提交 28d34fba authored 作者: Serhij S's avatar Serhij S

channel tuning

上级 696fad18
...@@ -85,7 +85,8 @@ where ...@@ -85,7 +85,8 @@ where
struct ChannelInner<T: DataDeliveryPolicy> { struct ChannelInner<T: DataDeliveryPolicy> {
id: UniqueId, id: UniqueId,
pc: Mutex<PolicyChannel<T>>, pc: Mutex<PolicyChannel<T>>,
available: Condvar, data_available: Condvar,
space_available: Condvar,
} }
impl<T: DataDeliveryPolicy> ChannelInner<T> { impl<T: DataDeliveryPolicy> ChannelInner<T> {
...@@ -96,7 +97,7 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> { ...@@ -96,7 +97,7 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> {
} }
let push_result = pc.queue.try_push(value); let push_result = pc.queue.try_push(value);
if push_result.value.is_none() { if push_result.value.is_none() {
self.available.notify_one(); self.data_available.notify_one();
if push_result.pushed { if push_result.pushed {
Ok(()) Ok(())
} else { } else {
...@@ -117,9 +118,9 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> { ...@@ -117,9 +118,9 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> {
break push_result.pushed; break push_result.pushed;
}; };
value = val; value = val;
self.available.wait(&mut pc); self.space_available.wait(&mut pc);
}; };
self.available.notify_one(); self.data_available.notify_one();
if pushed { if pushed {
Ok(()) Ok(())
} else { } else {
...@@ -130,18 +131,18 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> { ...@@ -130,18 +131,18 @@ impl<T: DataDeliveryPolicy> ChannelInner<T> {
let mut pc = self.pc.lock(); let mut pc = self.pc.lock();
loop { loop {
if let Some(val) = pc.queue.get() { if let Some(val) = pc.queue.get() {
self.available.notify_one(); self.space_available.notify_one();
return Ok(val); return Ok(val);
} else if pc.senders == 0 { } else if pc.senders == 0 {
return Err(Error::ChannelClosed); return Err(Error::ChannelClosed);
} }
self.available.wait(&mut pc); self.data_available.wait(&mut pc);
} }
} }
fn try_recv(&self) -> Result<T> { fn try_recv(&self) -> Result<T> {
let mut pc = self.pc.lock(); let mut pc = self.pc.lock();
if let Some(val) = pc.queue.get() { if let Some(val) = pc.queue.get() {
self.available.notify_one(); self.space_available.notify_one();
Ok(val) Ok(val)
} else if pc.senders == 0 { } else if pc.senders == 0 {
Err(Error::ChannelClosed) Err(Error::ChannelClosed)
...@@ -157,7 +158,8 @@ impl<T: DataDeliveryPolicy> Channel<T> { ...@@ -157,7 +158,8 @@ impl<T: DataDeliveryPolicy> Channel<T> {
ChannelInner { ChannelInner {
id: <_>::default(), id: <_>::default(),
pc: Mutex::new(PolicyChannel::new(capacity, ordering)), pc: Mutex::new(PolicyChannel::new(capacity, ordering)),
available: Condvar::new(), data_available: Condvar::new(),
space_available: Condvar::new(),
} }
.into(), .into(),
) )
...@@ -242,7 +244,7 @@ where ...@@ -242,7 +244,7 @@ where
let mut pc = self.channel.0.pc.lock(); let mut pc = self.channel.0.pc.lock();
pc.senders -= 1; pc.senders -= 1;
if pc.senders == 0 { if pc.senders == 0 {
self.channel.0.available.notify_all(); self.channel.0.data_available.notify_all();
} }
} }
} }
...@@ -305,7 +307,7 @@ where ...@@ -305,7 +307,7 @@ where
let mut pc = self.channel.0.pc.lock(); let mut pc = self.channel.0.pc.lock();
pc.receivers -= 1; pc.receivers -= 1;
if pc.receivers == 0 { if pc.receivers == 0 {
self.channel.0.available.notify_all(); self.channel.0.data_available.notify_all();
} }
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论