Skip to content

Commit 230d962

Browse files
Implement option for callback drop wait on undeclare for Publisher, Subscriber, Queryable, Querier, Matching listener and Session
1 parent 8d059ff commit 230d962

20 files changed

+1078
-226
lines changed

zenoh/src/api/admin.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ pub(crate) fn init(session: WeakSession) {
172172
let session = session.clone();
173173
move |q| on_admin_query(&session, &prefix, &prefix, q)
174174
}),
175+
#[cfg(feature = "unstable")]
176+
None,
175177
);
176178

177179
// Queryable simulating advanced publisher to allow advanced subscriber to receive historical data
@@ -185,6 +187,8 @@ pub(crate) fn init(session: WeakSession) {
185187
let session = session.clone();
186188
move |q| on_admin_query(&session, &adv_prefix, &prefix, q)
187189
}),
190+
#[cfg(feature = "unstable")]
191+
None,
188192
);
189193

190194
// Subscribe to transport events and publish them to the adminspace

zenoh/src/api/builders/close.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use zenoh_runtime::ZRuntime;
3434
pub struct CloseBuilder<TCloseable: Closeable> {
3535
closee: TCloseable::TClosee,
3636
timeout: Duration,
37+
close_args: <TCloseable::TClosee as Closee>::CloseArgs,
3738
}
3839

3940
// NOTE: `Closeable` is only pub(crate) because it is a zenoh-internal trait, so we don't
@@ -44,6 +45,7 @@ impl<TCloseable: Closeable> CloseBuilder<TCloseable> {
4445
Self {
4546
closee: closeable.get_closee(),
4647
timeout: Duration::from_secs(10),
48+
close_args: <TCloseable::TClosee as Closee>::CloseArgs::default(),
4749
}
4850
}
4951

