提交 4c99c3ca authored 作者: Serhij S's avatar Serhij S

eapi calls timeout

上级 32e2a2c5
...@@ -27,6 +27,8 @@ static CARGO_PKG_AUTHORS: OnceCell<String> = OnceCell::new(); ...@@ -27,6 +27,8 @@ static CARGO_PKG_AUTHORS: OnceCell<String> = OnceCell::new();
static CARGO_PKG_DESCRIPTION: OnceCell<String> = OnceCell::new(); static CARGO_PKG_DESCRIPTION: OnceCell<String> = OnceCell::new();
static CARGO_PKG_VERSION: OnceCell<String> = OnceCell::new(); static CARGO_PKG_VERSION: OnceCell<String> = OnceCell::new();
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
/// Sets the EAPI module information. Must be called only once. Usually not needed to be called /// Sets the EAPI module information. Must be called only once. Usually not needed to be called
/// directly, as executed by the `init_eapi!` macro. /// directly, as executed by the `init_eapi!` macro.
/// ///
...@@ -96,6 +98,20 @@ impl DataDeliveryPolicy for PushPayload { ...@@ -96,6 +98,20 @@ impl DataDeliveryPolicy for PushPayload {
} }
} }
async fn safe_rpc_call(
rpc: &RpcClient,
target: &str,
method: &str,
payload: busrt::borrow::Cow<'_>,
qos: QoS,
timeout: Duration,
) -> Result<RpcEvent> {
tokio::time::timeout(timeout, rpc.call(target, method, payload.into(), qos))
.await
.map_err(|_| Error::Timeout)?
.map_err(Error::io)
}
/// EAPI connection configuration /// EAPI connection configuration
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct EAPIConfig<D, V> pub struct EAPIConfig<D, V>
...@@ -104,7 +120,7 @@ where ...@@ -104,7 +120,7 @@ where
V: Send, V: Send,
{ {
path: String, path: String,
timeout: Option<f64>, timeout: Duration,
buf_size: Option<usize>, buf_size: Option<usize>,
queue_size: Option<usize>, queue_size: Option<usize>,
buf_ttl: Option<u64>, buf_ttl: Option<u64>,
...@@ -122,10 +138,7 @@ where ...@@ -122,10 +138,7 @@ where
V: Send, V: Send,
{ {
fn to_busrt_config(&self, name: &str) -> busrt::ipc::Config { fn to_busrt_config(&self, name: &str) -> busrt::ipc::Config {
let mut config = busrt::ipc::Config::new(&self.path, name); let mut config = busrt::ipc::Config::new(&self.path, name).timeout(self.timeout);
if let Some(timeout) = self.timeout {
config = config.timeout(Duration::from_secs_f64(timeout));
}
if let Some(buf_size) = self.buf_size { if let Some(buf_size) = self.buf_size {
config = config.buf_size(buf_size); config = config.buf_size(buf_size);
} }
...@@ -141,7 +154,7 @@ where ...@@ -141,7 +154,7 @@ where
pub fn new(path: &str) -> Self { pub fn new(path: &str) -> Self {
Self { Self {
path: path.to_owned(), path: path.to_owned(),
timeout: None, timeout: DEFAULT_TIMEOUT,
buf_size: None, buf_size: None,
queue_size: None, queue_size: None,
buf_ttl: None, buf_ttl: None,
...@@ -156,9 +169,9 @@ where ...@@ -156,9 +169,9 @@ where
self.name = Some(name.as_ref().to_owned()); self.name = Some(name.as_ref().to_owned());
self self
} }
/// Set timeout in seconds /// Set timeout
pub fn timeout(mut self, timeout: f64) -> Self { pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout); self.timeout = timeout;
self self
} }
/// Set buffer size /// Set buffer size
...@@ -432,6 +445,7 @@ where ...@@ -432,6 +445,7 @@ where
let rpc = Arc::new(RpcClient::new(client, handlers)); let rpc = Arc::new(RpcClient::new(client, handlers));
let rpc_c = rpc.clone(); let rpc_c = rpc.clone();
let rx = self.inner.rx.clone(); let rx = self.inner.rx.clone();
let timeout = self.inner.config.timeout;
let push_worker = tokio::spawn(async move { let push_worker = tokio::spawn(async move {
while let Ok(payload) = rx.recv().await { while let Ok(payload) = rx.recv().await {
match payload { match payload {
...@@ -462,9 +476,15 @@ where ...@@ -462,9 +476,15 @@ where
} }
match pack(&DobjPushPayload { i: &name, d: &data }) { match pack(&DobjPushPayload { i: &name, d: &data }) {
Ok(data) => { Ok(data) => {
if let Err(e) = rpc_c if let Err(e) = safe_rpc_call(
.call("eva.core", "dobj.push", data.into(), QoS::Realtime) &rpc_c,
.await "eva.core",
"dobj.push",
data.into(),
QoS::Realtime,
timeout,
)
.await
{ {
error!(%e, "failed to publish dobj"); error!(%e, "failed to publish dobj");
} }
...@@ -476,9 +496,15 @@ where ...@@ -476,9 +496,15 @@ where
} }
PushPayload::DObjError(name) => match pack(&ParamsId { i: &name }) { PushPayload::DObjError(name) => match pack(&ParamsId { i: &name }) {
Ok(data) => { Ok(data) => {
if let Err(e) = rpc_c if let Err(e) = safe_rpc_call(
.call("eva.core", "dobj.error", data.into(), QoS::Realtime) &rpc_c,
.await "eva.core",
"dobj.error",
data.into(),
QoS::Realtime,
timeout,
)
.await
{ {
error!(%e, "failed to publish dobj error"); error!(%e, "failed to publish dobj error");
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论