提交 75e96170 authored 作者: Serhij S's avatar Serhij S

chat replaced with ConnectionHandler object. connct function

上级 cf414f4b
[package] [package]
name = "roboplc" name = "roboplc"
version = "0.3.2" version = "0.4.0"
edition = "2021" edition = "2021"
authors = ["Serhij S. <div@altertech.com>"] authors = ["Serhij S. <div@altertech.com>"]
license = "Apache-2.0" license = "Apache-2.0"
......
...@@ -21,6 +21,11 @@ impl Client { ...@@ -21,6 +21,11 @@ impl Client {
pub fn lock(&self) -> MutexGuard<()> { pub fn lock(&self) -> MutexGuard<()> {
self.0.lock() self.0.lock()
} }
/// Connect the client. Does not need to be called for request/response protocols as the client
/// is automatically connected when the first request is made.
pub fn connect(&self) -> Result<()> {
self.0.connect()
}
/// Reconnect the client in case of read/write problems /// Reconnect the client in case of read/write problems
pub fn reconnect(&self) { pub fn reconnect(&self) {
self.0.reconnect(); self.0.reconnect();
...@@ -81,6 +86,7 @@ pub trait Stream: Read + Write + Send {} ...@@ -81,6 +86,7 @@ pub trait Stream: Read + Write + Send {}
trait Communicator { trait Communicator {
fn lock(&self) -> MutexGuard<()>; fn lock(&self) -> MutexGuard<()>;
fn connect(&self) -> Result<()>;
fn reconnect(&self); fn reconnect(&self);
fn write(&self, buf: &[u8]) -> Result<()>; fn write(&self, buf: &[u8]) -> Result<()>;
fn read_exact(&self, buf: &mut [u8]) -> Result<()>; fn read_exact(&self, buf: &mut [u8]) -> Result<()>;
...@@ -130,14 +136,18 @@ impl Timeouts { ...@@ -130,14 +136,18 @@ impl Timeouts {
} }
} }
pub type ChatFn = dyn Fn(&mut dyn Stream) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> pub trait ConnectionHandler {
+ Send /// called right after the connection is established
+ Sync; fn on_connect(
&self,
stream: &mut dyn Stream,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
/// Connection Options /// Connection Options
pub struct ConnectionOptions { pub struct ConnectionOptions {
with_reader: bool, with_reader: bool,
chat: Option<Box<ChatFn>>, connection_handler: Option<Box<dyn ConnectionHandler + Send + Sync>>,
timeouts: Timeouts, timeouts: Timeouts,
} }
...@@ -146,7 +156,7 @@ impl ConnectionOptions { ...@@ -146,7 +156,7 @@ impl ConnectionOptions {
pub fn new(timeout: Duration) -> Self { pub fn new(timeout: Duration) -> Self {
Self { Self {
with_reader: false, with_reader: false,
chat: None, connection_handler: None,
timeouts: Timeouts { timeouts: Timeouts {
connect: timeout, connect: timeout,
read: timeout, read: timeout,
...@@ -161,16 +171,13 @@ impl ConnectionOptions { ...@@ -161,16 +171,13 @@ impl ConnectionOptions {
self.with_reader = true; self.with_reader = true;
self self
} }
/// Set the chat function. The chat function is called after the connection is established. The /// Set the connection handler. The connection handler is used to implement custom protocols
/// chat function can be used to implement custom protocols that require additional setup. /// that require additional setup/handling. Replaces "chat" function.
pub fn chat<F>(mut self, chat: F) -> Self pub fn connection_handler<T>(mut self, connection_handler: T) -> Self
where where
F: Fn(&mut dyn Stream) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> T: ConnectionHandler + Send + Sync + 'static,
+ Send
+ Sync
+ 'static,
{ {
self.chat = Some(Box::new(chat)); self.connection_handler = Some(Box::new(connection_handler));
self self
} }
/// Set timeouts /// Set timeouts
......
...@@ -153,6 +153,9 @@ impl Communicator for Serial { ...@@ -153,6 +153,9 @@ impl Communicator for Serial {
fn session_id(&self) -> usize { fn session_id(&self) -> usize {
self.session_id.load(Ordering::Acquire) self.session_id.load(Ordering::Acquire)
} }
fn connect(&self) -> Result<()> {
self.get_port().map(|_| ())
}
fn reconnect(&self) { fn reconnect(&self) {
let mut port = self.port.lock(); let mut port = self.port.lock();
port.system_port.take(); port.system_port.take();
......
...@@ -2,7 +2,8 @@ use crate::pchannel; ...@@ -2,7 +2,8 @@ use crate::pchannel;
use crate::{Error, Result}; use crate::{Error, Result};
use super::{ use super::{
ChatFn, Client, CommReader, Communicator, ConnectionOptions, Protocol, Stream, Timeouts, Client, CommReader, Communicator, ConnectionHandler, ConnectionOptions, Protocol, Stream,
Timeouts,
}; };
use core::fmt; use core::fmt;
use parking_lot_rt::{Mutex, MutexGuard}; use parking_lot_rt::{Mutex, MutexGuard};
...@@ -46,7 +47,7 @@ pub struct Tcp { ...@@ -46,7 +47,7 @@ pub struct Tcp {
session_id: AtomicUsize, session_id: AtomicUsize,
allow_reconnect: AtomicBool, allow_reconnect: AtomicBool,
reader_tx: Option<pchannel::Sender<CommReader>>, reader_tx: Option<pchannel::Sender<CommReader>>,
chat: Option<Box<ChatFn>>, connection_handler: Option<Box<dyn ConnectionHandler + Send + Sync>>,
} }
#[allow(clippy::module_name_repetitions)] #[allow(clippy::module_name_repetitions)]
...@@ -68,6 +69,9 @@ impl Communicator for Tcp { ...@@ -68,6 +69,9 @@ impl Communicator for Tcp {
fn session_id(&self) -> usize { fn session_id(&self) -> usize {
self.session_id.load(Ordering::Acquire) self.session_id.load(Ordering::Acquire)
} }
fn connect(&self) -> Result<()> {
self.get_stream().map(|_| ())
}
fn reconnect(&self) { fn reconnect(&self) {
self.stream self.stream
.lock() .lock()
...@@ -136,7 +140,7 @@ impl Tcp { ...@@ -136,7 +140,7 @@ impl Tcp {
session_id: <_>::default(), session_id: <_>::default(),
allow_reconnect: AtomicBool::new(true), allow_reconnect: AtomicBool::new(true),
reader_tx: tx, reader_tx: tx,
chat: options.chat, connection_handler: options.connection_handler,
}; };
Ok((client.into(), rx)) Ok((client.into(), rx))
} }
...@@ -160,9 +164,11 @@ impl Tcp { ...@@ -160,9 +164,11 @@ impl Tcp {
stream.set_write_timeout(Some(self.timeouts.write))?; stream.set_write_timeout(Some(self.timeouts.write))?;
} }
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
if let Some(ref chat) = self.chat { if let Some(ref connection_handler) = self.connection_handler {
trace!("chatting with the server"); trace!("starting connection handler");
chat(&mut stream).map_err(Error::io)?; connection_handler
.on_connect(&mut stream)
.map_err(Error::io)?;
} }
self.session_id.fetch_add(1, Ordering::Release); self.session_id.fetch_add(1, Ordering::Release);
trace!(addr=%self.addr, session_id=self.session_id(), "TCP session started"); trace!(addr=%self.addr, session_id=self.session_id(), "TCP session started");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论