Add 'ops/posix_mq.rs/' from commit 'f7d1a38da6'

git-subtree-dir: ops/posix_mq.rs
git-subtree-mainline: 8f68497269
git-subtree-split: f7d1a38da6
This commit is contained in:
Vincent Ambo 2020-01-20 11:32:02 +00:00
commit b59c7e693c
9 changed files with 521 additions and 0 deletions

View file

@ -0,0 +1,130 @@
use nix;
use std::error;
use std::fmt;
use std::io;
use std::num;
/// This module implements a simple error type to match the errors that can be thrown from the C
/// functions as well as some extra errors resulting from internal validations.
///
/// As this crate exposes an opinionated API to the POSIX queues certain errors have been
/// ignored:
///
/// * ETIMEDOUT: The low-level timed functions are not exported and this error can not occur.
/// * EAGAIN: Non-blocking queue calls are not supported.
/// * EINVAL: Same reason as ETIMEDOUT
/// * EMSGSIZE: The message size is immutable after queue creation and this crate checks it.
/// * ENAMETOOLONG: This crate performs name validation
///
/// If an unexpected error is encountered it will be wrapped appropriately and should be reported
/// as a bug on https://github.com/aprilabank/posix_mq.rs
#[derive(Debug)]
pub enum Error {
// These errors are raised inside of the library
InvalidQueueName(&'static str),
ValueReadingError(io::Error),
MessageSizeExceeded(),
MaximumMessageSizeExceeded(),
MaximumMessageCountExceeded(),
// These errors match what is described in the man pages (from mq_overview(7) onwards).
PermissionDenied(),
InvalidQueueDescriptor(),
QueueCallInterrupted(),
QueueAlreadyExists(),
QueueNotFound(),
InsufficientMemory(),
InsufficientSpace(),
// These two are (hopefully) unlikely in modern systems
ProcessFileDescriptorLimitReached(),
SystemFileDescriptorLimitReached(),
// If an unhandled / unknown / unexpected error occurs this error will be used.
// In those cases bug reports would be welcome!
UnknownForeignError(nix::Errno),
// Some other unexpected / unknown error occured. This is probably an error from
// the nix crate. Bug reports also welcome for this!
UnknownInternalError(Option<nix::Error>),
}
impl error::Error for Error {
fn description(&self) -> &str {
use Error::*;
match *self {
// This error contains more sensible description strings already
InvalidQueueName(e) => e,
ValueReadingError(_) => "error reading system configuration for message queues",
MessageSizeExceeded() => "message is larger than maximum size for specified queue",
MaximumMessageSizeExceeded() => "specified queue message size exceeds system maximum",
MaximumMessageCountExceeded() => "specified queue message count exceeds system maximum",
PermissionDenied() => "permission to the specified queue was denied",
InvalidQueueDescriptor() => "the internal queue descriptor was invalid",
QueueCallInterrupted() => "queue method interrupted by signal",
QueueAlreadyExists() => "the specified queue already exists",
QueueNotFound() => "the specified queue could not be found",
InsufficientMemory() => "insufficient memory to call queue method",
InsufficientSpace() => "insufficient space to call queue method",
ProcessFileDescriptorLimitReached() =>
"maximum number of process file descriptors reached",
SystemFileDescriptorLimitReached() =>
"maximum number of system file descriptors reached",
UnknownForeignError(_) => "unknown foreign error occured: please report a bug!",
UnknownInternalError(_) => "unknown internal error occured: please report a bug!",
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// Explicitly import this to gain access to Error::description()
use std::error::Error;
f.write_str(self.description())
}
}
/// This from implementation is used to translate errors from the lower-level
/// C-calls into sensible Rust errors.
impl From<nix::Error> for Error {
fn from(e: nix::Error) -> Self {
match e {
nix::Error::Sys(e) => match_errno(e),
_ => Error::UnknownInternalError(Some(e)),
}
}
}
// This implementation is used when reading system queue settings.
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::ValueReadingError(e)
}
}
// This implementation is used when parsing system queue settings. The unknown error is returned
// here because the system is probably seriously broken if those files don't contain numbers.
impl From<num::ParseIntError> for Error {
fn from(_: num::ParseIntError) -> Self {
Error::UnknownInternalError(None)
}
}
fn match_errno(err: nix::Errno) -> Error {
use nix::errno::*;
match err {
EACCES => Error::PermissionDenied(),
EBADF => Error::InvalidQueueDescriptor(),
EINTR => Error::QueueCallInterrupted(),
EEXIST => Error::QueueAlreadyExists(),
EMFILE => Error::ProcessFileDescriptorLimitReached(),
ENFILE => Error::SystemFileDescriptorLimitReached(),
ENOENT => Error::QueueNotFound(),
ENOMEM => Error::InsufficientMemory(),
ENOSPC => Error::InsufficientSpace(),
_ => Error::UnknownForeignError(err),
}
}

