提交 1c3e13ff authored 作者: Serhij S's avatar Serhij S

shutdown fixes

上级 49148746
...@@ -33,12 +33,12 @@ serial = "0.4.0" ...@@ -33,12 +33,12 @@ serial = "0.4.0"
sysinfo = "0.30.6" sysinfo = "0.30.6"
thiserror = "1.0.57" thiserror = "1.0.57"
tracing = "0.1.40" tracing = "0.1.40"
signal-hook = "0.3.17"
[dev-dependencies] [dev-dependencies]
env_logger = "0.11.3" env_logger = "0.11.3"
insta = "1.36.1" insta = "1.36.1"
log = "0.4.21" log = "0.4.21"
signal-hook = "0.3.17"
tokio = { version = "1.36.0", features = ["rt", "macros", "time"] } tokio = { version = "1.36.0", features = ["rt", "macros", "time"] }
tracing = { version = "0.1.40", features = ["log"] } tracing = { version = "0.1.40", features = ["log"] }
......
...@@ -75,6 +75,9 @@ impl Worker<Message, Variables> for ModbusPuller1 { ...@@ -75,6 +75,9 @@ impl Worker<Message, Variables> for ModbusPuller1 {
error!(worker=self.worker_name(), err=%e, "Modbus pull error"); error!(worker=self.worker_name(), err=%e, "Modbus pull error");
} }
} }
if !context.is_online() {
break;
}
} }
Ok(()) Ok(())
} }
...@@ -82,7 +85,13 @@ impl Worker<Message, Variables> for ModbusPuller1 { ...@@ -82,7 +85,13 @@ impl Worker<Message, Variables> for ModbusPuller1 {
// Second worker, to control relays // Second worker, to control relays
#[derive(WorkerOpts)] #[derive(WorkerOpts)]
#[worker_opts(name = "relays", cpu = 2, scheduling = "fifo", priority = 80)] #[worker_opts(
name = "relays",
cpu = 2,
scheduling = "fifo",
priority = 80,
blocking = true
)]
struct ModbusRelays1 { struct ModbusRelays1 {
fan_mapping: ModbusMapping, fan_mapping: ModbusMapping,
} }
...@@ -155,7 +164,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { ...@@ -155,7 +164,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// creates the second worker and spawns it // creates the second worker and spawns it
let worker = ModbusRelays1::create(&modbus_tcp_client)?; let worker = ModbusRelays1::create(&modbus_tcp_client)?;
controller.spawn_worker(worker)?; controller.spawn_worker(worker)?;
// block the main thread until the controller is in the online state // register SIGINT and SIGTERM signals with max shutdown timeout of 5 seconds
controller.block_while_online(); controller.register_signals(Duration::from_secs(5))?;
// blocks the main thread while the controller is online and the workers are running
controller.block();
Ok(()) Ok(())
} }
...@@ -23,7 +23,7 @@ struct EnvData { ...@@ -23,7 +23,7 @@ struct EnvData {
// A worker to collect data from incoming UDP packets // A worker to collect data from incoming UDP packets
#[derive(WorkerOpts)] #[derive(WorkerOpts)]
#[worker_opts(name = "udp_in")] #[worker_opts(name = "udp_in", blocking = true)]
struct UdpIn {} struct UdpIn {}
impl Worker<Message, ()> for UdpIn { impl Worker<Message, ()> for UdpIn {
...@@ -51,7 +51,7 @@ impl Worker<Message, ()> for UdpIn { ...@@ -51,7 +51,7 @@ impl Worker<Message, ()> for UdpIn {
struct UdpOut {} struct UdpOut {}
impl Worker<Message, ()> for UdpOut { impl Worker<Message, ()> for UdpOut {
fn run(&mut self, _context: &Context<Message, ()>) -> WResult { fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let mut client = UdpOutput::connect("localhost:25000")?; let mut client = UdpOutput::connect("localhost:25000")?;
for _ in interval(Duration::from_secs(1)) { for _ in interval(Duration::from_secs(1)) {
let data = EnvData { let data = EnvData {
...@@ -62,6 +62,9 @@ impl Worker<Message, ()> for UdpOut { ...@@ -62,6 +62,9 @@ impl Worker<Message, ()> for UdpOut {
if let Err(e) = client.send(data) { if let Err(e) = client.send(data) {
error!(worker=self.worker_name(), error=%e, "udp send error"); error!(worker=self.worker_name(), error=%e, "udp send error");
} }
if !context.is_online() {
break;
}
} }
Ok(()) Ok(())
} }
...@@ -69,7 +72,7 @@ impl Worker<Message, ()> for UdpOut { ...@@ -69,7 +72,7 @@ impl Worker<Message, ()> for UdpOut {
// A worker to print data, received by the `UdpIn` worker // A worker to print data, received by the `UdpIn` worker
#[derive(WorkerOpts)] #[derive(WorkerOpts)]
#[worker_opts(name = "printEnv")] #[worker_opts(name = "printEnv", blocking = true)]
struct PrintEnv {} struct PrintEnv {}
impl Worker<Message, ()> for PrintEnv { impl Worker<Message, ()> for PrintEnv {
...@@ -86,8 +89,6 @@ impl Worker<Message, ()> for PrintEnv { ...@@ -86,8 +89,6 @@ impl Worker<Message, ()> for PrintEnv {
} }
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
// sets the simulated mode for the real-time module, do not set any thread real-time parameters
roboplc::thread_rt::set_simulated();
// initializes a debug logger // initializes a debug logger
env_logger::builder() env_logger::builder()
.filter_level(log::LevelFilter::Info) .filter_level(log::LevelFilter::Info)
...@@ -98,7 +99,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { ...@@ -98,7 +99,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
controller.spawn_worker(UdpIn {})?; controller.spawn_worker(UdpIn {})?;
controller.spawn_worker(PrintEnv {})?; controller.spawn_worker(PrintEnv {})?;
controller.spawn_worker(UdpOut {})?; controller.spawn_worker(UdpOut {})?;
// block the main thread until the controller is in the online state // register SIGINT and SIGTERM signals with max shutdown timeout of 5 seconds
controller.block_while_online(); controller.register_signals(Duration::from_secs(5))?;
// blocks the main thread while the controller is online and the workers are running
controller.block();
Ok(()) Ok(())
} }
...@@ -9,12 +9,17 @@ use std::{ ...@@ -9,12 +9,17 @@ use std::{
use crate::{ use crate::{
hub::Hub, hub::Hub,
suicide,
supervisor::Supervisor, supervisor::Supervisor,
thread_rt::{Builder, RTParams, Scheduling}, thread_rt::{Builder, RTParams, Scheduling},
DataDeliveryPolicy, Error, DataDeliveryPolicy, Error,
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
pub use roboplc_derive::WorkerOpts; pub use roboplc_derive::WorkerOpts;
use signal_hook::{
consts::{SIGINT, SIGTERM},
iterator::Signals,
};
use tracing::error; use tracing::error;
pub mod prelude { pub mod prelude {
...@@ -22,6 +27,7 @@ pub mod prelude { ...@@ -22,6 +27,7 @@ pub mod prelude {
pub use roboplc_derive::WorkerOpts; pub use roboplc_derive::WorkerOpts;
} }
/// Result type, which must be returned by workers' `run` method
pub type WResult = Result<(), Box<dyn std::error::Error>>; pub type WResult = Result<(), Box<dyn std::error::Error>>;
const SLEEP_SLEEP: Duration = Duration::from_millis(100); const SLEEP_SLEEP: Duration = Duration::from_millis(100);
...@@ -165,6 +171,53 @@ where ...@@ -165,6 +171,53 @@ where
self.supervisor.spawn(Builder::new().name(name), f)?; self.supervisor.spawn(Builder::new().name(name), f)?;
Ok(()) Ok(())
} }
/// Registers SIGINT and SIGTERM signals to a thread which terminates the controller.
///
/// Note: to properly terminate all workers must either periodically check the controller state
/// with [`Context::is_online()`] or be marked as blocking by overriding
/// [`WorkerOptions::worker_is_blocking()`] (or setting `blocking` to `true` in [`WorkerOpts`]
/// derive macro).
///
/// Workers that listen to hub messages may also receive a custom termination message and gracefully
/// shut themselves down. For such functionality a custom signal handler should be implemented
/// (See <https://github.com/eva-ics/roboplc/blob/main/examples/shutdown.rs>).
///
/// The thread is automatically spawned with FIFO scheduling and the highest priority on CPU 0
/// or falled back to non-realtime.
pub fn register_signals(&mut self, shutdown_timeout: Duration) -> Result<(), Error> {
let mut builder = Builder::new().name("RoboPLCSigRT").rt_params(
RTParams::new()
.set_priority(99)
.set_scheduling(Scheduling::FIFO)
.set_cpu_ids(&[0]),
);
builder.park_on_errors = true;
macro_rules! sig_handler {
() => {{
let context = self.context();
let mut signals = Signals::new([SIGTERM, SIGINT])?;
move || {
if let Some(sig) = signals.forever().next() {
match sig {
SIGTERM | SIGINT => {
suicide(shutdown_timeout, true);
context.terminate();
}
_ => unreachable!(),
}
}
}
}};
}
if let Err(e) = self.supervisor.spawn(builder.clone(), sig_handler!()) {
if !matches!(e, Error::RTSchedSetSchduler(_)) {
return Err(e);
}
}
let builder = builder.name("RoboPLCSig").rt_params(RTParams::new());
self.supervisor.spawn(builder, sig_handler!())?;
Ok(())
}
fn context(&self) -> Context<D, V> { fn context(&self) -> Context<D, V> {
Context { Context {
hub: self.hub.clone(), hub: self.hub.clone(),
......
...@@ -51,30 +51,43 @@ pub enum Error { ...@@ -51,30 +51,43 @@ pub enum Error {
/// Receive attempt failed because the channel is empty /// Receive attempt failed because the channel is empty
#[error("channel empty")] #[error("channel empty")]
ChannelEmpty, ChannelEmpty,
/// Hub send errors
#[error("hub send error {0}")] #[error("hub send error {0}")]
HubSend(Box<Error>), HubSend(Box<Error>),
/// Hub client with the given name is already registered
#[error("hub client already registered: {0}")] #[error("hub client already registered: {0}")]
HubAlreadyRegistered(Arc<str>), HubAlreadyRegistered(Arc<str>),
/// I/O and threading errors
#[error("I/O error {0}")] #[error("I/O error {0}")]
IO(String), IO(String),
/// Real-time engine error: unable to get the system thread id
#[error("RT SYS_gettid {0}")] #[error("RT SYS_gettid {0}")]
RTGetTId(libc::c_int), RTGetTId(libc::c_int),
/// Real-time engine error: unable to set the thread scheduler affinity
#[error("RT sched_setaffinity {0}")] #[error("RT sched_setaffinity {0}")]
RTSchedSetAffinity(libc::c_int), RTSchedSetAffinity(libc::c_int),
/// Real-time engine error: unable to set the thread scheduler policy
#[error("RT sched_setscheduler {0}")] #[error("RT sched_setscheduler {0}")]
RTSchedSetSchduler(libc::c_int), RTSchedSetSchduler(libc::c_int),
/// Supervisor error: task name is not specified in the thread builder
#[error("Task name must be specified when spawning by a supervisor")] #[error("Task name must be specified when spawning by a supervisor")]
SupervisorNameNotSpecified, SupervisorNameNotSpecified,
#[error("Task already registered")] /// Supervisor error: task with the given name is already registered
SupervisorDuplicateTask, #[error("Task already registered: `{0}`")]
SupervisorDuplicateTask(String),
/// Supervisor error: task with the given name is not found
#[error("Task not found")] #[error("Task not found")]
SupervisorTaskNotFound, SupervisorTaskNotFound,
/// Invalid data receied / parameters provided
#[error("Invalid data")] #[error("Invalid data")]
InvalidData(String), InvalidData(String),
/// [binrw](https://crates.io/crates/binrw) crate errors
#[error("binrw {0}")] #[error("binrw {0}")]
BinRw(String), BinRw(String),
/// The requested operation is not implemented
#[error("not implemented")] #[error("not implemented")]
Unimplemented, Unimplemented,
/// This error never happens and is used as a compiler hint only
#[error("never happens")] #[error("never happens")]
Infallible(#[from] std::convert::Infallible), Infallible(#[from] std::convert::Infallible),
} }
...@@ -192,11 +205,11 @@ pub fn suicide(delay: Duration, warn: bool) { ...@@ -192,11 +205,11 @@ pub fn suicide(delay: Duration, warn: bool) {
let mut builder = thread_rt::Builder::new().name("suicide").rt_params( let mut builder = thread_rt::Builder::new().name("suicide").rt_params(
RTParams::new() RTParams::new()
.set_priority(99) .set_priority(99)
.set_scheduling(Scheduling::FIFO), .set_scheduling(Scheduling::FIFO)
.set_cpu_ids(&[0]),
); );
builder.park_on_errors = true; builder.park_on_errors = true;
let res = builder.spawn(move || { let res = builder.spawn(move || {
dbg!("realtime");
thread_rt::suicide_myself(delay, warn); thread_rt::suicide_myself(delay, warn);
}); });
if res.is_err() { if res.is_err() {
......
...@@ -31,8 +31,8 @@ macro_rules! vacant_entry { ...@@ -31,8 +31,8 @@ macro_rules! vacant_entry {
let Some(name) = $builder.name.clone() else { let Some(name) = $builder.name.clone() else {
return Err(Error::SupervisorNameNotSpecified); return Err(Error::SupervisorNameNotSpecified);
}; };
let btree_map::Entry::Vacant(entry) = $self.tasks.entry(name) else { let btree_map::Entry::Vacant(entry) = $self.tasks.entry(name.clone()) else {
return Err(Error::SupervisorDuplicateTask); return Err(Error::SupervisorDuplicateTask(name));
}; };
entry entry
}}; }};
......
...@@ -25,7 +25,7 @@ pub fn set_simulated() { ...@@ -25,7 +25,7 @@ pub fn set_simulated() {
/// A thread builder object, similar to [`thread::Builder`] but with real-time capabilities /// A thread builder object, similar to [`thread::Builder`] but with real-time capabilities
/// ///
/// Warning: works on Linux systems only /// Warning: works on Linux systems only
#[derive(Default)] #[derive(Default, Clone)]
pub struct Builder { pub struct Builder {
pub(crate) name: Option<String>, pub(crate) name: Option<String>,
stack_size: Option<usize>, stack_size: Option<usize>,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论