提交 22c02014 authored 作者: Serhij S's avatar Serhij S

light shutdown

上级 cc5060a1
......@@ -72,6 +72,10 @@ path = "examples/raw-udp.rs"
name = "shutdown"
path = "examples/shutdown.rs"
[[example]]
name = "shutdown-custom"
path = "examples/shutdown-custom.rs"
[[example]]
name = "eapi"
path = "examples/eapi.rs"
......
// The example provides a graceful shutdown of the controller using a custom signal handler.
use roboplc::{prelude::*, time::interval};
use signal_hook::{
consts::{SIGINT, SIGTERM},
iterator::Signals,
};
use tracing::info;
// The maximum shutdown time
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(DataPolicy, Clone)]
enum Message {
Data(u8),
// Terminate signal for workers which listen on hub events
Terminate,
}
/// A worker which has got a blocking loop (e.g. listening to a socket, having a long cycle etc.)
/// and it is not possible to terminate it immediately. In this case the worker is not joined on
/// shutdown
#[derive(WorkerOpts)]
#[worker_opts(name = "veryblocking", blocking = true)]
struct VeryBlocking {}
impl Worker<Message, ()> for VeryBlocking {
fn run(&mut self, _context: &Context<Message, ()>) -> WResult {
for _ in interval(Duration::from_secs(120)) {
info!(worker = self.worker_name(), "I am still running");
}
Ok(())
}
}
#[derive(WorkerOpts)]
#[worker_opts(name = "parser")]
struct DataParser {}
impl Worker<Message, ()> for DataParser {
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let hc = context.hub().register(
self.worker_name(),
event_matches!(Message::Data(_) | Message::Terminate),
)?;
for msg in hc {
match msg {
Message::Data(data) => {
info!(worker = self.worker_name(), data = data);
}
// This worker terminates itself when it receives the Terminate message
Message::Terminate => {
break;
}
}
}
Ok(())
}
}
#[derive(WorkerOpts)]
#[worker_opts(name = "generator")]
struct DataGenerator {}
impl Worker<Message, ()> for DataGenerator {
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
for _ in interval(Duration::from_secs(1)) {
context.hub().send(Message::Data(42));
// This worker terminates itself when the controller goes to the stopping state
if !context.is_online() {
break;
}
}
Ok(())
}
}
#[derive(WorkerOpts)]
#[worker_opts(name = "sighandle")]
struct SignalHandler {}
impl Worker<Message, ()> for SignalHandler {
// this worker listens to SIGINT and SIGTERM signals, sends a Terminate message to the hub and
// sets the controller state to Stopping
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let mut signals = Signals::new([SIGTERM, SIGINT])?;
if let Some(sig) = signals.forever().next() {
match sig {
SIGTERM | SIGINT => {
info!("terminating");
// it is really important to set max shutdown timeout for the controller if the
// controller does not terminate in the given time, the process and all its
// sub-processes are forcibly killed
suicide(SHUTDOWN_TIMEOUT, true);
// set controller state to Stopping
context.terminate();
// send Terminate message to workers who listen to the hub
context.hub().send(Message::Terminate);
}
_ => unreachable!(),
}
}
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
roboplc::configure_logger(roboplc::LevelFilter::Info);
let mut controller = Controller::<Message, ()>::new();
controller.spawn_worker(DataGenerator {})?;
controller.spawn_worker(DataParser {})?;
controller.spawn_worker(SignalHandler {})?;
controller.spawn_worker(VeryBlocking {})?;
info!("controller started");
controller.block();
info!("controller terminated");
Ok(())
}
// The example provides a graceful shutdown of the controller.
// The example provides a graceful shutdown of the controller using built-in methods.
use roboplc::{prelude::*, time::interval};
use signal_hook::{
consts::{SIGINT, SIGTERM},
iterator::Signals,
};
use tracing::info;
// The maximum shutdown time
......@@ -74,44 +70,18 @@ impl Worker<Message, ()> for DataGenerator {
Ok(())
}
}
#[derive(WorkerOpts)]
#[worker_opts(name = "sighandle")]
struct SignalHandler {}
impl Worker<Message, ()> for SignalHandler {
// this worker listens to SIGINT and SIGTERM signals, sends a Terminate message to the hub and
// sets the controller state to Stopping
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let mut signals = Signals::new([SIGTERM, SIGINT])?;
if let Some(sig) = signals.forever().next() {
match sig {
SIGTERM | SIGINT => {
info!("terminating");
// it is really important to set max shutdown timeout for the controller if the
// controller does not terminate in the given time, the process and all its
// sub-processes are forcibly killed
suicide(SHUTDOWN_TIMEOUT, true);
// set controller state to Stopping
context.terminate();
// send Terminate message to workers who listen to the hub
context.hub().send(Message::Terminate);
}
_ => unreachable!(),
}
}
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
roboplc::configure_logger(roboplc::LevelFilter::Info);
let mut controller = Controller::<Message, ()>::new();
controller.spawn_worker(DataGenerator {})?;
controller.spawn_worker(DataParser {})?;
controller.spawn_worker(SignalHandler {})?;
controller.spawn_worker(VeryBlocking {})?;
controller.register_signals_with_shutdown_handler(
move |context| {
context.hub().send(Message::Terminate);
},
SHUTDOWN_TIMEOUT,
)?;
info!("controller started");
controller.block();
info!("controller terminated");
......
......@@ -12,7 +12,7 @@ use crate::{
suicide,
supervisor::Supervisor,
thread_rt::{Builder, RTParams, Scheduling},
DataDeliveryPolicy, Error,
DataDeliveryPolicy, Error, Result,
};
use parking_lot::RwLock;
pub use roboplc_derive::WorkerOpts;
......@@ -28,7 +28,7 @@ pub mod prelude {
}
/// Result type, which must be returned by workers' `run` method
pub type WResult = Result<(), Box<dyn std::error::Error>>;
pub type WResult = std::result::Result<(), Box<dyn std::error::Error>>;
pub const SLEEP_STEP: Duration = Duration::from_millis(100);
......@@ -137,7 +137,7 @@ where
pub fn spawn_worker<W: Worker<D, V> + WorkerOptions + 'static>(
&mut self,
mut worker: W,
) -> Result<(), Error> {
) -> Result<()> {
let context = self.context();
let mut rt_params = RTParams::new().set_scheduling(worker.worker_scheduling());
if let Some(priority) = worker.worker_priority() {
......@@ -161,13 +161,18 @@ where
Ok(())
}
/// Spawns a task thread (non-real-time) with the default options
pub fn spawn_task<F>(&mut self, name: &str, f: F) -> Result<(), Error>
pub fn spawn_task<F>(&mut self, name: &str, f: F) -> Result<()>
where
F: FnOnce() + Send + 'static,
{
self.supervisor.spawn(Builder::new().name(name), f)?;
Ok(())
}
/// Registers SIGINT and SIGTERM signals to a thread which terminates the controller with dummy
/// handler (see [`Controller::register_signals_with_shutdown_handler()`]).
pub fn register_signals(&mut self, shutdown_timeout: Duration) -> Result<()> {
self.register_signals_with_shutdown_handler(|_| {}, shutdown_timeout)
}
/// Registers SIGINT and SIGTERM signals to a thread which terminates the controller.
///
/// Note: to properly terminate all workers must either periodically check the controller state
......@@ -181,7 +186,15 @@ where
///
/// The thread is automatically spawned with FIFO scheduling and the highest priority on CPU 0
/// or falled back to non-realtime.
pub fn register_signals(&mut self, shutdown_timeout: Duration) -> Result<(), Error> {
pub fn register_signals_with_shutdown_handler<H>(
&mut self,
handle_fn: H,
shutdown_timeout: Duration,
) -> Result<()>
where
H: Fn(&Context<D, V>) + Send + Sync + 'static,
{
let handler = Arc::new(handle_fn);
let mut builder = Builder::new().name("RoboPLCSigRT").rt_params(
RTParams::new()
.set_priority(99)
......@@ -190,7 +203,7 @@ where
);
builder.park_on_errors = true;
macro_rules! sig_handler {
() => {{
($handler: expr) => {{
let context = self.context();
let mut signals = Signals::new([SIGTERM, SIGINT])?;
move || {
......@@ -198,6 +211,7 @@ where
match sig {
SIGTERM | SIGINT => {
suicide(shutdown_timeout, true);
$handler(&context);
context.terminate();
}
_ => unreachable!(),
......@@ -206,13 +220,14 @@ where
}
}};
}
if let Err(e) = self.supervisor.spawn(builder.clone(), sig_handler!()) {
let h = handler.clone();
if let Err(e) = self.supervisor.spawn(builder.clone(), sig_handler!(h)) {
if !matches!(e, Error::RTSchedSetSchduler(_)) {
return Err(e);
}
}
let builder = builder.name("RoboPLCSig").rt_params(RTParams::new());
self.supervisor.spawn(builder, sig_handler!())?;
self.supervisor.spawn(builder, sig_handler!(handler))?;
Ok(())
}
fn context(&self) -> Context<D, V> {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论