提交 79fb7822 authored 作者: Serhij S's avatar Serhij S

graceful shutdown example

上级 4d542bdf
...@@ -38,6 +38,7 @@ tracing = "0.1.40" ...@@ -38,6 +38,7 @@ tracing = "0.1.40"
env_logger = "0.11.3" env_logger = "0.11.3"
insta = "1.36.1" insta = "1.36.1"
log = "0.4.21" log = "0.4.21"
signal-hook = "0.3.17"
tokio = { version = "1.36.0", features = ["rt", "macros", "time"] } tokio = { version = "1.36.0", features = ["rt", "macros", "time"] }
tracing = { version = "0.1.40", features = ["log"] } tracing = { version = "0.1.40", features = ["log"] }
...@@ -48,3 +49,7 @@ path = "examples/plc-modbus.rs" ...@@ -48,3 +49,7 @@ path = "examples/plc-modbus.rs"
[[example]] [[example]]
name = "raw-udp" name = "raw-udp"
path = "examples/raw-udp.rs" path = "examples/raw-udp.rs"
[[example]]
name = "shutdown"
path = "examples/shutdown.rs"
// The example provides a graceful shutdown of the controller.
use roboplc::{prelude::*, time::interval};
use signal_hook::{
consts::{SIGINT, SIGTERM},
iterator::Signals,
};
use tracing::info;
#[derive(DataPolicy, Clone)]
enum Message {
Data(u8),
// Terminate signal for workers which listen on hub events
Terminate,
}
#[derive(WorkerOpts)]
#[worker_opts(name = "parser")]
struct DataParser {}
impl Worker<Message, ()> for DataParser {
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let hc = context.hub().register(
self.worker_name(),
event_matches!(Message::Data(_) | Message::Terminate),
)?;
for msg in hc {
match msg {
Message::Data(data) => {
info!(worker = self.worker_name(), data = data);
}
// This worker terminates itself when it receives the Terminate message
Message::Terminate => {
break;
}
}
}
Ok(())
}
}
#[derive(WorkerOpts)]
#[worker_opts(name = "generator")]
struct DataGenerator {}
impl Worker<Message, ()> for DataGenerator {
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
for _ in interval(Duration::from_secs(1)) {
context.hub().send(Message::Data(42));
// This worker terminates itself when the controller goes to the stopping state
if !context.is_online() {
break;
}
}
Ok(())
}
}
#[derive(WorkerOpts)]
#[worker_opts(name = "sighandle")]
struct SignalHandler {}
impl Worker<Message, ()> for SignalHandler {
// this worker listens to SIGINT and SIGTERM signals, sends a Terminate message to the hub and
// sets the controller state to Stopping
fn run(&mut self, context: &Context<Message, ()>) -> WResult {
let mut signals = Signals::new([SIGTERM, SIGINT])?;
if let Some(sig) = signals.forever().next() {
match sig {
SIGTERM | SIGINT => {
info!("terminating");
// it is really important to set max shutdown timeout for the controller if the
// controller does not terminate in the given time, the process and all its
// sub-processes are forcibly killed
suicide(Duration::from_secs(2), true);
// set controller state to terminating
context.set_state(ControllerStateKind::Stopping);
// send Terminate message to workers who listen to the hub
context.hub().send(Message::Terminate);
}
_ => unreachable!(),
}
}
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
let mut controller = Controller::<Message, ()>::new();
controller.spawn_worker(DataGenerator {})?;
controller.spawn_worker(DataParser {})?;
controller.spawn_worker(SignalHandler {})?;
info!("controller started");
controller.block();
info!("controller terminated");
Ok(())
}
...@@ -35,20 +35,20 @@ pub struct State { ...@@ -35,20 +35,20 @@ pub struct State {
impl State { impl State {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
state: AtomicI8::new(StateKind::Starting as i8).into(), state: AtomicI8::new(ControllerStateKind::Starting as i8).into(),
} }
} }
/// Set controller state /// Set controller state
pub fn set(&self, state: StateKind) { pub fn set(&self, state: ControllerStateKind) {
self.state.store(state as i8, Ordering::SeqCst); self.state.store(state as i8, Ordering::SeqCst);
} }
/// Get controller state /// Get controller state
pub fn get(&self) -> StateKind { pub fn get(&self) -> ControllerStateKind {
StateKind::from(self.state.load(Ordering::SeqCst)) ControllerStateKind::from(self.state.load(Ordering::SeqCst))
} }
/// Is the controller online (starting or running) /// Is the controller online (starting or running)
pub fn is_online(&self) -> bool { pub fn is_online(&self) -> bool {
self.get() >= StateKind::Starting self.get() >= ControllerStateKind::Starting
} }
} }
...@@ -61,7 +61,8 @@ impl Default for State { ...@@ -61,7 +61,8 @@ impl Default for State {
/// Controller state kind /// Controller state kind
#[derive(Default, Eq, PartialEq, Clone, Copy, Ord, PartialOrd)] #[derive(Default, Eq, PartialEq, Clone, Copy, Ord, PartialOrd)]
#[repr(i8)] #[repr(i8)]
pub enum StateKind { #[allow(clippy::module_name_repetitions)]
pub enum ControllerStateKind {
#[default] #[default]
Starting = 0, Starting = 0,
Active = 1, Active = 1,
...@@ -71,14 +72,14 @@ pub enum StateKind { ...@@ -71,14 +72,14 @@ pub enum StateKind {
Unknown = -128, Unknown = -128,
} }
impl From<i8> for StateKind { impl From<i8> for ControllerStateKind {
fn from(v: i8) -> Self { fn from(v: i8) -> Self {
match v { match v {
0 => StateKind::Starting, 0 => ControllerStateKind::Starting,
1 => StateKind::Active, 1 => ControllerStateKind::Active,
2 => StateKind::Running, 2 => ControllerStateKind::Running,
-100 => StateKind::Stopped, -100 => ControllerStateKind::Stopped,
_ => StateKind::Unknown, _ => ControllerStateKind::Unknown,
} }
} }
} }
...@@ -173,14 +174,14 @@ where ...@@ -173,14 +174,14 @@ where
/// Blocks until all tasks/workers are finished /// Blocks until all tasks/workers are finished
pub fn block(&mut self) { pub fn block(&mut self) {
self.supervisor.join_all(); self.supervisor.join_all();
self.state.set(StateKind::Stopped); self.state.set(ControllerStateKind::Stopped);
} }
/// Blocks until the controller goes into stopping/stopped /// Blocks until the controller goes into stopping/stopped
pub fn block_while_online(&self) { pub fn block_while_online(&self) {
while self.state.is_online() { while self.state.is_online() {
thread::sleep(SLEEP_SLEEP); thread::sleep(SLEEP_SLEEP);
} }
self.state.set(StateKind::Stopped); self.state.set(ControllerStateKind::Stopped);
} }
/// Is the controller online (starting or running) /// Is the controller online (starting or running)
pub fn is_online(&self) { pub fn is_online(&self) {
...@@ -240,11 +241,11 @@ where ...@@ -240,11 +241,11 @@ where
&self.variables &self.variables
} }
/// Controller's state /// Controller's state
pub fn get_state(&self) -> StateKind { pub fn get_state(&self) -> ControllerStateKind {
self.state.get() self.state.get()
} }
/// Set controller's state /// Set controller's state
pub fn set_state(&self, state: StateKind) { pub fn set_state(&self, state: ControllerStateKind) {
self.state.set(state); self.state.set(state);
} }
/// Is the controller online (starting or running) /// Is the controller online (starting or running)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论