277
ops/posix_mq.rs/src/lib.rs Normal file
View file

@ -0,0 +1,277 @@
extern crate nix;
extern crate libc;
use error::Error;
use libc::mqd_t;
use nix::mqueue;
use nix::sys::stat;
use std::ffi::CString;
use std::fs::File;
use std::io::Read;
use std::string::ToString;
use std::ops::Drop;
pub mod error;
#[cfg(test)]
mod tests;
/*
TODO:
* what happens if permissions change after FD was opened?
* drop dependency on nix crate?
*/
/// Wrapper type for queue names that performs basic validation of queue names before calling
/// out to C code.
#[derive(Debug, Clone, PartialEq)]
pub struct Name(CString);
impl Name {
pub fn new<S: ToString>(s: S) -> Result<Self, Error> {
let string = s.to_string();
if !string.starts_with('/') {
return Err(Error::InvalidQueueName("Queue name must start with '/'"));
}
// The C library has a special error return for this case, so I assume people must actually
// have tried just using '/' as a queue name.
if string.len() == 1 {
return Err(Error::InvalidQueueName(
"Queue name must be a slash followed by one or more characters"
));
}
if string.len() > 255 {
return Err(Error::InvalidQueueName("Queue name must not exceed 255 characters"));
}
if string.matches('/').count() > 1 {
return Err(Error::InvalidQueueName("Queue name can not contain more than one slash"));
}
// TODO: What error is being thrown away here? Is it possible?
Ok(Name(CString::new(string).unwrap()))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Message {
pub data: Vec<u8>,
pub priority: u32,
}
/// Represents an open queue descriptor to a POSIX message queue. This carries information
/// about the queue's limitations (i.e. maximum message size and maximum message count).
#[derive(Debug)]
pub struct Queue {
name: Name,
/// Internal file/queue descriptor.
queue_descriptor: mqd_t,
/// Maximum number of pending messages in this queue.
max_pending: i64,
/// Maximum size of this queue.
max_size: usize,
}
impl Queue {
/// Creates a new queue and fails if it already exists.
/// By default the queue will be read/writable by the current user with no access for other
/// users.
/// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue.
pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result<Queue, Error> {
if max_pending > read_i64_from_file(MSG_MAX)? {
return Err(Error::MaximumMessageCountExceeded());
}
if max_size > read_i64_from_file(MSGSIZE_MAX)? {
return Err(Error::MaximumMessageSizeExceeded());
}
let oflags = {
let mut flags = mqueue::MQ_OFlag::empty();
// Put queue in r/w mode
flags.toggle(mqueue::O_RDWR);
// Enable queue creation
flags.toggle(mqueue::O_CREAT);
// Fail if queue exists already
flags.toggle(mqueue::O_EXCL);
flags
};
let attr = mqueue::MqAttr::new(
0, max_pending, max_size, 0
);
let queue_descriptor = mqueue::mq_open(
&name.0,
oflags,
default_mode(),
Some(&attr),
)?;
Ok(Queue {
name,
queue_descriptor,
max_pending,
max_size: max_size as usize,
})
}
/// Opens an existing queue.
pub fn open(name: Name) -> Result<Queue, Error> {
// No extra flags need to be constructed as the default is to open and fail if the
// queue does not exist yet - which is what we want here.
let oflags = mqueue::O_RDWR;
let queue_descriptor = mqueue::mq_open(
&name.0,
oflags,
default_mode(),
None,
)?;
let attr = mq_getattr(queue_descriptor)?;
Ok(Queue {
name,
queue_descriptor,
max_pending: attr.mq_maxmsg,
max_size: attr.mq_msgsize as usize,
})
}
/// Opens an existing queue or creates a new queue with the OS default settings.
pub fn open_or_create(name: Name) -> Result<Queue, Error> {
let oflags = {
let mut flags = mqueue::MQ_OFlag::empty();
// Put queue in r/w mode
flags.toggle(mqueue::O_RDWR);
// Enable queue creation
flags.toggle(mqueue::O_CREAT);
flags
};
let default_pending = read_i64_from_file(MSG_DEFAULT)?;
let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?;
let attr = mqueue::MqAttr::new(
0, default_pending, default_size, 0
);
let queue_descriptor = mqueue::mq_open(
&name.0,
oflags,
default_mode(),
Some(&attr),
)?;
let actual_attr = mq_getattr(queue_descriptor)?;
Ok(Queue {
name,
queue_descriptor,
max_pending: actual_attr.mq_maxmsg,
max_size: actual_attr.mq_msgsize as usize,
})
}
/// Delete a message queue from the system. This method will make the queue unavailable for
/// other processes after their current queue descriptors have been closed.
pub fn delete(self) -> Result<(), Error> {
mqueue::mq_unlink(&self.name.0)?;
drop(self);
Ok(())
}
/// Send a message to the message queue.
/// If the queue is full this call will block until a message has been consumed.
pub fn send(&self, msg: &Message) -> Result<(), Error> {
if msg.data.len() > self.max_size as usize {
return Err(Error::MessageSizeExceeded());
}
mqueue::mq_send(
self.queue_descriptor,
msg.data.as_ref(),
msg.priority,
).map_err(|e| e.into())
}
/// Receive a message from the message queue.
/// If the queue is empty this call will block until a message arrives.
pub fn receive(&self) -> Result<Message, Error> {
let mut data: Vec<u8> = vec![0; self.max_size as usize];
let mut priority: u32 = 0;
let msg_size = mqueue::mq_receive(
self.queue_descriptor,
data.as_mut(),
&mut priority,
)?;
data.truncate(msg_size);
Ok(Message { data, priority })
}
pub fn max_pending(&self) -> i64 {
self.max_pending
}
pub fn max_size(&self) -> usize {
self.max_size
}
}
impl Drop for Queue {
fn drop(&mut self) {
// Attempt to close the queue descriptor and discard any possible errors.
// The only error thrown in the C-code is EINVAL, which would mean that the
// descriptor has already been closed.
mqueue::mq_close(self.queue_descriptor).ok();
}
}
// Creates the default queue mode (0600).
fn default_mode() -> stat::Mode {
let mut mode = stat::Mode::empty();
mode.toggle(stat::S_IRUSR);
mode.toggle(stat::S_IWUSR);
mode
}
/// This file defines the default number of maximum pending messages in a queue.
const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default";
/// This file defines the system maximum number of pending messages in a queue.
const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max";
/// This file defines the default maximum size of messages in a queue.
const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default";
/// This file defines the system maximum size for messages in a queue.
const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max";
/// This method is used in combination with the above constants to find system limits.
fn read_i64_from_file(name: &str) -> Result<i64, Error> {
let mut file = File::open(name.to_string())?;
let mut content = String::new();
file.read_to_string(&mut content)?;
Ok(content.trim().parse()?)
}
/// The mq_getattr implementation in the nix crate hides the maximum message size and count, which
/// is very impractical.
/// To work around it, this method calls the C-function directly.
fn mq_getattr(mqd: mqd_t) -> Result<libc::mq_attr, Error> {
use std::mem;
let mut attr = unsafe { mem::uninitialized::<libc::mq_attr>() };
let res = unsafe { libc::mq_getattr(mqd, &mut attr) };
nix::Errno::result(res)
.map(|_| attr)
.map_err(|e| e.into())
}

View file

@ -0,0 +1,22 @@
use super::*;
#[test]
fn test_open_delete() {
// Simple test with default queue settings
let name = Name::new("/test-queue").unwrap();
let queue = Queue::open_or_create(name)
.expect("Opening queue failed");
let message = Message {
data: "test-message".as_bytes().to_vec(),
priority: 0,
};
queue.send(&message).expect("message sending failed");
let result = queue.receive().expect("message receiving failed");
assert_eq!(message, result);
queue.delete();
}