提交 49148746 authored 作者: Serhij S's avatar Serhij S

blocking workers

上级 6a844150
...@@ -27,7 +27,7 @@ oneshot = { version = "0.1.6", default-features = false, features = ["std"] } ...@@ -27,7 +27,7 @@ oneshot = { version = "0.1.6", default-features = false, features = ["std"] }
parking_lot = "0.12.1" parking_lot = "0.12.1"
pin-project = "1.1.5" pin-project = "1.1.5"
rmodbus = { version = "0.9.3" } rmodbus = { version = "0.9.3" }
roboplc-derive = { version = "0.1.2" } roboplc-derive = { version = "0.1.3" }
serde = { version = "1.0.197", features = ["derive", "rc"] } serde = { version = "1.0.197", features = ["derive", "rc"] }
serial = "0.4.0" serial = "0.4.0"
sysinfo = "0.30.6" sysinfo = "0.30.6"
......
...@@ -17,6 +17,22 @@ enum Message { ...@@ -17,6 +17,22 @@ enum Message {
Terminate, Terminate,
} }
/// A worker which has got a blocking loop (e.g. listening to a socket, having a long cycle etc.)
/// and it is not possible to terminate it immediately. In this case the worker is not joined on
/// shutdown
#[derive(WorkerOpts)]
#[worker_opts(name = "veryblocking", blocking = true)]
struct VeryBlocking {}
impl Worker<Message, ()> for VeryBlocking {
fn run(&mut self, _context: &Context<Message, ()>) -> WResult {
for _ in interval(Duration::from_secs(120)) {
info!(worker = self.worker_name(), "I am still running");
}
Ok(())
}
}
#[derive(WorkerOpts)] #[derive(WorkerOpts)]
#[worker_opts(name = "parser")] #[worker_opts(name = "parser")]
struct DataParser {} struct DataParser {}
...@@ -97,6 +113,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { ...@@ -97,6 +113,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
controller.spawn_worker(DataGenerator {})?; controller.spawn_worker(DataGenerator {})?;
controller.spawn_worker(DataParser {})?; controller.spawn_worker(DataParser {})?;
controller.spawn_worker(SignalHandler {})?; controller.spawn_worker(SignalHandler {})?;
controller.spawn_worker(VeryBlocking {})?;
info!("controller started"); info!("controller started");
controller.block(); controller.block();
info!("controller terminated"); info!("controller terminated");
......
...@@ -145,7 +145,8 @@ where ...@@ -145,7 +145,8 @@ where
} }
let mut builder = Builder::new() let mut builder = Builder::new()
.name(worker.worker_name()) .name(worker.worker_name())
.rt_params(rt_params); .rt_params(rt_params)
.blocking(worker.worker_is_blocking());
if let Some(stack_size) = worker.worker_stack_size() { if let Some(stack_size) = worker.worker_stack_size() {
builder = builder.stack_size(stack_size); builder = builder.stack_size(stack_size);
} }
...@@ -156,7 +157,7 @@ where ...@@ -156,7 +157,7 @@ where
})?; })?;
Ok(()) Ok(())
} }
/// Spawns a task thread (non-real-time) /// Spawns a task thread (non-real-time) with the default options
pub fn spawn_task<F>(&mut self, name: &str, f: F) -> Result<(), Error> pub fn spawn_task<F>(&mut self, name: &str, f: F) -> Result<(), Error>
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
...@@ -290,4 +291,10 @@ pub trait WorkerOptions { ...@@ -290,4 +291,10 @@ pub trait WorkerOptions {
fn worker_cpu_ids(&self) -> Option<&[usize]> { fn worker_cpu_ids(&self) -> Option<&[usize]> {
None None
} }
/// A hint for task supervisors that the worker blocks the thread (e.g. listens to a socket or
/// has got a big interval in the main loop, does not return any useful result and should not
/// be joined)
fn worker_is_blocking(&self) -> bool {
false
}
} }
...@@ -97,7 +97,9 @@ impl<T> Supervisor<T> { ...@@ -97,7 +97,9 @@ impl<T> Supervisor<T> {
pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> { pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> {
let mut result = BTreeMap::new(); let mut result = BTreeMap::new();
for (name, task) in mem::take(&mut self.tasks) { for (name, task) in mem::take(&mut self.tasks) {
result.insert(name, task.join()); if !task.is_blocking() {
result.insert(name, task.join());
}
} }
result result
} }
...@@ -178,7 +180,9 @@ impl<'a, 'env, T> ScopedSupervisor<'a, 'env, T> { ...@@ -178,7 +180,9 @@ impl<'a, 'env, T> ScopedSupervisor<'a, 'env, T> {
pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> { pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> {
let mut result = BTreeMap::new(); let mut result = BTreeMap::new();
for (name, task) in mem::take(&mut self.tasks) { for (name, task) in mem::take(&mut self.tasks) {
result.insert(name, task.join()); if !task.is_blocking() {
result.insert(name, task.join());
}
} }
result result
} }
......
...@@ -29,6 +29,7 @@ pub fn set_simulated() { ...@@ -29,6 +29,7 @@ pub fn set_simulated() {
pub struct Builder { pub struct Builder {
pub(crate) name: Option<String>, pub(crate) name: Option<String>,
stack_size: Option<usize>, stack_size: Option<usize>,
blocking: bool,
rt_params: RTParams, rt_params: RTParams,
// an internal parameter to suspend (park) failed threads instead of panic // an internal parameter to suspend (park) failed threads instead of panic
pub(crate) park_on_errors: bool, pub(crate) park_on_errors: bool,
...@@ -89,6 +90,15 @@ impl Builder { ...@@ -89,6 +90,15 @@ impl Builder {
self.stack_size = Some(size); self.stack_size = Some(size);
self self
} }
/// A hint for task supervisors that the task blocks the thread (e.g. listens to a socket or
/// has got a big interval in the main loop, does not return any useful result and should not
/// be joined)
///
/// For scoped tasks: the task may be still forcibly joined at the end of the scope
pub fn blocking(mut self, blocking: bool) -> Self {
self.blocking = blocking;
self
}
/// Applies real-time parameters to the task /// Applies real-time parameters to the task
/// ///
/// See [`RTParams`] /// See [`RTParams`]
...@@ -98,7 +108,7 @@ impl Builder { ...@@ -98,7 +108,7 @@ impl Builder {
} }
fn try_into_thread_builder_name_and_params( fn try_into_thread_builder_name_and_params(
self, self,
) -> Result<(thread::Builder, String, RTParams, bool)> { ) -> Result<(thread::Builder, String, bool, RTParams, bool)> {
let mut builder = thread::Builder::new(); let mut builder = thread::Builder::new();
if let Some(ref name) = self.name { if let Some(ref name) = self.name {
if name.len() > 15 { if name.len() > 15 {
...@@ -115,6 +125,7 @@ impl Builder { ...@@ -115,6 +125,7 @@ impl Builder {
Ok(( Ok((
builder, builder,
self.name.unwrap_or_default(), self.name.unwrap_or_default(),
self.blocking,
self.rt_params, self.rt_params,
self.park_on_errors, self.park_on_errors,
)) ))
...@@ -130,7 +141,7 @@ impl Builder { ...@@ -130,7 +141,7 @@ impl Builder {
F: FnOnce() -> T + Send + 'static, F: FnOnce() -> T + Send + 'static,
T: Send + 'static, T: Send + 'static,
{ {
let (builder, name, rt_params, park_on_errors) = let (builder, name, blocking, rt_params, park_on_errors) =
self.try_into_thread_builder_name_and_params()?; self.try_into_thread_builder_name_and_params()?;
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let handle = builder.spawn(move || { let handle = builder.spawn(move || {
...@@ -141,6 +152,7 @@ impl Builder { ...@@ -141,6 +152,7 @@ impl Builder {
Ok(Task { Ok(Task {
name, name,
handle, handle,
blocking,
tid, tid,
rt_params, rt_params,
info: <_>::default(), info: <_>::default(),
...@@ -180,7 +192,7 @@ impl Builder { ...@@ -180,7 +192,7 @@ impl Builder {
F: FnOnce() -> T + Send + 'scope, F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope, T: Send + 'scope,
{ {
let (builder, name, rt_params, park_on_errors) = let (builder, name, blocking, rt_params, park_on_errors) =
self.try_into_thread_builder_name_and_params()?; self.try_into_thread_builder_name_and_params()?;
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let handle = builder.spawn_scoped(scope, move || { let handle = builder.spawn_scoped(scope, move || {
...@@ -191,6 +203,7 @@ impl Builder { ...@@ -191,6 +203,7 @@ impl Builder {
Ok(ScopedTask { Ok(ScopedTask {
name, name,
handle, handle,
blocking,
tid, tid,
rt_params, rt_params,
info: <_>::default(), info: <_>::default(),
...@@ -239,6 +252,7 @@ pub struct Task<T> { ...@@ -239,6 +252,7 @@ pub struct Task<T> {
serialize_with = "serialize_join_handle_active" serialize_with = "serialize_join_handle_active"
)] )]
handle: JoinHandle<T>, handle: JoinHandle<T>,
blocking: bool,
tid: libc::c_int, tid: libc::c_int,
rt_params: RTParams, rt_params: RTParams,
info: TaskInfo, info: TaskInfo,
...@@ -277,6 +291,9 @@ impl<T> Task<T> { ...@@ -277,6 +291,9 @@ impl<T> Task<T> {
pub fn elapsed(&self) -> Duration { pub fn elapsed(&self) -> Duration {
self.info.started_mt.elapsed() self.info.started_mt.elapsed()
} }
pub fn is_blocking(&self) -> bool {
self.blocking
}
} }
impl<T> From<Task<T>> for JoinHandle<T> { impl<T> From<Task<T>> for JoinHandle<T> {
...@@ -296,6 +313,7 @@ pub struct ScopedTask<'scope, T> { ...@@ -296,6 +313,7 @@ pub struct ScopedTask<'scope, T> {
serialize_with = "serialize_scoped_join_handle_active" serialize_with = "serialize_scoped_join_handle_active"
)] )]
handle: ScopedJoinHandle<'scope, T>, handle: ScopedJoinHandle<'scope, T>,
blocking: bool,
tid: libc::c_int, tid: libc::c_int,
rt_params: RTParams, rt_params: RTParams,
info: TaskInfo, info: TaskInfo,
...@@ -334,6 +352,9 @@ impl<'scope, T> ScopedTask<'scope, T> { ...@@ -334,6 +352,9 @@ impl<'scope, T> ScopedTask<'scope, T> {
pub fn elapsed(&self) -> Duration { pub fn elapsed(&self) -> Duration {
self.info.started_mt.elapsed() self.info.started_mt.elapsed()
} }
pub fn is_blocking(&self) -> bool {
self.blocking
}
} }
impl<'scope, T> From<ScopedTask<'scope, T>> for ScopedJoinHandle<'scope, T> { impl<'scope, T> From<ScopedTask<'scope, T>> for ScopedJoinHandle<'scope, T> {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论