提交 7d4593e2 authored 作者: Serhij S's avatar Serhij S

session locking (disable reconnects)

上级 4ce4b196
[package] [package]
name = "roboplc" name = "roboplc"
version = "0.1.35" version = "0.1.36"
edition = "2021" edition = "2021"
authors = ["Serhij S. <div@altertech.com>"] authors = ["Serhij S. <div@altertech.com>"]
license = "Apache-2.0" license = "Apache-2.0"
......
...@@ -36,12 +36,39 @@ impl Client { ...@@ -36,12 +36,39 @@ impl Client {
pub fn protocol(&self) -> Protocol { pub fn protocol(&self) -> Protocol {
self.0.protocol() self.0.protocol()
} }
/// Get local IP address (for TCP/IP)
pub fn local_ip_addr(&self) -> Result<Option<SocketAddr>> { pub fn local_ip_addr(&self) -> Result<Option<SocketAddr>> {
self.0.local_ip_addr() self.0.local_ip_addr()
} }
/// Get the current session id
pub fn session_id(&self) -> usize { pub fn session_id(&self) -> usize {
self.0.session_id() self.0.session_id()
} }
/// lock the current session (disable reconnects)
pub fn lock_session(&self) -> Result<SessionGuard> {
let session_id = self.0.lock_session()?;
Ok(SessionGuard {
client: self.clone(),
session_id,
})
}
}
pub struct SessionGuard {
client: Client,
session_id: usize,
}
impl SessionGuard {
pub fn session_id(&self) -> usize {
self.session_id
}
}
impl Drop for SessionGuard {
fn drop(&mut self) {
self.client.0.unlock_session();
}
} }
pub enum Protocol { pub enum Protocol {
...@@ -61,6 +88,8 @@ trait Communicator { ...@@ -61,6 +88,8 @@ trait Communicator {
fn local_ip_addr(&self) -> Result<Option<SocketAddr>> { fn local_ip_addr(&self) -> Result<Option<SocketAddr>> {
Ok(None) Ok(None)
} }
fn lock_session(&self) -> Result<usize>;
fn unlock_session(&self);
} }
#[allow(clippy::module_name_repetitions)] #[allow(clippy::module_name_repetitions)]
......
...@@ -9,6 +9,7 @@ use serial::SystemPort; ...@@ -9,6 +9,7 @@ use serial::SystemPort;
use std::io; use std::io;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
...@@ -133,6 +134,7 @@ pub struct Serial { ...@@ -133,6 +134,7 @@ pub struct Serial {
busy: Mutex<()>, busy: Mutex<()>,
params: Parameters, params: Parameters,
session_id: AtomicUsize, session_id: AtomicUsize,
allow_reconnect: AtomicBool,
} }
#[derive(Default)] #[derive(Default)]
...@@ -197,6 +199,17 @@ impl Communicator for Serial { ...@@ -197,6 +199,17 @@ impl Communicator for Serial {
fn protocol(&self) -> Protocol { fn protocol(&self) -> Protocol {
Protocol::Serial Protocol::Serial
} }
fn lock_session(&self) -> Result<usize> {
let _lock = self.lock();
let _s = self.get_port()?;
self.allow_reconnect.store(false, Ordering::Release);
Ok(self.session_id())
}
fn unlock_session(&self) {
self.allow_reconnect.store(true, Ordering::Release);
}
} }
impl Serial { impl Serial {
...@@ -209,12 +222,16 @@ impl Serial { ...@@ -209,12 +222,16 @@ impl Serial {
busy: <_>::default(), busy: <_>::default(),
params, params,
session_id: <_>::default(), session_id: <_>::default(),
allow_reconnect: AtomicBool::new(true),
} }
.into()) .into())
} }
fn get_port(&self) -> Result<MutexGuard<SPort>> { fn get_port(&self) -> Result<MutexGuard<SPort>> {
let mut lock = self.port.lock(); let mut lock = self.port.lock();
if lock.system_port.as_mut().is_none() { if lock.system_port.as_mut().is_none() {
if !self.allow_reconnect.load(Ordering::Acquire) {
return Err(Error::io("not connected but reconnects not allowed"));
}
trace!(dev=%self.params.port_dev, "creating new serial connection"); trace!(dev=%self.params.port_dev, "creating new serial connection");
let port = open(&self.params, self.timeout)?; let port = open(&self.params, self.timeout)?;
lock.system_port.replace(port); lock.system_port.replace(port);
......
...@@ -9,7 +9,7 @@ use parking_lot::{Mutex, MutexGuard}; ...@@ -9,7 +9,7 @@ use parking_lot::{Mutex, MutexGuard};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{self, TcpStream}; use std::net::{self, TcpStream};
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tracing::trace; use tracing::trace;
...@@ -44,6 +44,7 @@ pub struct Tcp { ...@@ -44,6 +44,7 @@ pub struct Tcp {
timeouts: Timeouts, timeouts: Timeouts,
busy: Mutex<()>, busy: Mutex<()>,
session_id: AtomicUsize, session_id: AtomicUsize,
allow_reconnect: AtomicBool,
reader_tx: Option<pchannel::Sender<CommReader>>, reader_tx: Option<pchannel::Sender<CommReader>>,
chat: Option<Box<ChatFn>>, chat: Option<Box<ChatFn>>,
} }
...@@ -101,6 +102,16 @@ impl Communicator for Tcp { ...@@ -101,6 +102,16 @@ impl Communicator for Tcp {
fn protocol(&self) -> Protocol { fn protocol(&self) -> Protocol {
Protocol::Tcp Protocol::Tcp
} }
fn lock_session(&self) -> Result<usize> {
let _lock = self.lock();
let _s = self.get_stream()?;
self.allow_reconnect.store(false, Ordering::Release);
Ok(self.session_id())
}
fn unlock_session(&self) {
self.allow_reconnect.store(true, Ordering::Release);
}
} }
impl Tcp { impl Tcp {
...@@ -123,6 +134,7 @@ impl Tcp { ...@@ -123,6 +134,7 @@ impl Tcp {
busy: <_>::default(), busy: <_>::default(),
timeouts: options.timeouts, timeouts: options.timeouts,
session_id: <_>::default(), session_id: <_>::default(),
allow_reconnect: AtomicBool::new(true),
reader_tx: tx, reader_tx: tx,
chat: options.chat, chat: options.chat,
}; };
...@@ -131,6 +143,9 @@ impl Tcp { ...@@ -131,6 +143,9 @@ impl Tcp {
fn get_stream(&self) -> Result<MutexGuard<Option<TcpStream>>> { fn get_stream(&self) -> Result<MutexGuard<Option<TcpStream>>> {
let mut lock = self.stream.lock(); let mut lock = self.stream.lock();
if lock.as_mut().is_none() { if lock.as_mut().is_none() {
if !self.allow_reconnect.load(Ordering::Acquire) {
return Err(Error::io("not connected but reconnects not allowed"));
}
trace!(addr=%self.addr, "creating new TCP stream"); trace!(addr=%self.addr, "creating new TCP stream");
let zero_to = Duration::from_secs(0); let zero_to = Duration::from_secs(0);
let mut stream = if self.timeouts.connect > zero_to { let mut stream = if self.timeouts.connect > zero_to {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论