提交 95caa6dc authored 作者: Serhij S's avatar Serhij S

mock all methods on non-linux platforms

上级 f0f993eb
...@@ -7,16 +7,18 @@ use std::{ ...@@ -7,16 +7,18 @@ use std::{
time::Duration, time::Duration,
}; };
#[cfg(target_os = "linux")]
use crate::suicide;
use crate::{ use crate::{
critical, critical,
hub::Hub, hub::Hub,
suicide,
supervisor::Supervisor, supervisor::Supervisor,
thread_rt::{Builder, RTParams, Scheduling}, thread_rt::{Builder, RTParams, Scheduling},
Error, Result, Error, Result,
}; };
pub use roboplc_derive::WorkerOpts; pub use roboplc_derive::WorkerOpts;
use rtsc::data_policy::DataDeliveryPolicy; use rtsc::data_policy::DataDeliveryPolicy;
#[cfg(target_os = "linux")]
use signal_hook::{ use signal_hook::{
consts::{SIGINT, SIGTERM}, consts::{SIGINT, SIGTERM},
iterator::Signals, iterator::Signals,
...@@ -203,7 +205,7 @@ where ...@@ -203,7 +205,7 @@ where
pub fn register_signals_with_shutdown_handler<H>( pub fn register_signals_with_shutdown_handler<H>(
&mut self, &mut self,
handle_fn: H, handle_fn: H,
shutdown_timeout: Duration, #[allow(unused_variables)] shutdown_timeout: Duration,
) -> Result<()> ) -> Result<()>
where where
H: Fn(&Context<D, V>) + Send + Sync + 'static, H: Fn(&Context<D, V>) + Send + Sync + 'static,
...@@ -218,22 +220,30 @@ where ...@@ -218,22 +220,30 @@ where
builder.park_on_errors = true; builder.park_on_errors = true;
macro_rules! sig_handler { macro_rules! sig_handler {
($handler: expr) => {{ ($handler: expr) => {{
let context = self.context(); #[cfg(target_os = "linux")]
let mut signals = Signals::new([SIGTERM, SIGINT])?; {
move || { let context = self.context();
if let Some(sig) = signals.forever().next() { let mut signals = Signals::new([SIGTERM, SIGINT])?;
match sig { move || {
SIGTERM | SIGINT => { if let Some(sig) = signals.forever().next() {
suicide(shutdown_timeout, true); match sig {
$handler(&context); SIGTERM | SIGINT => {
context.terminate(); suicide(shutdown_timeout, true);
$handler(&context);
context.terminate();
}
_ => unreachable!(),
} }
_ => unreachable!(),
} }
} }
} }
#[cfg(not(target_os = "linux"))]
{
move || {}
}
}}; }};
} }
#[allow(unused_variables)]
let h = handler.clone(); let h = handler.clone();
if let Err(e) = self.supervisor.spawn(builder.clone(), sig_handler!(h)) { if let Err(e) = self.supervisor.spawn(builder.clone(), sig_handler!(h)) {
if !matches!(e, Error::RTSchedSetSchduler(_)) { if !matches!(e, Error::RTSchedSetSchduler(_)) {
......
...@@ -6,7 +6,6 @@ use std::panic::PanicInfo; ...@@ -6,7 +6,6 @@ use std::panic::PanicInfo;
use std::{env, sync::Arc, time::Duration}; use std::{env, sync::Arc, time::Duration};
use colored::Colorize as _; use colored::Colorize as _;
#[cfg(target_os = "linux")]
use thread_rt::{RTParams, Scheduling}; use thread_rt::{RTParams, Scheduling};
pub use log::LevelFilter; pub use log::LevelFilter;
...@@ -90,7 +89,6 @@ pub use rtsc::data_policy::{DataDeliveryPolicy, DeliveryPolicy}; ...@@ -90,7 +89,6 @@ pub use rtsc::data_policy::{DataDeliveryPolicy, DeliveryPolicy};
/// Reliable TCP/Serial communications /// Reliable TCP/Serial communications
pub mod comm; pub mod comm;
/// Controller and workers /// Controller and workers
#[cfg(target_os = "linux")]
pub mod controller; pub mod controller;
/// In-process data communication pub/sub hub, synchronous edition /// In-process data communication pub/sub hub, synchronous edition
pub mod hub; pub mod hub;
...@@ -100,10 +98,8 @@ pub mod hub_async; ...@@ -100,10 +98,8 @@ pub mod hub_async;
/// I/O /// I/O
pub mod io; pub mod io;
/// Task supervisor to manage real-time threads /// Task supervisor to manage real-time threads
#[cfg(target_os = "linux")]
pub mod supervisor; pub mod supervisor;
/// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone /// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone, Linux only
#[cfg(target_os = "linux")]
pub mod thread_rt; pub mod thread_rt;
/// The crate result type /// The crate result type
...@@ -245,7 +241,6 @@ impl Error { ...@@ -245,7 +241,6 @@ impl Error {
} }
/// Immediately kills the current process and all its subprocesses with a message to stderr /// Immediately kills the current process and all its subprocesses with a message to stderr
#[cfg(target_os = "linux")]
pub fn critical(msg: &str) -> ! { pub fn critical(msg: &str) -> ! {
eprintln!("{}", msg.red().bold()); eprintln!("{}", msg.red().bold());
thread_rt::suicide_myself(Duration::from_secs(0), false); thread_rt::suicide_myself(Duration::from_secs(0), false);
...@@ -257,7 +252,6 @@ pub fn critical(msg: &str) -> ! { ...@@ -257,7 +252,6 @@ pub fn critical(msg: &str) -> ! {
/// period of time. /// period of time.
/// ///
/// Prints warnings to STDOUT if warn is true /// Prints warnings to STDOUT if warn is true
#[cfg(target_os = "linux")]
pub fn suicide(delay: Duration, warn: bool) { pub fn suicide(delay: Duration, warn: bool) {
let mut builder = thread_rt::Builder::new().name("suicide").rt_params( let mut builder = thread_rt::Builder::new().name("suicide").rt_params(
RTParams::new() RTParams::new()
...@@ -336,14 +330,12 @@ pub fn metrics_exporter_install( ...@@ -336,14 +330,12 @@ pub fn metrics_exporter_install(
/// Sets panic handler to immediately kill the process and its childs with SIGKILL. The process is /// Sets panic handler to immediately kill the process and its childs with SIGKILL. The process is
/// killed when panic happens in ANY thread /// killed when panic happens in ANY thread
#[cfg(target_os = "linux")]
pub fn setup_panic() { pub fn setup_panic() {
std::panic::set_hook(Box::new(move |info: &PanicInfo| { std::panic::set_hook(Box::new(move |info: &PanicInfo| {
panic(info); panic(info);
})); }));
} }
#[cfg(target_os = "linux")]
fn panic(info: &PanicInfo) -> ! { fn panic(info: &PanicInfo) -> ! {
eprintln!("{}", info.to_string().red().bold()); eprintln!("{}", info.to_string().red().bold());
thread_rt::suicide_myself(Duration::from_secs(0), false); thread_rt::suicide_myself(Duration::from_secs(0), false);
...@@ -372,13 +364,10 @@ pub fn configure_logger(filter: LevelFilter) { ...@@ -372,13 +364,10 @@ pub fn configure_logger(filter: LevelFilter) {
/// Prelude module /// Prelude module
pub mod prelude { pub mod prelude {
#[cfg(target_os = "linux")]
pub use super::suicide; pub use super::suicide;
#[cfg(target_os = "linux")]
pub use crate::controller::*; pub use crate::controller::*;
pub use crate::hub::prelude::*; pub use crate::hub::prelude::*;
pub use crate::io::prelude::*; pub use crate::io::prelude::*;
#[cfg(target_os = "linux")]
pub use crate::supervisor::prelude::*; pub use crate::supervisor::prelude::*;
pub use crate::time::DurationRT; pub use crate::time::DurationRT;
pub use bma_ts::{Monotonic, Timestamp}; pub use bma_ts::{Monotonic, Timestamp};
......
...@@ -29,8 +29,12 @@ impl<T> Default for Supervisor<T> { ...@@ -29,8 +29,12 @@ impl<T> Default for Supervisor<T> {
macro_rules! vacant_entry { macro_rules! vacant_entry {
($self:ident, $builder:ident) => {{ ($self:ident, $builder:ident) => {{
let Some(name) = $builder.name.clone() else { return Err(Error::SupervisorNameNotSpecified); }; let Some(name) = $builder.name.clone() else {
let btree_map::Entry::Vacant(entry) = $self.tasks.entry(name.clone()) else { return Err(Error::SupervisorDuplicateTask(name)); }; return Err(Error::SupervisorNameNotSpecified);
};
let btree_map::Entry::Vacant(entry) = $self.tasks.entry(name.clone()) else {
return Err(Error::SupervisorDuplicateTask(name));
};
entry entry
}}; }};
} }
......
...@@ -2,19 +2,22 @@ use crate::{time::Interval, Error, Result}; ...@@ -2,19 +2,22 @@ use crate::{time::Interval, Error, Result};
use bma_ts::{Monotonic, Timestamp}; use bma_ts::{Monotonic, Timestamp};
use colored::Colorize; use colored::Colorize;
use core::fmt; use core::fmt;
#[cfg(target_os = "linux")]
use libc::cpu_set_t; use libc::cpu_set_t;
#[cfg(target_os = "linux")]
use nix::{sys::signal, unistd}; use nix::{sys::signal, unistd};
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
fs, fs,
io::BufRead, io::BufRead,
mem,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
thread::{self, JoinHandle, Scope, ScopedJoinHandle}, thread::{self, JoinHandle, Scope, ScopedJoinHandle},
time::Duration, time::Duration,
}; };
use sysinfo::{Pid, PidExt, ProcessExt, System, SystemExt}; #[cfg(target_os = "linux")]
use sysinfo::PidExt;
use sysinfo::{Pid, ProcessExt, System, SystemExt};
use tracing::warn; use tracing::warn;
static REALTIME_MODE: AtomicBool = AtomicBool::new(true); static REALTIME_MODE: AtomicBool = AtomicBool::new(true);
...@@ -29,6 +32,12 @@ fn is_realtime() -> bool { ...@@ -29,6 +32,12 @@ fn is_realtime() -> bool {
REALTIME_MODE.load(Ordering::Relaxed) REALTIME_MODE.load(Ordering::Relaxed)
} }
macro_rules! panic_os {
() => {
panic!("The function is not supported on this OS");
};
}
/// The method preallocates a heap memory region with the given size. The method is useful to /// The method preallocates a heap memory region with the given size. The method is useful to
/// prevent memory fragmentation and speed up memory allocation. It is highly recommended to call /// prevent memory fragmentation and speed up memory allocation. It is highly recommended to call
/// the method at the beginning of the program. /// the method at the beginning of the program.
...@@ -38,31 +47,39 @@ fn is_realtime() -> bool { ...@@ -38,31 +47,39 @@ fn is_realtime() -> bool {
/// # Panics /// # Panics
/// ///
/// Will panic if the page size is too large (more than usize) /// Will panic if the page size is too large (more than usize)
#[allow(unused_variables)]
pub fn prealloc_heap(size: usize) -> Result<()> { pub fn prealloc_heap(size: usize) -> Result<()> {
if !is_realtime() { if !is_realtime() {
return Ok(()); return Ok(());
} }
let page_size = unsafe { #[cfg(target_os = "linux")]
if libc::mallopt(libc::M_MMAP_MAX, 0) != 1 { {
return Err(Error::failed( let page_size = unsafe {
"unable to disable mmap for allocation of large mem regions", if libc::mallopt(libc::M_MMAP_MAX, 0) != 1 {
)); return Err(Error::failed(
} "unable to disable mmap for allocation of large mem regions",
if libc::mallopt(libc::M_TRIM_THRESHOLD, -1) != 1 { ));
return Err(Error::failed("unable to disable trimming")); }
} if libc::mallopt(libc::M_TRIM_THRESHOLD, -1) != 1 {
if libc::mlockall(libc::MCL_FUTURE) == -1 { return Err(Error::failed("unable to disable trimming"));
return Err(Error::failed("unable to lock memory pages")); }
if libc::mlockall(libc::MCL_FUTURE) == -1 {
return Err(Error::failed("unable to lock memory pages"));
};
usize::try_from(libc::sysconf(libc::_SC_PAGESIZE)).expect("Page size too large")
}; };
usize::try_from(libc::sysconf(libc::_SC_PAGESIZE)).expect("Page size too large") let mut heap_mem = vec![0_u8; size];
}; std::hint::black_box(move || {
let mut heap_mem = vec![0_u8; size]; for i in (0..size).step_by(page_size) {
std::hint::black_box(move || { heap_mem[i] = 0xff;
for i in (0..size).step_by(page_size) { }
heap_mem[i] = 0xff; })();
} Ok(())
})(); }
Ok(()) #[cfg(not(target_os = "linux"))]
{
panic_os!();
}
} }
/// A thread builder object, similar to [`thread::Builder`] but with real-time capabilities /// A thread builder object, similar to [`thread::Builder`] but with real-time capabilities
...@@ -100,6 +117,7 @@ pub enum Scheduling { ...@@ -100,6 +117,7 @@ pub enum Scheduling {
Other, Other,
} }
#[cfg(target_os = "linux")]
impl From<Scheduling> for libc::c_int { impl From<Scheduling> for libc::c_int {
fn from(value: Scheduling) -> Self { fn from(value: Scheduling) -> Self {
match value { match value {
...@@ -113,6 +131,7 @@ impl From<Scheduling> for libc::c_int { ...@@ -113,6 +131,7 @@ impl From<Scheduling> for libc::c_int {
} }
} }
#[cfg(target_os = "linux")]
impl From<libc::c_int> for Scheduling { impl From<libc::c_int> for Scheduling {
fn from(value: libc::c_int) -> Self { fn from(value: libc::c_int) -> Self {
match value { match value {
...@@ -488,27 +507,36 @@ impl RTParams { ...@@ -488,27 +507,36 @@ impl RTParams {
} }
} }
#[allow(unused_variables)]
fn thread_init_internal( fn thread_init_internal(
tx_tid: oneshot::Sender<(libc::c_int, oneshot::Sender<bool>)>, tx_tid: oneshot::Sender<(libc::c_int, oneshot::Sender<bool>)>,
park_on_errors: bool, park_on_errors: bool,
) { ) {
let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) }; #[cfg(target_os = "linux")]
let (tx_ok, rx_ok) = oneshot::channel::<bool>(); {
tx_tid.send((tid, tx_ok)).unwrap(); let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) };
if !rx_ok.recv().unwrap() { let (tx_ok, rx_ok) = oneshot::channel::<bool>();
if park_on_errors { tx_tid.send((tid, tx_ok)).unwrap();
loop { if !rx_ok.recv().unwrap() {
thread::park(); if park_on_errors {
loop {
thread::park();
}
} else {
panic!(
"THREAD SETUP FAILED FOR `{}`",
thread::current().name().unwrap_or_default()
);
} }
} else {
panic!(
"THREAD SETUP FAILED FOR `{}`",
thread::current().name().unwrap_or_default()
);
} }
} }
#[cfg(not(target_os = "linux"))]
{
panic_os!();
}
} }
#[allow(unused_variables)]
fn thread_init_external( fn thread_init_external(
rx_tid: oneshot::Receiver<(libc::c_int, oneshot::Sender<bool>)>, rx_tid: oneshot::Receiver<(libc::c_int, oneshot::Sender<bool>)>,
params: &RTParams, params: &RTParams,
...@@ -527,49 +555,58 @@ fn thread_init_external( ...@@ -527,49 +555,58 @@ fn thread_init_external(
Ok(tid) Ok(tid)
} }
#[allow(unused_variables)]
fn apply_thread_params(tid: libc::c_int, params: &RTParams, quiet: bool) -> Result<()> { fn apply_thread_params(tid: libc::c_int, params: &RTParams, quiet: bool) -> Result<()> {
if !is_realtime() { if !is_realtime() {
return Ok(()); return Ok(());
} }
if !params.cpu_ids.is_empty() { #[cfg(target_os = "linux")]
unsafe { {
let mut cpuset: cpu_set_t = mem::zeroed(); if !params.cpu_ids.is_empty() {
for cpu in &params.cpu_ids { unsafe {
libc::CPU_SET(*cpu, &mut cpuset); let mut cpuset: cpu_set_t = std::mem::zeroed();
for cpu in &params.cpu_ids {
libc::CPU_SET(*cpu, &mut cpuset);
}
let res =
libc::sched_setaffinity(tid, std::mem::size_of::<libc::cpu_set_t>(), &cpuset);
if res != 0 {
if !quiet {
eprintln!(
"Error setting CPU affinity: {}",
std::io::Error::last_os_error()
);
}
return Err(Error::RTSchedSetAffinity(res));
}
} }
let res = libc::sched_setaffinity(tid, std::mem::size_of::<libc::cpu_set_t>(), &cpuset); }
if let Some(priority) = params.priority {
let res = unsafe {
libc::sched_setscheduler(
tid,
params.scheduling.into(),
&libc::sched_param {
sched_priority: priority,
},
)
};
if res != 0 { if res != 0 {
if !quiet { if !quiet {
eprintln!( eprintln!(
"Error setting CPU affinity: {}", "Error setting scheduler: {}",
std::io::Error::last_os_error() std::io::Error::last_os_error()
); );
} }
return Err(Error::RTSchedSetAffinity(res)); return Err(Error::RTSchedSetSchduler(res));
} }
} }
Ok(())
} }
if let Some(priority) = params.priority { #[cfg(not(target_os = "linux"))]
let res = unsafe { {
libc::sched_setscheduler( panic_os!();
tid,
params.scheduling.into(),
&libc::sched_param {
sched_priority: priority,
},
)
};
if res != 0 {
if !quiet {
eprintln!(
"Error setting scheduler: {}",
std::io::Error::last_os_error()
);
}
return Err(Error::RTSchedSetSchduler(res));
}
} }
Ok(())
} }
macro_rules! impl_serialize_join_handle { macro_rules! impl_serialize_join_handle {
...@@ -597,19 +634,36 @@ pub(crate) fn suicide_myself(delay: Duration, warn: bool) { ...@@ -597,19 +634,36 @@ pub(crate) fn suicide_myself(delay: Duration, warn: bool) {
eprintln!("{}", "KILLING THE PROCESS".red().bold()); eprintln!("{}", "KILLING THE PROCESS".red().bold());
} }
kill_pstree(pid as i32, false, None); kill_pstree(pid as i32, false, None);
#[cfg(target_os = "linux")]
let _ = signal::kill(unistd::Pid::from_raw(pid as i32), signal::Signal::SIGKILL); let _ = signal::kill(unistd::Pid::from_raw(pid as i32), signal::Signal::SIGKILL);
#[cfg(not(target_os = "linux"))]
{
panic_os!();
}
} }
/// Terminates a process tree with SIGTERM, waits "term_kill_interval" and repeats the opeation /// Terminates a process tree with SIGTERM, waits "term_kill_interval" and repeats the opeation
/// with SIGKILL /// with SIGKILL
/// ///
/// If "term_kill_interval" is not set, SIGKILL is used immediately. /// If "term_kill_interval" is not set, SIGKILL is used immediately.
#[allow(clippy::cast_possible_wrap, clippy::cast_sign_loss)] #[allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, unused_variables)]
pub fn kill_pstree(pid: i32, kill_parent: bool, term_kill_interval: Option<Duration>) { pub fn kill_pstree(pid: i32, kill_parent: bool, term_kill_interval: Option<Duration>) {
let mut sys = System::new(); #[cfg(target_os = "linux")]
sys.refresh_processes(); {
let mut pids = BTreeSet::new(); let mut sys = System::new();
if let Some(delay) = term_kill_interval { sys.refresh_processes();
let mut pids = BTreeSet::new();
if let Some(delay) = term_kill_interval {
kill_process_tree(
Pid::from_u32(pid as u32),
&mut sys,
&mut pids,
signal::Signal::SIGTERM,
kill_parent,
);
thread::sleep(delay);
sys.refresh_processes();
}
kill_process_tree( kill_process_tree(
Pid::from_u32(pid as u32), Pid::from_u32(pid as u32),
&mut sys, &mut sys,
...@@ -617,18 +671,14 @@ pub fn kill_pstree(pid: i32, kill_parent: bool, term_kill_interval: Option<Durat ...@@ -617,18 +671,14 @@ pub fn kill_pstree(pid: i32, kill_parent: bool, term_kill_interval: Option<Durat
signal::Signal::SIGTERM, signal::Signal::SIGTERM,
kill_parent, kill_parent,
); );
thread::sleep(delay);
sys.refresh_processes();
} }
kill_process_tree( #[cfg(not(target_os = "linux"))]
Pid::from_u32(pid as u32), {
&mut sys, panic_os!();
&mut pids, }
signal::Signal::SIGTERM,
kill_parent,
);
} }
#[cfg(target_os = "linux")]
fn kill_process_tree( fn kill_process_tree(
pid: Pid, pid: Pid,
sys: &mut sysinfo::System, sys: &mut sysinfo::System,
...@@ -647,6 +697,7 @@ fn kill_process_tree( ...@@ -647,6 +697,7 @@ fn kill_process_tree(
} }
} }
#[allow(dead_code)]
fn get_child_pids_recursive(pid: Pid, sys: &System, to: &mut BTreeSet<Pid>) { fn get_child_pids_recursive(pid: Pid, sys: &System, to: &mut BTreeSet<Pid>) {
for (i, p) in sys.processes() { for (i, p) in sys.processes() {
if let Some(parent) = p.parent() { if let Some(parent) = p.parent() {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论