@@ -115,7 +117,7 @@ impl<TCloseable: Closeable> IntoFuture for CloseBuilder<TCloseable> {
115117
fn into_future(self) -> Self::IntoFuture {
116118
Box::pin(
117119
async move {
118-
if tokio::time::timeout(self.timeout, self.closee.close_inner())
120+
if tokio::time::timeout(self.timeout, self.closee.close_inner(self.close_args))
119121
.await
120122
.is_err()
121123
{
@@ -220,10 +222,20 @@ impl<TOutput: Send + 'static> IntoFuture for NolocalJoinHandle<TOutput> {
220222

221223
#[async_trait]
222224
pub(crate) trait Closee: Send + Sync + 'static {
223-
async fn close_inner(&self);
225+
type CloseArgs: Default + Send + Sync;
226+
async fn close_inner(&self, close_arg: Self::CloseArgs);
224227
}
225228

226229
pub(crate) trait Closeable {
227230
type TClosee: Closee;
228231
fn get_closee(&self) -> Self::TClosee;
229232
}
233+
234+
impl CloseBuilder<crate::Session> {
235+
/// Block in undeclare operation until all currently running zenoh entities' callbacks (if any) return.
236+
#[zenoh_macros::unstable]
237+
pub fn wait_until_callback_execution_ends(mut self) -> Self {
238+
self.close_args.wait_until_callback_execution_ends = true;
239+
self
240+
}
241+
}

zenoh/src/api/builders/liveliness.rs

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,17 @@ where
255255
let key_expr = self.key_expr?;
256256
let session = self.session;
257257
let (callback, handler) = self.handler.into_handler();
258+
#[cfg(feature = "unstable")]
259+
let callback_sync_group = crate::api::cancellation::SyncGroup::default();
258260
session
259261
.0
260262
.declare_liveliness_subscriber_inner(
261263
&key_expr,
262264
Locality::default(),
263265
self.history,
264266
callback,
267+
#[cfg(feature = "unstable")]
268+
callback_sync_group.notifier(),
265269
)
266270
.map(|sub_state| Subscriber {
267271
inner: SubscriberInner {
@@ -272,6 +276,8 @@ where
272276
undeclare_on_drop: true,
273277
},
274278
handler,
279+
#[cfg(feature = "unstable")]
280+
callback_sync_group,
275281
})
276282
}
277283
}
@@ -300,6 +306,8 @@ impl Wait for LivelinessSubscriberBuilder<'_, '_, Callback<Sample>, true> {
300306
Locality::default(),
301307
self.history,
302308
self.handler,
309+
#[cfg(feature = "unstable")]
310+
None,
303311
)?;
304312
Ok(())
305313
}
@@ -510,35 +518,15 @@ where
510518
Handler::Handler: Send,
511519
{
512520
fn wait(self) -> <Self as Resolvable>::To {
513-
#[allow(unused_mut)] // mut is needed only for unstable cancellation_token
514-
let (mut callback, receiver) = self.handler.into_handler();
515-
#[cfg(feature = "unstable")]
516-
let cancellation_token = if let Some(ct) = self.cancellation_token {
517-
if let Some(notifier) = ct.notifier() {
518-
callback.set_on_drop_notifier(notifier);
519-
Some(ct)
520-
} else {
521-
return Ok(receiver);
522-
}
523-
} else {
524-
None
525-
};
526-
#[allow(unused_variables)] // qid is only needed for unstable cancellation_token
527-
self.session
528-
.0
529-
.liveliness_query(&self.key_expr?, self.timeout, callback)
530-
.map(|qid| {
531-
#[cfg(feature = "unstable")]
532-
if let Some(cancellation_token) = cancellation_token {
533-
let weak_session = self.session.downgrade();
534-
let on_cancel = move || {
535-
let _ = weak_session.cancel_liveliness_query(qid); // fails only if no associated query exists - likely because it was already finalized
536-
Ok(())
537-
};
538-
cancellation_token.add_on_cancel_handler(Box::new(on_cancel));
539-
}
540-
receiver
541-
})
521+
let (callback, receiver) = self.handler.into_handler();
522+
self.session.0.liveliness_query(
523+
&self.key_expr?,
524+
self.timeout,
525+
callback,
526+
#[cfg(feature = "unstable")]
527+
self.cancellation_token.into_iter().collect(),
528+
)?;
529+
Ok(receiver)
542530
}
543531
}
544532

zenoh/src/api/builders/matching_listener.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::{
2121
use zenoh_core::{Resolvable, Wait};
2222
use zenoh_result::ZResult;
2323

24+
#[cfg(feature = "unstable")]
25+
use crate::api::cancellation::{SyncGroup, SyncGroupNotifier};
2426
use crate::{
2527
api::{
2628
handlers::{Callback, DefaultHandler, IntoHandler},
@@ -44,6 +46,8 @@ pub struct MatchingListenerBuilder<'a, Handler, const BACKGROUND: bool = false>
4446
pub(crate) matching_listeners: &'a Arc<Mutex<HashSet<Id>>>,
4547
pub(crate) matching_status_type: MatchingStatusType,
4648
pub handler: Handler,
49+
#[cfg(feature = "unstable")]
50+
pub(crate) parent_callback_sync_group_notifier: Option<SyncGroupNotifier>,
4751
}
4852

4953
impl<'a> MatchingListenerBuilder<'a, DefaultHandler> {
@@ -140,6 +144,8 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> {
140144
matching_listeners: self.matching_listeners,
141145
matching_status_type: self.matching_status_type,
142146
handler,
147+
#[cfg(feature = "unstable")]
148+
parent_callback_sync_group_notifier: self.parent_callback_sync_group_notifier,
143149
}
144150
}
145151
}
@@ -179,6 +185,8 @@ impl<'a> MatchingListenerBuilder<'a, Callback<MatchingStatus>> {
179185
key_expr: self.key_expr,
180186
matching_status_type: self.matching_status_type,
181187
handler: self.handler,
188+
#[cfg(feature = "unstable")]
189+
parent_callback_sync_group_notifier: self.parent_callback_sync_group_notifier,
182190
}
183191
}
184192
}
@@ -197,12 +205,16 @@ where
197205
Handler::Handler: Send,
198206
{
199207
fn wait(self) -> <Self as Resolvable>::To {
208+
#[cfg(feature = "unstable")]
209+
let callback_sync_group = SyncGroup::default();
200210
let (callback, handler) = self.handler.into_handler();
201211
let state = self.session.declare_matches_listener_inner(
202212
self.key_expr,
203213
self.destination,
204214
self.matching_status_type,
205215
callback,
216+
#[cfg(feature = "unstable")]
217+
callback_sync_group.notifier(),
206218
)?;
207219
zlock!(self.matching_listeners).insert(state.id);
208220
Ok(MatchingListener {
@@ -213,6 +225,8 @@ where
213225
undeclare_on_drop: true,
214226
},
215227
handler,
228+
#[cfg(feature = "unstable")]
229+
callback_sync_group,
216230
})
217231
}
218232
}
@@ -241,6 +255,8 @@ impl Wait for MatchingListenerBuilder<'_, Callback<MatchingStatus>, true> {
241255
self.destination,
242256
self.matching_status_type,
243257
self.handler,
258+
#[cfg(feature = "unstable")]
259+
self.parent_callback_sync_group_notifier,
244260
)?;
245261
zlock!(self.matching_listeners).insert(state.id);
246262
Ok(())

