提交 41da870a authored 作者: Serhij S's avatar Serhij S

scheduling and system tools moved to rtsc

上级 3c81fd0a
......@@ -47,7 +47,7 @@ metrics-exporter-prometheus = { version = "0.16.0", optional = true, default-fea
metrics-exporter-scope = { version = "0.2.0", optional = true }
metrics = { version = "0.24", optional = true }
snmp2 = { version = "0.3", optional = true }
rtsc = "0.3"
rtsc = "^0.3.12"
rvideo = { version = "0.5", optional = true, default-features = false }
rflow = { version = "0.1", optional = true, default-features = false }
once_cell = { version = "1.19.0", optional = true }
......
......@@ -106,6 +106,7 @@ pub mod io;
pub mod supervisor;
/// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone, Linux only
pub mod thread_rt;
pub use rtsc::system;
/// State helper functions
#[cfg(any(feature = "json", feature = "msgpack"))]
......@@ -153,10 +154,10 @@ pub enum Error {
RTGetTId(libc::c_int),
/// Real-time engine error: unable to set the thread scheduler affinity
#[error("RT sched_setaffinity {0}")]
RTSchedSetAffinity(libc::c_int),
RTSchedSetAffinity(String),
/// Real-time engine error: unable to set the thread scheduler policy
#[error("RT sched_setscheduler {0}")]
RTSchedSetSchduler(libc::c_int),
RTSchedSetSchduler(String),
/// Supervisor error: task name is not specified in the thread builder
#[error("Task name must be specified when spawning by a supervisor")]
SupervisorNameNotSpecified,
......@@ -198,6 +199,9 @@ impl From<rtsc::Error> for Error {
rtsc::Error::InvalidData(msg) => Error::InvalidData(msg),
rtsc::Error::Failed(msg) => Error::Failed(msg),
rtsc::Error::AccessDenied => Error::AccessDenied,
rtsc::Error::RTSchedSetAffinity(msg) => Error::RTSchedSetAffinity(msg),
rtsc::Error::RTSchedSetScheduler(msg) => Error::RTSchedSetSchduler(msg),
rtsc::Error::IO(err) => Error::IO(err),
}
}
}
......@@ -213,6 +217,9 @@ impl From<Error> for rtsc::Error {
Error::Timeout => rtsc::Error::Timeout,
Error::InvalidData(msg) => rtsc::Error::InvalidData(msg),
Error::AccessDenied => rtsc::Error::AccessDenied,
Error::RTSchedSetAffinity(msg) => rtsc::Error::RTSchedSetAffinity(msg),
Error::RTSchedSetSchduler(msg) => rtsc::Error::RTSchedSetScheduler(msg),
Error::IO(err) => rtsc::Error::IO(err),
_ => rtsc::Error::Failed(err.to_string()),
}
}
......
......@@ -3,14 +3,10 @@ use bma_ts::{Monotonic, Timestamp};
use colored::Colorize;
use core::fmt;
#[cfg(target_os = "linux")]
use libc::cpu_set_t;
#[cfg(target_os = "linux")]
use nix::{sys::signal, unistd};
use serde::{Deserialize, Serialize, Serializer};
use std::{
collections::{BTreeMap, BTreeSet},
fs,
io::BufRead,
collections::BTreeSet,
sync::atomic::{AtomicBool, Ordering},
thread::{self, JoinHandle, Scope, ScopedJoinHandle},
time::Duration,
......@@ -18,7 +14,6 @@ use std::{
#[cfg(target_os = "linux")]
use sysinfo::PidExt;
use sysinfo::{Pid, ProcessExt, System, SystemExt};
use tracing::warn;
static REALTIME_MODE: AtomicBool = AtomicBool::new(true);
......@@ -53,34 +48,7 @@ pub fn prealloc_heap(size: usize) -> Result<()> {
if !is_realtime() {
return Ok(());
}
#[cfg(target_os = "linux")]
{
let page_size = unsafe {
if libc::mallopt(libc::M_MMAP_MAX, 0) != 1 {
return Err(Error::failed(
"unable to disable mmap for allocation of large mem regions",
));
}
if libc::mallopt(libc::M_TRIM_THRESHOLD, -1) != 1 {
return Err(Error::failed("unable to disable trimming"));
}
if libc::mlockall(libc::MCL_FUTURE) == -1 {
return Err(Error::failed("unable to lock memory pages"));
};
usize::try_from(libc::sysconf(libc::_SC_PAGESIZE)).expect("Page size too large")
};
let mut heap_mem = vec![0_u8; size];
std::hint::black_box(move || {
for i in (0..size).step_by(page_size) {
heap_mem[i] = 0xff;
}
})();
Ok(())
}
#[cfg(not(target_os = "linux"))]
{
panic_os!();
}
rtsc::thread_rt::preallocate_heap(size).map_err(Into::into)
}
/// A thread builder object, similar to [`thread::Builder`] but with real-time capabilities
......@@ -118,33 +86,46 @@ pub enum Scheduling {
Other,
}
#[cfg(target_os = "linux")]
impl From<Scheduling> for libc::c_int {
impl From<Scheduling> for rtsc::thread_rt::Scheduling {
fn from(value: Scheduling) -> Self {
match value {
Scheduling::RoundRobin => libc::SCHED_RR,
Scheduling::FIFO => libc::SCHED_FIFO,
Scheduling::Idle => libc::SCHED_IDLE,
Scheduling::Batch => libc::SCHED_BATCH,
Scheduling::DeadLine => libc::SCHED_DEADLINE,
Scheduling::Other => libc::SCHED_NORMAL,
}
}
}
#[cfg(target_os = "linux")]
impl From<libc::c_int> for Scheduling {
fn from(value: libc::c_int) -> Self {
match value {
libc::SCHED_RR => Scheduling::RoundRobin,
libc::SCHED_FIFO => Scheduling::FIFO,
libc::SCHED_IDLE => Scheduling::Idle,
libc::SCHED_BATCH => Scheduling::Batch,
libc::SCHED_DEADLINE => Scheduling::DeadLine,
_ => Scheduling::Other,
}
}
}
Scheduling::RoundRobin => rtsc::thread_rt::Scheduling::RoundRobin,
Scheduling::FIFO => rtsc::thread_rt::Scheduling::FIFO,
Scheduling::Idle => rtsc::thread_rt::Scheduling::Idle,
Scheduling::Batch => rtsc::thread_rt::Scheduling::Batch,
Scheduling::DeadLine => rtsc::thread_rt::Scheduling::DeadLine,
Scheduling::Other => rtsc::thread_rt::Scheduling::Other,
}
}
}
//#[cfg(target_os = "linux")]
//impl From<Scheduling> for libc::c_int {
//fn from(value: Scheduling) -> Self {
//match value {
//Scheduling::RoundRobin => libc::SCHED_RR,
//Scheduling::FIFO => libc::SCHED_FIFO,
//Scheduling::Idle => libc::SCHED_IDLE,
//Scheduling::Batch => libc::SCHED_BATCH,
//Scheduling::DeadLine => libc::SCHED_DEADLINE,
//Scheduling::Other => libc::SCHED_NORMAL,
//}
//}
//}
//#[cfg(target_os = "linux")]
//impl From<libc::c_int> for Scheduling {
//fn from(value: libc::c_int) -> Self {
//match value {
//libc::SCHED_RR => Scheduling::RoundRobin,
//libc::SCHED_FIFO => Scheduling::FIFO,
//libc::SCHED_IDLE => Scheduling::Idle,
//libc::SCHED_BATCH => Scheduling::Batch,
//libc::SCHED_DEADLINE => Scheduling::DeadLine,
//_ => Scheduling::Other,
//}
//}
//}
macro_rules! impl_builder_from {
($t: ty) => {
......@@ -472,6 +453,12 @@ impl RTParams {
pub fn new() -> Self {
Self::default()
}
fn as_rtsc_thread_params(&self) -> rtsc::thread_rt::Params {
rtsc::thread_rt::Params::new()
.with_priority(self.priority)
.with_scheduling(self.scheduling.into())
.with_cpu_ids(&self.cpu_ids)
}
/// Sets thread scheduling policy (can be used as build pattern)
pub fn set_scheduling(mut self, scheduling: Scheduling) -> Self {
self.scheduling = scheduling;
......@@ -561,53 +548,7 @@ fn apply_thread_params(tid: libc::c_int, params: &RTParams, quiet: bool) -> Resu
if !is_realtime() {
return Ok(());
}
#[cfg(target_os = "linux")]
{
if !params.cpu_ids.is_empty() {
unsafe {
let mut cpuset: cpu_set_t = std::mem::zeroed();
for cpu in &params.cpu_ids {
libc::CPU_SET(*cpu, &mut cpuset);
}
let res =
libc::sched_setaffinity(tid, std::mem::size_of::<libc::cpu_set_t>(), &cpuset);
if res != 0 {
if !quiet {
eprintln!(
"Error setting CPU affinity: {}",
std::io::Error::last_os_error()
);
}
return Err(Error::RTSchedSetAffinity(res));
}
}
}
if let Some(priority) = params.priority {
let res = unsafe {
libc::sched_setscheduler(
tid,
params.scheduling.into(),
&libc::sched_param {
sched_priority: priority,
},
)
};
if res != 0 {
if !quiet {
eprintln!(
"Error setting scheduler: {}",
std::io::Error::last_os_error()
);
}
return Err(Error::RTSchedSetSchduler(res));
}
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
{
panic_os!();
}
rtsc::thread_rt::apply(tid, &params.as_rtsc_thread_params()).map_err(Into::into)
}
macro_rules! impl_serialize_join_handle {
......@@ -711,7 +652,8 @@ fn get_child_pids_recursive(pid: Pid, sys: &System, to: &mut BTreeSet<Pid>) {
}
/// Configure system parameters (global) while the process is running. Does nothing in simulated
/// mode
/// mode. A wrapper around [`rtsc::system::linux::SystemConfig`] which respects simulated/real-time
/// mode.
///
/// Example:
///
......@@ -725,10 +667,7 @@ fn get_child_pids_recursive(pid: Pid, sys: &System, to: &mut BTreeSet<Pid>) {
/// // system config is restored at the end of the scope
/// ```
#[derive(Default)]
pub struct SystemConfig {
values: BTreeMap<&'static str, String>,
prev_values: BTreeMap<&'static str, String>,
}
pub struct SystemConfig(rtsc::system::linux::SystemConfig);
impl SystemConfig {
/// Creates a new system config object
......@@ -738,44 +677,23 @@ impl SystemConfig {
}
/// Set a parameter to configure
pub fn set<V: fmt::Display>(mut self, key: &'static str, value: V) -> Self {
self.values.insert(key, value.to_string());
self
}
/// Apply values to /proc/sys keys
pub fn apply(mut self) -> Result<SystemConfigGuard> {
if is_realtime() {
for (key, value) in &self.values {
let fname = format!("/proc/sys/{}", key);
let prev_value = fs::read_to_string(&fname)?;
self.prev_values.insert(key, prev_value);
fs::write(fname, value)?;
}
self.0 = self.0.set(key, value);
}
Ok(SystemConfigGuard { config: self })
self
}
}
/// A guard object to restore system parameters when dropped
pub struct SystemConfigGuard {
config: SystemConfig,
}
impl Drop for SystemConfigGuard {
fn drop(&mut self) {
/// Apply values to /proc/sys keys
pub fn apply(self) -> Result<rtsc::system::linux::SystemConfigGuard> {
if is_realtime() {
for (key, value) in &self.config.prev_values {
if let Err(error) = fs::write(format!("/proc/sys/{}", key), value) {
warn!(key, value, %error, "Failed to restore system config");
}
}
return self.0.apply().map_err(Into::into);
}
Ok(rtsc::system::linux::SystemConfigGuard::default())
}
}
/// Configure CPU governors for the given CPUs
pub struct CpuGovernor {
prev_governor: BTreeMap<usize, String>,
}
/// Configure CPU governors for the given CPUs. A wrapper around
/// [`rtsc::system::linux::CpuGovernor`] which respects simulated/real-time mode.
pub struct CpuGovernor(#[allow(dead_code)] rtsc::system::linux::CpuGovernor);
impl CpuGovernor {
/// Set performance governor for the given CPUs. This sets the maximum frequency for the CPUs,
......@@ -786,59 +704,11 @@ impl CpuGovernor {
where
I: IntoIterator<Item = usize>,
{
let mut prev_governor = BTreeMap::new();
for cpu in performance_cpus {
let fname = format!(
"/sys/devices/system/cpu/cpu{}/cpufreq/scaling_governor",
cpu
);
let prev_value = fs::read_to_string(fname)?;
prev_governor.insert(cpu, prev_value.trim().to_string());
}
for cpu in prev_governor.keys() {
let fname = format!(
"/sys/devices/system/cpu/cpu{}/cpufreq/scaling_governor",
cpu
);
fs::write(fname, "performance")?;
}
Ok(CpuGovernor { prev_governor })
}
}
impl Drop for CpuGovernor {
fn drop(&mut self) {
for (cpu, governor) in &self.prev_governor {
if let Err(error) = fs::write(
format!(
"/sys/devices/system/cpu/cpu{}/cpufreq/scaling_governor",
cpu
),
governor,
) {
warn!(cpu, %error, "Failed to restore governor");
}
}
}
}
/// Get absolute number of CPUs, including isolated
pub fn num_cpus() -> Result<usize> {
let f = std::fs::File::open("/proc/cpuinfo")?;
let reader = std::io::BufReader::new(f);
let lines = reader.lines();
let mut count = 0;
for line in lines {
let line = line?;
if line
.split(':')
.next()
.ok_or_else(|| Error::failed("invalid line"))?
.trim_end()
== "processor"
{
count += 1;
if is_realtime() {
let inner = rtsc::system::linux::CpuGovernor::performance(performance_cpus)?;
Ok(Self(inner))
} else {
Ok(Self(rtsc::system::linux::CpuGovernor::default()))
}
}
Ok(count)
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论