提交 9c8c0cee authored 作者: Serhij S's avatar Serhij S

Modbus write access control, locking, gateway example

上级 7a7a8a3f
......@@ -2,6 +2,12 @@
## RoboPLC
### 0.2.0 (2024-05-09)
* Re-exported locking primitives are re-exported as `locking`
* Modbus server write access control
### 0.1.49 (2024-05-07)
* Added subprocess pipe I/O
......
[package]
name = "roboplc"
version = "0.1.50"
version = "0.2.0"
edition = "2021"
authors = ["Serhij S. <div@altertech.com>"]
license = "Apache-2.0"
......@@ -43,6 +43,7 @@ log = "0.4.21"
metrics-exporter-prometheus = { version = "0.14.0", optional = true, default-features = false, features = ["http-listener"] }
metrics = { version = "0.22.3", optional = true }
parking_lot_rt = "0.12.1"
snmp = { version = "0.2.2", optional = true }
[features]
eapi = ["eva-common", "eva-sdk", "busrt", "tokio", "hostname"]
......@@ -51,6 +52,7 @@ modbus = ["rmodbus"]
openssl-vendored = ["busrt/openssl-vendored", "eva-common/openssl-vendored"]
metrics = ["dep:metrics", "metrics-exporter-prometheus"]
full = ["eapi", "modbus", "metrics", "pipe"]
#default = ["modbus"]
[dev-dependencies]
insta = "1.36.1"
......@@ -89,3 +91,8 @@ required-features = ["pipe"]
name = "eapi"
path = "examples/eapi.rs"
required-features = ["eapi"]
[[example]]
name = "snmp-modbus"
path = "examples/snmp-modbus.rs"
required-features = ["modbus", "snmp"]
/// An example of SNMP->Modbus TCP gateway for a 16-port relay board with SNMP-enabled control.
///
/// The program reads the current state of the relay board to coils 0-15 of the Modbus context
/// storage available as unit 1. If the coils are modified by a Modbus client, the program writes
/// the new state to the relay board. State changes are not written unless modified.
///
/// The discrete register 0 displays the relay board state. (0 - unavailable, 1 - ok)
use std::ops::Range;
use roboplc::controller::prelude::*;
use roboplc::io::modbus::{prelude::*, ModbusServerWritePermission};
use roboplc::locking::Mutex;
use roboplc::prelude::*;
use roboplc::time::interval;
use tracing::{error, warn};
const MODBUS_TIMEOUT: Duration = Duration::from_secs(1);
const MODBUS_LISTEN: &str = "0.0.0.0:5502";
const MODBUS_UNIT: u8 = 1;
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
const SNMP_TIMEOUT: Duration = Duration::from_millis(400);
const RELAY_ADDR: &str = "10.210.110.26:161";
const RELAY_COMMUNITY: &[u8] = b"private";
/// Relay board lock, prevents concurrent access to Modbus coils 0-15
static RELAY_MODBUS_CONTEXT_LOCK: Mutex<()> = Mutex::new(());
type ModbusServerMapping = roboplc::io::modbus::ModbusServerMapping<16, 1, 0, 0>;
type ModbusServer = roboplc::io::modbus::ModbusServer<16, 1, 0, 0>;
/// A 16-port relay state
#[derive(Default, Clone)]
#[binrw]
struct Relays16 {
ports: [u8; 16],
}
type Message = ();
type Variables = ();
#[derive(WorkerOpts)]
#[worker_opts(cpu = 2, priority = 50, scheduling = "fifo", blocking = true)]
struct Relay {
port_mapping: ModbusServerMapping,
state_mapping: ModbusServerMapping,
}
impl Worker<Message, Variables> for Relay {
fn run(&mut self, _context: &Context<Message, Variables>) -> WResult {
let mut first_run = true;
let mut sess = snmp::SyncSession::new(RELAY_ADDR, RELAY_COMMUNITY, Some(SNMP_TIMEOUT), 0)?;
let relay_oid = &[1, 3, 6, 1, 4, 1, 42505, 6, 2, 3, 1, 3];
let mut prev_relay_state = Relays16::default();
let mut relay_down = false;
for int_state in interval(Duration::from_millis(500)) {
if !int_state {
warn!("Relay worker loop timeout");
}
let _lock = RELAY_MODBUS_CONTEXT_LOCK.lock();
let mut relays: Relays16 = self.port_mapping.read().unwrap_or_default();
if first_run {
// we do not have a previous state yet, so do not process any changes
first_run = false;
} else {
// write changes to the relay board in case if Modbus context storage coils are
// changed
for (i, (prev, current)) in prev_relay_state
.ports
.iter()
.zip(relays.ports.iter())
.enumerate()
{
if prev != current {
let port_oid = &[
1,
3,
6,
1,
4,
1,
42505,
6,
2,
3,
1,
3,
u32::try_from(i).unwrap(),
];
let value = snmp::Value::Integer((*current).into());
match sess.set(&[(port_oid, value)]) {
Ok(res) => {
if res.error_status != snmp::snmp::ERRSTATUS_NOERROR {
error!(status = res.error_status, "Relay SNMP set error");
}
}
Err(error) => {
error!(?error, "Relay SNMP set error");
}
}
}
}
}
// read the current relay board state
match sess.getbulk(&[relay_oid], 0, 16) {
Ok(response) => {
for (name, val) in response.varbinds {
let snmp::Value::Integer(value) = val else {
continue;
};
let Ok(value) = u8::try_from(value) else {
continue;
};
let Some(port) = name.raw().last() else {
continue;
};
if usize::from(*port) >= relays.ports.len() {
continue;
}
relays.ports[usize::from(*port)] = value;
}
// save the current relay board state
prev_relay_state = relays.clone();
// write the current relay board state to the Modbus context storage
self.port_mapping.write(relays)?;
if relay_down {
self.state_mapping.write(1u8)?;
tracing::info!("Relay back online");
relay_down = false;
}
}
Err(error) => {
if !relay_down {
self.state_mapping.write(0u8)?;
error!(?error, "Relay down");
relay_down = true;
}
}
}
}
Ok(())
}
}
#[derive(WorkerOpts)]
#[worker_opts(cpu = 3, priority = 50, scheduling = "fifo", blocking = true)]
struct ModbusSrv {
server: ModbusServer,
}
impl Worker<Message, Variables> for ModbusSrv {
fn run(&mut self, _context: &Context<Message, Variables>) -> WResult {
self.server.serve()?;
Ok(())
}
}
fn relay_modbus_write_allow(
kind: ModbusRegisterKind,
range: Range<u16>,
) -> ModbusServerWritePermission {
if kind == ModbusRegisterKind::Coil && range.end < 16 {
ModbusServerWritePermission::AllowLock(RELAY_MODBUS_CONTEXT_LOCK.lock())
} else {
ModbusServerWritePermission::Allow
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
roboplc::setup_panic();
roboplc::configure_logger(roboplc::LevelFilter::Info);
if !roboplc::is_production() {
roboplc::thread_rt::set_simulated();
}
roboplc::thread_rt::prealloc_heap(10_000_000)?;
let mut server = ModbusServer::bind(
roboplc::comm::Protocol::Tcp,
MODBUS_UNIT,
MODBUS_LISTEN,
MODBUS_TIMEOUT,
1,
)?;
server.set_allow_external_write_fn(relay_modbus_write_allow);
let port_mapping = server.mapping("c@0".parse()?, 16);
let mut state_mapping = server.mapping("d@0".parse()?, 1);
state_mapping.write(1u8)?;
let mut controller = Controller::<Message, Variables>::new();
controller.spawn_worker(ModbusSrv { server })?;
controller.spawn_worker(Relay {
port_mapping,
state_mapping,
})?;
controller.register_signals(SHUTDOWN_TIMEOUT)?;
controller.block();
Ok(())
}
......@@ -14,7 +14,10 @@ pub use regs::{Kind as ModbusRegisterKind, Register as ModbusRegister};
use rmodbus::guess_response_frame_len;
use rmodbus::{client::ModbusRequest as RModbusRequest, ModbusProto};
#[allow(clippy::module_name_repetitions)]
pub use server::{ModbusServer, ModbusServerMapping};
pub use server::{
AllowFn as ModbusServerAllowFn, ModbusServer, ModbusServerMapping,
WritePermission as ModbusServerWritePermission,
};
use super::IoMapping;
......
......@@ -5,7 +5,7 @@ use crate::{
Error, Result,
};
use binrw::{BinRead, BinWrite};
use parking_lot_rt::Mutex;
use parking_lot_rt::{Mutex, MutexGuard};
use rmodbus::{
server::{context::ModbusContext, storage::ModbusStorage, ModbusFrame},
ModbusFrameBuf, ModbusProto,
......@@ -38,6 +38,7 @@ fn handle_client<
unit: u8,
storage: Arc<Mutex<ModbusStorage<C, D, I, H>>>,
modbus_proto: ModbusProto,
allow_write: &AllowFn,
) -> Result<()> {
let mut buf: ModbusFrameBuf = [0; 256];
let mut response = Vec::with_capacity(256);
......@@ -52,9 +53,30 @@ fn handle_client<
if frame.readonly {
frame.process_read(&*storage.lock()).map_err(Error::io)?;
} else {
frame
.process_write(&mut *storage.lock())
.map_err(Error::io)?;
let (process, _guard) = if let Some(changes) = frame.changes() {
let (kind, range) = match changes {
rmodbus::server::Changes::Coils { reg, count } => {
(ModbusRegisterKind::Coil, reg..reg + count)
}
rmodbus::server::Changes::Holdings { reg, count } => {
(ModbusRegisterKind::Holding, reg..reg + count)
}
};
match allow_write(kind, range) {
WritePermission::Allow => (true, None),
WritePermission::AllowLock(guard) => (true, Some(guard)),
WritePermission::Deny => (false, None),
}
} else {
(true, None)
};
if process {
frame
.process_write(&mut *storage.lock())
.map_err(Error::io)?;
} else {
frame.set_modbus_error_if_unset(&rmodbus::ErrorKind::NegativeAcknowledge)?;
}
}
}
if frame.response_required {
......@@ -65,6 +87,33 @@ fn handle_client<
Ok(())
}
pub type AllowFn = fn(ModbusRegisterKind, std::ops::Range<u16>) -> WritePermission;
pub enum WritePermission {
/// Write is allowed.
Allow,
/// Write is allowed with a lock, the lock is released after the write operation.
AllowLock(MutexGuard<'static, ()>),
/// Write is forbidden.
Deny,
}
impl From<bool> for WritePermission {
fn from(value: bool) -> Self {
if value {
Self::Allow
} else {
Self::Deny
}
}
}
impl From<MutexGuard<'static, ()>> for WritePermission {
fn from(guard: MutexGuard<'static, ()>) -> Self {
Self::AllowLock(guard)
}
}
/// Modbus server. Requires to be run in a separate thread manually.
#[allow(clippy::module_name_repetitions)]
pub struct ModbusServer<const C: usize, const D: usize, const I: usize, const H: usize> {
......@@ -73,6 +122,7 @@ pub struct ModbusServer<const C: usize, const D: usize, const I: usize, const H:
server: Server,
timeout: Duration,
semaphore: Semaphore,
allow_external_write_fn: Arc<AllowFn>,
}
impl<const C: usize, const D: usize, const I: usize, const H: usize> ModbusServer<C, D, I, H> {
pub fn bind(
......@@ -92,8 +142,15 @@ impl<const C: usize, const D: usize, const I: usize, const H: usize> ModbusServe
server,
timeout,
semaphore: Semaphore::new(max_workers),
allow_external_write_fn: Arc::new(|_, _| WritePermission::Allow),
})
}
/// Set a function which checks if an external client write operation is allowed.
/// The function allows to block a client until a certain storage context range is processed by
/// an internal task.
pub fn set_allow_external_write_fn(&mut self, f: AllowFn) {
self.allow_external_write_fn = f.into();
}
pub fn mapping(&self, register: ModbusRegister, count: u16) -> ModbusServerMapping<C, D, I, H> {
let buf_capacity = match register.kind {
ModbusRegisterKind::Coil | ModbusRegisterKind::Discrete => usize::from(count),
......@@ -121,17 +178,24 @@ impl<const C: usize, const D: usize, const I: usize, const H: usize> ModbusServe
continue;
}
let storage = self.storage.clone();
let allow_write = self.allow_external_write_fn.clone();
thread::spawn(move || {
let _permission = permission;
if let Err(error) = handle_client(stream, unit, storage, ModbusProto::TcpUdp) {
if let Err(error) =
handle_client(stream, unit, storage, ModbusProto::TcpUdp, &allow_write)
{
error!(%addr, %error, "error handling Modbus client");
}
});
},
Server::Serial(ref mut serial) => loop {
if let Err(e) =
handle_client(&mut *serial, unit, self.storage.clone(), ModbusProto::Rtu)
{
if let Err(e) = handle_client(
&mut *serial,
unit,
self.storage.clone(),
ModbusProto::Rtu,
&self.allow_external_write_fn,
) {
error!(%e, "error handling Modbus client");
}
},
......
......@@ -11,7 +11,7 @@ use thread_rt::{RTParams, Scheduling};
pub use log::LevelFilter;
pub use roboplc_derive::DataPolicy;
pub use parking_lot_rt::{Condvar, Mutex, Once, RwLock};
pub use parking_lot_rt as locking;
#[cfg(feature = "metrics")]
pub use metrics;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论