提交 b544390a authored 作者: Serhij S's avatar Serhij S

semaphores

上级 6bde025e
...@@ -27,7 +27,7 @@ oneshot = { version = "0.1.6", default-features = false, features = ["std"] } ...@@ -27,7 +27,7 @@ oneshot = { version = "0.1.6", default-features = false, features = ["std"] }
parking_lot = "0.12.1" parking_lot = "0.12.1"
pin-project = "1.1.5" pin-project = "1.1.5"
rmodbus = { version = "0.9.3" } rmodbus = { version = "0.9.3" }
roboplc-derive = { version = "0.1.4" } roboplc-derive = { version = "0.1.5" }
serde = { version = "1.0.197", features = ["derive", "rc"] } serde = { version = "1.0.197", features = ["derive", "rc"] }
serial = "0.4.0" serial = "0.4.0"
sysinfo = "0.30.6" sysinfo = "0.30.6"
......
...@@ -24,6 +24,8 @@ pub mod pchannel; ...@@ -24,6 +24,8 @@ pub mod pchannel;
pub mod pchannel_async; pub mod pchannel_async;
/// Policy-based data storages /// Policy-based data storages
pub mod pdeque; pub mod pdeque;
/// A lighweight real-time safe semaphore
pub mod semaphore;
/// Task supervisor to manage real-time threads /// Task supervisor to manage real-time threads
pub mod supervisor; pub mod supervisor;
/// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone /// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone
...@@ -225,6 +227,7 @@ pub fn suicide(delay: Duration, warn: bool) { ...@@ -225,6 +227,7 @@ pub fn suicide(delay: Duration, warn: bool) {
}; };
} }
impl DataDeliveryPolicy for () {}
impl DataDeliveryPolicy for usize {} impl DataDeliveryPolicy for usize {}
pub mod prelude { pub mod prelude {
......
use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
/// A lightweight real-time safe semaphore
pub struct Semaphore {
inner: Arc<SemaphoreInner>,
}
impl Semaphore {
pub fn new(capacity: usize) -> Self {
Self {
inner: SemaphoreInner {
permissions: <_>::default(),
capacity,
cv: Condvar::new(),
}
.into(),
}
}
/// Tries to acquire permission, returns None if failed
pub fn try_acquire(&self) -> Option<SemaphoreGuard> {
let mut count = self.inner.permissions.lock();
if *count == self.inner.capacity {
return None;
}
*count += 1;
Some(SemaphoreGuard {
inner: self.inner.clone(),
})
}
/// Acquires permission, blocks until it is available
pub fn acquire(&self) -> SemaphoreGuard {
let mut count = self.inner.permissions.lock();
while *count == self.inner.capacity {
self.inner.cv.wait(&mut count);
}
*count += 1;
SemaphoreGuard {
inner: self.inner.clone(),
}
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
pub fn available(&self) -> usize {
self.inner.capacity - *self.inner.permissions.lock()
}
pub fn used(&self) -> usize {
*self.inner.permissions.lock()
}
/// For tests only
#[allow(dead_code)]
fn is_poisoned(&self) -> bool {
*self.inner.permissions.lock() > self.inner.capacity
}
}
struct SemaphoreInner {
permissions: Mutex<usize>,
capacity: usize,
cv: Condvar,
}
impl SemaphoreInner {
fn release(&self) {
let mut count = self.permissions.lock();
*count -= 1;
self.cv.notify_one();
}
}
pub struct SemaphoreGuard {
inner: Arc<SemaphoreInner>,
}
impl Drop for SemaphoreGuard {
fn drop(&mut self) {
self.inner.release();
}
}
#[cfg(test)]
mod test {
use std::time::Instant;
use super::*;
#[test]
fn test_semaphore() {
let sem = Semaphore::new(2);
assert_eq!(sem.capacity(), 2);
assert_eq!(sem.available(), 2);
assert_eq!(sem.used(), 0);
let _g1 = sem.acquire();
assert_eq!(sem.available(), 1);
assert_eq!(sem.used(), 1);
let _g2 = sem.acquire();
assert_eq!(sem.available(), 0);
assert_eq!(sem.used(), 2);
let g3 = sem.try_acquire();
assert!(g3.is_none());
drop(_g1);
assert_eq!(sem.available(), 1);
assert_eq!(sem.used(), 1);
let _g4 = sem.acquire();
assert_eq!(sem.available(), 0);
assert_eq!(sem.used(), 2);
}
#[test]
fn test_semaphore_multithread() {
let start = Instant::now();
let sem = Semaphore::new(10);
let mut tasks = Vec::new();
for _ in 0..100 {
let perm = sem.acquire();
tasks.push(std::thread::spawn(move || {
let _perm = perm;
std::thread::sleep(std::time::Duration::from_millis(1));
}));
}
'outer: loop {
for task in &tasks {
if sem.is_poisoned() {
panic!("Semaphore is poisoned");
}
if !task.is_finished() {
continue 'outer;
}
}
break 'outer;
}
assert!(start.elapsed().as_millis() > 10);
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论