Skip to content

Commit 201d2da

Browse files
add support for TransportEventsListener and LinkEventsListener
1 parent 230d962 commit 201d2da

File tree

6 files changed

+178
-49
lines changed

6 files changed

+178
-49
lines changed

zenoh/src/api/admin.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,12 @@ pub(crate) fn init(session: WeakSession) {
232232
}
233233
}
234234
});
235-
if let Err(e) = session.declare_transport_events_listener_inner(callback, false) {
235+
if let Err(e) = session.declare_transport_events_listener_inner(
236+
callback,
237+
false,
238+
#[cfg(feature = "unstable")]
239+
None,
240+
) {
236241
tracing::error!("Unable to subscribe to transport events: {}", e);
237242
}
238243

@@ -288,7 +293,13 @@ pub(crate) fn init(session: WeakSession) {
288293
}
289294
}
290295
});
291-
if let Err(e) = session.declare_transport_links_listener_inner(callback, false, None) {
296+
if let Err(e) = session.declare_transport_links_listener_inner(
297+
callback,
298+
false,
299+
None,
300+
#[cfg(feature = "unstable")]
301+
None,
302+
) {
292303
tracing::error!("Unable to subscribe to link events: {}", e);
293304
}
294305
}

zenoh/src/api/builders/info_links.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::api::info::{Link, LinkEvent};
3232
use crate::api::Id;
3333
#[zenoh_macros::unstable]
3434
use crate::{
35+
api::cancellation::SyncGroup,
3536
api::session::{UndeclarableSealed, WeakSession},
3637
handlers::{Callback, DefaultHandler, IntoHandler},
3738
};
@@ -168,6 +169,8 @@ impl std::fmt::Debug for LinkEventsListenerInner {
168169
pub struct LinkEventsListener<Handler> {
169170
pub(crate) inner: LinkEventsListenerInner,
170171
pub(crate) handler: Handler,
172+
#[cfg(feature = "unstable")]
173+
pub(crate) callback_sync_group: SyncGroup,
171174
}
172175

173176
#[zenoh_macros::unstable]
@@ -240,7 +243,11 @@ impl<Handler: Send> UndeclarableSealed<()> for LinkEventsListener<Handler> {
240243
type Undeclaration = LinkEventsListenerUndeclaration<Handler>;
241244

242245
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
243-
LinkEventsListenerUndeclaration(self)
246+
LinkEventsListenerUndeclaration {
247+
listener: self,
248+
#[cfg(feature = "unstable")]
249+
wait_until_callback_execution_ends: false,
250+
}
244251
}
245252
}
246253

@@ -262,17 +269,35 @@ impl<Handler> std::ops::DerefMut for LinkEventsListener<Handler> {
262269

263270
/// A [`Resolvable`] returned by [`LinkEventsListener::undeclare`]
264271
#[zenoh_macros::unstable]
265-
pub struct LinkEventsListenerUndeclaration<Handler>(LinkEventsListener<Handler>);
272+
pub struct LinkEventsListenerUndeclaration<Handler> {
273+
listener: LinkEventsListener<Handler>,
274+
#[cfg(feature = "unstable")]
275+
wait_until_callback_execution_ends: bool,
276+
}
266277

267278
#[zenoh_macros::unstable]
268279
impl<Handler> Resolvable for LinkEventsListenerUndeclaration<Handler> {
269280
type To = ZResult<()>;
270281
}
271282

283+
impl<Handler> LinkEventsListenerUndeclaration<Handler> {
284+
/// Block in undeclare operation until all currently running instances of link events listener callback (if any) return.
285+
#[zenoh_macros::unstable]
286+
pub fn wait_until_callback_execution_ends(mut self) -> Self {
287+
self.wait_until_callback_execution_ends = true;
288+
self
289+
}
290+
}
291+
272292
#[zenoh_macros::unstable]
273293
impl<Handler> Wait for LinkEventsListenerUndeclaration<Handler> {
274294
fn wait(mut self) -> <Self as Resolvable>::To {
275-
self.0.undeclare_impl()
295+
self.listener.undeclare_impl()?;
296+
#[cfg(feature = "unstable")]
297+
if self.wait_until_callback_execution_ends {
298+
self.listener.callback_sync_group.wait();
299+
}
300+
Ok(())
276301
}
277302
}
278303

@@ -450,11 +475,15 @@ where
450475
Handler::Handler: Send,
451476
{
452477
fn wait(self) -> Self::To {
478+
#[cfg(feature = "unstable")]
479+
let callback_sync_group = SyncGroup::default();
453480
let (callback, handler) = self.handler.into_handler();
454481
let state = self.session.declare_transport_links_listener_inner(
455482
callback,
456483
self.history,
457484
self.transport,
485+
#[cfg(feature = "unstable")]
486+
callback_sync_group.notifier(),
458487
)?;
459488

460489
Ok(LinkEventsListener {
@@ -464,6 +493,8 @@ where
464493
undeclare_on_drop: true,
465494
},
466495
handler,
496+
#[cfg(feature = "unstable")]
497+
callback_sync_group,
467498
})
468499
}
469500
}
@@ -494,6 +525,8 @@ impl Wait for LinkEventsListenerBuilder<'_, Callback<LinkEvent>, true> {
494525
self.handler,
495526
self.history,
496527
self.transport,
528+
#[cfg(feature = "unstable")]
529+
None,
497530
)?;
498531
// Set the listener to not undeclare on drop (background mode)
499532
// Note: We can't access the listener to set background flag, so we just don't keep a reference

