提交 d404109e authored 作者: Serhij S's avatar Serhij S

reader moved to comm

上级 2017434a
use parking_lot::MutexGuard; use parking_lot::MutexGuard;
use std::{ use std::{
io::{Read, Write}, io::{Read, Write},
net::SocketAddr,
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use crate::Result; use crate::{Result, DataDeliveryPolicy};
pub mod serial; // Serial communications pub mod serial; // Serial communications
pub mod tcp; // TCP communications pub mod tcp; // TCP communications
...@@ -35,6 +36,9 @@ impl Client { ...@@ -35,6 +36,9 @@ impl Client {
pub fn protocol(&self) -> Protocol { pub fn protocol(&self) -> Protocol {
self.0.protocol() self.0.protocol()
} }
pub fn local_ip_addr(&self) -> Result<Option<SocketAddr>> {
self.0.local_ip_addr()
}
} }
pub enum Protocol { pub enum Protocol {
...@@ -51,6 +55,9 @@ trait Communicator { ...@@ -51,6 +55,9 @@ trait Communicator {
fn read_exact(&self, buf: &mut [u8]) -> Result<()>; fn read_exact(&self, buf: &mut [u8]) -> Result<()>;
fn protocol(&self) -> Protocol; fn protocol(&self) -> Protocol;
fn session_id(&self) -> usize; fn session_id(&self) -> usize;
fn local_ip_addr(&self) -> Result<Option<SocketAddr>> {
Ok(None)
}
} }
/// Connection Options /// Connection Options
...@@ -60,6 +67,18 @@ pub struct ConnectionOptions { ...@@ -60,6 +67,18 @@ pub struct ConnectionOptions {
timeout: Duration, timeout: Duration,
} }
pub struct CommReader {
reader: Option<Box<dyn Read + Send + 'static>>,
}
impl CommReader {
pub fn take(&mut self) -> Option<Box<dyn Read + Send + 'static>> {
self.reader.take()
}
}
impl DataDeliveryPolicy for CommReader {}
pub type ChatFn = dyn Fn(&mut dyn Stream) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> pub type ChatFn = dyn Fn(&mut dyn Stream) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>
+ Send + Send
+ Sync; + Sync;
......
use crate::pchannel; use crate::pchannel;
use crate::{DataDeliveryPolicy, Error, Result}; use crate::{Error, Result};
use super::{ChatFn, Client, Communicator, ConnectionOptions, Protocol, Stream}; use super::{ChatFn, Client, CommReader, Communicator, ConnectionOptions, Protocol, Stream};
use core::fmt; use core::fmt;
use parking_lot::{Mutex, MutexGuard}; use parking_lot::{Mutex, MutexGuard};
use std::io::{Read, Write}; use std::io::{Read, Write};
...@@ -27,25 +27,13 @@ pub fn connect<A: ToSocketAddrs + fmt::Debug>(addr: A, timeout: Duration) -> Res ...@@ -27,25 +27,13 @@ pub fn connect<A: ToSocketAddrs + fmt::Debug>(addr: A, timeout: Duration) -> Res
pub fn connect_with_options<A: ToSocketAddrs + fmt::Debug>( pub fn connect_with_options<A: ToSocketAddrs + fmt::Debug>(
addr: A, addr: A,
options: ConnectionOptions, options: ConnectionOptions,
) -> Result<(Client, pchannel::Receiver<TcpReader>)> { ) -> Result<(Client, Option<pchannel::Receiver<CommReader>>)> {
let (tcp, rx) = Tcp::create(addr, options)?; let (tcp, maybe_rx) = Tcp::create(addr, options)?;
Ok((Client(tcp), rx.unwrap())) Ok((Client(tcp), maybe_rx))
} }
impl Stream for TcpStream {} impl Stream for TcpStream {}
pub struct TcpReader {
reader: Option<Box<dyn Read + Send + 'static>>,
}
impl TcpReader {
pub fn take(&mut self) -> Option<Box<dyn Read + Send + 'static>> {
self.reader.take()
}
}
impl DataDeliveryPolicy for TcpReader {}
#[allow(clippy::module_name_repetitions)] #[allow(clippy::module_name_repetitions)]
pub struct Tcp { pub struct Tcp {
addr: SocketAddr, addr: SocketAddr,
...@@ -53,7 +41,7 @@ pub struct Tcp { ...@@ -53,7 +41,7 @@ pub struct Tcp {
timeout: Duration, timeout: Duration,
busy: Mutex<()>, busy: Mutex<()>,
session_id: AtomicUsize, session_id: AtomicUsize,
reader_tx: Option<pchannel::Sender<TcpReader>>, reader_tx: Option<pchannel::Sender<CommReader>>,
chat: Option<Box<ChatFn>>, chat: Option<Box<ChatFn>>,
} }
...@@ -99,6 +87,15 @@ impl Communicator for Tcp { ...@@ -99,6 +87,15 @@ impl Communicator for Tcp {
.read_exact(buf) .read_exact(buf)
.map_err(|e| handle_tcp_stream_error!(self.session_id, stream, e, false)) .map_err(|e| handle_tcp_stream_error!(self.session_id, stream, e, false))
} }
fn local_ip_addr(&self) -> Result<Option<SocketAddr>> {
let mut stream = self.get_stream()?;
stream
.as_mut()
.unwrap()
.local_addr()
.map(Some)
.map_err(|e| handle_tcp_stream_error!(self.session_id, stream, e, false))
}
fn protocol(&self) -> Protocol { fn protocol(&self) -> Protocol {
Protocol::Tcp Protocol::Tcp
} }
...@@ -108,7 +105,7 @@ impl Tcp { ...@@ -108,7 +105,7 @@ impl Tcp {
fn create<A: ToSocketAddrs + fmt::Debug>( fn create<A: ToSocketAddrs + fmt::Debug>(
addr: A, addr: A,
options: ConnectionOptions, options: ConnectionOptions,
) -> Result<(TcpClient, Option<pchannel::Receiver<TcpReader>>)> { ) -> Result<(TcpClient, Option<pchannel::Receiver<CommReader>>)> {
let (tx, rx) = if options.with_reader { let (tx, rx) = if options.with_reader {
let (tx, rx) = pchannel::bounded(READER_CHANNEL_CAPACITY); let (tx, rx) = pchannel::bounded(READER_CHANNEL_CAPACITY);
(Some(tx), Some(rx)) (Some(tx), Some(rx))
...@@ -140,7 +137,7 @@ impl Tcp { ...@@ -140,7 +137,7 @@ impl Tcp {
chat(&mut stream).map_err(Error::io)?; chat(&mut stream).map_err(Error::io)?;
} }
if let Some(ref tx) = self.reader_tx { if let Some(ref tx) = self.reader_tx {
tx.send(TcpReader { tx.send(CommReader {
reader: Some(Box::new(stream.try_clone()?)), reader: Some(Box::new(stream.try_clone()?)),
})?; })?;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论