提交 1540c8ec authored 作者: Serhij S's avatar Serhij S

Latest DeliveryPolicy

上级 f6fe0aa1
......@@ -27,7 +27,7 @@ oneshot = { version = "0.1.6", default-features = false, features = ["std"] }
parking_lot = "0.12.1"
pin-project = "1.1.5"
rmodbus = { version = "0.9.3" }
roboplc-derive = { version = "0.1.3" }
roboplc-derive = { version = "0.1.4" }
serde = { version = "1.0.197", features = ["derive", "rc"] }
serial = "0.4.0"
sysinfo = "0.30.6"
......
......@@ -61,6 +61,8 @@ channels.
Data policies supported:
* **Always** a frame is always delivered
* **Latest** a frame is always delivered, previous are dropped if no room
(acts like a ring-buffer)
* **Optional** a frame can be skipped if no room
* **Single** a frame must be delivered only once (the latest one)
* **SingleOptional** a frame must be delivered only once (the latest one) and
......
......@@ -116,6 +116,9 @@ impl Error {
pub fn invalid_data<S: fmt::Display>(msg: S) -> Self {
Error::InvalidData(msg.to_string())
}
pub fn io<S: fmt::Display>(msg: S) -> Self {
Error::IO(msg.to_string())
}
}
/// Data delivery policies, used by [`hub::Hub`], [`pchannel::Receiver`] and [`pdeque::Deque`]
......@@ -124,6 +127,8 @@ pub enum DeliveryPolicy {
#[default]
/// always deliver, fail if no room (default)
Always,
/// always deliver, drop the previous if no room (act as a ring-buffer)
Latest,
/// skip delivery if no room
Optional,
/// always deliver the frame but always in a single copy (latest)
......@@ -153,6 +158,7 @@ impl fmt::Display for DeliveryPolicy {
"{}",
match self {
DeliveryPolicy::Always => "always",
DeliveryPolicy::Latest => "latest",
DeliveryPolicy::Optional => "optional",
DeliveryPolicy::Single => "single",
DeliveryPolicy::SingleOptional => "single-optional",
......
......@@ -68,6 +68,18 @@ where
if value.is_delivery_policy_single() {
self.data.retain(|d| !d.eq_kind(&value) && !d.is_expired());
}
macro_rules! push_final {
() => {
if self.data.len() < self.capacity {
push!()
} else {
TryPushOutput {
pushed: false,
value: Some(value),
}
}
};
}
if self.data.len() < self.capacity {
push!()
} else {
......@@ -84,14 +96,24 @@ where
true
}
});
if self.data.len() < self.capacity {
push!()
} else {
TryPushOutput {
pushed: false,
value: Some(value),
push_final!()
}
DeliveryPolicy::Latest => {
let mut entry_removed = false;
self.data.retain(|d| {
if entry_removed {
true
} else if d.is_expired()
|| d.is_delivery_policy_optional()
|| d.eq_kind(&value)
{
entry_removed = true;
false
} else {
true
}
});
push_final!()
}
DeliveryPolicy::Optional | DeliveryPolicy::SingleOptional => TryPushOutput {
pushed: false,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论