提交 7c06a2ea authored 作者: Serhij S's avatar Serhij S

speed tuning

上级 db3e73d1
...@@ -3,7 +3,10 @@ use std::{ ...@@ -3,7 +3,10 @@ use std::{
future::Future, future::Future,
mem, mem,
pin::Pin, pin::Pin,
sync::Arc, sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll, Waker}, task::{Context, Poll, Waker},
}; };
...@@ -42,6 +45,7 @@ where ...@@ -42,6 +45,7 @@ where
struct ChannelInner<T: DataDeliveryPolicy> { struct ChannelInner<T: DataDeliveryPolicy> {
id: UniqueId, id: UniqueId,
pc: Mutex<PolicyChannel<T>>, pc: Mutex<PolicyChannel<T>>,
next_op_id: AtomicUsize,
} }
impl<T: DataDeliveryPolicy> Channel<T> { impl<T: DataDeliveryPolicy> Channel<T> {
...@@ -50,10 +54,14 @@ impl<T: DataDeliveryPolicy> Channel<T> { ...@@ -50,10 +54,14 @@ impl<T: DataDeliveryPolicy> Channel<T> {
ChannelInner { ChannelInner {
id: <_>::default(), id: <_>::default(),
pc: Mutex::new(PolicyChannel::new(capacity, ordering)), pc: Mutex::new(PolicyChannel::new(capacity, ordering)),
next_op_id: <_>::default(),
} }
.into(), .into(),
) )
} }
fn op_id(&self) -> usize {
self.0.next_op_id.fetch_add(1, Ordering::SeqCst)
}
} }
struct PolicyChannel<T: DataDeliveryPolicy> { struct PolicyChannel<T: DataDeliveryPolicy> {
...@@ -169,7 +177,7 @@ where ...@@ -169,7 +177,7 @@ where
#[pin_project(PinnedDrop)] #[pin_project(PinnedDrop)]
struct Send<'a, T: DataDeliveryPolicy> { struct Send<'a, T: DataDeliveryPolicy> {
id: UniqueId, id: usize,
channel: &'a Channel<T>, channel: &'a Channel<T>,
queued: bool, queued: bool,
value: Option<T>, value: Option<T>,
...@@ -179,11 +187,9 @@ struct Send<'a, T: DataDeliveryPolicy> { ...@@ -179,11 +187,9 @@ struct Send<'a, T: DataDeliveryPolicy> {
#[allow(clippy::needless_lifetimes)] #[allow(clippy::needless_lifetimes)]
impl<'a, T: DataDeliveryPolicy> PinnedDrop for Send<'a, T> { impl<'a, T: DataDeliveryPolicy> PinnedDrop for Send<'a, T> {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
self.channel if self.queued {
.0 self.channel.0.pc.lock().notify_send_fut_drop(self.id);
.pc }
.lock()
.notify_send_fut_drop(self.id.as_usize());
} }
} }
...@@ -195,9 +201,10 @@ where ...@@ -195,9 +201,10 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut pc = self.channel.0.pc.lock(); let mut pc = self.channel.0.pc.lock();
if self.queued { if self.queued {
pc.confirm_send_fut_waked(self.id.as_usize()); pc.confirm_send_fut_waked(self.id);
} }
if pc.receivers == 0 { if pc.receivers == 0 {
self.queued = false;
return Poll::Ready(Err(Error::ChannelClosed)); return Poll::Ready(Err(Error::ChannelClosed));
} }
if pc.send_fut_wakers.is_empty() || self.queued { if pc.send_fut_wakers.is_empty() || self.queued {
...@@ -205,6 +212,7 @@ where ...@@ -205,6 +212,7 @@ where
if let Some(val) = push_result.value { if let Some(val) = push_result.value {
self.value = Some(val); self.value = Some(val);
} else { } else {
self.queued = false;
pc.notify_data_sent(); pc.notify_data_sent();
return Poll::Ready(if push_result.pushed { return Poll::Ready(if push_result.pushed {
Ok(()) Ok(())
...@@ -214,7 +222,7 @@ where ...@@ -214,7 +222,7 @@ where
} }
} }
self.queued = true; self.queued = true;
pc.append_send_fut_waker(cx.waker().clone(), self.id.as_usize()); pc.append_send_fut_waker(cx.waker().clone(), self.id);
Poll::Pending Poll::Pending
} }
} }
...@@ -234,7 +242,7 @@ where ...@@ -234,7 +242,7 @@ where
#[inline] #[inline]
pub fn send(&self, value: T) -> impl Future<Output = Result<()>> + '_ { pub fn send(&self, value: T) -> impl Future<Output = Result<()>> + '_ {
Send { Send {
id: <_>::default(), id: self.channel.op_id(),
channel: &self.channel, channel: &self.channel,
queued: false, queued: false,
value: Some(value), value: Some(value),
...@@ -301,18 +309,16 @@ where ...@@ -301,18 +309,16 @@ where
} }
struct Recv<'a, T: DataDeliveryPolicy> { struct Recv<'a, T: DataDeliveryPolicy> {
id: UniqueId, id: usize,
channel: &'a Channel<T>, channel: &'a Channel<T>,
queued: bool, queued: bool,
} }
impl<'a, T: DataDeliveryPolicy> Drop for Recv<'a, T> { impl<'a, T: DataDeliveryPolicy> Drop for Recv<'a, T> {
fn drop(&mut self) { fn drop(&mut self) {
self.channel if self.queued {
.0 self.channel.0.pc.lock().notify_recv_fut_drop(self.id);
.pc }
.lock()
.notify_recv_fut_drop(self.id.as_usize());
} }
} }
...@@ -324,18 +330,20 @@ where ...@@ -324,18 +330,20 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut pc = self.channel.0.pc.lock(); let mut pc = self.channel.0.pc.lock();
if self.queued { if self.queued {
pc.confirm_recv_fut_waked(self.id.as_usize()); pc.confirm_recv_fut_waked(self.id);
} }
if pc.recv_fut_wakers.is_empty() || self.queued { if pc.recv_fut_wakers.is_empty() || self.queued {
if let Some(val) = pc.queue.get() { if let Some(val) = pc.queue.get() {
pc.notify_data_received(); pc.notify_data_received();
self.queued = false;
return Poll::Ready(Ok(val)); return Poll::Ready(Ok(val));
} else if pc.senders == 0 { } else if pc.senders == 0 {
self.queued = false;
return Poll::Ready(Err(Error::ChannelClosed)); return Poll::Ready(Err(Error::ChannelClosed));
} }
} }
self.queued = true; self.queued = true;
pc.append_recv_fut_waker(cx.waker().clone(), self.id.as_usize()); pc.append_recv_fut_waker(cx.waker().clone(), self.id);
Poll::Pending Poll::Pending
} }
} }
...@@ -355,7 +363,7 @@ where ...@@ -355,7 +363,7 @@ where
#[inline] #[inline]
pub fn recv(&self) -> impl Future<Output = Result<T>> + '_ { pub fn recv(&self) -> impl Future<Output = Result<T>> + '_ {
Recv { Recv {
id: <_>::default(), id: self.channel.op_id(),
channel: &self.channel, channel: &self.channel,
queued: false, queued: false,
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论