zenoh/src/api/builders/publisher.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use zenoh_protocol::core::CongestionControl;
2121
#[cfg(feature = "unstable")]
2222
use zenoh_protocol::core::Reliability;
2323

24+
#[cfg(feature = "unstable")]
25+
use crate::api::cancellation::SyncGroup;
2426
#[cfg(feature = "unstable")]
2527
use crate::api::sample::SourceInfo;
2628
use crate::{
@@ -501,6 +503,8 @@ impl Wait for PublisherBuilder<'_, '_> {
501503
reliability: self.reliability,
502504
matching_listeners: Default::default(),
503505
undeclare_on_drop: true,
506+
#[cfg(feature = "unstable")]
507+
sync_group: SyncGroup::default(),
504508
})
505509
}
506510
}

zenoh/src/api/builders/querier.rs

Lines changed: 24 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use zenoh_result::ZResult;
2525

2626
use super::sample::QoSBuilderTrait;
2727
#[cfg(feature = "unstable")]
28-
use crate::api::cancellation::CancellationTokenBuilderTrait;
28+
use crate::api::cancellation::{CancellationToken, CancellationTokenBuilderTrait};
2929
#[cfg(feature = "unstable")]
3030
use crate::api::query::ReplyKeyExpr;
3131
#[cfg(feature = "unstable")]
@@ -191,6 +191,8 @@ impl Wait for QuerierBuilder<'_, '_> {
191191
#[cfg(feature = "unstable")]
192192
accept_replies: self.accept_replies,
193193
matching_listeners: Default::default(),
194+
#[cfg(feature = "unstable")]
195+
cancellation_token: CancellationToken::default(),
194196
})
195197
}
196198
}
@@ -473,19 +475,7 @@ where
473475
Handler::Handler: Send,
474476
{
475477
fn wait(self) -> <Self as Resolvable>::To {
476-
#[allow(unused_mut)] // mut is needed only for unstable cancellation_token
477-
let (mut callback, receiver) = self.handler.into_handler();
478-
#[cfg(feature = "unstable")]
479-
let cancellation_token = if let Some(ct) = self.cancellation_token {
480-
if let Some(notifier) = ct.notifier() {
481-
callback.set_on_drop_notifier(notifier);
482-
Some(ct)
483-
} else {
484-
return Ok(receiver);
485-
}
486-
} else {
487-
None
488-
};
478+
let (callback, receiver) = self.handler.into_handler();
489479
#[allow(unused_mut)]
490480
// mut is only needed when building with "unstable" feature, which might add extra internal parameters on top of the user-provided ones
491481
let mut parameters = self.parameters.clone();
@@ -494,34 +484,26 @@ where
494484
parameters.set_reply_key_expr_any();
495485
}
496486
#[allow(unused_variables)] // qid is only needed for unstable cancellation_token
497-
self.querier
498-
.session
499-
.query(
500-
&self.querier.key_expr,
501-
&parameters,
502-
self.querier.target,
503-
self.querier.consolidation,
504-
self.querier.qos,
505-
self.querier.destination,
506-
self.querier.timeout,
507-
self.value,
508-
self.attachment,
509-
#[cfg(feature = "unstable")]
510-
self.source_info,
511-
callback,
512-
)
513-
.map(|qid| {
514-
#[cfg(feature = "unstable")]
515-
if let Some(cancellation_token) = cancellation_token {
516-
let weak_session = self.querier.session.clone();
517-
let on_cancel = move || {
518-
let _ = weak_session.cancel_query(qid); // fails only if no associated query exists - likely because it was already finalized
519-
Ok(())
520-
};
521-
cancellation_token.add_on_cancel_handler(Box::new(on_cancel));
522-
}
523-
receiver
524-
})
487+
self.querier.session.query(
488+
&self.querier.key_expr,
489+
&parameters,
490+
self.querier.target,
491+
self.querier.consolidation,
492+
self.querier.qos,
493+
self.querier.destination,
494+
self.querier.timeout,
495+
self.value,
496+
self.attachment,
497+
#[cfg(feature = "unstable")]
498+
self.source_info,
499+
callback,
500+
#[cfg(feature = "unstable")]
501+
match self.cancellation_token {
502+
Some(ct) => vec![self.querier.cancellation_token.clone(), ct],
503+
None => vec![self.querier.cancellation_token.clone()],
504+
},
505+
)?;
506+
Ok(receiver)
525507
}
526508
}
527509

