提交 60bb0618 authored 作者: Serhij S's avatar Serhij S

moved to RTSC (TODO: move async channel)

上级 3186cd94
[package]
name = "roboplc"
version = "0.2.1"
version = "0.3.0"
edition = "2021"
authors = ["Serhij S. <div@altertech.com>"]
license = "Apache-2.0"
......@@ -26,7 +26,7 @@ object-id = "0.1.3"
oneshot = { version = "0.1.6", default-features = false, features = ["std"] }
pin-project = "1.1.5"
rmodbus = { version = "0.9.4", optional = true }
roboplc-derive = { version = "0.1" }
roboplc-derive = { version = "0.3" }
serde = { version = "1.0.197", features = ["derive", "rc"] }
serial = "0.4.0"
sysinfo = "0.29"
......@@ -44,6 +44,7 @@ metrics-exporter-prometheus = { version = "0.14.0", optional = true, default-fea
metrics = { version = "0.22.3", optional = true }
parking_lot_rt = "0.12.1"
snmp = { version = "0.2.2", optional = true }
rtsc = "0.1.4"
[features]
eapi = ["eva-common", "eva-sdk", "busrt", "tokio", "hostname"]
......
use std::collections::VecDeque;
use parking_lot_rt::Mutex;
/// A capacity-limited thread-safe deque-based data buffer
pub struct DataBuffer<T> {
data: Mutex<VecDeque<T>>,
capacity: usize,
}
impl<T> DataBuffer<T> {
/// # Panics
///
/// Will panic if the capacity is zero
#[inline]
pub fn bounded(capacity: usize) -> Self {
assert!(capacity > 0, "data buffer capacity MUST be > 0");
Self {
data: <_>::default(),
capacity,
}
}
/// Tries to push the value
/// returns the value back if not pushed
pub fn try_push(&self, value: T) -> Option<T> {
let mut buf = self.data.lock();
if buf.len() >= self.capacity {
return Some(value);
}
buf.push_back(value);
None
}
/// Forcibly pushes the value, removing the first element if necessary
///
/// returns true in case the buffer had enough capacity or false if the first element had been
/// removed
pub fn force_push(&self, value: T) -> bool {
let mut buf = self.data.lock();
let mut res = true;
while buf.len() >= self.capacity {
buf.pop_front();
res = false;
}
buf.push_back(value);
res
}
/// the current buffer length (number of elements)
pub fn len(&self) -> usize {
self.data.lock().len()
}
/// is the buffer empty
pub fn is_empty(&self) -> bool {
self.data.lock().is_empty()
}
/// takes the buffer content and keeps nothing inside
pub fn take(&self) -> VecDeque<T> {
std::mem::take(&mut *self.data.lock())
}
}
use parking_lot_rt::MutexGuard;
use rtsc::data_policy::DataDeliveryPolicy;
use std::{
io::{Read, Write},
net::SocketAddr,
......@@ -6,7 +7,7 @@ use std::{
time::Duration,
};
use crate::{DataDeliveryPolicy, Result};
use crate::Result;
pub mod serial; // Serial communications
pub mod tcp; // TCP communications
......
......@@ -13,10 +13,11 @@ use crate::{
suicide,
supervisor::Supervisor,
thread_rt::{Builder, RTParams, Scheduling},
DataDeliveryPolicy, Error, Result,
Error, Result,
};
use parking_lot_rt::RwLock;
pub use roboplc_derive::WorkerOpts;
use rtsc::data_policy::DataDeliveryPolicy;
use signal_hook::{
consts::{SIGINT, SIGTERM},
iterator::Signals,
......
use std::sync::Arc;
use parking_lot_rt::Mutex;
use rtsc::data_policy::DataDeliveryPolicy;
use crate::pchannel::{self, Receiver, Sender};
use crate::{DataDeliveryPolicy, Error, Result};
use crate::{Error, Result};
use self::prelude::DataChannel;
......@@ -12,8 +13,8 @@ type ConditionFunction<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
pub mod prelude {
pub use super::Hub;
pub use crate::event_matches;
pub use crate::pchannel::DataChannel;
pub use crate::{DataDeliveryPolicy, DeliveryPolicy};
pub use rtsc::data_policy::{DataDeliveryPolicy, DeliveryPolicy};
pub use rtsc::DataChannel;
}
pub const DEFAULT_PRIORITY: usize = 100;
......@@ -99,8 +100,9 @@ impl<T: DataDeliveryPolicy + Clone> Hub<T> {
macro_rules! send_checked {
($sub: expr, $msg: expr) => {
if let Err(e) = $sub.tx.send($msg) {
if !error_handler(&$sub.name, &e) {
return Err(Error::HubSend(e.into()));
let err = e.into();
if !error_handler(&$sub.name, &err) {
return Err(Error::HubSend(err.into()));
}
}
};
......@@ -202,18 +204,18 @@ impl<T> DataChannel<T> for Hub<T>
where
T: DataDeliveryPolicy + Clone,
{
fn send(&self, message: T) -> Result<()> {
fn send(&self, message: T) -> rtsc::Result<()> {
self.send(message);
Ok(())
}
fn recv(&self) -> Result<T> {
Err(Error::Unimplemented)
fn recv(&self) -> rtsc::Result<T> {
Err(rtsc::Error::Unimplemented)
}
fn try_recv(&self) -> Result<T> {
Err(Error::Unimplemented)
fn try_recv(&self) -> rtsc::Result<T> {
Err(rtsc::Error::Unimplemented)
}
fn try_send(&self, _message: T) -> Result<()> {
Err(Error::Unimplemented)
fn try_send(&self, _message: T) -> rtsc::Result<()> {
Err(rtsc::Error::Unimplemented)
}
}
......@@ -221,18 +223,18 @@ impl<T> DataChannel<T> for Client<T>
where
T: DataDeliveryPolicy + Clone,
{
fn send(&self, message: T) -> Result<()> {
fn send(&self, message: T) -> rtsc::Result<()> {
self.send(message);
Ok(())
}
fn recv(&self) -> Result<T> {
self.recv()
fn recv(&self) -> rtsc::Result<T> {
self.recv().map_err(Into::into)
}
fn try_recv(&self) -> Result<T> {
self.try_recv()
fn try_recv(&self) -> rtsc::Result<T> {
self.try_recv().map_err(Into::into)
}
fn try_send(&self, _message: T) -> Result<()> {
Err(Error::Unimplemented)
fn try_send(&self, _message: T) -> rtsc::Result<()> {
Err(rtsc::Error::Unimplemented)
}
}
......@@ -269,11 +271,11 @@ impl<T: DataDeliveryPolicy + Clone> Client<T> {
}
/// Receives a message from the hub (blocking)
pub fn recv(&self) -> Result<T> {
self.rx.recv()
self.rx.recv().map_err(Into::into)
}
/// Receives a message from the hub (non-blocking)
pub fn try_recv(&self) -> Result<T> {
self.rx.try_recv()
self.rx.try_recv().map_err(Into::into)
}
}
......@@ -360,7 +362,9 @@ struct Subscription<T: DataDeliveryPolicy + Clone> {
#[cfg(test)]
mod test {
use crate::{event_matches, DataDeliveryPolicy};
use rtsc::data_policy::DataDeliveryPolicy;
use crate::event_matches;
use super::Hub;
......
......@@ -2,22 +2,26 @@
use core::{fmt, num};
use std::io::Write;
use std::panic::PanicInfo;
use std::{env, mem, str::FromStr, sync::Arc, time::Duration};
use std::{env, sync::Arc, time::Duration};
use colored::Colorize as _;
#[cfg(target_os = "linux")]
use thread_rt::{RTParams, Scheduling};
pub use log::LevelFilter;
pub use roboplc_derive::DataPolicy;
pub use rtsc::{DataChannel, DataPolicy};
pub use parking_lot_rt as locking;
#[cfg(feature = "metrics")]
pub use metrics;
/// Event buffers
pub mod buf;
pub use rtsc::buf;
pub use rtsc::pchannel;
pub use rtsc::time;
pub use rtsc::data_policy::{DataDeliveryPolicy, DeliveryPolicy};
/// Reliable TCP/Serial communications
pub mod comm;
/// Controller and workers
......@@ -27,26 +31,16 @@ pub mod controller;
pub mod hub;
/// In-process data communication pub/sub hub, asynchronous edition
pub mod hub_async;
/// Async policy channel
pub mod pchannel_async;
/// I/O
pub mod io;
/// Policy-based channels, synchronous edition
pub mod pchannel;
/// Policy-based channels, asynchronous edition
pub mod pchannel_async;
/// Policy-based data storages
pub mod pdeque;
/// A lighweight real-time safe semaphore
pub mod semaphore;
/// Task supervisor to manage real-time threads
#[cfg(target_os = "linux")]
pub mod supervisor;
/// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone
#[cfg(target_os = "linux")]
pub mod thread_rt;
/// Various time tools for real-time applications
pub mod time;
/// A memory cell with an expiring value
pub mod ttlcell;
pub type Result<T> = std::result::Result<T, Error>;
......@@ -116,6 +110,36 @@ pub enum Error {
Failed(String),
}
impl From<rtsc::Error> for Error {
fn from(err: rtsc::Error) -> Self {
match err {
rtsc::Error::ChannelFull => Error::ChannelFull,
rtsc::Error::ChannelSkipped => Error::ChannelSkipped,
rtsc::Error::ChannelClosed => Error::ChannelClosed,
rtsc::Error::ChannelEmpty => Error::ChannelEmpty,
rtsc::Error::Unimplemented => Error::Unimplemented,
rtsc::Error::Timeout => Error::Timeout,
rtsc::Error::InvalidData(msg) => Error::InvalidData(msg),
rtsc::Error::Failed(msg) => Error::Failed(msg),
}
}
}
impl From<Error> for rtsc::Error {
fn from(err: Error) -> Self {
match err {
Error::ChannelFull => rtsc::Error::ChannelFull,
Error::ChannelSkipped => rtsc::Error::ChannelSkipped,
Error::ChannelClosed => rtsc::Error::ChannelClosed,
Error::ChannelEmpty => rtsc::Error::ChannelEmpty,
Error::Unimplemented => rtsc::Error::Unimplemented,
Error::Timeout => rtsc::Error::Timeout,
Error::InvalidData(msg) => rtsc::Error::InvalidData(msg),
_ => rtsc::Error::Failed(err.to_string()),
}
}
}
macro_rules! impl_error {
($t: ty, $key: ident) => {
impl From<$t> for Error {
......@@ -149,87 +173,6 @@ impl Error {
}
}
/// Data delivery policies, used by [`hub::Hub`], [`pchannel::Receiver`] and [`pdeque::Deque`]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
pub enum DeliveryPolicy {
#[default]
/// always deliver, fail if no room (default)
Always,
/// always deliver, drop the previous if no room (act as a ring-buffer)
Latest,
/// skip delivery if no room
Optional,
/// always deliver the frame but always in a single copy (latest)
Single,
/// deliver a single latest copy, skip if no room
SingleOptional,
}
impl FromStr for DeliveryPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"always" => Ok(DeliveryPolicy::Always),
"optional" => Ok(DeliveryPolicy::Optional),
"single" => Ok(DeliveryPolicy::Single),
"single-optional" => Ok(DeliveryPolicy::SingleOptional),
_ => Err(Error::invalid_data(s)),
}
}
}
impl fmt::Display for DeliveryPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
DeliveryPolicy::Always => "always",
DeliveryPolicy::Latest => "latest",
DeliveryPolicy::Optional => "optional",
DeliveryPolicy::Single => "single",
DeliveryPolicy::SingleOptional => "single-optional",
}
)
}
}
/// Implements delivery policies for own data types
pub trait DataDeliveryPolicy
where
Self: Sized,
{
/// Delivery policy, the default is [`DeliveryPolicy::Always`]
fn delivery_policy(&self) -> DeliveryPolicy {
DeliveryPolicy::Always
}
/// Priority, for ordered, lower is better, the default is 100
fn priority(&self) -> usize {
100
}
/// Has equal kind with other
///
/// (default: check enum discriminant)
fn eq_kind(&self, other: &Self) -> bool {
mem::discriminant(self) == mem::discriminant(other)
}
/// If a frame expires during storing/delivering, it is not delivered
fn is_expired(&self) -> bool {
false
}
#[doc(hidden)]
fn is_delivery_policy_single(&self) -> bool {
let dp = self.delivery_policy();
dp == DeliveryPolicy::Single || dp == DeliveryPolicy::SingleOptional
}
#[doc(hidden)]
fn is_delivery_policy_optional(&self) -> bool {
let dp = self.delivery_policy();
dp == DeliveryPolicy::Optional || dp == DeliveryPolicy::SingleOptional
}
}
/// Immediately kills the current process and all its subprocesses with a message to stderr
#[cfg(target_os = "linux")]
pub fn critical(msg: &str) -> ! {
......@@ -296,11 +239,6 @@ fn panic(info: &PanicInfo) -> ! {
}
}
impl DataDeliveryPolicy for () {}
impl DataDeliveryPolicy for usize {}
impl DataDeliveryPolicy for String {}
impl<T> DataDeliveryPolicy for Vec<T> {}
/// Returns true if started in production mode (as a systemd unit)
pub fn is_production() -> bool {
env::var("INVOCATION_ID").map_or(false, |v| !v.is_empty())
......@@ -328,8 +266,7 @@ pub mod prelude {
#[cfg(target_os = "linux")]
pub use crate::supervisor::prelude::*;
pub use crate::time::DurationRT;
pub use crate::ttlcell::TtlCell;
pub use bma_ts::{Monotonic, Timestamp};
pub use roboplc_derive::DataPolicy;
pub use rtsc::DataPolicy;
pub use std::time::Duration;
}
差异被折叠。
......@@ -10,10 +10,11 @@ use std::{
task::{Context, Poll, Waker},
};
use crate::{pdeque::Deque, DataDeliveryPolicy, Error, Result};
use crate::{DataDeliveryPolicy, Error, Result};
use object_id::UniqueId;
use parking_lot_rt::{Condvar, Mutex};
use pin_project::{pin_project, pinned_drop};
use rtsc::{data_policy::StorageTryPushOutput, pdeque::Deque};
type ClientId = usize;
......@@ -250,15 +251,15 @@ where
}
if pc.send_fut_wakers.is_empty() || self.queued {
let push_result = pc.queue.try_push(self.value.take().unwrap());
if let Some(val) = push_result.value {
if let StorageTryPushOutput::Full(val) = push_result {
self.value = Some(val);
} else {
self.queued = false;
pc.notify_data_sent();
return Poll::Ready(if push_result.pushed {
Ok(())
} else {
Err(Error::ChannelSkipped)
return Poll::Ready(match push_result {
StorageTryPushOutput::Pushed => Ok(()),
StorageTryPushOutput::Skipped => Err(Error::ChannelSkipped),
StorageTryPushOutput::Full(_) => unreachable!(),
});
}
}
......@@ -294,16 +295,13 @@ where
if pc.receivers == 0 {
return Err(Error::ChannelClosed);
}
let push_result = pc.queue.try_push(value);
if push_result.value.is_none() {
pc.notify_data_sent();
if push_result.pushed {
match pc.queue.try_push(value) {
StorageTryPushOutput::Pushed => {
pc.notify_data_sent();
Ok(())
} else {
Err(Error::ChannelSkipped)
}
} else {
Err(Error::ChannelFull)
StorageTryPushOutput::Skipped => Err(Error::ChannelSkipped),
StorageTryPushOutput::Full(_) => Err(Error::ChannelFull),
}
}
pub fn send_blocking(&self, mut value: T) -> Result<()> {
......@@ -313,18 +311,18 @@ where
return Err(Error::ChannelClosed);
}
let push_result = pc.queue.try_push(value);
let Some(val) = push_result.value else {
break push_result.pushed;
let StorageTryPushOutput::Full(val) = push_result else {
break push_result;
};
value = val;
pc.append_send_sync_waker();
self.channel.0.space_available.wait(&mut pc);
};
pc.wake_next_recv();
if pushed {
Ok(())
} else {
Err(Error::ChannelSkipped)
match pushed {
StorageTryPushOutput::Pushed => Ok(()),
StorageTryPushOutput::Skipped => Err(Error::ChannelSkipped),
StorageTryPushOutput::Full(_) => unreachable!(),
}
}
#[inline]
......
use std::collections::VecDeque;
use crate::{DataDeliveryPolicy, DeliveryPolicy};
/// A deque which stores values with respect of [`DataDeliveryPolicy`]
#[derive(Clone, Debug)]
pub struct Deque<T>
where
T: DataDeliveryPolicy,
{
data: VecDeque<T>,
capacity: usize,
ordered: bool,
}
/// Result payload of try_push operation
pub struct TryPushOutput<T> {
/// has the value been really pushed
pub pushed: bool,
/// the value in case if push failed but the value kind is not an optional
pub value: Option<T>,
}
impl<T> Deque<T>
where
T: DataDeliveryPolicy,
{
/// Creates a new bounded deque
#[inline]
pub fn bounded(capacity: usize) -> Self {
Self {
data: VecDeque::with_capacity(capacity),
capacity,
ordered: false,
}
}
/// Enabled/disables priority ordering, can be used as a build pattern
#[inline]
pub fn set_ordering(mut self, v: bool) -> Self {
self.ordered = v;
self
}
/// Tries to store the value
///
/// Returns the value back if there is no capacity even after all [`DataDeliveryPolicy`]
/// rules have been applied
///
/// Note: expired values are dropped and the operation returns: pushed=true
pub fn try_push(&mut self, value: T) -> TryPushOutput<T> {
macro_rules! push {
() => {{
self.data.push_back(value);
if self.ordered {
sort_by_priority(&mut self.data);
}
TryPushOutput {
pushed: true,
value: None,
}
}};
}
if value.is_expired() {
return TryPushOutput {
pushed: true,
value: None,
};
}
if value.is_delivery_policy_single() {
self.data.retain(|d| !d.eq_kind(&value) && !d.is_expired());
}
macro_rules! push_final {
() => {
if self.data.len() < self.capacity {
push!()
} else {
TryPushOutput {
pushed: false,
value: Some(value),
}
}
};
}
if self.data.len() < self.capacity {
push!()
} else {
match value.delivery_policy() {
DeliveryPolicy::Always | DeliveryPolicy::Single => {
let mut entry_removed = false;
self.data.retain(|d| {
if entry_removed {
true
} else if d.is_expired() || d.is_delivery_policy_optional() {
entry_removed = true;
false
} else {
true
}
});
push_final!()
}
DeliveryPolicy::Latest => {
let mut entry_removed = false;
self.data.retain(|d| {
if entry_removed {
true
} else if d.is_expired()
|| d.is_delivery_policy_optional()
|| d.eq_kind(&value)
{
entry_removed = true;
false
} else {
true
}
});
push_final!()
}
DeliveryPolicy::Optional | DeliveryPolicy::SingleOptional => TryPushOutput {
pushed: false,
value: None,
},
}
}
}
/// Returns the first available value, ignores expired ones
#[inline]
pub fn get(&mut self) -> Option<T> {
loop {
let value = self.data.pop_front();
if let Some(ref val) = value {
if !val.is_expired() {
break value;
}
} else {
break None;
}
}
}
/// Clears the deque
#[inline]
pub fn clear(&mut self) {
self.data.clear();
}
/// Returns number of elements in deque
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() == self.capacity
}
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
fn sort_by_priority<T: DataDeliveryPolicy>(v: &mut VecDeque<T>) {
v.rotate_right(v.as_slices().1.len());
assert!(v.as_slices().1.is_empty());
v.as_mut_slices()
.0
.sort_by(|a, b| a.priority().partial_cmp(&b.priority()).unwrap());
}
use std::sync::Arc;
use parking_lot_rt::{Condvar, Mutex};
/// A lightweight real-time safe semaphore
pub struct Semaphore {
inner: Arc<SemaphoreInner>,
}
impl Semaphore {
pub fn new(capacity: usize) -> Self {
Self {
inner: SemaphoreInner {
permissions: <_>::default(),
capacity,
cv: Condvar::new(),
}
.into(),
}
}
/// Tries to acquire permission, returns None if failed
pub fn try_acquire(&self) -> Option<SemaphoreGuard> {
let mut count = self.inner.permissions.lock();
if *count == self.inner.capacity {
return None;
}
*count += 1;
Some(SemaphoreGuard {
inner: self.inner.clone(),
})
}
/// Acquires permission, blocks until it is available
pub fn acquire(&self) -> SemaphoreGuard {
let mut count = self.inner.permissions.lock();
while *count == self.inner.capacity {
self.inner.cv.wait(&mut count);
}
*count += 1;
SemaphoreGuard {
inner: self.inner.clone(),
}
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
pub fn available(&self) -> usize {
self.inner.capacity - *self.inner.permissions.lock()
}
pub fn used(&self) -> usize {
*self.inner.permissions.lock()
}
/// For tests only
#[allow(dead_code)]
fn is_poisoned(&self) -> bool {
*self.inner.permissions.lock() > self.inner.capacity
}
}
struct SemaphoreInner {
permissions: Mutex<usize>,
capacity: usize,
cv: Condvar,
}
impl SemaphoreInner {
fn release(&self) {
let mut count = self.permissions.lock();
*count -= 1;
self.cv.notify_one();
}
}
#[allow(clippy::module_name_repetitions)]
pub struct SemaphoreGuard {
inner: Arc<SemaphoreInner>,
}
impl Drop for SemaphoreGuard {
fn drop(&mut self) {
self.inner.release();
}
}
#[cfg(test)]
mod test {
use std::time::Instant;
use super::*;
#[test]
fn test_semaphore() {
let sem = Semaphore::new(2);
assert_eq!(sem.capacity(), 2);
assert_eq!(sem.available(), 2);
assert_eq!(sem.used(), 0);
let _g1 = sem.acquire();
assert_eq!(sem.available(), 1);
assert_eq!(sem.used(), 1);
let _g2 = sem.acquire();
assert_eq!(sem.available(), 0);
assert_eq!(sem.used(), 2);
let g3 = sem.try_acquire();
assert!(g3.is_none());
drop(_g1);
assert_eq!(sem.available(), 1);
assert_eq!(sem.used(), 1);
let _g4 = sem.acquire();
assert_eq!(sem.available(), 0);
assert_eq!(sem.used(), 2);
}
#[test]
fn test_semaphore_multithread() {
let start = Instant::now();
let sem = Semaphore::new(10);
let mut tasks = Vec::new();
for _ in 0..100 {
let perm = sem.acquire();
tasks.push(std::thread::spawn(move || {
let _perm = perm;
std::thread::sleep(std::time::Duration::from_millis(1));
}));
}
'outer: loop {
for task in &tasks {
std::hint::spin_loop();
assert!(!sem.is_poisoned(), "Semaphore is poisoned");
if !task.is_finished() {
continue 'outer;
}
}
break 'outer;
}
assert!(start.elapsed().as_millis() > 10);
}
}
use std::{thread, time::Duration};
use bma_ts::Monotonic;
/// A trait which extends the standard [`Duration`] and similar types with additional methods
///
pub trait DurationRT {
/// Returns true if all provided [`Monotonic`] times fit the duration
fn fits(&self, t: &[Monotonic]) -> bool;
/// Returns the absolute difference between two durations (provided until abs_diff become
/// stable)
fn diff_abs(&self, other: Self) -> Duration;
}
impl DurationRT for Duration {
fn fits(&self, t: &[Monotonic]) -> bool {
if t.is_empty() {
true
} else {
let min_ts = t.iter().min().unwrap();
let max_ts = t.iter().max().unwrap();
max_ts.as_duration() - min_ts.as_duration() <= *self
}
}
fn diff_abs(&self, other: Self) -> Duration {
if *self > other {
*self - other
} else {
other - *self
}
}
}
pub fn interval(period: Duration) -> Interval {
Interval::new(period)
}
/// A synchronous interval helper, similar to
/// <https://docs.rs/tokio/latest/tokio/time/struct.Interval.html>
pub struct Interval {
next_tick: Option<Monotonic>,
period: Duration,
missing_tick_behavior: MissedTickBehavior,
}
impl Iterator for Interval {
type Item = bool;
fn next(&mut self) -> Option<bool> {
Some(self.tick())
}
}
impl Interval {
pub fn new(period: Duration) -> Self {
Self {
next_tick: None,
period,
missing_tick_behavior: <_>::default(),
}
}
/// Ticks the interval
///
/// Returns false if a tick is missed
pub fn tick(&mut self) -> bool {
let now = Monotonic::now();
if let Some(mut next_tick) = self.next_tick {
match now.cmp(&next_tick) {
std::cmp::Ordering::Less => {
let to_sleep = next_tick - now;
self.next_tick = Some(next_tick + self.period);
thread::sleep(to_sleep);
true
}
std::cmp::Ordering::Equal => true,
std::cmp::Ordering::Greater => {
match self.missing_tick_behavior {
MissedTickBehavior::Burst => {
self.next_tick = Some(next_tick + self.period);
}
MissedTickBehavior::Delay => {
self.next_tick = Some(now + self.period);
}
MissedTickBehavior::Skip => {
while next_tick <= now {
next_tick += self.period;
}
self.next_tick = Some(next_tick);
}
}
false
}
}
} else {
self.next_tick = Some(now + self.period);
true
}
}
/// Sets missing tick behavior policy. Can be used as a build pattern
pub fn set_missing_tick_behavior(mut self, missing_tick_behavior: MissedTickBehavior) -> Self {
self.missing_tick_behavior = missing_tick_behavior;
self
}
}
/// Interval missing tick behavior
///
/// The behavior is similar to
/// <https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html>
/// but may differ in some details
#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
pub enum MissedTickBehavior {
#[default]
/// `[Interval::tick()`] method has no delay for missed intervals, all the missed ones are
/// fired instantly
Burst,
/// The interval is restarted from the current point of time
Delay,
/// Missed ticks are skipped with no additional effect
Skip,
}
#[cfg(test)]
mod test {
use std::{thread, time::Duration};
use bma_ts::Monotonic;
use crate::time::DurationRT as _;
#[test]
fn test_fits() {
let first = Monotonic::now();
thread::sleep(Duration::from_millis(10));
let second = Monotonic::now();
thread::sleep(Duration::from_millis(10));
let third = Monotonic::now();
assert!(Duration::from_millis(100).fits(&[first, second, third]));
assert!(Duration::from_millis(25).fits(&[first, second, third]));
}
}
use bma_ts::Monotonic;
use core::fmt;
use std::{ops::Deref, time::Duration};
/// A memory ce;ll with an expiring value with API similar to the standard [`Option`]
pub struct TtlCell<T> {
value: Option<T>,
ttl: Duration,
set_at: Monotonic,
}
impl<T> fmt::Debug for TtlCell<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.as_ref() {
Some(v) => write!(f, "Some({:?})", v),
None => write!(f, "None"),
}
}
}
impl<T> PartialEq for TtlCell<T>
where
T: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()
}
}
impl<T> Eq for TtlCell<T> where T: Eq {}
impl<T> Clone for TtlCell<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
ttl: self.ttl,
set_at: self.set_at,
}
}
}
impl<T> TtlCell<T> {
/// Creates a new empty cell
#[inline]
pub fn new(ttl: Duration) -> Self {
Self {
value: None,
ttl,
set_at: Monotonic::now(),
}
}
/// Creates a new empty cell with a value set
#[inline]
pub fn new_with_value(ttl: Duration, value: T) -> Self {
Self {
value: Some(value),
ttl,
set_at: Monotonic::now(),
}
}
/// Replaces the current value, returns the previous one. The value set time is set to the
/// current time point
#[inline]
pub fn replace(&mut self, value: T) -> Option<T> {
let prev = self.value.replace(value);
let result = if self.is_expired() { None } else { prev };
self.touch();
result
}
/// Sets the current value. The value set time is set to the current time point
#[inline]
pub fn set(&mut self, value: T) {
self.value = Some(value);
self.touch();
}
/// Clears the current value
#[inline]
pub fn clear(&mut self) {
self.value = None;
}
/// Returns a refernce to the value if set and not expired
#[inline]
pub fn as_ref(&self) -> Option<&T> {
if self.is_expired() {
None
} else {
self.value.as_ref()
}
}
/// A value ref-coupler
///
/// Returns two references to two [`TtlCell`] values in case if both of them are not expired and
/// set time difference matches "max_time_delta" parameter
#[inline]
pub fn as_ref_with<'a, O>(
&'a self,
other: &'a TtlCell<O>,
max_time_delta: Duration,
) -> Option<(&T, &O)> {
let maybe_first = self.as_ref();
let maybe_second = other.as_ref();
if let Some(first) = maybe_first {
if let Some(second) = maybe_second {
if self.set_at.abs_diff(other.set_at) <= max_time_delta {
return Some((first, second));
}
}
}
None
}
/// Takes the value if set and not expired, clears the cell
#[inline]
pub fn take(&mut self) -> Option<T> {
if self.is_expired() {
None
} else {
self.value.take()
}
}
/// A value take-coupler
///
/// Takes two [`TtlCell`] values in case if both of them are not expired and set time
/// difference matches "max_time_delta" parameter. Both cells are cleared
#[inline]
pub fn take_with<O>(
&mut self,
other: &mut TtlCell<O>,
max_time_delta: Duration,
) -> Option<(T, O)> {
let maybe_first = self.take();
let maybe_second = other.take();
if let Some(first) = maybe_first {
if let Some(second) = maybe_second {
if self.set_at.abs_diff(other.set_at) <= max_time_delta {
return Some((first, second));
}
}
}
None
}
/// Returns a derefernce to the value if set and not expired
#[inline]
pub fn as_deref(&self) -> Option<&T::Target>
where
T: Deref,
{
match self.as_ref() {
Some(t) => Some(&**t),
None => None,
}
}
/// Returns true if the value is expired or not set
#[inline]
pub fn is_expired(&self) -> bool {
self.set_at.elapsed() > self.ttl || self.value.is_none()
}
/// Updates the value set time to the current point of time
#[inline]
pub fn touch(&mut self) {
self.set_at = Monotonic::now();
}
/// Returns the value set time (monotonic)
#[inline]
pub fn set_at(&self) -> Monotonic {
self.set_at
}
}
#[cfg(test)]
mod test {
use std::{thread, time::Duration};
use super::TtlCell;
#[test]
fn test_get_set() {
let ttl = Duration::from_millis(10);
let mut opt = TtlCell::new_with_value(ttl, 25);
thread::sleep(ttl / 2);
insta::assert_debug_snapshot!(opt.as_ref().copied(), @r###"
Some(
25,
)
"###);
thread::sleep(ttl);
insta::assert_debug_snapshot!(opt.as_ref().copied(), @"None");
opt.set(30);
thread::sleep(ttl / 2);
insta::assert_debug_snapshot!(opt.as_ref().copied(), @r###"
Some(
30,
)
"###);
thread::sleep(ttl);
insta::assert_debug_snapshot!(opt.as_ref().copied(), @"None");
}
#[test]
fn test_take_replace() {
let ttl = Duration::from_millis(10);
let mut opt = TtlCell::new_with_value(ttl, 25);
thread::sleep(ttl / 2);
insta::assert_debug_snapshot!(opt.take(), @r###"
Some(
25,
)
"###);
insta::assert_debug_snapshot!(opt.as_ref().copied(), @"None");
opt.set(30);
thread::sleep(ttl / 2);
insta::assert_debug_snapshot!(opt.replace(29), @r###"
Some(
30,
)
"###);
thread::sleep(ttl);
insta::assert_debug_snapshot!(opt.as_ref().copied(), @"None");
}
#[test]
fn test_take_with() {
let mut first = TtlCell::new_with_value(Duration::from_secs(1), 25);
thread::sleep(Duration::from_millis(10));
let mut second = TtlCell::new_with_value(Duration::from_secs(1), 25);
insta::assert_debug_snapshot!(first
.take_with(&mut second, Duration::from_millis(100)), @r###"
Some(
(
25,
25,
),
)
"###);
let mut first = TtlCell::new_with_value(Duration::from_secs(1), 25);
thread::sleep(Duration::from_millis(100));
let mut second = TtlCell::new_with_value(Duration::from_secs(1), 25);
insta::assert_debug_snapshot!(
first.take_with(&mut second, Duration::from_millis(50)), @"None");
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论