提交 30a6e082 authored 作者: Serhij S's avatar Serhij S

added send_timeout and recv_timeout methods to pchannel

上级 f8603f18
...@@ -60,6 +60,7 @@ trait Communicator { ...@@ -60,6 +60,7 @@ trait Communicator {
} }
} }
#[allow(clippy::module_name_repetitions)]
pub struct CommReader { pub struct CommReader {
reader: Option<Box<dyn Read + Send + 'static>>, reader: Option<Box<dyn Read + Send + 'static>>,
} }
......
...@@ -61,6 +61,9 @@ pub enum Error { ...@@ -61,6 +61,9 @@ pub enum Error {
/// Hub client with the given name is already registered /// Hub client with the given name is already registered
#[error("hub client already registered: {0}")] #[error("hub client already registered: {0}")]
HubAlreadyRegistered(Arc<str>), HubAlreadyRegistered(Arc<str>),
/// Timeouts
#[error("timed out")]
Timeout,
/// I/O and threading errors /// I/O and threading errors
#[error("I/O error {0}")] #[error("I/O error {0}")]
IO(String), IO(String),
......
use std::sync::Arc; use std::{sync::Arc, time::Duration};
use crate::{pdeque::Deque, DataDeliveryPolicy, Error, Result}; use crate::{pdeque::Deque, DataDeliveryPolicy, Error, Result};
use object_id::UniqueId; use object_id::UniqueId;
...@@ -90,24 +90,27 @@ struct ChannelInner<T: DataDeliveryPolicy> { ...@@ -90,24 +90,27 @@ struct ChannelInner<T: DataDeliveryPolicy> {
} }
impl<T: DataDeliveryPolicy> ChannelInner<T> { impl<T: DataDeliveryPolicy> ChannelInner<T> {
fn try_send(&self, value: T) -> Result<()> { fn send(&self, mut value: T) -> Result<()> {
let mut pc = self.pc.lock(); let mut pc = self.pc.lock();
if pc.receivers == 0 { let pushed = loop {
return Err(Error::ChannelClosed); if pc.receivers == 0 {
} return Err(Error::ChannelClosed);
let push_result = pc.queue.try_push(value);
if push_result.value.is_none() {
self.data_available.notify_one();
if push_result.pushed {
Ok(())
} else {
Err(Error::ChannelSkipped)
} }
let push_result = pc.queue.try_push(value);
let Some(val) = push_result.value else {
break push_result.pushed;
};
value = val;
self.space_available.wait(&mut pc);
};
self.data_available.notify_one();
if pushed {
Ok(())
} else { } else {
Err(Error::ChannelFull) Err(Error::ChannelSkipped)
} }
} }
fn send(&self, mut value: T) -> Result<()> { fn send_timeout(&self, mut value: T, timeout: Duration) -> Result<()> {
let mut pc = self.pc.lock(); let mut pc = self.pc.lock();
let pushed = loop { let pushed = loop {
if pc.receivers == 0 { if pc.receivers == 0 {
...@@ -118,7 +121,9 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> { ...@@ -118,7 +121,9 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> {
break push_result.pushed; break push_result.pushed;
}; };
value = val; value = val;
self.space_available.wait(&mut pc); if self.space_available.wait_for(&mut pc, timeout).timed_out() {
return Err(Error::Timeout);
}
}; };
self.data_available.notify_one(); self.data_available.notify_one();
if pushed { if pushed {
...@@ -127,6 +132,23 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> { ...@@ -127,6 +132,23 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> {
Err(Error::ChannelSkipped) Err(Error::ChannelSkipped)
} }
} }
fn try_send(&self, value: T) -> Result<()> {
let mut pc = self.pc.lock();
if pc.receivers == 0 {
return Err(Error::ChannelClosed);
}
let push_result = pc.queue.try_push(value);
if push_result.value.is_none() {
self.data_available.notify_one();
if push_result.pushed {
Ok(())
} else {
Err(Error::ChannelSkipped)
}
} else {
Err(Error::ChannelFull)
}
}
fn recv(&self) -> Result<T> { fn recv(&self) -> Result<T> {
let mut pc = self.pc.lock(); let mut pc = self.pc.lock();
loop { loop {
...@@ -139,6 +161,20 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> { ...@@ -139,6 +161,20 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> {
self.data_available.wait(&mut pc); self.data_available.wait(&mut pc);
} }
} }
fn recv_timeout(&self, timeout: Duration) -> Result<T> {
let mut pc = self.pc.lock();
loop {
if let Some(val) = pc.queue.get() {
self.space_available.notify_one();
return Ok(val);
} else if pc.senders == 0 {
return Err(Error::ChannelClosed);
}
if self.data_available.wait_for(&mut pc, timeout).timed_out() {
return Err(Error::Timeout);
};
}
}
fn try_recv(&self) -> Result<T> { fn try_recv(&self) -> Result<T> {
let mut pc = self.pc.lock(); let mut pc = self.pc.lock();
if let Some(val) = pc.queue.get() { if let Some(val) = pc.queue.get() {
...@@ -203,6 +239,10 @@ where ...@@ -203,6 +239,10 @@ where
self.channel.0.send(value) self.channel.0.send(value)
} }
#[inline] #[inline]
pub fn send_timeout(&self, value: T, timeout: Duration) -> Result<()> {
self.channel.0.send_timeout(value, timeout)
}
#[inline]
pub fn try_send(&self, value: T) -> Result<()> { pub fn try_send(&self, value: T) -> Result<()> {
self.channel.0.try_send(value) self.channel.0.try_send(value)
} }
...@@ -276,6 +316,10 @@ where ...@@ -276,6 +316,10 @@ where
self.channel.0.recv() self.channel.0.recv()
} }
#[inline] #[inline]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T> {
self.channel.0.recv_timeout(timeout)
}
#[inline]
pub fn try_recv(&self) -> Result<T> { pub fn try_recv(&self) -> Result<T> {
self.channel.0.try_recv() self.channel.0.try_recv()
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论