zenoh/src/api/builders/info_transport.rs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::api::handlers::locked;
2828
use crate::api::info::{Transport, TransportEvent};
2929
#[zenoh_macros::unstable]
3030
use crate::{
31+
api::cancellation::SyncGroup,
3132
api::session::{UndeclarableSealed, WeakSession},
3233
api::Id,
3334
handlers::{Callback, DefaultHandler, IntoHandler},
@@ -134,6 +135,8 @@ impl std::fmt::Debug for TransportEventsListenerInner {
134135
pub struct TransportEventsListener<Handler> {
135136
pub(crate) inner: TransportEventsListenerInner,
136137
pub(crate) handler: Handler,
138+
#[cfg(feature = "unstable")]
139+
pub(crate) callback_sync_group: SyncGroup,
137140
}
138141

139142
#[zenoh_macros::unstable]
@@ -206,7 +209,11 @@ impl<Handler: Send> UndeclarableSealed<()> for TransportEventsListener<Handler>
206209
type Undeclaration = TransportEventsListenerUndeclaration<Handler>;
207210

208211
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
209-
TransportEventsListenerUndeclaration(self)
212+
TransportEventsListenerUndeclaration {
213+
listener: self,
214+
#[cfg(feature = "unstable")]
215+
wait_until_callback_execution_ends: false,
216+
}
210217
}
211218
}
212219

