Skip to content

Commit 3c8b3bc

Browse files
committed
fix: prevent polling loop hang when MQTT broker is unreachable
Add three layers of defense against the polling loop blocking indefinitely during extended broker outages: 1. Shared mqtt_connected flag (Arc<AtomicBool>) set by the event loop on ConnAck/Disconnect/Error, checked at the top of each polling cycle to skip vcontrold queries and publishes entirely when the broker is down. 2. Publish timeout (5s) around publish_retained calls so individual publishes never block the polling loop if rumqttc's internal channel fills up. 3. MissedTickBehavior::Skip on the polling interval to prevent burst- firing all missed ticks after a stall.
1 parent e804d3f commit 3c8b3bc

File tree

5 files changed

+140
-2
lines changed

5 files changed

+140
-2
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
1818
thiserror = "2"
1919
hostname = "0.4"
2020

21+
[dev-dependencies]
22+
tokio = { version = "1", features = ["test-util"] }
23+
2124
[profile.release]
2225
lto = true
2326
strip = true

src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod polling;
1313
mod process;
1414
mod vcontrold;
1515

16+
use std::sync::atomic::AtomicBool;
1617
use std::sync::Arc;
1718
use tokio::sync::mpsc;
1819
use tracing::{error, info};
@@ -91,6 +92,10 @@ async fn run() -> Result<()> {
9192
let (mqtt_client, eventloop) = MqttClient::new(&config.mqtt, &publisher_client_id)?;
9293
let mqtt_client = Arc::new(mqtt_client);
9394

95+
// Shared flag: tracks whether the MQTT broker is currently reachable.
96+
// Written by run_event_loop, read by run_polling_loop.
97+
let mqtt_connected = Arc::new(AtomicBool::new(false));
98+
9499
// Channel for subscriber messages (if enabled)
95100
let (message_tx, message_rx) = if config.mqtt_subscribe {
96101
let (tx, rx) = mpsc::channel(100);
@@ -114,15 +119,17 @@ async fn run() -> Result<()> {
114119
mqtt_client.clone_client(),
115120
subscribe_topics,
116121
message_tx,
122+
Arc::clone(&mqtt_connected),
117123
));
118124

119125
// Spawn polling loop (if commands are configured)
120126
let polling_handle = if !config.commands.is_empty() {
121127
let config_clone = config.clone();
122128
let vcontrold_clone = Arc::clone(&vcontrold_client);
123129
let mqtt_clone = Arc::clone(&mqtt_client);
130+
let connected = Arc::clone(&mqtt_connected);
124131
Some(tokio::spawn(async move {
125-
run_polling_loop(&config_clone, vcontrold_clone, mqtt_clone).await;
132+
run_polling_loop(&config_clone, vcontrold_clone, mqtt_clone, connected).await;
126133
}))
127134
} else {
128135
info!("No commands configured, polling disabled");

src/mqtt/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use rustls::ClientConfig;
1010
use std::fs::File;
1111
use std::io::BufReader;
1212
use std::path::Path;
13+
use std::sync::atomic::{AtomicBool, Ordering};
1314
use std::sync::Arc;
1415
use std::time::Duration;
1516
use tokio::sync::mpsc;
@@ -269,6 +270,7 @@ pub async fn run_event_loop(
269270
client: AsyncClient,
270271
subscribe_topics: Vec<String>,
271272
message_tx: Option<mpsc::Sender<IncomingMessage>>,
273+
mqtt_connected: Arc<AtomicBool>,
272274
) {
273275
loop {
274276
match eventloop.poll().await {
@@ -289,6 +291,7 @@ pub async fn run_event_loop(
289291
}
290292
rumqttc::v5::Incoming::ConnAck(_) => {
291293
info!("Connected to MQTT broker");
294+
mqtt_connected.store(true, Ordering::Relaxed);
292295

293296
// Re-subscribe to all topics on every (re)connection
294297
for topic in &subscribe_topics {
@@ -306,13 +309,15 @@ pub async fn run_event_loop(
306309
}
307310
rumqttc::v5::Incoming::Disconnect(_) => {
308311
warn!("Disconnected from MQTT broker");
312+
mqtt_connected.store(false, Ordering::Relaxed);
309313
}
310314
_ => {}
311315
}
312316
}
313317
}
314318
Err(e) => {
315319
error!("MQTT event loop error: {}", e);
320+
mqtt_connected.store(false, Ordering::Relaxed);
316321
// Wait before retrying
317322
tokio::time::sleep(Duration::from_secs(10)).await;
318323
}

src/mqtt/publisher.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@
22
//!
33
//! Publishes vcontrold command results to MQTT topics.
44
5+
use std::time::Duration;
6+
7+
use tokio::time::timeout;
58
use tracing::{debug, error, warn};
69

710
use crate::error::MqttError;
811
use crate::vcontrold::{CommandResult, Value};
912

13+
/// Timeout for individual MQTT publish operations.
14+
///
15+
/// Prevents the polling loop from blocking indefinitely when the MQTT
16+
/// client's internal channel is full (e.g. during a broker outage).
17+
const PUBLISH_TIMEOUT: Duration = Duration::from_secs(5);
18+
1019
use super::client::MqttClient;
1120

1221
/// Publisher for vcontrold polling results
@@ -48,7 +57,17 @@ impl<'a> Publisher<'a> {
4857
let topic = self.client.topic(&format!("command/{}", result.command));
4958
debug!("Publishing to {}: {}", topic, payload);
5059

51-
self.client.publish_retained(&topic, &payload).await
60+
match timeout(PUBLISH_TIMEOUT, self.client.publish_retained(&topic, &payload)).await {
61+
Ok(result) => result,
62+
Err(_) => {
63+
warn!(
64+
"Publish timeout for {} after {}s - MQTT client may be stalled",
65+
topic,
66+
PUBLISH_TIMEOUT.as_secs()
67+
);
68+
Ok(())
69+
}
70+
}
5271
}
5372

5473
/// Publish multiple command results
@@ -93,4 +112,23 @@ mod tests {
93112
assert_eq!(format_number(3.14159), "3.14159");
94113
assert_eq!(format_number(0.5), "0.5");
95114
}
115+
116+
#[test]
117+
fn test_publish_timeout_is_5_seconds() {
118+
assert_eq!(PUBLISH_TIMEOUT, Duration::from_secs(5));
119+
}
120+
121+
/// Verify that `tokio::time::timeout` with `PUBLISH_TIMEOUT` fires
122+
/// instead of blocking forever when the inner future never resolves.
123+
/// This simulates the scenario where `publish_retained` blocks because
124+
/// rumqttc's internal channel is full during a broker outage.
125+
#[tokio::test]
126+
async fn test_publish_timeout_fires_on_stalled_future() {
127+
tokio::time::pause();
128+
129+
let stalled = std::future::pending::<Result<(), MqttError>>();
130+
let result = timeout(PUBLISH_TIMEOUT, stalled).await;
131+
132+
assert!(result.is_err(), "timeout should fire on a stalled future");
133+
}
96134
}

src/polling.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//!
33
//! Handles command batching and periodic execution.
44
5+
use std::sync::atomic::{AtomicBool, Ordering};
56
use std::sync::Arc;
67
use tokio::time::interval;
78
use tracing::{debug, error, info, warn};
@@ -66,6 +67,7 @@ pub async fn run_polling_loop(
6667
config: &Config,
6768
vcontrold: Arc<VcontroldClient>,
6869
mqtt_client: Arc<MqttClient>,
70+
mqtt_connected: Arc<AtomicBool>,
6971
) {
7072
if config.commands.is_empty() {
7173
warn!("No commands configured for polling");
@@ -88,11 +90,32 @@ pub async fn run_polling_loop(
8890
}
8991

9092
let mut poll_interval = interval(config.interval);
93+
// Skip missed ticks instead of bursting them all at once. This prevents
94+
// overwhelming the MQTT client after a stall (e.g. broker outage where
95+
// publishes hit the timeout and the interval falls behind).
96+
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
9197
let publisher = Publisher::new(&mqtt_client);
9298

99+
let mut was_disconnected = false;
100+
93101
loop {
94102
poll_interval.tick().await;
95103

104+
// Skip entire cycle when the MQTT broker is unreachable. This avoids
105+
// unnecessary vcontrold/Optolink traffic and prevents filling the
106+
// rumqttc internal channel (which would block the polling loop).
107+
if !mqtt_connected.load(Ordering::Relaxed) {
108+
if !was_disconnected {
109+
warn!("MQTT broker disconnected, skipping polling cycles");
110+
was_disconnected = true;
111+
}
112+
continue;
113+
}
114+
if was_disconnected {
115+
info!("MQTT broker reconnected, resuming polling");
116+
was_disconnected = false;
117+
}
118+
96119
debug!("Starting polling cycle");
97120

98121
for (batch_idx, batch) in batches.iter().enumerate() {
@@ -183,4 +206,66 @@ mod tests {
183206
assert_eq!(batches.len(), 1);
184207
assert_eq!(batches[0], vec!["veryLongCommandName"]);
185208
}
209+
210+
/// Verify that the polling interval uses Skip behavior: after a long stall
211+
/// only one tick fires rather than a burst of all missed ticks.
212+
///
213+
/// With the default `Burst` behavior, advancing time by 5x the interval
214+
/// would yield 5 immediately-ready ticks. With `Skip`, only the next
215+
/// natural tick fires, so we get exactly 1.
216+
#[tokio::test]
217+
async fn test_interval_skips_missed_ticks() {
218+
use std::time::Duration;
219+
use tokio::time::{interval, MissedTickBehavior};
220+
221+
tokio::time::pause();
222+
223+
let period = Duration::from_secs(10);
224+
let mut ivl = interval(period);
225+
ivl.set_missed_tick_behavior(MissedTickBehavior::Skip);
226+
227+
// First tick fires immediately (interval semantics)
228+
ivl.tick().await;
229+
230+
// Simulate a stall: advance time by 5 periods without ticking
231+
tokio::time::advance(period * 5).await;
232+
233+
// After the stall, exactly one tick should be ready (Skip discards
234+
// missed ticks and resets to the next future deadline).
235+
ivl.tick().await;
236+
237+
// The next tick should NOT be immediately available — it should
238+
// require waiting another full period.
239+
let next = tokio::time::timeout(period / 2, ivl.tick()).await;
240+
assert!(
241+
next.is_err(),
242+
"no burst tick should be available; Skip must discard missed ticks"
243+
);
244+
}
245+
246+
#[test]
247+
fn test_mqtt_connected_flag_state_transitions() {
248+
// Verify the AtomicBool flag behaves correctly across the
249+
// state transitions that run_event_loop and run_polling_loop rely on.
250+
let connected = Arc::new(AtomicBool::new(false));
251+
252+
// Initial state: disconnected (same as main.rs)
253+
assert!(!connected.load(Ordering::Relaxed));
254+
255+
// Simulate ConnAck in event loop
256+
connected.store(true, Ordering::Relaxed);
257+
assert!(connected.load(Ordering::Relaxed));
258+
259+
// Simulate Disconnect
260+
connected.store(false, Ordering::Relaxed);
261+
assert!(!connected.load(Ordering::Relaxed));
262+
263+
// Simulate reconnect
264+
connected.store(true, Ordering::Relaxed);
265+
assert!(connected.load(Ordering::Relaxed));
266+
267+
// Simulate event loop error
268+
connected.store(false, Ordering::Relaxed);
269+
assert!(!connected.load(Ordering::Relaxed));
270+
}
186271
}

0 commit comments

Comments
 (0)