zenoh/src/api/builders/query.rs

Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -374,52 +374,28 @@ where
374374
Handler::Handler: Send,
375375
{
376376
fn wait(self) -> <Self as Resolvable>::To {
377-
#[allow(unused_mut)] // mut is needed only for unstable cancellation_token
378-
let (mut callback, receiver) = self.handler.into_handler();
379-
#[cfg(feature = "unstable")]
380-
let cancellation_token = if let Some(ct) = self.cancellation_token {
381-
if let Some(notifier) = ct.notifier() {
382-
callback.set_on_drop_notifier(notifier);
383-
Some(ct)
384-
} else {
385-
return Ok(receiver);
386-
}
387-
} else {
388-
None
389-
};
377+
let (callback, receiver) = self.handler.into_handler();
390378
let Selector {
391379
key_expr,
392380
parameters,
393381
} = self.selector?;
394-
#[allow(unused_variables)] // qid is only needed for unstable cancellation_token
395-
self.session
396-
.0
397-
.query(
398-
&key_expr,
399-
&parameters,
400-
self.target,
401-
self.consolidation,
402-
self.qos.into(),
403-
self.destination,
404-
self.timeout,
405-
self.value,
406-
self.attachment,
407-
#[cfg(feature = "unstable")]
408-
self.source_info,
409-
callback,
410-
)
411-
.map(|qid| {
412-
#[cfg(feature = "unstable")]
413-
if let Some(cancellation_token) = cancellation_token {
414-
let weak_session = self.session.downgrade();
415-
let on_cancel = move || {
416-
let _ = weak_session.cancel_query(qid); // fails only if no associated query exists - likely because it was already finalized
417-
Ok(())
418-
};
419-
cancellation_token.add_on_cancel_handler(Box::new(on_cancel));
420-
}
421-
receiver
422-
})
382+
self.session.0.query(
383+
&key_expr,
384+
&parameters,
385+
self.target,
386+
self.consolidation,
387+
self.qos.into(),
388+
self.destination,
389+
self.timeout,
390+
self.value,
391+
self.attachment,
392+
#[cfg(feature = "unstable")]
393+
self.source_info,
394+
callback,
395+
#[cfg(feature = "unstable")]
396+
self.cancellation_token.into_iter().collect(),
397+
)?;
398+
Ok(receiver)
423399
}
424400
}
425401

0 commit comments

Comments
 (0)