提交 fc1674f4 authored 作者: Serhij S's avatar Serhij S

modbus client

上级 4e5a4b9e
......@@ -10,6 +10,7 @@ keywords = ["realtime", "robots", "plc", "industrial"]
readme = "README.md"
[dependencies]
binrw = "0.13.3"
bma-ts = { version = "0.1.8", features = ["serde"] }
colored = "2.1.0"
libc = "0.2.153"
......@@ -18,7 +19,10 @@ object-id = "0.1.3"
oneshot = { version = "0.1.6", default-features = false, features = ["std"] }
parking_lot = "0.12.1"
pin-project = "1.1.5"
rmodbus = { version = "0.9.2" }
roboplc-derive = { version = "0.1.0" }
serde = { version = "1.0.197", features = ["derive", "rc"] }
serial = "0.4.0"
sysinfo = "0.30.6"
thiserror = "1.0.57"
......
[package]
name = "roboplc-derive"
version = "0.1.0"
edition = "2021"
authors = ["Serhij S. <div@altertech.com>"]
license = "Apache-2.0"
description = "Derive macros for RoboPLC"
repository = "https://github.com/eva-ics/roboplc"
keywords = ["realtime", "robots", "plc", "industrial"]
readme = "README.md"
[lib]
proc-macro = true
[dependencies]
syn = { version = "1.0", features = ["full"] }
quote = "1.0"
proc-macro2 = "1.0"
darling = "0.13.0"
# roboplc-derive
Derive macros for the [roboplc](https://crates.io/crates/roboplc) crate.
extern crate proc_macro;
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Data, DeriveInput, Fields, Meta, NestedMeta};
/// # Panics
///
/// Will panic on parse errors
#[allow(clippy::too_many_lines)]
#[proc_macro_derive(DataPolicy, attributes(data_delivery, data_priority, data_expires))]
pub fn data_policy_derive(input: TokenStream) -> TokenStream {
let ast = parse_macro_input!(input as DeriveInput);
match ast.data {
Data::Enum(ref data_enum) => {
let enum_name = &ast.ident;
let mut delivery_policy_cases = vec![];
let mut priority_cases = vec![];
let mut expires_cases = vec![];
let mut default_policy_impl = true;
let mut default_priority_impl = true;
let mut default_expires_impl = true;
for variant in &data_enum.variants {
let variant_name = &variant.ident;
let mut priority_value = quote! { 100 };
let mut delivery_policy_value = quote! { ::roboplc::DeliveryPolicy::Always };
let mut expires_value = quote! { false };
for attr in &variant.attrs {
if attr.path.is_ident("data_delivery") {
default_policy_impl = false;
if let Meta::List(meta_list) = attr.parse_meta().unwrap() {
for nested_meta in meta_list.nested {
if let NestedMeta::Meta(meta) = nested_meta {
match meta
.path()
.get_ident()
.map(ToString::to_string)
.as_deref()
{
Some("single") => {
delivery_policy_value =
quote! { ::roboplc::DeliveryPolicy::Single }
}
Some("single_optional") => {
delivery_policy_value =
quote! { ::roboplc::DeliveryPolicy::SingleOptional }
}
Some("optional") => {
delivery_policy_value =
quote! { ::roboplc::DeliveryPolicy::Optional }
}
Some("always") => {
delivery_policy_value =
quote! { ::roboplc::DeliveryPolicy::Always }
}
Some(v) => panic!("Unknown policy variant: {}", v),
None => panic!("Policy variant not specified"),
}
}
}
} else {
panic!("unable to parse data_delivery attribute");
}
} else if attr.path.is_ident("data_expires") {
default_expires_impl = false;
if let Meta::List(meta_list) = attr.parse_meta().unwrap() {
for nested_meta in meta_list.nested {
if let NestedMeta::Meta(lit) = nested_meta {
expires_value = quote! { #lit(value) }
} else {
panic!("data_expires value must be a function",);
}
}
} else {
panic!("unable to parse data_expires attribute");
}
} else if attr.path.is_ident("data_priority") {
default_priority_impl = false;
if let Ok(Meta::List(meta_list)) = attr.parse_meta() {
for nested_meta in meta_list.nested {
if let NestedMeta::Lit(lit_int) = nested_meta {
priority_value = quote! { #lit_int };
} else {
panic!("data_priority value must be an integer");
}
}
} else {
panic!("unable to parse data_priority attribute");
}
}
}
let pattern = match &variant.fields {
Fields::Unnamed(_) => quote! { #enum_name::#variant_name(..) },
Fields::Named(_) => quote! { #enum_name::#variant_name{..} },
Fields::Unit => quote! { #enum_name::#variant_name },
};
let pattern_expires = match &variant.fields {
Fields::Unnamed(_) => quote! { #enum_name::#variant_name(value, ..) },
Fields::Named(_) => quote! { #enum_name::#variant_name{value, ..} },
Fields::Unit => quote! { #enum_name::#variant_name },
};
delivery_policy_cases.push(quote! {
#pattern => #delivery_policy_value,
});
priority_cases.push(quote! {
#pattern => #priority_value,
});
expires_cases.push(quote! {
#pattern_expires => #expires_value,
});
}
let fn_delivery_policy = if default_policy_impl {
quote! {
fn delivery_policy(&self) -> ::roboplc::DeliveryPolicy {
::roboplc::DeliveryPolicy::Always
}
}
} else {
quote! {
fn delivery_policy(&self) -> ::roboplc::DeliveryPolicy {
match self {
#(#delivery_policy_cases)*
}
}
}
};
let fn_priority = if default_priority_impl {
quote! {
fn priority(&self) -> usize {
100
}
}
} else {
quote! {
fn priority(&self) -> usize {
match self {
#(#priority_cases)*
}
}
}
};
let fn_expires = if default_expires_impl {
quote! {
fn is_expired(&self) -> bool {
false
}
}
} else {
quote! {
fn is_expired(&self) -> bool {
match self {
#(#expires_cases)*
}
}
}
};
let generated = quote! {
impl ::roboplc::DataDeliveryPolicy for #enum_name {
#fn_delivery_policy
#fn_priority
#fn_expires
}
};
generated.into()
}
_ => panic!("DataPolicy can only be derived for enums"),
}
}
use parking_lot::MutexGuard;
use std::sync::Arc;
pub mod serial;
pub mod tcp;
#[derive(Clone)]
pub struct Client(Arc<dyn Communicator + Send + Sync>);
impl Client {
pub fn lock(&self) -> MutexGuard<()> {
self.0.lock()
}
pub fn reconnect(&self) {
self.0.reconnect();
}
pub fn write(&self, buf: &[u8]) -> Result<(), std::io::Error> {
self.0.write(buf)
}
pub fn read_exact(&self, buf: &mut [u8]) -> Result<(), std::io::Error> {
self.0.read_exact(buf)
}
pub fn protocol(&self) -> Protocol {
self.0.protocol()
}
}
pub enum Protocol {
Tcp,
Serial,
}
trait Communicator {
fn lock(&self) -> MutexGuard<()>;
fn reconnect(&self);
fn write(&self, buf: &[u8]) -> Result<(), std::io::Error>;
fn read_exact(&self, buf: &mut [u8]) -> Result<(), std::io::Error>;
fn protocol(&self) -> Protocol;
}
use crate::Error;
use super::Client;
use super::Communicator;
use super::Protocol;
use parking_lot::{Mutex, MutexGuard};
use serial::prelude::*;
use serial::SystemPort;
use std::io::{Read, Write};
use std::sync::Arc;
use std::time::{Duration, Instant};
pub fn connect(path: &str, timeout: Duration, frame_delay: Duration) -> Result<Client, Error> {
Ok(Client(Serial::create(path, timeout, frame_delay)?))
}
fn parse_path(
path: &str,
) -> (
&str,
serial::BaudRate,
serial::CharSize,
serial::Parity,
serial::StopBits,
) {
let mut sp = path.split(':');
let port_dev = sp.next().unwrap();
let s_baud_rate = sp
.next()
.unwrap_or_else(|| panic!("serial baud rate not specified: {}", path));
let s_char_size = sp
.next()
.unwrap_or_else(|| panic!("serial char size not specified: {}", path));
let s_parity = sp
.next()
.unwrap_or_else(|| panic!("serial parity not specified: {}", path));
let s_stop_bits = sp
.next()
.unwrap_or_else(|| panic!("serial stopbits not specified: {}", path));
let baud_rate = match s_baud_rate {
"110" => serial::Baud110,
"300" => serial::Baud300,
"600" => serial::Baud600,
"1200" => serial::Baud1200,
"2400" => serial::Baud2400,
"4800" => serial::Baud4800,
"9600" => serial::Baud9600,
"19200" => serial::Baud19200,
"38400" => serial::Baud38400,
"57600" => serial::Baud57600,
"115200" => serial::Baud115200,
v => panic!("specified serial baud rate not supported: {}", v),
};
let char_size = match s_char_size {
"5" => serial::Bits5,
"6" => serial::Bits6,
"7" => serial::Bits7,
"8" => serial::Bits8,
v => panic!("specified serial char size not supported: {}", v),
};
let parity = match s_parity {
"N" => serial::ParityNone,
"E" => serial::ParityEven,
"O" => serial::ParityOdd,
v => panic!("specified serial parity not supported: {}", v),
};
let stop_bits = match s_stop_bits {
"1" => serial::Stop1,
"2" => serial::Stop2,
v => unimplemented!("specified serial stop bits not supported: {}", v),
};
(port_dev, baud_rate, char_size, parity, stop_bits)
}
/// # Panics
///
/// Will panic on misconfigured listen string
pub fn check_path(path: &str) {
let _ = parse_path(path);
}
/// # Panics
///
/// Will panic on misconfigured listen string
pub fn open(listen: &str, timeout: Duration) -> Result<SystemPort, serial::Error> {
let (port_dev, baud_rate, char_size, parity, stop_bits) = parse_path(listen);
let mut port = serial::open(&port_dev)?;
port.reconfigure(&|settings| {
(settings.set_baud_rate(baud_rate).unwrap());
settings.set_char_size(char_size);
settings.set_parity(parity);
settings.set_stop_bits(stop_bits);
settings.set_flow_control(serial::FlowNone);
Ok(())
})?;
port.set_timeout(timeout)?;
Ok(port)
}
#[allow(clippy::module_name_repetitions)]
pub struct Serial {
path: String,
port: Mutex<SPort>,
timeout: Duration,
frame_delay: Duration,
busy: Mutex<()>,
}
#[derive(Default)]
struct SPort {
system_port: Option<SystemPort>,
last_frame: Option<Instant>,
}
#[allow(clippy::module_name_repetitions)]
pub type SerialClient = Arc<Serial>;
impl Communicator for Serial {
fn lock(&self) -> MutexGuard<()> {
self.busy.lock()
}
fn reconnect(&self) {
let mut port = self.port.lock();
port.system_port.take();
port.last_frame.take();
}
fn write(&self, buf: &[u8]) -> Result<(), std::io::Error> {
let mut port = self.get_port()?;
if let Some(last_frame) = port.last_frame {
let el = last_frame.elapsed();
if el < self.frame_delay {
std::thread::sleep(self.frame_delay - el);
}
}
let result = port
.system_port
.as_mut()
.unwrap()
.write_all(buf)
.map_err(|e| {
self.reconnect();
e
});
if result.is_ok() {
port.last_frame.replace(Instant::now());
}
result
}
fn read_exact(&self, buf: &mut [u8]) -> Result<(), std::io::Error> {
let mut port = self.get_port()?;
port.system_port
.as_mut()
.unwrap()
.read_exact(buf)
.map_err(|e| {
self.reconnect();
e
})
}
fn protocol(&self) -> Protocol {
Protocol::Serial
}
}
impl Serial {
/// # Panics
///
/// Will panic on misconfigured path string
pub fn create(
path: &str,
timeout: Duration,
frame_delay: Duration,
) -> Result<Arc<Self>, Error> {
check_path(path);
Ok(Self {
path: path.to_owned(),
port: <_>::default(),
timeout,
frame_delay,
busy: <_>::default(),
}
.into())
}
fn get_port(&self) -> Result<MutexGuard<SPort>, std::io::Error> {
let mut lock = self.port.lock();
if lock.system_port.as_mut().is_none() {
let port = open(&self.path, self.timeout)?;
lock.system_port.replace(port);
lock.last_frame.take();
}
Ok(lock)
}
}
use crate::Error;
use super::{Client, Communicator, Protocol};
use core::fmt;
use parking_lot::{Mutex, MutexGuard};
use std::io::{Read, Write};
use std::net::TcpStream;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
pub fn connect<A: ToSocketAddrs + fmt::Debug>(addr: A, timeout: Duration) -> Result<Client, Error> {
Ok(Client(Tcp::create(addr, timeout)?))
}
#[allow(clippy::module_name_repetitions)]
pub struct Tcp {
addr: SocketAddr,
stream: Mutex<Option<TcpStream>>,
timeout: Duration,
busy: Mutex<()>,
}
#[allow(clippy::module_name_repetitions)]
pub type TcpClient = Arc<Tcp>;
macro_rules! handle_tcp_stream_error {
($stream: expr, $err: expr, $any: expr) => {{
if $any || $err.kind() == std::io::ErrorKind::TimedOut {
$stream.take();
}
$err
}};
}
impl Communicator for Tcp {
fn lock(&self) -> MutexGuard<()> {
self.busy.lock()
}
fn reconnect(&self) {
self.stream.lock().take();
}
fn write(&self, buf: &[u8]) -> Result<(), std::io::Error> {
let mut stream = self.get_stream()?;
stream
.as_mut()
.unwrap()
.write_all(buf)
.map_err(|e| handle_tcp_stream_error!(stream, e, true))
}
fn read_exact(&self, buf: &mut [u8]) -> Result<(), std::io::Error> {
let mut stream = self.get_stream()?;
stream
.as_mut()
.unwrap()
.read_exact(buf)
.map_err(|e| handle_tcp_stream_error!(stream, e, false))
}
fn protocol(&self) -> Protocol {
Protocol::Tcp
}
}
impl Tcp {
fn create<A: ToSocketAddrs + fmt::Debug>(
addr: A,
timeout: Duration,
) -> Result<TcpClient, Error> {
Ok(Self {
addr: addr
.to_socket_addrs()?
.next()
.ok_or_else(|| Error::invalid_data(format!("Invalid address: {:?}", addr)))?,
stream: <_>::default(),
busy: <_>::default(),
timeout,
}
.into())
}
fn get_stream(&self) -> Result<MutexGuard<Option<TcpStream>>, std::io::Error> {
let mut lock = self.stream.lock();
if lock.as_mut().is_none() {
let stream = TcpStream::connect_timeout(&self.addr, self.timeout)?;
stream.set_read_timeout(Some(self.timeout))?;
stream.set_write_timeout(Some(self.timeout))?;
stream.set_nodelay(true)?;
lock.replace(stream);
}
Ok(lock)
}
}
use std::{
sync::{
atomic::{AtomicI8, Ordering},
Arc,
},
thread,
time::Duration,
};
use crate::{
hub::Hub,
supervisor::Supervisor,
thread_rt::{Builder, RTParams, Scheduling},
DataDeliveryPolicy, Error,
};
use parking_lot::Mutex;
pub mod prelude {
pub use super::{Context, Controller, Worker, WorkerOptions};
}
const SLEEP_SLEEP: Duration = Duration::from_millis(100);
/// Controller state beacon. Can be cloned and shared with no limitations.
#[derive(Clone)]
pub struct State {
state: Arc<AtomicI8>,
}
impl State {
pub fn new() -> Self {
Self {
state: AtomicI8::new(StateKind::Starting as i8).into(),
}
}
/// Set controller state
pub fn set(&self, state: StateKind) {
self.state.store(state as i8, Ordering::SeqCst);
}
/// Get controller state
pub fn get(&self) -> StateKind {
StateKind::from(self.state.load(Ordering::SeqCst))
}
/// Is the controller online (starting or running)
pub fn is_online(&self) -> bool {
self.get() >= StateKind::Starting
}
}
impl Default for State {
fn default() -> Self {
Self::new()
}
}
/// Controller state kind
#[derive(Default, Eq, PartialEq, Clone, Copy, Ord, PartialOrd)]
#[repr(i8)]
pub enum StateKind {
#[default]
Starting = 0,
Active = 1,
Running = 2,
Stopping = -1,
Stopped = -100,
Unknown = -128,
}
impl From<i8> for StateKind {
fn from(v: i8) -> Self {
match v {
0 => StateKind::Starting,
1 => StateKind::Active,
2 => StateKind::Running,
-100 => StateKind::Stopped,
_ => StateKind::Unknown,
}
}
}
/// Controller, used to manage workers and their context
///
/// Generic parameter `D` is the message type for the controller's [`Hub`] messages.
/// Generic parameter `V` is the type of shared variables. If shared variables are not required, it
/// can be set to `()`.
///
pub struct Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + 'static,
{
supervisor: Supervisor<()>,
hub: Hub<D>,
state: State,
variables: Arc<Mutex<V>>,
}
impl<D, V> Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + 'static,
{
/// Creates a new controller instance, variables MUST implement [`Default`] trait
pub fn new() -> Self
where
V: Default,
{
Self {
supervisor: <_>::default(),
hub: <_>::default(),
state: State::new(),
variables: <_>::default(),
}
}
/// Creates a new controller instance with a pre-defined variables object
pub fn new_with_variables(variables: V) -> Self
where
V: Default,
{
Self {
supervisor: <_>::default(),
hub: <_>::default(),
state: State::new(),
variables: Arc::new(Mutex::new(variables)),
}
}
/// Spawns a worker
pub fn spawn_worker<W: Worker<D, V> + WorkerOptions + 'static>(
&mut self,
mut worker: W,
) -> Result<(), Error> {
let context = self.context();
let mut rt_params = RTParams::new().set_scheduling(worker.worker_scheduling());
if let Some(priority) = worker.worker_priority() {
rt_params = rt_params.set_priority(priority);
}
if let Some(cpu_ids) = worker.worker_cpu_ids() {
rt_params = rt_params.set_cpu_ids(cpu_ids);
}
let mut builder = Builder::new()
.name(worker.worker_name())
.rt_params(rt_params);
if let Some(stack_size) = worker.worker_stack_size() {
builder = builder.stack_size(stack_size);
}
self.supervisor.spawn(builder, move || {
worker.run(&context);
})?;
Ok(())
}
/// Spawns a task thread (non-real-time)
pub fn spawn_task<F>(&mut self, name: &str, f: F) -> Result<(), Error>
where
F: FnOnce() + Send + 'static,
{
self.supervisor.spawn(Builder::new().name(name), f)?;
Ok(())
}
fn context(&self) -> Context<D, V> {
Context {
hub: self.hub.clone(),
state: self.state.clone(),
variables: self.variables.clone(),
}
}
/// Blocks until all tasks/workers are finished
pub fn block(&mut self) {
self.supervisor.join_all();
self.state.set(StateKind::Stopped);
}
/// Blocks until the controller goes into stopping/stopped
pub fn block_while_online(&self) {
while self.state.is_online() {
thread::sleep(SLEEP_SLEEP);
}
self.state.set(StateKind::Stopped);
}
/// Is the controller online (starting or running)
pub fn is_online(&self) {
self.state.is_online();
}
/// State beacon
pub fn state(&self) -> &State {
&self.state
}
/// Controller [`Hub`] instance
pub fn hub(&self) -> &Hub<D> {
&self.hub
}
/// Controller [`Supervisor`] instance
pub fn supervisor(&self) -> &Supervisor<()> {
&self.supervisor
}
/// Controller shared variables
pub fn variables(&self) -> &Arc<Mutex<V>> {
&self.variables
}
}
impl<D, V> Default for Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + 'static + Default,
{
fn default() -> Self {
Self::new()
}
}
/// The context type is used to give workers access to the controller's hub, state, and shared
/// variables.
pub struct Context<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send,
{
hub: Hub<D>,
state: State,
variables: Arc<Mutex<V>>,
}
impl<D, V> Context<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send,
{
/// Controller's hub instance
pub fn hub(&self) -> &Hub<D> {
&self.hub
}
/// Controller's shared variables (locked)
pub fn variables(&self) -> &Arc<Mutex<V>> {
&self.variables
}
/// Controller's state
pub fn get_state(&self) -> StateKind {
self.state.get()
}
/// Set controller's state
pub fn set_state(&self, state: StateKind) {
self.state.set(state);
}
/// Is the controller online (starting or running)
pub fn is_online(&self) -> bool {
self.state.is_online()
}
}
/// The trait which MUST be implemented by all workers
pub trait Worker<D: DataDeliveryPolicy + Clone + Send + Sync + 'static, V: Send>:
Send + Sync
{
/// The worker's main function, started by [`Controller::spawn_worker()`]
fn run(&mut self, context: &Context<D, V>);
}
/// The trait which MUST be implemented by all workers
pub trait WorkerOptions {
/// A mandatory method, an unique name for the worker
fn worker_name(&self) -> &str;
/// The stack size for the worker thread
fn worker_stack_size(&self) -> Option<usize> {
None
}
/// The [`Scheduling`] policy for the worker thread
fn worker_scheduling(&self) -> Scheduling {
Scheduling::default()
}
/// The scheduled priority for the worker thread
fn worker_priority(&self) -> Option<i32> {
None
}
/// The CPU ID(s) affinity for the worker thread
fn worker_cpu_ids(&self) -> Option<&[usize]> {
None
}
}
......@@ -5,8 +5,17 @@ use parking_lot::Mutex;
use crate::pchannel::{self, Receiver, Sender};
use crate::{DataDeliveryPolicy, Error, Result};
use self::prelude::DataChannel;
type ConditionFunction<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
pub mod prelude {
pub use super::Hub;
pub use crate::event_matches;
pub use crate::pchannel::DataChannel;
pub use crate::{DataDeliveryPolicy, DeliveryPolicy};
}
pub const DEFAULT_PRIORITY: usize = 100;
pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
......@@ -189,6 +198,44 @@ where
}
}
impl<T> DataChannel<T> for Hub<T>
where
T: DataDeliveryPolicy + Clone,
{
fn send(&self, message: T) -> Result<()> {
self.send(message);
Ok(())
}
fn recv(&self) -> Result<T> {
Err(Error::Unimplemented)
}
fn try_recv(&self) -> Result<T> {
Err(Error::Unimplemented)
}
fn try_send(&self, _message: T) -> Result<()> {
Err(Error::Unimplemented)
}
}
impl<T> DataChannel<T> for Client<T>
where
T: DataDeliveryPolicy + Clone,
{
fn send(&self, message: T) -> Result<()> {
self.send(message);
Ok(())
}
fn recv(&self) -> Result<T> {
self.recv()
}
fn try_recv(&self) -> Result<T> {
self.try_recv()
}
fn try_send(&self, _message: T) -> Result<()> {
Err(Error::Unimplemented)
}
}
pub struct Client<T: DataDeliveryPolicy + Clone> {
name: Arc<str>,
hub: Hub<T>,
......
pub use binrw;
use binrw::{BinRead, BinWrite};
use crate::Result;
pub mod modbus;
#[allow(clippy::module_name_repetitions)]
pub trait IoMapping {
type Options;
fn read<T>(&mut self) -> Result<T>
where
T: for<'a> BinRead<Args<'a> = ()>;
fn write<T>(&mut self, value: T) -> Result<()>
where
T: for<'a> BinWrite<Args<'a> = ()>;
}
pub mod prelude {
pub use super::IoMapping as _;
pub use binrw::prelude::*;
}
use std::io::Cursor;
use crate::comm::{Client, Protocol};
use crate::{Error, Result};
use binrw::{BinRead, BinWrite};
#[allow(clippy::module_name_repetitions)]
pub use regs::{Kind as ModbusRegisterKind, Register as ModbusRegister};
use rmodbus::guess_response_frame_len;
use rmodbus::{client::ModbusRequest as RModbusRequest, ModbusProto};
use super::IoMapping;
mod regs;
pub mod prelude {
pub use super::{ModbusMapping, ModbusMappingOptions, ModbusRegister, ModbusRegisterKind};
}
pub trait SwapModbusEndianess {
fn to_swapped_modbus_endianness(&self) -> Self;
}
impl SwapModbusEndianess for f32 {
fn to_swapped_modbus_endianness(&self) -> Self {
let b = self.to_be_bytes();
Self::from_be_bytes([b[2], b[3], b[0], b[1]])
}
}
impl SwapModbusEndianess for f64 {
fn to_swapped_modbus_endianness(&self) -> Self {
let b = self.to_be_bytes();
Self::from_be_bytes([b[6], b[7], b[4], b[5], b[2], b[3], b[0], b[1]])
}
}
impl From<Protocol> for ModbusProto {
fn from(value: Protocol) -> Self {
match value {
Protocol::Tcp => ModbusProto::TcpUdp,
Protocol::Serial => ModbusProto::Rtu,
}
}
}
#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct ModbusMappingOptions {
bulk_write: bool,
}
impl ModbusMappingOptions {
pub fn new() -> Self {
Self { bulk_write: true }
}
pub fn bulk_write(mut self, value: bool) -> Self {
self.bulk_write = value;
self
}
}
impl Default for ModbusMappingOptions {
fn default() -> Self {
Self { bulk_write: true }
}
}
#[allow(clippy::module_name_repetitions)]
pub struct ModbusMapping {
client: Client,
unit_id: u8,
register: ModbusRegister,
count: u16,
request_id: u16,
buf: Vec<u8>,
rest_buf: Vec<u8>,
data_buf: Vec<u8>,
options: ModbusMappingOptions,
}
impl ModbusMapping {
pub fn create<R>(client: &Client, unit_id: u8, register: R, count: u16) -> Result<Self>
where
R: TryInto<ModbusRegister>,
Error: From<<R as TryInto<ModbusRegister>>::Error>,
{
Ok(Self {
client: client.clone(),
unit_id,
register: register.try_into()?,
count,
request_id: 1,
// pre-allocate buffers
buf: Vec::with_capacity(256),
rest_buf: Vec::with_capacity(256),
data_buf: vec![],
options: <_>::default(),
})
}
pub fn with_options(mut self, options: ModbusMappingOptions) -> Self {
self.options = options;
self
}
}
macro_rules! prepare_transaction {
($self: expr) => {{
let mut mreq = RModbusRequest::new($self.unit_id, $self.client.protocol().into());
mreq.tr_id = $self.request_id;
$self.request_id += 1;
$self.buf.truncate(0);
mreq
}};
}
macro_rules! communicate {
($self: expr) => {
$self.client.write(&$self.buf)?;
let mut buf = [0u8; 6];
$self.client.read_exact(&mut buf)?;
$self.buf.truncate(0);
$self.buf.extend(buf);
let len = guess_response_frame_len(&buf, $self.client.protocol().into())?;
if len > 6 {
$self.rest_buf.resize(usize::from(len - 6), 0);
$self.client.read_exact(&mut $self.rest_buf)?;
$self.buf.extend(&$self.rest_buf);
}
};
}
impl IoMapping for ModbusMapping {
type Options = ModbusMappingOptions;
fn read<T>(&mut self) -> Result<T>
where
T: for<'a> BinRead<Args<'a> = ()>,
{
let _lock = self.client.lock();
let mut mreq = prepare_transaction!(self);
match self.register.kind {
ModbusRegisterKind::Coil => {
mreq.generate_get_coils(self.register.offset, self.count, &mut self.buf)?;
}
ModbusRegisterKind::Discrete => {
mreq.generate_get_discretes(self.register.offset, self.count, &mut self.buf)?;
}
ModbusRegisterKind::Input => {
mreq.generate_get_inputs(self.register.offset, self.count, &mut self.buf)?;
}
ModbusRegisterKind::Holding => {
mreq.generate_get_holdings(self.register.offset, self.count, &mut self.buf)?;
}
};
communicate!(self);
match self.register.kind {
ModbusRegisterKind::Coil | ModbusRegisterKind::Discrete => {
self.data_buf.truncate(0);
mreq.parse_bool_u8(&self.buf, &mut self.data_buf)?;
let mut reader = Cursor::new(&self.data_buf);
T::read_be(&mut reader).map_err(Into::into)
}
ModbusRegisterKind::Input | ModbusRegisterKind::Holding => {
let data = mreq.parse_slice(&self.buf)?;
if data.is_empty() {
return Err(Error::invalid_data("invalid modbus response"));
}
let mut reader = Cursor::new(&data[1..]);
T::read_be(&mut reader).map_err(Into::into)
}
}
}
fn write<T>(&mut self, value: T) -> Result<()>
where
T: for<'a> BinWrite<Args<'a> = ()>,
{
let _lock = self.client.lock();
let mut data_buf = Cursor::new(&mut self.data_buf);
value.write_be(&mut data_buf)?;
if self.options.bulk_write {
let mut mreq = prepare_transaction!(self);
match self.register.kind {
ModbusRegisterKind::Coil => {
mreq.generate_set_coils_bulk(
self.register.offset,
&self.data_buf,
&mut self.buf,
)?;
}
ModbusRegisterKind::Holding => {
mreq.generate_set_holdings_bulk_from_slice(
self.register.offset,
&self.data_buf,
&mut self.buf,
)?;
}
ModbusRegisterKind::Discrete | ModbusRegisterKind::Input => {
return Err(Error::IO(
"unsupported modbus register kind for writing".to_owned(),
));
}
}
communicate!(self);
mreq.parse_ok(&self.buf)?;
} else {
let mut i = 0;
for offset in self.register.offset..self.register.offset + self.count {
let mut mreq = prepare_transaction!(self);
match self.register.kind {
ModbusRegisterKind::Coil => {
mreq.generate_set_coil(
offset,
self.data_buf.get(i).copied().unwrap_or_default(),
&mut self.buf,
)?;
i += 1;
}
ModbusRegisterKind::Holding => {
let high = self.data_buf.get(i).copied().unwrap_or_default();
let low = self.data_buf.get(i + 1).copied().unwrap_or_default();
let value: u16 = u16::from(high) << 8 | u16::from(low);
mreq.generate_set_holding(offset, value, &mut self.buf)?;
i += 2;
}
ModbusRegisterKind::Discrete | ModbusRegisterKind::Input => {
return Err(Error::IO(
"unsupported modbus register kind for writing".to_owned(),
));
}
}
communicate!(self);
mreq.parse_ok(&self.buf)?;
}
}
Ok(())
}
}
use std::str::FromStr;
use crate::{Error, Result};
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum Kind {
Coil,
Discrete,
Input,
Holding,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct Register {
pub kind: Kind,
pub offset: u16,
}
impl Register {
pub fn new(kind: Kind, offset: u16) -> Self {
Self { kind, offset }
}
}
fn parse_kind_offset(r: &str) -> Result<(Kind, u16)> {
if let Some(v) = r.strip_prefix('c') {
Ok((Kind::Coil, v.parse()?))
} else if let Some(v) = r.strip_prefix('d') {
Ok((Kind::Discrete, v.parse()?))
} else if let Some(v) = r.strip_prefix('i') {
Ok((Kind::Input, v.parse()?))
} else if let Some(v) = r.strip_prefix('h') {
Ok((Kind::Holding, v.parse()?))
} else {
Err(Error::invalid_data(format!("invalid register kind: {}", r)))
}
}
impl FromStr for Register {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
let (kind, offset) = parse_kind_offset(s)?;
Ok(Register { kind, offset })
}
}
impl TryFrom<&str> for Register {
type Error = Error;
fn try_from(s: &str) -> Result<Self> {
s.parse()
}
}
#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
use std::{mem, sync::Arc, time::Duration};
use core::{fmt, num};
use std::{mem, str::FromStr, sync::Arc, time::Duration};
use thread_rt::{RTParams, Scheduling};
pub use roboplc_derive::DataPolicy;
/// Event buffers
pub mod buf;
/// Reliable TCP/Serial communications
pub mod comm;
/// Controller and workers
pub mod controller;
/// In-process data communication pub/sub hub, synchronous edition
pub mod hub;
/// In-process data communication pub/sub hub, asynchronous edition
pub mod hub_async;
/// I/O
pub mod io;
/// Policy-based channels, synchronous edition
pub mod pchannel;
/// Policy-based channels, asynchronous edition
......@@ -60,6 +69,14 @@ pub enum Error {
SupervisorDuplicateTask,
#[error("Task not found")]
SupervisorTaskNotFound,
#[error("Invalid data")]
InvalidData(String),
#[error("binrw {0}")]
BinRw(String),
#[error("not implemented")]
Unimplemented,
#[error("never happens")]
Infallible(#[from] std::convert::Infallible),
}
macro_rules! impl_error {
......@@ -73,12 +90,19 @@ macro_rules! impl_error {
}
impl_error!(std::io::Error, IO);
impl_error!(rmodbus::ErrorKind, IO);
impl_error!(oneshot::RecvError, IO);
impl_error!(num::ParseIntError, InvalidData);
impl_error!(num::ParseFloatError, InvalidData);
impl_error!(binrw::Error, BinRw);
impl Error {
pub fn is_skipped(&self) -> bool {
pub fn is_data_skipped(&self) -> bool {
matches!(self, Error::ChannelSkipped)
}
pub fn invalid_data<S: fmt::Display>(msg: S) -> Self {
Error::InvalidData(msg.to_string())
}
}
/// Data delivery policies, used by [`hub::Hub`], [`pchannel::Receiver`] and [`pdeque::Deque`]
......@@ -95,6 +119,35 @@ pub enum DeliveryPolicy {
SingleOptional,
}
impl FromStr for DeliveryPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"always" => Ok(DeliveryPolicy::Always),
"optional" => Ok(DeliveryPolicy::Optional),
"single" => Ok(DeliveryPolicy::Single),
"single-optional" => Ok(DeliveryPolicy::SingleOptional),
_ => Err(Error::invalid_data(s)),
}
}
}
impl fmt::Display for DeliveryPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
DeliveryPolicy::Always => "always",
DeliveryPolicy::Optional => "optional",
DeliveryPolicy::Single => "single",
DeliveryPolicy::SingleOptional => "single-optional",
}
)
}
}
/// Implements delivery policies for own data types
pub trait DataDeliveryPolicy
where
......@@ -106,7 +159,7 @@ where
}
/// Priority, for ordered
fn priority(&self) -> usize {
0
100
}
/// Has equal kind with other
///
......@@ -152,3 +205,16 @@ pub fn suicide(delay: Duration, warn: bool) {
});
};
}
pub mod prelude {
pub use super::suicide;
pub use crate::controller::*;
pub use crate::hub::prelude::*;
pub use crate::io::prelude::*;
pub use crate::supervisor::prelude::*;
pub use crate::time::DurationRT;
pub use crate::ttlcell::TtlCell;
pub use bma_ts::{Monotonic, Timestamp};
pub use roboplc_derive::DataPolicy;
pub use std::time::Duration;
}
......@@ -4,6 +4,59 @@ use crate::{pdeque::Deque, DataDeliveryPolicy, Error, Result};
use object_id::UniqueId;
use parking_lot::{Condvar, Mutex};
/// An abstract trait for data channels and hubs
pub trait DataChannel<T: DataDeliveryPolicy> {
fn send(&self, value: T) -> Result<()>;
fn try_send(&self, value: T) -> Result<()>;
fn recv(&self) -> Result<T>;
fn try_recv(&self) -> Result<T>;
fn is_alive(&self) -> bool {
true
}
}
impl<T> DataChannel<T> for Sender<T>
where
T: DataDeliveryPolicy,
{
fn send(&self, value: T) -> Result<()> {
self.send(value)
}
fn try_send(&self, value: T) -> Result<()> {
self.try_send(value)
}
fn try_recv(&self) -> Result<T> {
Err(Error::Unimplemented)
}
fn recv(&self) -> Result<T> {
Err(Error::Unimplemented)
}
fn is_alive(&self) -> bool {
self.is_alive()
}
}
impl<T> DataChannel<T> for Receiver<T>
where
T: DataDeliveryPolicy,
{
fn send(&self, _value: T) -> Result<()> {
Err(Error::Unimplemented)
}
fn try_send(&self, _value: T) -> Result<()> {
Err(Error::Unimplemented)
}
fn try_recv(&self) -> Result<T> {
self.try_recv()
}
fn recv(&self) -> Result<T> {
self.recv()
}
fn is_alive(&self) -> bool {
self.is_alive()
}
}
struct Channel<T: DataDeliveryPolicy>(Arc<ChannelInner<T>>);
impl<T: DataDeliveryPolicy> Channel<T> {
......@@ -319,7 +372,7 @@ mod test {
for _ in 0..10 {
tx.send(Message::Test(123)).unwrap();
if let Err(e) = tx.send(Message::Spam) {
assert!(e.is_skipped(), "{}", e);
assert!(e.is_data_skipped(), "{}", e);
}
tx.send(Message::Temperature(123.0)).unwrap();
}
......@@ -343,7 +396,7 @@ mod test {
for _ in 0..10 {
tx.send(Message::Test(123)).unwrap();
if let Err(e) = tx.send(Message::Spam) {
assert!(e.is_skipped(), "{}", e);
assert!(e.is_data_skipped(), "{}", e);
}
tx.send(Message::Temperature(123.0)).unwrap();
}
......
......@@ -484,7 +484,7 @@ mod test {
for _ in 0..10 {
tx.send(Message::Test(123)).await.unwrap();
if let Err(e) = tx.send(Message::Spam).await {
assert!(e.is_skipped(), "{}", e);
assert!(e.is_data_skipped(), "{}", e);
}
tx.send(Message::Temperature(123.0)).await.unwrap();
}
......@@ -508,7 +508,7 @@ mod test {
for _ in 0..10 {
tx.send(Message::Test(123)).await.unwrap();
if let Err(e) = tx.send(Message::Spam).await {
assert!(e.is_skipped(), "{}", e);
assert!(e.is_data_skipped(), "{}", e);
}
tx.send(Message::Temperature(123.0)).await.unwrap();
}
......
......@@ -3,10 +3,15 @@ use std::{mem, thread};
use serde::Serialize;
use crate::thread_rt::{Builder, Task};
use crate::thread_rt::{Builder, ScopedTask, Task};
use crate::time::Interval;
use crate::{Error, Result};
pub mod prelude {
pub use super::Supervisor;
pub use crate::thread_rt::{Builder, Scheduling};
}
/// A supervisor object used to manage tasks spawned with [`Builder`]
#[derive(Serialize)]
pub struct Supervisor<T> {
......@@ -21,6 +26,18 @@ impl<T> Default for Supervisor<T> {
}
}
macro_rules! vacant_entry {
($self:ident, $builder:ident) => {{
let Some(name) = $builder.name.clone() else {
return Err(Error::SupervisorNameNotSpecified);
};
let btree_map::Entry::Vacant(entry) = $self.tasks.entry(name) else {
return Err(Error::SupervisorDuplicateTask);
};
entry
}};
}
impl<T> Supervisor<T> {
pub fn new() -> Self {
Self::default()
......@@ -34,7 +51,7 @@ impl<T> Supervisor<T> {
T: Send + 'static,
{
let builder = builder.into();
let entry = self.vacant_entry(&builder)?;
let entry = vacant_entry!(self, builder);
let task = builder.spawn(f)?;
Ok(entry.insert(task))
}
......@@ -47,7 +64,7 @@ impl<T> Supervisor<T> {
B: Into<Builder>,
{
let builder = builder.into();
let entry = self.vacant_entry(&builder)?;
let entry = vacant_entry!(self, builder);
let task = builder.spawn_periodic(f, interval)?;
Ok(entry.insert(task))
}
......@@ -84,16 +101,85 @@ impl<T> Supervisor<T> {
}
result
}
fn vacant_entry(
}
#[allow(clippy::module_name_repetitions)]
#[derive(Serialize)]
pub struct ScopedSupervisor<'a, 'env: 'a, T> {
tasks: BTreeMap<String, ScopedTask<'a, T>>,
#[serde(skip_serializing)]
scope: &'a thread::Scope<'a, 'env>,
}
impl<'a, 'env, T> ScopedSupervisor<'a, 'env, T> {
pub fn new(scope: &'a thread::Scope<'a, 'env>) -> Self {
Self {
tasks: <_>::default(),
scope,
}
}
/// Spawns a new task using a [`Builder`] object and registers it. The task name MUST be unique
/// and SHOULD be 15 characters or less to set a proper thread name
pub fn spawn<F, B>(&mut self, builder: B, f: F) -> Result<&ScopedTask<T>>
where
B: Into<Builder>,
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
let builder = builder.into();
let entry = vacant_entry!(self, builder);
let task = builder.spawn_scoped(self.scope, f)?;
Ok(entry.insert(task))
}
/// Spawns a new periodic task using a [`Builder`] object and registers it. The task name MUST
/// be unique and SHOULD be 15 characters or less to set a proper thread name
pub fn spawn_periodic<F, B>(
&mut self,
builder: &Builder,
) -> Result<btree_map::VacantEntry<String, Task<T>>> {
let Some(name) = builder.name.clone() else {
return Err(Error::SupervisorNameNotSpecified);
};
let btree_map::Entry::Vacant(entry) = self.tasks.entry(name) else {
return Err(Error::SupervisorDuplicateTask);
};
Ok(entry)
builder: B,
f: F,
interval: Interval,
) -> Result<&ScopedTask<T>>
where
F: Fn() -> T + Send + 'a,
T: Send + 'a,
B: Into<Builder>,
{
let builder = builder.into();
let entry = vacant_entry!(self, builder);
let task = builder.spawn_scoped_periodic(self.scope, f, interval)?;
Ok(entry.insert(task))
}
/// Gets a task by its name
pub fn get_task(&self, name: &str) -> Option<&ScopedTask<T>> {
self.tasks.get(name)
}
/// Gets a task by its name as a mutable object
pub fn get_task_mut(&mut self, name: &str) -> Option<&mut ScopedTask<'a, T>> {
self.tasks.get_mut(name)
}
/// Takes a task by its name and removes it from the internal registry
pub fn take_task(&mut self, name: &str) -> Option<ScopedTask<T>> {
self.tasks.remove(name)
}
/// Removes a task from the internal registry
pub fn forget_task(&mut self, name: &str) -> Result<()> {
if self.tasks.remove(name).is_some() {
Ok(())
} else {
Err(Error::SupervisorTaskNotFound)
}
}
/// Removes all finished tasks from the internal registry
pub fn purge(&mut self) {
self.tasks.retain(|_, task| !task.is_finished());
}
/// Joins all tasks in the internal registry and returns a map with their results. After the
/// operation the registry is cleared
pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> {
let mut result = BTreeMap::new();
for (name, task) in mem::take(&mut self.tasks) {
result.insert(name, task.join());
}
result
}
}
......@@ -96,20 +96,28 @@ impl Builder {
self.rt_params = rt_params;
self
}
fn into_thread_builder_name_and_params(self) -> (thread::Builder, String, RTParams, bool) {
fn try_into_thread_builder_name_and_params(
self,
) -> Result<(thread::Builder, String, RTParams, bool)> {
let mut builder = thread::Builder::new();
if let Some(ref name) = self.name {
if name.len() > 15 {
return Err(Error::invalid_data(format!(
"Thread name '{}' is too long (max 15 characters)",
name
)));
}
builder = builder.name(name.to_owned());
}
if let Some(stack_size) = self.stack_size {
builder = builder.stack_size(stack_size);
}
(
Ok((
builder,
self.name.unwrap_or_default(),
self.rt_params,
self.park_on_errors,
)
))
}
/// Spawns a task
///
......@@ -122,7 +130,8 @@ impl Builder {
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (builder, name, rt_params, park_on_errors) = self.into_thread_builder_name_and_params();
let (builder, name, rt_params, park_on_errors) =
self.try_into_thread_builder_name_and_params()?;
let (tx, rx) = oneshot::channel();
let handle = builder.spawn(move || {
thread_init_internal(tx, park_on_errors);
......@@ -171,7 +180,8 @@ impl Builder {
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
{
let (builder, name, rt_params, park_on_errors) = self.into_thread_builder_name_and_params();
let (builder, name, rt_params, park_on_errors) =
self.try_into_thread_builder_name_and_params()?;
let (tx, rx) = oneshot::channel();
let handle = builder.spawn_scoped(scope, move || {
thread_init_internal(tx, park_on_errors);
......@@ -347,6 +357,12 @@ impl RTParams {
/// Sets thread scheduling policy (can be used as build pattern)
pub fn set_scheduling(mut self, scheduling: Scheduling) -> Self {
self.scheduling = scheduling;
if scheduling == Scheduling::FIFO
|| scheduling == Scheduling::RoundRobin
|| scheduling == Scheduling::DeadLine && self.priority.is_none()
{
self.priority = Some(1);
}
self
}
/// Sets thread priority (can be used as build pattern)
......
......@@ -21,6 +21,10 @@ impl DurationRT for Duration {
}
}
pub fn interval(period: Duration) -> Interval {
Interval::new(period)
}
/// A synchronous interval helper, similar to
/// <https://docs.rs/tokio/latest/tokio/time/struct.Interval.html>
pub struct Interval {
......@@ -29,6 +33,14 @@ pub struct Interval {
missing_tick_behavior: MissedTickBehavior,
}
impl Iterator for Interval {
type Item = bool;
fn next(&mut self) -> Option<bool> {
Some(self.tick())
}
}
impl Interval {
pub fn new(period: Duration) -> Self {
Self {
......
use bma_ts::Monotonic;
use core::fmt;
use std::{ops::Deref, time::Duration};
/// A memory cell with an expiring value with API similar to the standard [`Option`]
/// A memory ce;ll with an expiring value with API similar to the standard [`Option`]
pub struct TtlCell<T> {
value: Option<T>,
ttl: Duration,
set_at: Monotonic,
}
impl<T> fmt::Debug for TtlCell<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.as_ref() {
Some(v) => write!(f, "Some({:?})", v),
None => write!(f, "None"),
}
}
}
impl<T> PartialEq for TtlCell<T>
where
T: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()
}
}
impl<T> Eq for TtlCell<T> where T: Eq {}
impl<T> Clone for TtlCell<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
ttl: self.ttl,
set_at: self.set_at,
}
}
}
impl<T> TtlCell<T> {
/// Creates a new empty cell
#[inline]
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论