@@ -228,7 +235,20 @@ impl<Handler> std::ops::DerefMut for TransportEventsListener<Handler> {
228235

229236
/// A [`Resolvable`] returned by [`TransportEventsListener::undeclare`]
230237
#[zenoh_macros::unstable]
231-
pub struct TransportEventsListenerUndeclaration<Handler>(TransportEventsListener<Handler>);
238+
pub struct TransportEventsListenerUndeclaration<Handler> {
239+
listener: TransportEventsListener<Handler>,
240+
#[cfg(feature = "unstable")]
241+
wait_until_callback_execution_ends: bool,
242+
}
243+
244+
impl<Handler> TransportEventsListenerUndeclaration<Handler> {
245+
/// Block in undeclare operation until all currently running instances of transport events listener callback (if any) return.
246+
#[zenoh_macros::unstable]
247+
pub fn wait_until_callback_execution_ends(mut self) -> Self {
248+
self.wait_until_callback_execution_ends = true;
249+
self
250+
}
251+
}
232252

233253
#[zenoh_macros::unstable]
234254
impl<Handler> Resolvable for TransportEventsListenerUndeclaration<Handler> {
@@ -238,7 +258,12 @@ impl<Handler> Resolvable for TransportEventsListenerUndeclaration<Handler> {
238258
#[zenoh_macros::unstable]
239259
impl<Handler> Wait for TransportEventsListenerUndeclaration<Handler> {
240260
fn wait(mut self) -> <Self as Resolvable>::To {
241-
self.0.undeclare_impl()
261+
self.listener.undeclare_impl()?;
262+
#[cfg(feature = "unstable")]
263+
if self.wait_until_callback_execution_ends {
264+
self.listener.callback_sync_group.wait();
265+
}
266+
Ok(())
242267
}
243268
}
244269

@@ -391,10 +416,14 @@ where
391416
Handler::Handler: Send,
392417
{
393418
fn wait(self) -> Self::To {
419+
#[cfg(feature = "unstable")]
420+
let callback_sync_group = SyncGroup::default();
394421
let (callback, handler) = self.handler.into_handler();
395-
let state = self
396-
.session
397-
.declare_transport_events_listener_inner(callback, self.history)?;
422+
let state = self.session.declare_transport_events_listener_inner(
423+
callback,
424+
self.history,
425+
callback_sync_group.notifier(),
426+
)?;
398427

399428
Ok(TransportEventsListener {
400429
inner: TransportEventsListenerInner {
@@ -403,6 +432,7 @@ where
403432
undeclare_on_drop: true,
404433
},
405434
handler,
435+
callback_sync_group,
406436
})
407437
}
408438
}
@@ -428,9 +458,12 @@ impl Resolvable for TransportEventsListenerBuilder<'_, Callback<TransportEvent>,
428458
#[zenoh_macros::unstable]
429459
impl Wait for TransportEventsListenerBuilder<'_, Callback<TransportEvent>, true> {
430460
fn wait(self) -> <Self as Resolvable>::To {
431-
let state = self
432-
.session
433-
.declare_transport_events_listener_inner(self.handler, self.history)?;
461+
let state = self.session.declare_transport_events_listener_inner(
462+
self.handler,
463+
self.history,
464+
#[cfg(feature = "unstable")]
465+
None,
466+
)?;
434467
// Set the listener to not undeclare on drop (background mode)
435468
// Note: We can't access the listener to set background flag, so we just don't keep a reference
436469
// The listener will live until explicitly undeclared or session closes

zenoh/src/api/publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ pub struct PublisherUndeclaration<'a> {
427427
}
428428

429429
impl<'a> PublisherUndeclaration<'a> {
430-
/// Block in undeclare operation until all currently instances of matching listeners' callbacks (if any) return.
430+
/// Block in undeclare operation until all currently running instances of matching listeners' callbacks (if any) return.
431431
#[zenoh_macros::unstable]
432432
pub fn wait_until_callback_execution_ends(mut self) -> Self {
433433
self.wait_until_callback_execution_ends = true;

zenoh/src/api/session.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2133,14 +2133,17 @@ impl SessionInner {
21332133
}
21342134
}
21352135

2136+
#[allow(unused_mut)] // for callback drop on undeclare
21362137
pub(crate) fn declare_transport_events_listener_inner(
21372138
&self,
2138-
callback: Callback<TransportEvent>,
2139+
mut callback: Callback<TransportEvent>,
21392140
history: bool,
2141+
#[cfg(feature = "unstable")] callback_drop_notifier: Option<SyncGroupNotifier>,
21402142
) -> ZResult<Arc<TransportEventsListenerState>> {
21412143
let id = self.runtime.next_id();
21422144
trace!("declare_transport_events_listener_inner() => {id}");
2143-
2145+
#[cfg(feature = "unstable")]
2146+
self.register_callback_drop_notifier(callback_drop_notifier, &mut callback);
21442147
let listener_state = Arc::new(TransportEventsListenerState { id, callback });
21452148

21462149
zwrite!(self.state)
@@ -2200,15 +2203,18 @@ impl SessionInner {
22002203
}
22012204
}
22022205

2206+
#[allow(unused_mut)] // for callback drop on undeclare
22032207
pub(crate) fn declare_transport_links_listener_inner(
22042208
&self,
2205-
callback: Callback<LinkEvent>,
2209+
mut callback: Callback<LinkEvent>,
22062210
history: bool,
22072211
transport: Option<Transport>,
2212+
#[cfg(feature = "unstable")] callback_drop_notifier: Option<SyncGroupNotifier>,
22082213
) -> ZResult<Arc<LinkEventsListenerState>> {
22092214
let id = self.runtime.next_id();
22102215
trace!("declare_transport_links_listener_inner() => {id}");
2211-
2216+
#[cfg(feature = "unstable")]
2217+
self.register_callback_drop_notifier(callback_drop_notifier, &mut callback);
22122218
let listener_state = Arc::new(LinkEventsListenerState {
22132219
id,
22142220
callback,

0 commit comments

Comments
 (0)