提交 9eb698c1 authored 作者: Serhij S's avatar Serhij S

docs

上级 56944e5b
...@@ -25,3 +25,57 @@ width="350" /> ...@@ -25,3 +25,57 @@ width="350" />
* thread-safe out-of-the-box * thread-safe out-of-the-box
* frames may be forcibly pushed, overriding the previous ones, like in a ring-buffer. * frames may be forcibly pushed, overriding the previous ones, like in a ring-buffer.
## Hub
[`hub::Hub`] implements a data-hub (in process pub/sub) model, when multiple
clients (usually thread workers) exchange data via a single virtual bus instead
of using direct channels.
This brings some additional overhead into data exchange, however makes the
architecture significantly clearer, lowers code support costs and brings
additional features.
<img
src="https://raw.githubusercontent.com/eva-ics/roboplc/main/schemas/hub.png"
width="550" />
* classic pub/sub patterns with no data serialization overhead
* based on [`pchannel`] which allows to mix different kinds of data and apply
additional policies if required
* a fully passive model with no "server" thread.
## pdeque and pchannel
A policy-based deque [`pdeque::Deque`] is a component to build policy-based
channels.
[`pchannel`] is a channel module, based on the policy-based deque.
Data policies supported:
* **Always** a frame is always delivered
* **Optional** a frame can be skipped if no room
* **Single** a frame must be delivered only once (the latest one)
* **SingleOptional** a frame must be delivered only once (the latest one) and
is optional
Additionally, components support ordering by data priority and automatically
drop expired data if the data type has got an expiration marker method
implemented.
Without policies applied, speed is similar to other popular channel/storage
implementations. With policies data transfer speed can be lower, latency can
rise, however the overall effect is significantly better as the data is
processed directly inside a channel or a storage buffer.
## Real-time
[`thread_rt::Builder`] provides a thread builder component, which extends the
standard thread builder with real-time capabilities: scheduler policies and CPU
affinity (Linux only).
[`supervisor::Supervisor`] provides a lightweight task supervisor to manage
launched threads.
...@@ -3,7 +3,7 @@ use std::sync::Arc; ...@@ -3,7 +3,7 @@ use std::sync::Arc;
use parking_lot::Mutex; use parking_lot::Mutex;
use crate::pchannel::{self, Receiver}; use crate::pchannel::{self, Receiver};
use crate::{pchannel::Sender, MessageDeliveryPolicy}; use crate::{pchannel::Sender, DataDeliveryPolicy};
use crate::{Error, Result}; use crate::{Error, Result};
type ConditionFunction<T> = Box<dyn Fn(&T) -> bool + Send + Sync>; type ConditionFunction<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
...@@ -13,11 +13,11 @@ pub const DEFAULT_PRIORITY: usize = 100; ...@@ -13,11 +13,11 @@ pub const DEFAULT_PRIORITY: usize = 100;
pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024; pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
/// Data communcation hub to implement in-process pub/sub model for thread workers /// Data communcation hub to implement in-process pub/sub model for thread workers
pub struct Hub<T: MessageDeliveryPolicy + Clone> { pub struct Hub<T: DataDeliveryPolicy + Clone> {
inner: Arc<Mutex<HubInner<T>>>, inner: Arc<Mutex<HubInner<T>>>,
} }
impl<T: MessageDeliveryPolicy + Clone> Clone for Hub<T> { impl<T: DataDeliveryPolicy + Clone> Clone for Hub<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
inner: self.inner.clone(), inner: self.inner.clone(),
...@@ -25,7 +25,7 @@ impl<T: MessageDeliveryPolicy + Clone> Clone for Hub<T> { ...@@ -25,7 +25,7 @@ impl<T: MessageDeliveryPolicy + Clone> Clone for Hub<T> {
} }
} }
impl<T: MessageDeliveryPolicy + Clone> Default for Hub<T> { impl<T: DataDeliveryPolicy + Clone> Default for Hub<T> {
fn default() -> Self { fn default() -> Self {
Self { Self {
inner: <_>::default(), inner: <_>::default(),
...@@ -33,7 +33,7 @@ impl<T: MessageDeliveryPolicy + Clone> Default for Hub<T> { ...@@ -33,7 +33,7 @@ impl<T: MessageDeliveryPolicy + Clone> Default for Hub<T> {
} }
} }
impl<T: MessageDeliveryPolicy + Clone> Hub<T> { impl<T: DataDeliveryPolicy + Clone> Hub<T> {
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
...@@ -173,14 +173,14 @@ impl<T: MessageDeliveryPolicy + Clone> Hub<T> { ...@@ -173,14 +173,14 @@ impl<T: MessageDeliveryPolicy + Clone> Hub<T> {
} }
} }
struct HubInner<T: MessageDeliveryPolicy + Clone> { struct HubInner<T: DataDeliveryPolicy + Clone> {
default_channel_capacity: usize, default_channel_capacity: usize,
subscriptions: Vec<Arc<Subscription<T>>>, subscriptions: Vec<Arc<Subscription<T>>>,
} }
impl<T> Default for HubInner<T> impl<T> Default for HubInner<T>
where where
T: MessageDeliveryPolicy + Clone, T: DataDeliveryPolicy + Clone,
{ {
fn default() -> Self { fn default() -> Self {
Self { Self {
...@@ -190,13 +190,13 @@ where ...@@ -190,13 +190,13 @@ where
} }
} }
pub struct Client<T: MessageDeliveryPolicy + Clone> { pub struct Client<T: DataDeliveryPolicy + Clone> {
name: Arc<str>, name: Arc<str>,
hub: Hub<T>, hub: Hub<T>,
rx: Receiver<T>, rx: Receiver<T>,
} }
impl<T: MessageDeliveryPolicy + Clone> Client<T> { impl<T: DataDeliveryPolicy + Clone> Client<T> {
/// Sends a message to hub-subscribed clients, ignores send errors /// Sends a message to hub-subscribed clients, ignores send errors
pub fn send(&self, message: T) { pub fn send(&self, message: T) {
self.hub.send(message); self.hub.send(message);
...@@ -221,13 +221,13 @@ impl<T: MessageDeliveryPolicy + Clone> Client<T> { ...@@ -221,13 +221,13 @@ impl<T: MessageDeliveryPolicy + Clone> Client<T> {
} }
} }
impl<T: MessageDeliveryPolicy + Clone> Drop for Client<T> { impl<T: DataDeliveryPolicy + Clone> Drop for Client<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.hub.unregister(&self.name); self.hub.unregister(&self.name);
} }
} }
pub struct ClientOptions<T: MessageDeliveryPolicy + Clone> { pub struct ClientOptions<T: DataDeliveryPolicy + Clone> {
name: Arc<str>, name: Arc<str>,
priority: usize, priority: usize,
capacity: Option<usize>, capacity: Option<usize>,
...@@ -235,7 +235,7 @@ pub struct ClientOptions<T: MessageDeliveryPolicy + Clone> { ...@@ -235,7 +235,7 @@ pub struct ClientOptions<T: MessageDeliveryPolicy + Clone> {
condition: ConditionFunction<T>, condition: ConditionFunction<T>,
} }
impl<T: MessageDeliveryPolicy + Clone> ClientOptions<T> { impl<T: DataDeliveryPolicy + Clone> ClientOptions<T> {
pub fn new<F>(name: &str, condition: F) -> Self pub fn new<F>(name: &str, condition: F) -> Self
where where
F: Fn(&T) -> bool + Send + Sync + 'static, F: Fn(&T) -> bool + Send + Sync + 'static,
...@@ -295,7 +295,7 @@ macro_rules! event_matches { ...@@ -295,7 +295,7 @@ macro_rules! event_matches {
}; };
} }
struct Subscription<T: MessageDeliveryPolicy + Clone> { struct Subscription<T: DataDeliveryPolicy + Clone> {
name: Arc<str>, name: Arc<str>,
tx: Sender<T>, tx: Sender<T>,
priority: usize, priority: usize,
...@@ -304,7 +304,7 @@ struct Subscription<T: MessageDeliveryPolicy + Clone> { ...@@ -304,7 +304,7 @@ struct Subscription<T: MessageDeliveryPolicy + Clone> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::{event_matches, MessageDeliveryPolicy}; use crate::{event_matches, DataDeliveryPolicy};
use super::Hub; use super::Hub;
...@@ -315,7 +315,7 @@ mod test { ...@@ -315,7 +315,7 @@ mod test {
Test, Test,
} }
impl MessageDeliveryPolicy for Message {} impl DataDeliveryPolicy for Message {}
#[test] #[test]
fn test_hub() { fn test_hub() {
......
...@@ -92,7 +92,7 @@ pub enum DeliveryPolicy { ...@@ -92,7 +92,7 @@ pub enum DeliveryPolicy {
} }
/// Implements delivery policies for own data types /// Implements delivery policies for own data types
pub trait MessageDeliveryPolicy pub trait DataDeliveryPolicy
where where
Self: Sized, Self: Sized,
{ {
......
use std::sync::Arc; use std::sync::Arc;
use crate::{pdeque::Deque, Error, MessageDeliveryPolicy, Result}; use crate::{pdeque::Deque, Error, DataDeliveryPolicy, Result};
use object_id::UniqueId; use object_id::UniqueId;
use parking_lot::{Condvar, Mutex}; use parking_lot::{Condvar, Mutex};
struct Channel<T: MessageDeliveryPolicy>(Arc<ChannelInner<T>>); struct Channel<T: DataDeliveryPolicy>(Arc<ChannelInner<T>>);
impl<T: MessageDeliveryPolicy> Channel<T> { impl<T: DataDeliveryPolicy> Channel<T> {
fn id(&self) -> usize { fn id(&self) -> usize {
self.0.id.as_usize() self.0.id.as_usize()
} }
} }
impl<T: MessageDeliveryPolicy> Eq for Channel<T> {} impl<T: DataDeliveryPolicy> Eq for Channel<T> {}
impl<T: MessageDeliveryPolicy> PartialEq for Channel<T> { impl<T: DataDeliveryPolicy> PartialEq for Channel<T> {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.id() == other.id() self.id() == other.id()
} }
...@@ -22,20 +22,20 @@ impl<T: MessageDeliveryPolicy> PartialEq for Channel<T> { ...@@ -22,20 +22,20 @@ impl<T: MessageDeliveryPolicy> PartialEq for Channel<T> {
impl<T> Clone for Channel<T> impl<T> Clone for Channel<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self(self.0.clone()) Self(self.0.clone())
} }
} }
struct ChannelInner<T: MessageDeliveryPolicy> { struct ChannelInner<T: DataDeliveryPolicy> {
id: UniqueId, id: UniqueId,
pc: Mutex<PolicyChannel<T>>, pc: Mutex<PolicyChannel<T>>,
available: Condvar, available: Condvar,
} }
impl<T: MessageDeliveryPolicy> ChannelInner<T> { impl<T: DataDeliveryPolicy> ChannelInner<T> {
fn try_send(&self, value: T) -> Result<()> { fn try_send(&self, value: T) -> Result<()> {
let mut pc = self.pc.lock(); let mut pc = self.pc.lock();
if pc.receivers == 0 { if pc.receivers == 0 {
...@@ -98,7 +98,7 @@ impl<T: MessageDeliveryPolicy> ChannelInner<T> { ...@@ -98,7 +98,7 @@ impl<T: MessageDeliveryPolicy> ChannelInner<T> {
} }
} }
impl<T: MessageDeliveryPolicy> Channel<T> { impl<T: DataDeliveryPolicy> Channel<T> {
fn new(capacity: usize, ordering: bool) -> Self { fn new(capacity: usize, ordering: bool) -> Self {
Self( Self(
ChannelInner { ChannelInner {
...@@ -111,7 +111,7 @@ impl<T: MessageDeliveryPolicy> Channel<T> { ...@@ -111,7 +111,7 @@ impl<T: MessageDeliveryPolicy> Channel<T> {
} }
} }
struct PolicyChannel<T: MessageDeliveryPolicy> { struct PolicyChannel<T: DataDeliveryPolicy> {
queue: Deque<T>, queue: Deque<T>,
senders: usize, senders: usize,
receivers: usize, receivers: usize,
...@@ -119,7 +119,7 @@ struct PolicyChannel<T: MessageDeliveryPolicy> { ...@@ -119,7 +119,7 @@ struct PolicyChannel<T: MessageDeliveryPolicy> {
impl<T> PolicyChannel<T> impl<T> PolicyChannel<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
fn new(capacity: usize, ordering: bool) -> Self { fn new(capacity: usize, ordering: bool) -> Self {
assert!(capacity > 0, "channel capacity MUST be > 0"); assert!(capacity > 0, "channel capacity MUST be > 0");
...@@ -134,14 +134,14 @@ where ...@@ -134,14 +134,14 @@ where
#[derive(Eq, PartialEq)] #[derive(Eq, PartialEq)]
pub struct Sender<T> pub struct Sender<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
channel: Channel<T>, channel: Channel<T>,
} }
impl<T> Sender<T> impl<T> Sender<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
#[inline] #[inline]
pub fn send(&self, value: T) -> Result<()> { pub fn send(&self, value: T) -> Result<()> {
...@@ -171,7 +171,7 @@ where ...@@ -171,7 +171,7 @@ where
impl<T> Clone for Sender<T> impl<T> Clone for Sender<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
self.channel.0.pc.lock().senders += 1; self.channel.0.pc.lock().senders += 1;
...@@ -183,7 +183,7 @@ where ...@@ -183,7 +183,7 @@ where
impl<T> Drop for Sender<T> impl<T> Drop for Sender<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
fn drop(&mut self) { fn drop(&mut self) {
self.channel.0.pc.lock().senders -= 1; self.channel.0.pc.lock().senders -= 1;
...@@ -194,14 +194,14 @@ where ...@@ -194,14 +194,14 @@ where
#[derive(Eq, PartialEq)] #[derive(Eq, PartialEq)]
pub struct Receiver<T> pub struct Receiver<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
channel: Channel<T>, channel: Channel<T>,
} }
impl<T> Receiver<T> impl<T> Receiver<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
#[inline] #[inline]
pub fn recv(&self) -> Result<T> { pub fn recv(&self) -> Result<T> {
...@@ -231,7 +231,7 @@ where ...@@ -231,7 +231,7 @@ where
impl<T> Clone for Receiver<T> impl<T> Clone for Receiver<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
self.channel.0.pc.lock().receivers += 1; self.channel.0.pc.lock().receivers += 1;
...@@ -243,7 +243,7 @@ where ...@@ -243,7 +243,7 @@ where
impl<T> Drop for Receiver<T> impl<T> Drop for Receiver<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
fn drop(&mut self) { fn drop(&mut self) {
self.channel.0.pc.lock().receivers -= 1; self.channel.0.pc.lock().receivers -= 1;
...@@ -251,7 +251,7 @@ where ...@@ -251,7 +251,7 @@ where
} }
} }
fn make_channel<T: MessageDeliveryPolicy>(ch: Channel<T>) -> (Sender<T>, Receiver<T>) { fn make_channel<T: DataDeliveryPolicy>(ch: Channel<T>) -> (Sender<T>, Receiver<T>) {
let tx = Sender { let tx = Sender {
channel: ch.clone(), channel: ch.clone(),
}; };
...@@ -259,24 +259,24 @@ fn make_channel<T: MessageDeliveryPolicy>(ch: Channel<T>) -> (Sender<T>, Receive ...@@ -259,24 +259,24 @@ fn make_channel<T: MessageDeliveryPolicy>(ch: Channel<T>) -> (Sender<T>, Receive
(tx, rx) (tx, rx)
} }
/// Creates a bounded channel which respects [`MessageDeliveryPolicy`] rules with no message /// Creates a bounded channel which respects [`DataDeliveryPolicy`] rules with no message
/// priority ordering /// priority ordering
/// ///
/// # Panics /// # Panics
/// ///
/// Will panic if the capacity is zero /// Will panic if the capacity is zero
pub fn bounded<T: MessageDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) { pub fn bounded<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let ch = Channel::new(capacity, false); let ch = Channel::new(capacity, false);
make_channel(ch) make_channel(ch)
} }
/// Creates a bounded channel which respects [`MessageDeliveryPolicy`] rules and has got message /// Creates a bounded channel which respects [`DataDeliveryPolicy`] rules and has got message
/// priority ordering turned on /// priority ordering turned on
/// ///
/// # Panics /// # Panics
/// ///
/// Will panic if the capacity is zero /// Will panic if the capacity is zero
pub fn ordered<T: MessageDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) { pub fn ordered<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let ch = Channel::new(capacity, true); let ch = Channel::new(capacity, true);
make_channel(ch) make_channel(ch)
} }
...@@ -285,7 +285,7 @@ pub fn ordered<T: MessageDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receive ...@@ -285,7 +285,7 @@ pub fn ordered<T: MessageDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receive
mod test { mod test {
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use crate::{DeliveryPolicy, MessageDeliveryPolicy}; use crate::{DeliveryPolicy, DataDeliveryPolicy};
use super::bounded; use super::bounded;
...@@ -296,7 +296,7 @@ mod test { ...@@ -296,7 +296,7 @@ mod test {
Spam, Spam,
} }
impl MessageDeliveryPolicy for Message { impl DataDeliveryPolicy for Message {
fn delivery_policy(&self) -> DeliveryPolicy { fn delivery_policy(&self) -> DeliveryPolicy {
match self { match self {
Message::Test(_) => DeliveryPolicy::Always, Message::Test(_) => DeliveryPolicy::Always,
......
use std::collections::VecDeque; use std::collections::VecDeque;
use crate::{DeliveryPolicy, MessageDeliveryPolicy}; use crate::{DeliveryPolicy, DataDeliveryPolicy};
/// A deque which stores values with respect of [`MessageDeliveryPolicy`] /// A deque which stores values with respect of [`DataDeliveryPolicy`]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Deque<T> pub struct Deque<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
data: VecDeque<T>, data: VecDeque<T>,
capacity: usize, capacity: usize,
...@@ -23,7 +23,7 @@ pub struct TryPushOutput<T> { ...@@ -23,7 +23,7 @@ pub struct TryPushOutput<T> {
impl<T> Deque<T> impl<T> Deque<T>
where where
T: MessageDeliveryPolicy, T: DataDeliveryPolicy,
{ {
/// Creates a new bounded deque /// Creates a new bounded deque
#[inline] #[inline]
...@@ -42,7 +42,7 @@ where ...@@ -42,7 +42,7 @@ where
} }
/// Tries to store the value /// Tries to store the value
/// ///
/// Returns the value back if there is no capacity even after all [`MessageDeliveryPolicy`] /// Returns the value back if there is no capacity even after all [`DataDeliveryPolicy`]
/// rules have been applied /// rules have been applied
pub fn try_push(&mut self, value: T) -> TryPushOutput<T> { pub fn try_push(&mut self, value: T) -> TryPushOutput<T> {
macro_rules! push { macro_rules! push {
...@@ -126,7 +126,7 @@ where ...@@ -126,7 +126,7 @@ where
} }
} }
fn sort_by_priority<T: MessageDeliveryPolicy>(v: &mut VecDeque<T>) { fn sort_by_priority<T: DataDeliveryPolicy>(v: &mut VecDeque<T>) {
v.rotate_right(v.as_slices().1.len()); v.rotate_right(v.as_slices().1.len());
assert!(v.as_slices().1.is_empty()); assert!(v.as_slices().1.is_empty());
v.as_mut_slices() v.as_mut_slices()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论