提交 f8603f18 authored 作者: Serhij S's avatar Serhij S

comm options timeouts

上级 f751a9ef
...@@ -60,13 +60,6 @@ trait Communicator { ...@@ -60,13 +60,6 @@ trait Communicator {
} }
} }
/// Connection Options
pub struct ConnectionOptions {
with_reader: bool,
chat: Option<Box<ChatFn>>,
timeout: Duration,
}
pub struct CommReader { pub struct CommReader {
reader: Option<Box<dyn Read + Send + 'static>>, reader: Option<Box<dyn Read + Send + 'static>>,
} }
...@@ -79,16 +72,34 @@ impl CommReader { ...@@ -79,16 +72,34 @@ impl CommReader {
impl DataDeliveryPolicy for CommReader {} impl DataDeliveryPolicy for CommReader {}
pub(crate) struct Timeouts {
pub connect: Duration,
pub read: Duration,
pub write: Duration,
}
pub type ChatFn = dyn Fn(&mut dyn Stream) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> pub type ChatFn = dyn Fn(&mut dyn Stream) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>
+ Send + Send
+ Sync; + Sync;
/// Connection Options
pub struct ConnectionOptions {
with_reader: bool,
chat: Option<Box<ChatFn>>,
timeouts: Timeouts,
}
impl ConnectionOptions { impl ConnectionOptions {
/// timeout = the default timeout
pub fn new(timeout: Duration) -> Self { pub fn new(timeout: Duration) -> Self {
Self { Self {
with_reader: false, with_reader: false,
chat: None, chat: None,
timeout, timeouts: Timeouts {
connect: timeout,
read: timeout,
write: timeout,
},
} }
} }
/// Enable the reader channel. The reader channel allows the client to receive a clone of the /// Enable the reader channel. The reader channel allows the client to receive a clone of the
...@@ -110,4 +121,19 @@ impl ConnectionOptions { ...@@ -110,4 +121,19 @@ impl ConnectionOptions {
self.chat = Some(Box::new(chat)); self.chat = Some(Box::new(chat));
self self
} }
/// Set the connect timeout
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.timeouts.connect = timeout;
self
}
/// Set the read timeout
pub fn read_timeout(mut self, timeout: Duration) -> Self {
self.timeouts.read = timeout;
self
}
/// Set the write timeout
pub fn write_timeout(mut self, timeout: Duration) -> Self {
self.timeouts.write = timeout;
self
}
} }
use crate::pchannel; use crate::pchannel;
use crate::{Error, Result}; use crate::{Error, Result};
use super::{ChatFn, Client, CommReader, Communicator, ConnectionOptions, Protocol, Stream}; use super::{
ChatFn, Client, CommReader, Communicator, ConnectionOptions, Protocol, Stream, Timeouts,
};
use core::fmt; use core::fmt;
use parking_lot::{Mutex, MutexGuard}; use parking_lot::{Mutex, MutexGuard};
use std::io::{Read, Write}; use std::io::{Read, Write};
...@@ -38,7 +40,7 @@ impl Stream for TcpStream {} ...@@ -38,7 +40,7 @@ impl Stream for TcpStream {}
pub struct Tcp { pub struct Tcp {
addr: SocketAddr, addr: SocketAddr,
stream: Mutex<Option<TcpStream>>, stream: Mutex<Option<TcpStream>>,
timeout: Duration, timeouts: Timeouts,
busy: Mutex<()>, busy: Mutex<()>,
session_id: AtomicUsize, session_id: AtomicUsize,
reader_tx: Option<pchannel::Sender<CommReader>>, reader_tx: Option<pchannel::Sender<CommReader>>,
...@@ -119,7 +121,7 @@ impl Tcp { ...@@ -119,7 +121,7 @@ impl Tcp {
.ok_or_else(|| Error::invalid_data(format!("Invalid address: {:?}", addr)))?, .ok_or_else(|| Error::invalid_data(format!("Invalid address: {:?}", addr)))?,
stream: <_>::default(), stream: <_>::default(),
busy: <_>::default(), busy: <_>::default(),
timeout: options.timeout, timeouts: options.timeouts,
session_id: <_>::default(), session_id: <_>::default(),
reader_tx: tx, reader_tx: tx,
chat: options.chat, chat: options.chat,
...@@ -129,9 +131,9 @@ impl Tcp { ...@@ -129,9 +131,9 @@ impl Tcp {
fn get_stream(&self) -> Result<MutexGuard<Option<TcpStream>>> { fn get_stream(&self) -> Result<MutexGuard<Option<TcpStream>>> {
let mut lock = self.stream.lock(); let mut lock = self.stream.lock();
if lock.as_mut().is_none() { if lock.as_mut().is_none() {
let mut stream = TcpStream::connect_timeout(&self.addr, self.timeout)?; let mut stream = TcpStream::connect_timeout(&self.addr, self.timeouts.connect)?;
stream.set_read_timeout(Some(self.timeout))?; stream.set_read_timeout(Some(self.timeouts.read))?;
stream.set_write_timeout(Some(self.timeout))?; stream.set_write_timeout(Some(self.timeouts.write))?;
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
if let Some(ref chat) = self.chat { if let Some(ref chat) = self.chat {
chat(&mut stream).map_err(Error::io)?; chat(&mut stream).map_err(Error::io)?;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论