提交 47c340e2 authored 作者: Serhij S's avatar Serhij S

import

上级 091b3282
target
Cargo.lock
[package]
name = "roboplc"
version = "0.1.0"
edition = "2021"
authors = ["Serhij S. <div@altertech.com>"]
license = "Apache-2.0"
description = "Kit for PLCs and real-time micro-services"
repository = "https://github.com/eva-ics/roboplc"
keywords = ["realtime", "robots", "plc", "industrial"]
readme = "README.md"
[dependencies]
bma-ts = { version = "0.1.8", features = ["serde"] }
colored = "2.1.0"
libc = "0.2.153"
nix = { version = "0.28.0", features = ["signal"] }
object-id = "0.1.3"
oneshot = { version = "0.1.6", default-features = false, features = ["std"] }
parking_lot = "0.12.1"
serde = { version = "1.0.197", features = ["derive", "rc"] }
sysinfo = "0.30.6"
thiserror = "1.0.57"
# roboplc
An ultimate pack of tools to create real-time micro-services, PLCs and industrial-grade robots in Rust
# RoboPLC
An ultimate pack of tools for creating real-time micro-services, PLCs and
industrial-grade robots in Rust.
Note: the crate is actively developed. API can be changed at any time. Use at
your own risk!
RoboPLC is a component of [EVA ICS](https://www.eva-ics.com/) industrial
automation platform.
use std::collections::VecDeque;
use parking_lot::Mutex;
/// A capacity-limited thread-safe deque-based data buffer
pub struct DataBuffer<T> {
data: Mutex<VecDeque<T>>,
capacity: usize,
}
impl<T> DataBuffer<T> {
/// # Panics
///
/// Will panic if the capacity is zero
#[inline]
pub fn bounded(capacity: usize) -> Self {
assert!(capacity > 0, "data buffer capacity MUST be > 0");
Self {
data: <_>::default(),
capacity,
}
}
/// Tries to push the value
/// returns the value back if not pushed
pub fn try_push(&self, value: T) -> Option<T> {
let mut buf = self.data.lock();
if buf.len() >= self.capacity {
return Some(value);
}
buf.push_back(value);
None
}
/// Forcibly pushes the value, removing the first element if necessary
///
/// returns true in case the buffer had enough capacity or false if the first element had been
/// removed
pub fn force_push(&self, value: T) -> bool {
let mut buf = self.data.lock();
let mut res = true;
while buf.len() >= self.capacity {
buf.pop_front();
res = false;
}
buf.push_back(value);
res
}
/// the current buffer length (number of elements)
pub fn len(&self) -> usize {
self.data.lock().len()
}
/// is the buffer empty
pub fn is_empty(&self) -> bool {
self.data.lock().is_empty()
}
/// takes the buffer content and keeps nothing inside
pub fn take(&self) -> VecDeque<T> {
std::mem::take(&mut *self.data.lock())
}
}
use std::sync::Arc;
use parking_lot::Mutex;
use crate::pchannel::{self, Receiver};
use crate::{pchannel::Sender, MessageDeliveryPolicy};
use crate::{Error, Result};
type ConditionFunction<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
pub const DEFAULT_PRIORITY: usize = 100;
pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
/// Data communcation hub to implement in-process pub/sub model for thread workers
pub struct Hub<T: MessageDeliveryPolicy + Clone> {
inner: Arc<Mutex<HubInner<T>>>,
}
impl<T: MessageDeliveryPolicy + Clone> Clone for Hub<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: MessageDeliveryPolicy + Clone> Default for Hub<T> {
fn default() -> Self {
Self {
inner: <_>::default(),
}
}
}
impl<T: MessageDeliveryPolicy + Clone> Hub<T> {
pub fn new() -> Self {
Self::default()
}
/// Sets the default client channel capacity (the default is 1024), can be used as a build
/// pattern
pub fn set_default_channel_capacity(self, capacity: usize) -> Self {
self.inner.lock().default_channel_capacity = capacity;
self
}
/// Sends a message to subscribed clients, ignores send errors
///
/// # Panics
///
/// Should not panic
pub fn send(&self, message: T) {
macro_rules! send {
($sub: expr, $msg: expr) => {
let _ = $sub.tx.send($msg);
};
}
// clones matching subscribers to keep the internal mutex unlocked and avoid deadlocks
let targets: Vec<Arc<Subscription<T>>> = self
.inner
.lock()
.subscriptions
.iter()
.filter(|c| (c.condition)(&message))
.cloned()
.collect();
if targets.is_empty() {
return;
}
for sub in targets.iter().take(targets.len() - 1) {
if (sub.condition)(&message) {
send!(sub, message.clone());
}
}
let sub = targets.last().unwrap();
if (sub.condition)(&message) {
send!(sub, message);
}
}
/// Sends a message to subscribed clients, calls an error handlers function in case of errors
/// with some subsciber
///
/// If the error function returns false, the whole operation is aborted
///
/// # Panics
///
/// Should not panic
pub fn send_checked<F>(&self, message: T, error_handler: F) -> Result<()>
where
F: Fn(&str, &Error) -> bool,
{
macro_rules! send_checked {
($sub: expr, $msg: expr) => {
if let Err(e) = $sub.tx.send($msg) {
if !error_handler(&$sub.name, &e) {
return Err(Error::HubSend(e.into()));
}
}
};
}
let targets: Vec<Arc<Subscription<T>>> = self
.inner
.lock()
.subscriptions
.iter()
.filter(|c| (c.condition)(&message))
.cloned()
.collect();
if targets.is_empty() {
return Ok(());
}
for sub in targets.iter().take(targets.len() - 1) {
if (sub.condition)(&message) {
send_checked!(sub, message.clone());
}
}
let sub = targets.last().unwrap();
if (sub.condition)(&message) {
send_checked!(sub, message);
}
Ok(())
}
/// Registers a sender-only client with no subscriptions
///
/// If attempting to receive a message from such client, [`Error::ChannelClosed`] is returned
pub fn sender(&self) -> Client<T> {
let (_, rx) = pchannel::bounded(1);
Client {
name: "".into(),
hub: self.clone(),
rx,
}
}
/// Registers a regular client. The condition function is used to check which kinds of
/// messages should be delivered (returns true for subscribed)
pub fn register<F>(&self, name: &str, condition: F) -> Result<Client<T>>
where
F: Fn(&T) -> bool + Send + Sync + 'static,
{
self.register_with_options(ClientOptions::new(name, condition))
}
/// Registers a regular client with custom options
pub fn register_with_options(&self, client_options: ClientOptions<T>) -> Result<Client<T>> {
let name = client_options.name.clone();
let mut inner = self.inner.lock();
if inner.subscriptions.iter().any(|client| client.name == name) {
return Err(Error::HubAlreadyRegistered(name));
}
let capacity = client_options
.capacity
.unwrap_or(inner.default_channel_capacity);
let (tx, rx) = if client_options.ordering {
pchannel::ordered(capacity)
} else {
pchannel::bounded(capacity)
};
inner
.subscriptions
.push(client_options.into_subscription(tx).into());
inner
.subscriptions
.sort_by(|a, b| a.priority.cmp(&b.priority));
Ok(Client {
name,
hub: self.clone(),
rx,
})
}
fn unregister(&self, name: &str) {
self.inner
.lock()
.subscriptions
.retain(|client| &*client.name != name);
}
}
struct HubInner<T: MessageDeliveryPolicy + Clone> {
default_channel_capacity: usize,
subscriptions: Vec<Arc<Subscription<T>>>,
}
impl<T> Default for HubInner<T>
where
T: MessageDeliveryPolicy + Clone,
{
fn default() -> Self {
Self {
default_channel_capacity: DEFAULT_CHANNEL_CAPACITY,
subscriptions: <_>::default(),
}
}
}
pub struct Client<T: MessageDeliveryPolicy + Clone> {
name: Arc<str>,
hub: Hub<T>,
rx: Receiver<T>,
}
impl<T: MessageDeliveryPolicy + Clone> Client<T> {
/// Sends a message to hub-subscribed clients, ignores send errors
pub fn send(&self, message: T) {
self.hub.send(message);
}
/// Sends a message to subscribed clients, calls an error handlers function in case of errors
/// with some subsciber
///
/// If the error function returns false, the whole operation is aborted
pub fn send_checked<F>(&self, message: T, error_handler: F) -> Result<()>
where
F: Fn(&str, &Error) -> bool,
{
self.hub.send_checked(message, error_handler)
}
/// Receives a message from the hub (blocking)
pub fn recv(&self) -> Result<T> {
self.rx.recv()
}
/// Receives a message from the hub (non-blocking)
pub fn try_recv(&self) -> Result<T> {
self.rx.try_recv()
}
}
impl<T: MessageDeliveryPolicy + Clone> Drop for Client<T> {
fn drop(&mut self) {
self.hub.unregister(&self.name);
}
}
pub struct ClientOptions<T: MessageDeliveryPolicy + Clone> {
name: Arc<str>,
priority: usize,
capacity: Option<usize>,
ordering: bool,
condition: ConditionFunction<T>,
}
impl<T: MessageDeliveryPolicy + Clone> ClientOptions<T> {
pub fn new<F>(name: &str, condition: F) -> Self
where
F: Fn(&T) -> bool + Send + Sync + 'static,
{
Self {
name: name.to_owned().into(),
priority: DEFAULT_PRIORITY,
capacity: None,
ordering: false,
condition: Box::new(condition),
}
}
/// Enables client channel priority ordering
pub fn ordering(mut self, ordering: bool) -> Self {
self.ordering = ordering;
self
}
/// Sets client priority (the default is 100)
pub fn priority(mut self, priority: usize) -> Self {
self.priority = priority;
self
}
/// Overrides the default hub client channel capacity
pub fn capacity(mut self, capacity: usize) -> Self {
self.capacity = Some(capacity);
self
}
fn into_subscription(self, tx: Sender<T>) -> Subscription<T> {
Subscription {
name: self.name,
tx,
priority: self.priority,
condition: self.condition,
}
}
}
/// A macro which can be used to match an event with enum for [`Hub`] subscription condition
///
/// # Examples
///
/// ```rust
/// enum Message {
/// Temperature(f64),
/// Flush,
/// Other
/// }
///
/// let condition = move || { event_matches!(Message::Temperature(_)) ||
/// event_matches!(Message::Flush) };
/// ```
#[macro_export]
macro_rules! event_matches {
($m: pat) => {
|msg| matches!(msg, $m)
};
}
struct Subscription<T: MessageDeliveryPolicy + Clone> {
name: Arc<str>,
tx: Sender<T>,
priority: usize,
condition: ConditionFunction<T>,
}
#[cfg(test)]
mod test {
use crate::{event_matches, MessageDeliveryPolicy};
use super::Hub;
#[derive(Clone)]
enum Message {
Temperature(f64),
Humidity(f64),
Test,
}
impl MessageDeliveryPolicy for Message {}
#[test]
fn test_hub() {
let hub = Hub::<Message>::new().set_default_channel_capacity(20);
let sender = hub.sender();
let recv = hub
.register(
"test_recv",
event_matches!(Message::Temperature(_) | Message::Humidity(_)),
)
.unwrap();
for _ in 0..10 {
sender.send(Message::Temperature(1.0));
sender.send(Message::Humidity(2.0));
sender.send(Message::Test);
}
let mut c = 0;
while let Ok(msg) = recv.try_recv() {
match msg {
Message::Temperature(_) | Message::Humidity(_) => c += 1,
Message::Test => panic!(),
}
}
assert_eq!(c, 20);
}
}
#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
use std::{mem, sync::Arc, time::Duration};
use thread_rt::{RTParams, Scheduling};
/// Event buffers
pub mod buf;
/// In-process data communication pub/sub hub
pub mod hub;
/// Policy-based channels
pub mod pchannel;
/// Policy-based data storages
pub mod pdeque;
/// Task supervisor to manage real-time threads
pub mod supervisor;
/// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone
pub mod thread_rt;
/// Various time tools for real-time applications
pub mod time;
/// A memory cell with an expiring value
pub mod ttlcell;
pub type Result<T> = std::result::Result<T, Error>;
/// The crate error type
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// the channel is full and the value can not be sent
#[error("channel full")]
ChannelFull,
/// the channel is full, an optional value is skipped. the error can be ignored but should be
/// logged
#[error("channel message skipped")]
ChannelSkipped,
/// The channel is closed (all transmitters/receivers gone)
#[error("channel closed")]
ChannelClosed,
/// Receive attempt failed because the channel is empty
#[error("channel empty")]
ChannelEmpty,
#[error("hub send error {0}")]
HubSend(Box<Error>),
#[error("hub client already registered: {0}")]
HubAlreadyRegistered(Arc<str>),
#[error("I/O error {0}")]
IO(String),
#[error("RT SYS_gettid {0}")]
RTGetTId(libc::c_int),
#[error("RT sched_setaffinity {0}")]
RTSchedSetAffinity(libc::c_int),
#[error("RT sched_setscheduler {0}")]
RTSchedSetSchduler(libc::c_int),
#[error("Task name must be specified when spawning by a supervisor")]
SupervisorNameNotSpecified,
#[error("Task already registered")]
SupervisorDuplicateTask,
#[error("Task not found")]
SupervisorTaskNotFound,
}
macro_rules! impl_error {
($t: ty, $key: ident) => {
impl From<$t> for Error {
fn from(err: $t) -> Self {
Error::$key(err.to_string())
}
}
};
}
impl_error!(std::io::Error, IO);
impl_error!(oneshot::RecvError, IO);
impl Error {
pub fn is_skipped(&self) -> bool {
matches!(self, Error::ChannelSkipped)
}
}
/// Message delivery policies, used by [`hub::Hub`], [`pchannel::Receiver`] and [`pdeque::Deque`]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
pub enum DeliveryPolicy {
#[default]
/// always deliver, fail if no room (default)
Always,
/// skip delivery if no room
Optional,
/// always deliver the message but always in a single copy (latest)
Single,
/// deliver a single latest copy, skip if no room
SingleOptional,
}
/// Implements delivery policies for own data types
pub trait MessageDeliveryPolicy
where
Self: Sized,
{
/// Delivery policy
fn delivery_policy(&self) -> DeliveryPolicy {
DeliveryPolicy::Always
}
/// Priority, for ordered
fn priority(&self) -> usize {
0
}
/// Has equal kind with other
///
/// (default: check enum discriminant)
fn eq_kind(&self, other: &Self) -> bool {
mem::discriminant(self) == mem::discriminant(other)
}
/// If a message expires during storing/delivering, it is not delivered
fn is_expired(&self) -> bool {
false
}
#[doc(hidden)]
fn is_delivery_policy_single(&self) -> bool {
let dp = self.delivery_policy();
dp == DeliveryPolicy::Single || dp == DeliveryPolicy::SingleOptional
}
#[doc(hidden)]
fn is_delivery_policy_optional(&self) -> bool {
let dp = self.delivery_policy();
dp == DeliveryPolicy::Optional || dp == DeliveryPolicy::SingleOptional
}
}
/// Terminates the current process and all its subprocesses in the specified period of time with
/// SIGKILL command. Useful if a process is unable to shut it down gracefully within a specified
/// period of time.
///
/// Prints warnings to STDOUT if warn is true
pub fn suicide(delay: Duration, warn: bool) {
let mut builder = thread_rt::Builder::new().name("suicide").rt_params(
RTParams::new()
.set_priority(99)
.set_scheduling(Scheduling::FIFO),
);
builder.park_on_errors = true;
let res = builder.spawn(move || {
dbg!("realtime");
thread_rt::suicide_myself(delay, warn);
});
if res.is_err() {
std::thread::spawn(move || {
thread_rt::suicide_myself(delay, warn);
});
};
}
use std::sync::Arc;
use crate::{pdeque::Deque, Error, MessageDeliveryPolicy, Result};
use object_id::UniqueId;
use parking_lot::{Condvar, Mutex};
struct Channel<T: MessageDeliveryPolicy>(Arc<ChannelInner<T>>);
impl<T: MessageDeliveryPolicy> Channel<T> {
fn id(&self) -> usize {
self.0.id.as_usize()
}
}
impl<T: MessageDeliveryPolicy> Eq for Channel<T> {}
impl<T: MessageDeliveryPolicy> PartialEq for Channel<T> {
fn eq(&self, other: &Self) -> bool {
self.id() == other.id()
}
}
impl<T> Clone for Channel<T>
where
T: MessageDeliveryPolicy,
{
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
struct ChannelInner<T: MessageDeliveryPolicy> {
id: UniqueId,
pc: Mutex<PolicyChannel<T>>,
available: Condvar,
}
impl<T: MessageDeliveryPolicy> ChannelInner<T> {
fn try_send(&self, value: T) -> Result<()> {
let mut pc = self.pc.lock();
if pc.receivers == 0 {
return Err(Error::ChannelClosed);
}
let push_result = pc.queue.try_push(value);
if push_result.value.is_none() {
self.available.notify_one();
if push_result.pushed {
Ok(())
} else {
Err(Error::ChannelSkipped)
}
} else {
Err(Error::ChannelFull)
}
}
fn send(&self, mut value: T) -> Result<()> {
let mut pc = self.pc.lock();
let pushed = loop {
if pc.receivers == 0 {
return Err(Error::ChannelClosed);
}
let push_result = pc.queue.try_push(value);
let Some(val) = push_result.value else {
break push_result.pushed;
};
value = val;
self.available.wait(&mut pc);
};
self.available.notify_one();
if pushed {
Ok(())
} else {
Err(Error::ChannelSkipped)
}
}
fn recv(&self) -> Result<T> {
let mut pc = self.pc.lock();
loop {
if let Some(val) = pc.queue.get() {
self.available.notify_one();
return Ok(val);
} else if pc.senders == 0 {
return Err(Error::ChannelClosed);
}
self.available.wait(&mut pc);
}
}
fn try_recv(&self) -> Result<T> {
let mut pc = self.pc.lock();
if let Some(val) = pc.queue.get() {
self.available.notify_one();
Ok(val)
} else if pc.senders == 0 {
Err(Error::ChannelClosed)
} else {
Err(Error::ChannelEmpty)
}
}
}
impl<T: MessageDeliveryPolicy> Channel<T> {
fn new(capacity: usize, ordering: bool) -> Self {
Self(
ChannelInner {
id: <_>::default(),
pc: Mutex::new(PolicyChannel::new(capacity, ordering)),
available: Condvar::new(),
}
.into(),
)
}
}
struct PolicyChannel<T: MessageDeliveryPolicy> {
queue: Deque<T>,
senders: usize,
receivers: usize,
}
impl<T> PolicyChannel<T>
where
T: MessageDeliveryPolicy,
{
fn new(capacity: usize, ordering: bool) -> Self {
assert!(capacity > 0, "channel capacity MUST be > 0");
Self {
queue: Deque::bounded(capacity).set_ordering(ordering),
senders: 1,
receivers: 1,
}
}
}
#[derive(Eq, PartialEq)]
pub struct Sender<T>
where
T: MessageDeliveryPolicy,
{
channel: Channel<T>,
}
impl<T> Sender<T>
where
T: MessageDeliveryPolicy,
{
/// Returns true if the value has been really sent
#[inline]
pub fn send(&self, value: T) -> Result<()> {
self.channel.0.send(value)
}
#[inline]
pub fn try_send(&self, value: T) -> Result<()> {
self.channel.0.try_send(value)
}
#[inline]
pub fn len(&self) -> usize {
self.channel.0.pc.lock().queue.len()
}
#[inline]
pub fn is_full(&self) -> bool {
self.channel.0.pc.lock().queue.is_full()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.channel.0.pc.lock().queue.is_empty()
}
#[inline]
pub fn is_alive(&self) -> bool {
self.channel.0.pc.lock().receivers > 0
}
}
impl<T> Clone for Sender<T>
where
T: MessageDeliveryPolicy,
{
fn clone(&self) -> Self {
self.channel.0.pc.lock().senders += 1;
Self {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Sender<T>
where
T: MessageDeliveryPolicy,
{
fn drop(&mut self) {
self.channel.0.pc.lock().senders -= 1;
self.channel.0.available.notify_all();
}
}
#[derive(Eq, PartialEq)]
pub struct Receiver<T>
where
T: MessageDeliveryPolicy,
{
channel: Channel<T>,
}
impl<T> Receiver<T>
where
T: MessageDeliveryPolicy,
{
#[inline]
pub fn recv(&self) -> Result<T> {
self.channel.0.recv()
}
#[inline]
pub fn try_recv(&self) -> Result<T> {
self.channel.0.try_recv()
}
#[inline]
pub fn len(&self) -> usize {
self.channel.0.pc.lock().queue.len()
}
#[inline]
pub fn is_full(&self) -> bool {
self.channel.0.pc.lock().queue.is_full()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.channel.0.pc.lock().queue.is_empty()
}
#[inline]
pub fn is_alive(&self) -> bool {
self.channel.0.pc.lock().senders > 0
}
}
impl<T> Clone for Receiver<T>
where
T: MessageDeliveryPolicy,
{
fn clone(&self) -> Self {
self.channel.0.pc.lock().receivers += 1;
Self {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Receiver<T>
where
T: MessageDeliveryPolicy,
{
fn drop(&mut self) {
self.channel.0.pc.lock().receivers -= 1;
self.channel.0.available.notify_all();
}
}
fn make_channel<T: MessageDeliveryPolicy>(ch: Channel<T>) -> (Sender<T>, Receiver<T>) {
let tx = Sender {
channel: ch.clone(),
};
let rx = Receiver { channel: ch };
(tx, rx)
}
/// Creates a bounded channel which respects [`MessageDeliveryPolicy`] rules with no message
/// priority ordering
///
/// # Panics
///
/// Will panic if the capacity is zero
pub fn bounded<T: MessageDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let ch = Channel::new(capacity, false);
make_channel(ch)
}
/// Creates a bounded channel which respects [`MessageDeliveryPolicy`] rules and has got message
/// priority ordering turned on
///
/// # Panics
///
/// Will panic if the capacity is zero
pub fn ordered<T: MessageDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let ch = Channel::new(capacity, true);
make_channel(ch)
}
#[cfg(test)]
mod test {
use std::{thread, time::Duration};
use crate::{DeliveryPolicy, MessageDeliveryPolicy};
use super::bounded;
#[derive(Debug)]
enum Message {
Test(usize),
Temperature(f64),
Spam,
}
impl MessageDeliveryPolicy for Message {
fn delivery_policy(&self) -> DeliveryPolicy {
match self {
Message::Test(_) => DeliveryPolicy::Always,
Message::Temperature(_) => DeliveryPolicy::Single,
Message::Spam => DeliveryPolicy::Optional,
}
}
}
#[test]
fn test_delivery_policy_optional() {
let (tx, rx) = bounded::<Message>(1);
thread::spawn(move || {
for _ in 0..10 {
tx.send(Message::Test(123)).unwrap();
if let Err(e) = tx.send(Message::Spam) {
assert!(e.is_skipped(), "{}", e);
}
tx.send(Message::Temperature(123.0)).unwrap();
}
});
thread::sleep(Duration::from_secs(1));
while let Ok(msg) = rx.recv() {
thread::sleep(Duration::from_millis(10));
if matches!(msg, Message::Spam) {
panic!("delivery policy not respected ({:?})", msg);
}
}
}
#[test]
fn test_delivery_policy_single() {
let (tx, rx) = bounded::<Message>(512);
thread::spawn(move || {
for _ in 0..10 {
tx.send(Message::Test(123)).unwrap();
if let Err(e) = tx.send(Message::Spam) {
assert!(e.is_skipped(), "{}", e);
}
tx.send(Message::Temperature(123.0)).unwrap();
}
});
thread::sleep(Duration::from_secs(1));
let mut c = 0;
let mut t = 0;
while let Ok(msg) = rx.recv() {
match msg {
Message::Test(_) => c += 1,
Message::Temperature(_) => t += 1,
Message::Spam => {}
}
}
assert_eq!(c, 10);
assert_eq!(t, 1);
}
#[test]
fn test_poisoning() {
let n = 20_000;
for i in 0..n {
let (tx, rx) = bounded::<Message>(512);
let rx_t = thread::spawn(move || while rx.recv().is_ok() {});
thread::spawn(move || {
let _t = tx;
});
for _ in 0..100 {
if rx_t.is_finished() {
break;
}
thread::sleep(Duration::from_millis(1));
}
assert!(rx_t.is_finished(), "RX poisined {}", i);
}
}
}
use std::collections::VecDeque;
use crate::{DeliveryPolicy, MessageDeliveryPolicy};
/// A deque which stores values with respect of [`MessageDeliveryPolicy`]
#[derive(Clone, Debug)]
pub struct Deque<T>
where
T: MessageDeliveryPolicy,
{
data: VecDeque<T>,
capacity: usize,
ordered: bool,
}
/// Result payload of try_push operation
pub struct TryPushOutput<T> {
/// has the value been really pushed
pub pushed: bool,
/// the value in case if push failed but the value kind is not an optional
pub value: Option<T>,
}
impl<T> Deque<T>
where
T: MessageDeliveryPolicy,
{
/// Creates a new bounded deque
#[inline]
pub fn bounded(capacity: usize) -> Self {
Self {
data: VecDeque::with_capacity(capacity),
capacity,
ordered: false,
}
}
/// Enabled/disables priority ordering, can be used as a build pattern
#[inline]
pub fn set_ordering(mut self, v: bool) -> Self {
self.ordered = v;
self
}
/// Tries to store the value
///
/// Returns the value back if there is no capacity even after all [`MessageDeliveryPolicy`]
/// rules have been applied
pub fn try_push(&mut self, value: T) -> TryPushOutput<T> {
macro_rules! push {
() => {{
self.data.push_back(value);
if self.ordered {
sort_by_priority(&mut self.data);
}
TryPushOutput {
pushed: true,
value: None,
}
}};
}
if value.is_delivery_policy_single() {
self.data.retain(|d| !d.eq_kind(&value) && !d.is_expired());
}
if self.data.len() < self.capacity {
push!()
} else {
match value.delivery_policy() {
DeliveryPolicy::Always | DeliveryPolicy::Single => {
let mut entry_removed = false;
self.data.retain(|d| {
if entry_removed {
true
} else if d.is_expired() || d.is_delivery_policy_optional() {
entry_removed = true;
false
} else {
true
}
});
if self.data.len() < self.capacity {
push!()
} else {
TryPushOutput {
pushed: false,
value: Some(value),
}
}
}
DeliveryPolicy::Optional | DeliveryPolicy::SingleOptional => TryPushOutput {
pushed: false,
value: None,
},
}
}
}
/// Returns the first available value, ignores expired ones
#[inline]
pub fn get(&mut self) -> Option<T> {
loop {
let value = self.data.pop_front();
if let Some(ref val) = value {
if !val.is_expired() {
break value;
}
} else {
break None;
}
}
}
/// Clears the deque
#[inline]
pub fn clear(&mut self) {
self.data.clear();
}
/// Returns number of elements in deque
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() == self.capacity
}
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
fn sort_by_priority<T: MessageDeliveryPolicy>(v: &mut VecDeque<T>) {
v.rotate_right(v.as_slices().1.len());
assert!(v.as_slices().1.is_empty());
v.as_mut_slices()
.0
.sort_by(|a, b| a.priority().partial_cmp(&b.priority()).unwrap());
}
use std::collections::{btree_map, BTreeMap};
use std::{mem, thread};
use serde::Serialize;
use crate::thread_rt::{Builder, Task};
use crate::time::Interval;
use crate::{Error, Result};
/// A supervisor object used to manage tasks spawned with [`Builder`]
#[derive(Serialize)]
pub struct Supervisor<T> {
tasks: BTreeMap<String, Task<T>>,
}
impl<T> Default for Supervisor<T> {
fn default() -> Self {
Self {
tasks: <_>::default(),
}
}
}
impl<T> Supervisor<T> {
pub fn new() -> Self {
Self::default()
}
/// Spawns a new task using a [`Builder`] object and registers it. The task name MUST be unique
/// and SHOULD be 16 characters or less to set a proper thread name
pub fn spawn<F, B>(&mut self, builder: B, f: F) -> Result<&Task<T>>
where
B: Into<Builder>,
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let builder = builder.into();
let entry = self.vacant_entry(&builder)?;
let task = builder.spawn(f)?;
Ok(entry.insert(task))
}
/// Spawns a new periodic task using a [`Builder`] object and registers it. The task name MUST
/// be unique and SHOULD be 16 characters or less to set a proper thread name
pub fn spawn_periodic<F, B>(&mut self, builder: B, f: F, interval: Interval) -> Result<&Task<T>>
where
F: Fn() -> T + Send + 'static,
T: Send + 'static,
B: Into<Builder>,
{
let builder = builder.into();
let entry = self.vacant_entry(&builder)?;
let task = builder.spawn_periodic(f, interval)?;
Ok(entry.insert(task))
}
/// Gets a task by its name
pub fn get_task(&self, name: &str) -> Option<&Task<T>> {
self.tasks.get(name)
}
/// Gets a task by its name as a mutable object
pub fn get_task_mut(&mut self, name: &str) -> Option<&mut Task<T>> {
self.tasks.get_mut(name)
}
/// Takes a task by its name and removes it from the internal registry
pub fn take_task(&mut self, name: &str) -> Option<Task<T>> {
self.tasks.remove(name)
}
/// Removes a task from the internal registry
pub fn forget_task(&mut self, name: &str) -> Result<()> {
if self.tasks.remove(name).is_some() {
Ok(())
} else {
Err(Error::SupervisorTaskNotFound)
}
}
/// Removes all finished tasks from the internal registry
pub fn purge(&mut self) {
self.tasks.retain(|_, task| !task.is_finished());
}
/// Joins all tasks in the internal registry and returns a map with their results. After the
/// operation the registry is cleared
pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> {
let mut result = BTreeMap::new();
for (name, task) in mem::take(&mut self.tasks) {
result.insert(name, task.join());
}
result
}
fn vacant_entry(
&mut self,
builder: &Builder,
) -> Result<btree_map::VacantEntry<String, Task<T>>> {
let Some(name) = builder.name.clone() else {
return Err(Error::SupervisorNameNotSpecified);
};
let btree_map::Entry::Vacant(entry) = self.tasks.entry(name) else {
return Err(Error::SupervisorDuplicateTask);
};
Ok(entry)
}
}
差异被折叠。
use std::{thread, time::Duration};
use bma_ts::Monotonic;
/// A trait which extends the standard [`Duration`] and similar types with additional methods
///
pub trait DurationRT {
/// Returns true if all provided [`Monotonic`] times fit the duration
fn fits(&self, t: &[Monotonic]) -> bool;
}
impl DurationRT for Duration {
fn fits(&self, t: &[Monotonic]) -> bool {
if t.is_empty() {
true
} else {
let min_ts = t.iter().min().unwrap();
let max_ts = t.iter().max().unwrap();
max_ts.as_duration() - min_ts.as_duration() <= *self
}
}
}
/// A synchronous interval helper, similar to
/// <https://docs.rs/tokio/latest/tokio/time/struct.Interval.html>
pub struct Interval {
next_tick: Option<Monotonic>,
period: Duration,
missing_tick_behavior: MissedTickBehavior,
}
impl Interval {
pub fn new(period: Duration) -> Self {
Self {
next_tick: None,
period,
missing_tick_behavior: <_>::default(),
}
}
/// Ticks the interval
///
/// Returns false if a tick is missed
pub fn tick(&mut self) -> bool {
let now = Monotonic::now();
if let Some(mut next_tick) = self.next_tick {
match now.cmp(&next_tick) {
std::cmp::Ordering::Less => {
let to_sleep = next_tick - now;
self.next_tick = Some(next_tick + self.period);
thread::sleep(to_sleep);
true
}
std::cmp::Ordering::Equal => true,
std::cmp::Ordering::Greater => {
match self.missing_tick_behavior {
MissedTickBehavior::Burst => {
self.next_tick = Some(next_tick + self.period);
}
MissedTickBehavior::Delay => {
self.next_tick = Some(now + self.period);
}
MissedTickBehavior::Skip => {
while next_tick <= now {
next_tick += self.period;
}
self.next_tick = Some(next_tick);
}
}
false
}
}
} else {
self.next_tick = Some(now + self.period);
true
}
}
/// Sets missing tick behavior policy. Can be used as a build pattern
pub fn set_missing_tick_behavior(mut self, missing_tick_behavior: MissedTickBehavior) -> Self {
self.missing_tick_behavior = missing_tick_behavior;
self
}
}
/// Interval missing tick behavior
///
/// The behavior is similar to
/// <https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html>
/// but may differ in some details
#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
pub enum MissedTickBehavior {
#[default]
/// `[Interval::tick()`] method has no delay for missed intervals, all the missed ones are
/// fired instantly
Burst,
/// The interval is restarted from the current point of time
Delay,
/// Missed ticks are skipped with no additional effect
Skip,
}
#[cfg(test)]
mod test {
use std::{thread, time::Duration};
use bma_ts::Monotonic;
use crate::time::DurationRT as _;
#[test]
fn test_fits() {
let first = Monotonic::now();
thread::sleep(Duration::from_millis(10));
let second = Monotonic::now();
thread::sleep(Duration::from_millis(10));
let third = Monotonic::now();
assert!(Duration::from_millis(100).fits(&[first, second, third]));
assert!(Duration::from_millis(25).fits(&[first, second, third]));
}
}
use bma_ts::Monotonic;
use std::{ops::Deref, time::Duration};
/// A memory cell with an expiring value with API similar to the standard [`Option`]
pub struct TtlCell<T> {
value: Option<T>,
ttl: Duration,
set_at: Monotonic,
}
impl<T> TtlCell<T> {
/// Creates a new empty cell
#[inline]
pub fn new(ttl: Duration) -> Self {
Self {
value: None,
ttl,
set_at: Monotonic::now(),
}
}
/// Creates a new empty cell with a value set
#[inline]
pub fn new_with_value(ttl: Duration, value: T) -> Self {
Self {
value: Some(value),
ttl,
set_at: Monotonic::now(),
}
}
/// Replaces the current value, returns the previous one. The value set time is set to the
/// current time point
#[inline]
pub fn replace(&mut self, value: T) -> Option<T> {
let prev = self.value.replace(value);
let result = if self.is_expired() { None } else { prev };
self.touch();
result
}
/// Sets the current value. The value set time is set to the current time point
#[inline]
pub fn set(&mut self, value: T) {
self.value = Some(value);
self.touch();
}
/// Clears the current value
#[inline]
pub fn clear(&mut self) {
self.value = None;
}
/// Returns a refernce to the value if set and not expired
#[inline]
pub fn as_ref(&self) -> Option<&T> {
if self.is_expired() {
None
} else {
self.value.as_ref()
}
}
/// A value ref-coupler
///
/// Returns two references to two [`TtlCell`] values in case if both of them are not expired and
/// set time difference matches "max_time_delta" parameter
#[inline]
pub fn as_ref_with<'a, O>(
&'a self,
other: &'a TtlCell<O>,
max_time_delta: Duration,
) -> Option<(&T, &O)> {
let maybe_first = self.as_ref();
let maybe_second = other.as_ref();
if let Some(first) = maybe_first {
if let Some(second) = maybe_second {
if self.set_at.abs_diff(other.set_at) <= max_time_delta {
return Some((first, second));
}
}
}
None
}
/// Takes the value if set and not expired, clears the cell
#[inline]
pub fn take(&mut self) -> Option<T> {
if self.is_expired() {
None
} else {
self.value.take()
}
}
/// A value take-coupler
///
/// Takes two [`TtlCell`] values in case if both of them are not expired and set time
/// difference matches "max_time_delta" parameter. Both cells are cleared.
#[inline]
pub fn take_with<O>(
&mut self,
other: &mut TtlCell<O>,
max_time_delta: Duration,
) -> Option<(T, O)> {
let maybe_first = self.take();
let maybe_second = other.take();
if let Some(first) = maybe_first {
if let Some(second) = maybe_second {
if self.set_at.abs_diff(other.set_at) <= max_time_delta {
return Some((first, second));
}
}
}
None
}
/// Returns a derefernce to the value if set and not expired
#[inline]
pub fn as_deref(&self) -> Option<&T::Target>
where
T: Deref,
{
match self.as_ref() {
Some(t) => Some(&**t),
None => None,
}
}
/// Returns true if the value is expired or not set
#[inline]
pub fn is_expired(&self) -> bool {
self.set_at.elapsed() > self.ttl || self.value.is_none()
}
/// Updates the value set time to the current point of time
#[inline]
pub fn touch(&mut self) {
self.set_at = Monotonic::now();
}
/// Returns the value set time (monotonic)
#[inline]
pub fn set_at(&self) -> Monotonic {
self.set_at
}
}
#[cfg(test)]
mod test {
use std::{thread, time::Duration};
use super::TtlCell;
#[test]
fn test_get_set() {
let ttl = Duration::from_millis(10);
let mut opt = TtlCell::new_with_value(ttl, 25);
thread::sleep(ttl / 2);
assert_eq!(opt.as_ref().copied(), Some(25));
thread::sleep(ttl);
assert_eq!(opt.as_ref().copied(), None);
opt.set(30);
thread::sleep(ttl / 2);
assert_eq!(opt.as_ref().copied(), Some(30));
thread::sleep(ttl);
assert_eq!(opt.as_ref().copied(), None);
}
#[test]
fn test_take_replace() {
let ttl = Duration::from_millis(10);
let mut opt = TtlCell::new_with_value(ttl, 25);
thread::sleep(ttl / 2);
assert_eq!(opt.take(), Some(25));
assert_eq!(opt.as_ref().copied(), None);
opt.set(30);
thread::sleep(ttl / 2);
assert_eq!(opt.replace(29), Some(30));
assert_eq!(opt.as_ref().copied(), Some(29));
thread::sleep(ttl);
assert_eq!(opt.as_ref().copied(), None);
}
#[test]
fn test_take_with() {
let mut first = TtlCell::new_with_value(Duration::from_secs(1), 25);
thread::sleep(Duration::from_millis(10));
let mut second = TtlCell::new_with_value(Duration::from_secs(1), 25);
assert!(first
.take_with(&mut second, Duration::from_millis(100))
.is_some());
let mut first = TtlCell::new_with_value(Duration::from_secs(1), 25);
thread::sleep(Duration::from_millis(100));
let mut second = TtlCell::new_with_value(Duration::from_secs(1), 25);
assert!(first
.take_with(&mut second, Duration::from_millis(50))
.is_none());
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论