提交 7f0c73c2 authored 作者: Serhij S's avatar Serhij S

raw udp, WResult

上级 6c65eb25
......@@ -2,7 +2,7 @@ name: CI
on:
push:
branches: [ "main" ]
branches: [ "*" ]
pull_request:
env:
......
......@@ -32,6 +32,7 @@ serde = { version = "1.0.197", features = ["derive", "rc"] }
serial = "0.4.0"
sysinfo = "0.30.6"
thiserror = "1.0.57"
tracing = "0.1.40"
[dev-dependencies]
env_logger = "0.11.3"
......@@ -44,3 +45,6 @@ tracing = { version = "0.1.40", features = ["log"] }
name = "plc-modbus"
path = "examples/plc-modbus.rs"
[[example]]
name = "raw-udp"
path = "examples/raw-udp.rs"
......@@ -59,13 +59,13 @@ impl ModbusPuller1 {
// A worker implementation, contains a single function to run which has got access to the
// controller context
impl Worker<Message, Variables> for ModbusPuller1 {
fn run(&mut self, context: &Context<Message, Variables>) {
fn run(&mut self, context: &Context<Message, Variables>) -> WResult {
// this worker does not need to be subscribed to any events
let hub = context.hub();
for _ in interval(Duration::from_millis(500)) {
match self.sensor_mapping.read::<EnvironmentSensors>() {
Ok(v) => {
context.variables().lock().temperature = v.temperature;
context.variables().write().temperature = v.temperature;
hub.send(Message::EnvSensorData(TtlCell::new_with_value(
ENV_DATA_TTL,
v,
......@@ -76,6 +76,7 @@ impl Worker<Message, Variables> for ModbusPuller1 {
}
}
}
Ok(())
}
}
......@@ -94,7 +95,7 @@ impl ModbusRelays1 {
}
impl Worker<Message, Variables> for ModbusRelays1 {
fn run(&mut self, context: &Context<Message, Variables>) {
fn run(&mut self, context: &Context<Message, Variables>) -> WResult {
// this worker needs to be subscribed to EnvSensorData kind of events
let hc = context
.hub()
......@@ -132,6 +133,7 @@ impl Worker<Message, Variables> for ModbusRelays1 {
}
}
}
Ok(())
}
}
......
use roboplc::io::raw_udp::{UdpInput, UdpOutput};
use roboplc::prelude::*;
use roboplc::time::interval;
use tracing::{error, info};
#[derive(DataPolicy, Clone)]
enum Message {
Env(EnvData),
}
// A raw UDP structure, to be sent and received
//
// Raw UDP structures are used by various software, e.g. Matlab, LabView, etc. as well as by some
// fieldbus devices
#[derive(Debug, Clone)]
#[binrw]
#[brw(little)]
struct EnvData {
temp: f64,
hum: f64,
pressure: f64,
}
// A worker to collect data from incoming UDP packets
#[derive(WorkerOpts)]
#[worker_opts(name = "udp_in")]
struct UdpIn {}
impl Worker<Message, ()> for UdpIn {
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let server = UdpInput::<EnvData>::bind("127.0.0.1:25000", 24)?;
// [`UdpInput`] is an iterator of incoming UDP packets which are automatically parsed
for data in server {
match data {
Ok(data) => {
context.hub().send(Message::Env(data));
}
Err(e) => {
error!(worker=self.worker_name(), error=%e, "udp in error");
}
}
}
Ok(())
}
}
// A worker to send data to a remote UDP server
// (in this example data is just sent to UDP input worker)
#[derive(WorkerOpts)]
#[worker_opts(name = "udp_out")]
struct UdpOut {}
impl Worker<Message, ()> for UdpOut {
fn run(&mut self, _context: &Context<Message, ()>) -> WResult {
let mut client = UdpOutput::connect("localhost:25000")?;
for _ in interval(Duration::from_secs(1)) {
let data = EnvData {
temp: 25.0,
hum: 50.0,
pressure: 1000.0,
};
if let Err(e) = client.send(data) {
error!(worker=self.worker_name(), error=%e, "udp send error");
}
}
Ok(())
}
}
// A worker to print data, received by the `UdpIn` worker
#[derive(WorkerOpts)]
#[worker_opts(name = "printEnv")]
struct PrintEnv {}
impl Worker<Message, ()> for PrintEnv {
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let hc = context
.hub()
.register(self.worker_name(), event_matches!(Message::Env(_)))?;
for msg in hc {
let Message::Env(data) = msg;
info!(worker = self.worker_name(), data=?data);
}
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// sets the simulated mode for the real-time module, do not set any thread real-time parameters
roboplc::thread_rt::set_simulated();
// initializes a debug logger
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
// creates a controller instance
let mut controller = Controller::<Message, ()>::new();
// spawns workers
controller.spawn_worker(UdpIn {})?;
controller.spawn_worker(PrintEnv {})?;
controller.spawn_worker(UdpOut {})?;
// block the main thread until the controller is in the online state
controller.block_while_online();
Ok(())
}
......@@ -13,14 +13,17 @@ use crate::{
thread_rt::{Builder, RTParams, Scheduling},
DataDeliveryPolicy, Error,
};
use parking_lot::Mutex;
use parking_lot::RwLock;
pub use roboplc_derive::WorkerOpts;
use tracing::error;
pub mod prelude {
pub use super::{Context, Controller, Worker, WorkerOptions};
pub use super::{Context, Controller, WResult, Worker, WorkerOptions};
pub use roboplc_derive::WorkerOpts;
}
pub type WResult = Result<(), Box<dyn std::error::Error>>;
const SLEEP_SLEEP: Duration = Duration::from_millis(100);
/// Controller state beacon. Can be cloned and shared with no limitations.
......@@ -89,18 +92,18 @@ impl From<i8> for StateKind {
pub struct Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + 'static,
V: Send + Sync + 'static,
{
supervisor: Supervisor<()>,
hub: Hub<D>,
state: State,
variables: Arc<Mutex<V>>,
variables: Arc<RwLock<V>>,
}
impl<D, V> Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + 'static,
V: Send + Sync + 'static,
{
/// Creates a new controller instance, variables MUST implement [`Default`] trait
pub fn new() -> Self
......@@ -123,7 +126,7 @@ where
supervisor: <_>::default(),
hub: <_>::default(),
state: State::new(),
variables: Arc::new(Mutex::new(variables)),
variables: Arc::new(RwLock::new(variables)),
}
}
/// Spawns a worker
......@@ -146,7 +149,9 @@ where
builder = builder.stack_size(stack_size);
}
self.supervisor.spawn(builder, move || {
worker.run(&context);
if let Err(e) = worker.run(&context) {
error!(worker=worker.worker_name(), error=%e, "worker terminated");
}
})?;
Ok(())
}
......@@ -194,7 +199,7 @@ where
&self.supervisor
}
/// Controller shared variables
pub fn variables(&self) -> &Arc<Mutex<V>> {
pub fn variables(&self) -> &Arc<RwLock<V>> {
&self.variables
}
}
......@@ -202,7 +207,7 @@ where
impl<D, V> Default for Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + 'static + Default,
V: Send + Sync + 'static + Default,
{
fn default() -> Self {
Self::new()
......@@ -218,7 +223,7 @@ where
{
hub: Hub<D>,
state: State,
variables: Arc<Mutex<V>>,
variables: Arc<RwLock<V>>,
}
impl<D, V> Context<D, V>
......@@ -231,7 +236,7 @@ where
&self.hub
}
/// Controller's shared variables (locked)
pub fn variables(&self) -> &Arc<Mutex<V>> {
pub fn variables(&self) -> &Arc<RwLock<V>> {
&self.variables
}
/// Controller's state
......@@ -253,7 +258,7 @@ pub trait Worker<D: DataDeliveryPolicy + Clone + Send + Sync + 'static, V: Send>
Send + Sync
{
/// The worker's main function, started by [`Controller::spawn_worker()`]
fn run(&mut self, context: &Context<D, V>);
fn run(&mut self, context: &Context<D, V>) -> WResult;
}
/// The trait which MUST be implemented by all workers
......
......@@ -4,6 +4,7 @@ use binrw::{BinRead, BinWrite};
use crate::Result;
pub mod modbus;
pub mod raw_udp;
#[allow(clippy::module_name_repetitions)]
pub trait IoMapping {
......
use binrw::{BinRead, BinWrite};
use std::{
io::Cursor,
marker::PhantomData,
net::{SocketAddr, ToSocketAddrs, UdpSocket},
};
use crate::{Error, Result};
pub struct UdpInput<T>
where
T: for<'a> BinRead<Args<'a> = ()>,
{
server: UdpSocket,
buffer: Vec<u8>,
_phantom: PhantomData<T>,
}
impl<T> UdpInput<T>
where
T: for<'a> BinRead<Args<'a> = ()>,
{
pub fn bind<A: ToSocketAddrs>(addr: A, buf_size: usize) -> Result<Self> {
let server = UdpSocket::bind(addr)?;
Ok(Self {
server,
buffer: vec![0; buf_size],
_phantom: PhantomData,
})
}
}
impl<T> Iterator for UdpInput<T>
where
T: for<'a> BinRead<Args<'a> = ()>,
{
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
match self.server.recv(&mut self.buffer) {
Ok(size) => {
let mut cursor = Cursor::new(&self.buffer[..size]);
Some(T::read_le(&mut cursor).map_err(Into::into))
}
Err(e) => Some(Err(e.into())),
}
}
}
pub struct UdpOutput {
socket: UdpSocket,
target: SocketAddr,
data_buf: Vec<u8>,
}
impl UdpOutput {
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
let socket = UdpSocket::bind(("0.0.0.0", 0))?;
let target = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| Error::InvalidData("no target address provided".to_string()))?;
Ok(Self {
socket,
target,
data_buf: <_>::default(),
})
}
pub fn send<T>(&mut self, value: T) -> Result<()>
where
T: for<'a> BinWrite<Args<'a> = ()>,
{
let mut buf = Cursor::new(&mut self.data_buf);
value.write_le(&mut buf)?;
self.socket.send_to(&self.data_buf, self.target)?;
Ok(())
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论