提交 96b606ab authored 作者: Serhij S's avatar Serhij S

subprocess pipes

上级 2ee04f39
......@@ -46,10 +46,11 @@ parking_lot_rt = "0.12.1"
[features]
eapi = ["eva-common", "eva-sdk", "busrt", "tokio", "hostname"]
pipe = ["tokio/process", "tokio/io-util", "tokio/macros", "tokio/rt", "tokio/time"]
modbus = ["rmodbus"]
openssl-vendored = ["busrt/openssl-vendored", "eva-common/openssl-vendored"]
metrics = ["dep:metrics", "metrics-exporter-prometheus"]
full = ["eapi", "modbus", "metrics"]
full = ["eapi", "modbus", "metrics", "pipe"]
[dev-dependencies]
insta = "1.36.1"
......@@ -79,6 +80,11 @@ path = "examples/shutdown.rs"
name = "shutdown-custom"
path = "examples/shutdown-custom.rs"
[[example]]
name = "pipe"
path = "examples/pipe.rs"
required-features = ["pipe"]
[[example]]
name = "eapi"
path = "examples/eapi.rs"
......
......@@ -124,6 +124,9 @@ Currently supported:
* Raw UDP in/out via [`io::raw_udp`]
([Raw UDP in/out example](https://github.com/roboplc/roboplc/blob/main/examples/raw-udp.rs))
* Subprocess pipes via [`io::pipe`]
([Subprocess pipe example](https://github.com/roboplc/roboplc/blob/main/examples/pipe.rs))
* [EVA ICS](https://www.eva-ics.com/) EAPI in/out via [`io::eapi`] ([EVA ICS
example](https://github.com/roboplc/roboplc/blob/main/examples/eapi.rs)),
requires `eapi` crate feature.
......
/// Launches a subprocess and reads its output line by line. Useful to connect RoboPLC with 3rd
/// party software which can not be embedded.
use roboplc::controller::prelude::*;
use roboplc::io::pipe::{self, Pipe};
use roboplc::{prelude::*, Error};
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
type Message = ();
type Variables = ();
#[derive(WorkerOpts)]
#[worker_opts(cpu = 0, priority = 50, scheduling = "fifo", blocking = true)]
struct Worker1 {
reader: pipe::Reader,
}
impl Worker<Message, Variables> for Worker1 {
fn run(&mut self, _context: &Context<Message, Variables>) -> WResult {
loop {
let line = self.reader.line()?;
println!("Worker1: {}", line.trim_end());
}
}
}
#[derive(WorkerOpts)]
#[worker_opts(cpu = 0, priority = 50, scheduling = "fifo", blocking = true)]
struct PipeRunner {
pipe: Pipe,
}
impl Worker<Message, Variables> for PipeRunner {
/// The piped subprocess needs to be run by a worker. The subprocess inherits the scheduling
/// policy and priority of the worker.
fn run(&mut self, _context: &Context<Message, Variables>) -> WResult {
self.pipe.run();
Err(Error::failed("pipe exited").into())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
roboplc::setup_panic();
roboplc::configure_logger(roboplc::LevelFilter::Info);
if !roboplc::is_production() {
roboplc::thread_rt::set_simulated();
}
let _sys = roboplc::thread_rt::SystemConfig::new()
.set("kernel/sched_rt_runtime_us", -1)
.apply()
.expect("Unable to set system config");
roboplc::thread_rt::prealloc_heap(10_000_000)?;
let mut controller = Controller::<Message, Variables>::new();
let (pipe, reader) = Pipe::new("/path/to/subprogram");
controller.spawn_worker(Worker1 { reader })?;
controller.spawn_worker(PipeRunner { pipe })?;
controller.register_signals(SHUTDOWN_TIMEOUT)?;
controller.block();
Ok(())
}
......@@ -12,6 +12,10 @@ pub mod eapi;
#[cfg(feature = "modbus")]
/// Modbus communication
pub mod modbus;
/// Linux process communication
#[cfg(feature = "pipe")]
/// Subprocess pipes
pub mod pipe;
/// Raw UDP communication
pub mod raw_udp;
......
use std::{
collections::BTreeMap,
ffi::{OsStr, OsString},
io,
process::Stdio,
time::Duration,
};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
process::Command,
};
use tracing::error;
use crate::{
pchannel_async::{self, Receiver},
DataDeliveryPolicy, Result,
};
pub struct Reader {
rx: Receiver<String>,
}
impl Reader {
pub fn line(&self) -> Result<String> {
self.rx.recv_blocking()
}
}
pub struct Pipe {
program: OsString,
args: Vec<OsString>,
environment: BTreeMap<String, String>,
input_data: Option<Vec<u8>>,
tx: pchannel_async::Sender<String>,
restart_delay: Duration,
}
impl Pipe {
pub fn new<P: AsRef<OsStr>>(program: P) -> (Self, Reader) {
let (tx, rx) = pchannel_async::bounded(10);
(
Self {
program: program.as_ref().to_owned(),
args: Vec::new(),
environment: BTreeMap::new(),
input_data: None,
tx,
restart_delay: Duration::from_secs(1),
},
Reader { rx },
)
}
pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {
self.args.push(arg.as_ref().to_owned());
self
}
pub fn args(&mut self, args: impl IntoIterator<Item = impl AsRef<OsStr>>) -> &mut Self {
self.args
.extend(args.into_iter().map(|x| x.as_ref().to_owned()));
self
}
pub fn env(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
self.environment.insert(key.into(), value.into());
self
}
pub fn envs(
&mut self,
envs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> &mut Self {
self.environment
.extend(envs.into_iter().map(|(k, v)| (k.into(), v.into())));
self
}
/// STDIN data for the subprocess
pub fn input_data(&mut self, data: impl Into<Vec<u8>>) -> &mut Self {
self.input_data = Some(data.into());
self
}
pub fn restart_delay(&mut self, delay: Duration) -> &mut Self {
self.restart_delay = delay;
self
}
/// Launches a subprocess pipe. The subprocess is restarted automatically if it terminates. The
/// subprocess inherits sheduling policy and priority of the parent thread.
///
/// # Panics
///
/// Will panic if the method is unable to create tokio runtime
pub fn run(&self) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(self.run_async());
}
async fn run_async(&self) {
loop {
match command_pipe(
&self.program,
&self.args,
&Options {
environment: self.environment.clone(),
input_data: self.input_data.clone(),
},
) {
Ok(rx) => {
while let Ok(v) = rx.recv().await {
match v {
CommandPipeOutput::Stdout(line) => {
if self.tx.send(line).await.is_err() {
return;
}
}
CommandPipeOutput::Stderr(line) => {
error!(program=%self.program.to_string_lossy(), "{}",
line.trim_end());
}
CommandPipeOutput::Terminated(code) => {
error!(program=%self.program.to_string_lossy(), "Command terminated with code {}", code);
break;
}
}
}
}
Err(error) => {
error!(program=%self.program.to_string_lossy(), %error, "Failed to start command pipe");
}
}
tokio::time::sleep(self.restart_delay).await;
}
}
}
#[derive(Default, Clone)]
struct Options {
environment: BTreeMap<String, String>,
input_data: Option<Vec<u8>>,
}
#[derive(Debug)]
enum CommandPipeOutput {
Stdout(String),
Stderr(String),
Terminated(i32),
}
impl DataDeliveryPolicy for CommandPipeOutput {}
fn command_pipe<P, I, S>(
program: P,
args: I,
opts: &Options,
) -> std::result::Result<Receiver<CommandPipeOutput>, io::Error>
where
P: AsRef<OsStr>,
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
let (output_tx, output_rx) = pchannel_async::bounded(10);
let mut child = Command::new(program)
.args(args)
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.kill_on_drop(true)
.envs(&opts.environment)
.spawn()?;
let stdin = if opts.input_data.is_some() {
match child.stdin.take() {
Some(v) => Some(v),
None => {
return Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"Unable to create stdin writer",
))
}
}
} else {
None
};
let stdin_writer = stdin.map(BufWriter::new);
let stderr = child.stderr.take().ok_or_else(|| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to capture stderr of child process",
)
})?;
let stdout = child.stdout.take().ok_or_else(|| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to capture stdout of child process",
)
})?;
let fut_stdin = stdin_writer.map(|mut writer| {
let input_data = opts.input_data.as_ref().unwrap().clone();
tokio::spawn(async move {
if let Err(error) = writer.write_all(&input_data).await {
error!(%error, "Unable to write to stdin");
} else if let Err(error) = writer.flush().await {
error!(%error, "Unable to flush stdin");
}
})
});
tokio::spawn(async move {
let output_tx_stderr = output_tx.clone();
let stderr_handle = tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut line = String::new();
while reader.read_line(&mut line).await.is_ok() {
if line.is_empty()
|| (output_tx_stderr
.send(CommandPipeOutput::Stderr(line.clone()))
.await)
.is_err()
{
break;
}
line.clear();
}
});
let output_tx_stdout = output_tx.clone();
let stdout_handle = tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while reader.read_line(&mut line).await.is_ok() {
if line.is_empty()
|| (output_tx_stdout
.send(CommandPipeOutput::Stdout(line.clone()))
.await)
.is_err()
{
break;
}
line.clear();
}
});
let mut exit_code = -99;
if let Ok(x) = child.wait().await {
if let Some(code) = x.code() {
exit_code = code;
}
}
if let Some(v) = fut_stdin {
v.abort();
}
tokio::select!(
_ = stderr_handle => {},
_ = stdout_handle => {},
);
let _r = output_tx
.send(CommandPipeOutput::Terminated(exit_code))
.await;
});
Ok(output_rx)
}
......@@ -298,6 +298,8 @@ fn panic(info: &PanicInfo) -> ! {
impl DataDeliveryPolicy for () {}
impl DataDeliveryPolicy for usize {}
impl DataDeliveryPolicy for String {}
impl<T> DataDeliveryPolicy for Vec<T> {}
/// Returns true if started in production mode (as a systemd unit)
pub fn is_production() -> bool {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论