Skip to content

Commit fb4d1c8

Browse files
account for declared ke in reply (#2387)
1 parent b3b5161 commit fb4d1c8

File tree

6 files changed

+174
-17
lines changed

6 files changed

+174
-17
lines changed

zenoh/src/api/key_expr.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use crate::api::session::{Session, SessionInner, UndeclarableSealed, WeakSession
3232
pub(crate) struct KeyExprWireDeclaration {
3333
expr_id: ExprId,
3434
prefix_len: u32,
35-
mapping: Mapping,
3635
session: WeakSession,
3736
undeclared: bool,
3837
}
@@ -47,7 +46,6 @@ impl KeyExprWireDeclaration {
4746
.map(|expr_id| Self {
4847
expr_id,
4948
prefix_len,
50-
mapping: Mapping::Sender,
5149
session: session.downgrade(),
5250
undeclared: false,
5351
}))
@@ -541,23 +539,31 @@ impl<'a> KeyExpr<'a> {
541539
.unwrap_or(false)
542540
}
543541

544-
pub(crate) fn to_wire(&'a self, session: &SessionInner) -> WireExpr<'a> {
542+
fn to_wire_inner(&'a self, session: &SessionInner, mapping: Mapping) -> WireExpr<'a> {
545543
match self.declaration() {
546544
Some(d) if d.is_declared_on_session_inner(session) => WireExpr {
547545
scope: d.expr_id,
548546
suffix: std::borrow::Cow::Borrowed(
549547
&self.key_expr().as_str()[((d.prefix_len) as usize)..],
550548
),
551-
mapping: d.mapping,
549+
mapping,
552550
},
553551
_ => WireExpr {
554552
scope: 0,
555553
suffix: std::borrow::Cow::Borrowed(self.key_expr().as_str()),
556-
mapping: Mapping::Sender,
554+
mapping,
557555
},
558556
}
559557
}
560558

559+
pub(crate) fn to_wire(&'a self, session: &SessionInner) -> WireExpr<'a> {
560+
self.to_wire_inner(session, Mapping::Sender)
561+
}
562+
563+
pub(crate) fn to_wire_local(&'a self, session: &SessionInner) -> WireExpr<'a> {
564+
self.to_wire_inner(session, Mapping::Receiver)
565+
}
566+
561567
fn undeclare_with_session_check(&mut self, parent_session: Option<&Session>) -> ZResult<()> {
562568
match self.declaration_mut().take() {
563569
Some(mut d) if self.key_expr().len() == d.prefix_len as usize => {

zenoh/src/api/queryable.rs

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,72 @@ use crate::{
5454
net::primitives::Primitives,
5555
};
5656

57+
pub(crate) struct LocalReplyPrimitives {
58+
session: WeakSession,
59+
}
60+
61+
pub(crate) struct RemoteReplyPrimitives {
62+
pub(crate) session: Option<WeakSession>,
63+
pub(crate) primitives: Arc<dyn Primitives>,
64+
}
65+
66+
pub(crate) enum ReplyPrimitives {
67+
Local(LocalReplyPrimitives),
68+
Remote(RemoteReplyPrimitives),
69+
}
70+
71+
impl ReplyPrimitives {
72+
pub(crate) fn new_local(session: WeakSession) -> Self {
73+
ReplyPrimitives::Local(LocalReplyPrimitives { session })
74+
}
75+
76+
pub(crate) fn new_remote(
77+
session: Option<WeakSession>,
78+
primitives: Arc<dyn Primitives>,
79+
) -> Self {
80+
ReplyPrimitives::Remote(RemoteReplyPrimitives {
81+
session,
82+
primitives,
83+
})
84+
}
85+
86+
pub(crate) fn send_response_final(&self, msg: &mut ResponseFinal) {
87+
match self {
88+
ReplyPrimitives::Local(local) => local.session.send_response_final(msg),
89+
ReplyPrimitives::Remote(remote) => remote.primitives.send_response_final(msg),
90+
}
91+
}
92+
93+
pub(crate) fn send_response(&self, msg: &mut Response) {
94+
match self {
95+
ReplyPrimitives::Local(local) => local.session.send_response(msg),
96+
ReplyPrimitives::Remote(remote) => remote.primitives.send_response(msg),
97+
}
98+
}
99+
100+
pub(crate) fn keyexpr_to_wire(&self, key_expr: &KeyExpr) -> WireExpr<'static> {
101+
match self {
102+
ReplyPrimitives::Local(local) => key_expr.to_wire_local(&local.session).to_owned(),
103+
ReplyPrimitives::Remote(remote) => match &remote.session {
104+
Some(s) => key_expr.to_wire(&s).to_owned(),
105+
None => WireExpr {
106+
scope: 0,
107+
suffix: std::borrow::Cow::Owned(key_expr.as_str().into()),
108+
mapping: Mapping::Sender,
109+
},
110+
},
111+
}
112+
}
113+
}
114+
57115
pub(crate) struct QueryInner {
58116
pub(crate) key_expr: KeyExpr<'static>,
59117
pub(crate) parameters: Parameters<'static>,
60118
pub(crate) qid: RequestId,
61119
pub(crate) zid: ZenohIdProto,
62120
#[cfg(feature = "unstable")]
63121
pub(crate) source_info: Option<SourceInfo>,
64-
pub(crate) primitives: Arc<dyn Primitives>,
122+
pub(crate) primitives: ReplyPrimitives,
65123
}
66124

67125
impl QueryInner {
@@ -74,7 +132,7 @@ impl QueryInner {
74132
zid: ZenohIdProto::default(),
75133
#[cfg(feature = "unstable")]
76134
source_info: None,
77-
primitives: Arc::new(DummyPrimitives),
135+
primitives: ReplyPrimitives::new_remote(None, Arc::new(DummyPrimitives)),
78136
}
79137
}
80138
}
@@ -478,11 +536,7 @@ impl Query {
478536
let ext_sinfo = sample.source_info.map(Into::into);
479537
self.inner.primitives.send_response(&mut Response {
480538
rid: self.inner.qid,
481-
wire_expr: WireExpr {
482-
scope: 0,
483-
suffix: std::borrow::Cow::Owned(sample.key_expr.into()),
484-
mapping: Mapping::Sender,
485-
},
539+
wire_expr: self.inner.primitives.keyexpr_to_wire(&sample.key_expr),
486540
payload: ResponseBody::Reply(zenoh::Reply {
487541
consolidation: zenoh::ConsolidationMode::DEFAULT,
488542
ext_unknown: vec![],

zenoh/src/api/session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ use crate::{
104104
ConsolidationMode, LivelinessQueryState, QueryConsolidation, QueryState, QueryTarget,
105105
Reply,
106106
},
107-
queryable::{Query, QueryInner, QueryableState},
107+
queryable::{Query, QueryInner, QueryableState, ReplyPrimitives},
108108
sample::{Locality, QoS, Sample, SampleKind},
109109
selector::Selector,
110110
subscriber::{SubscriberKind, SubscriberState},
@@ -2658,9 +2658,9 @@ impl SessionInner {
26582658
#[cfg(feature = "unstable")]
26592659
source_info,
26602660
primitives: if local {
2661-
Arc::new(WeakSession::new(self))
2661+
ReplyPrimitives::new_local(WeakSession::new(self))
26622662
} else {
2663-
primitives
2663+
ReplyPrimitives::new_remote(Some(WeakSession::new(self)), primitives)
26642664
},
26652665
});
26662666
if !queryables.is_empty() {

zenoh/src/net/runtime/adminspace.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use crate::{
5454
api::{
5555
bytes::ZBytes,
5656
key_expr::KeyExpr,
57-
queryable::{Query, QueryInner},
57+
queryable::{Query, QueryInner, ReplyPrimitives},
5858
},
5959
bytes::Encoding,
6060
net::{
@@ -470,7 +470,7 @@ impl Primitives for AdminSpace {
470470
zid: zid.into(),
471471
#[cfg(feature = "unstable")]
472472
source_info: query.ext_sinfo.map(Into::into),
473-
primitives,
473+
primitives: ReplyPrimitives::new_remote(None, primitives),
474474
}),
475475
eid: self.queryable_id,
476476
value: mem::take(&mut query.ext_body)

zenoh/src/tests/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@
1414

1515
mod interceptor_cache;
1616
mod link_weights;
17+
18+
mod query_reply;

zenoh/src/tests/query_reply.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use std::{
2+
any::Any,
3+
sync::{Arc, Mutex},
4+
time::Duration,
5+
};
6+
7+
use zenoh_core::ztimeout;
8+
use zenoh_protocol::{
9+
core::{Reliability, WireExpr, ZenohIdProto},
10+
network::{Declare, Interest, Mapping, Push, Request, Response, ResponseFinal},
11+
};
12+
13+
use crate::{
14+
api::queryable::{Query, QueryInner, ReplyPrimitives},
15+
net::primitives::Primitives,
16+
};
17+
18+
const TIMEOUT: Duration = Duration::from_secs(60);
19+
20+
struct ReplyTestPrimitives {
21+
wire_expr: Arc<Mutex<Option<WireExpr<'static>>>>,
22+
}
23+
24+
impl ReplyTestPrimitives {
25+
fn new() -> Self {
26+
ReplyTestPrimitives {
27+
wire_expr: Arc::new(Mutex::new(None)),
28+
}
29+
}
30+
31+
fn wire_expr(&self) -> Option<WireExpr> {
32+
self.wire_expr.lock().unwrap().clone()
33+
}
34+
}
35+
36+
impl Primitives for ReplyTestPrimitives {
37+
fn send_interest(&self, _msg: &mut Interest) {}
38+
39+
fn send_declare(&self, _msg: &mut Declare) {}
40+
41+
fn send_push(&self, _msg: &mut Push, _reliability: Reliability) {}
42+
43+
fn send_request(&self, _msg: &mut Request) {}
44+
45+
fn send_response(&self, msg: &mut Response) {
46+
let _ = self.wire_expr.lock().unwrap().insert(msg.wire_expr.clone());
47+
}
48+
49+
fn send_response_final(&self, _msg: &mut ResponseFinal) {}
50+
51+
fn send_close(&self) {}
52+
53+
fn as_any(&self) -> &dyn Any {
54+
self
55+
}
56+
}
57+
58+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
59+
async fn test_reply_preserves_optimized_ke() {
60+
use crate::Config;
61+
62+
let session = ztimeout!(crate::open(Config::default())).unwrap();
63+
64+
let primitives = Arc::new(ReplyTestPrimitives::new());
65+
66+
let query_inner = QueryInner {
67+
key_expr: "test/**".try_into().unwrap(),
68+
parameters: "".into(),
69+
qid: 1,
70+
zid: ZenohIdProto::default(),
71+
#[cfg(feature = "unstable")]
72+
source_info: None,
73+
primitives: ReplyPrimitives::new_remote(Some(session.downgrade()), primitives.clone()),
74+
};
75+
let query = Query {
76+
inner: Arc::new(query_inner),
77+
eid: 1,
78+
value: None,
79+
attachment: None,
80+
};
81+
82+
let ke = "test/reply_declared_ke";
83+
let declared_ke = ztimeout!(session.declare_keyexpr(ke)).unwrap();
84+
let _ = ztimeout!(query.reply(declared_ke, "payload")).unwrap();
85+
86+
let mut we = primitives.wire_expr().unwrap();
87+
assert!(we.suffix.is_empty());
88+
assert!(we.scope != 0);
89+
assert!(we.mapping == Mapping::Sender);
90+
91+
let _ = ztimeout!(query.reply(ke, "payload")).unwrap();
92+
we = primitives.wire_expr().unwrap();
93+
assert_eq!(&we.suffix, &ke);
94+
assert!(we.scope == 0);
95+
}

0 commit comments

Comments
 (0)