Skip to content

Commit 4d8108e

Browse files
committed
fix recovery after prolonged MQTT outages
A prolonged MQTT outage exposed two coupled recovery problems. First, the rumqtt event loop awaited subscribe() and subscriber-channel send() calls during reconnect handling. If the MQTT request queue or the internal subscriber queue was full after the outage, that backpressure could stall eventloop.poll() and delay or prevent recovery. Rework the loop to use try_subscribe with retry-on-later-iterations, honor ConnAck.session_present, and drop subscriber-bound request messages when the queue is saturated so the event loop stays responsive. Second, vcontrold could return fatal ERR responses such as ">FRAMER: Error 0x05 != 0x06 (P300_INIT_OK)" together with "Error in send, terminating". Those responses were previously treated like ordinary command errors, so the poisoned TCP session stayed cached and every later command in the polling cycle reused the broken session. Detect those fatal responses, invalidate the connection immediately, and force a clean reconnect before the next command. Keep connection invalidation under the mutex to avoid a re-lock race between the polling task and the MQTT subscriber task, document the request/response backpressure tradeoff, and add regression tests for the MQTT queueing behavior plus both fatal and non-fatal vcontrold ERR responses.
1 parent 1b5e29e commit 4d8108e

File tree

5 files changed

+443
-84
lines changed

5 files changed

+443
-84
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ Response format (JSON):
149149
{"getTempA":21.5,"getTempWW":48.1}
150150
```
151151

152+
Notes:
153+
- Request/response handling is best-effort under overload.
154+
- If the internal subscriber queue is full, new request messages may be dropped
155+
so the MQTT event loop can stay responsive and reconnect cleanly.
156+
- Clients should retry a request if no response arrives.
157+
152158
## Health Check
153159

154160
The container includes a built-in health check that verifies all critical components:

SPEC.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ The parser extracts:
204204
- Execute each command via TCP connection
205205
- Build JSON response
206206
- Publish response to `${MQTT_TOPIC}/response`
207+
- If the internal subscriber queue is saturated, drop new request messages
208+
instead of blocking the MQTT event loop
207209
4. On disconnect: automatic reconnection via rumqttc
208210

209211
## Error Handling

src/mqtt/client.rs

Lines changed: 186 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,19 @@ fn build_tls_transport(host: &str, config: &TlsConfig) -> Result<Transport, Mqtt
103103
if let Some(ca_file) = &config.ca_file {
104104
let certs = load_certs(ca_file)?;
105105
for cert in certs {
106-
root_cert_store
107-
.add(cert)
108-
.map_err(|e| MqttError::ConnectionFailed(format!("Failed to add CA cert: {}", e)))?;
106+
root_cert_store.add(cert).map_err(|e| {
107+
MqttError::ConnectionFailed(format!("Failed to add CA cert: {}", e))
108+
})?;
109109
}
110110
} else if let Some(ca_path) = &config.ca_path {
111111
// Load all .crt and .pem files from directory
112112
if let Ok(entries) = std::fs::read_dir(ca_path) {
113113
for entry in entries.flatten() {
114114
let path = entry.path();
115-
if path.extension().is_some_and(|ext| ext == "crt" || ext == "pem") {
115+
if path
116+
.extension()
117+
.is_some_and(|ext| ext == "crt" || ext == "pem")
118+
{
116119
if let Ok(certs) = load_certs(&path) {
117120
for cert in certs {
118121
let _ = root_cert_store.add(cert);
@@ -129,19 +132,18 @@ fn build_tls_transport(host: &str, config: &TlsConfig) -> Result<Transport, Mqtt
129132
// Build client config
130133
let builder = ClientConfig::builder().with_root_certificates(root_cert_store);
131134

132-
let tls_config = if let (Some(cert_file), Some(key_file)) =
133-
(&config.cert_file, &config.key_file)
134-
{
135-
// Client certificate authentication
136-
let certs = load_certs(cert_file)?;
137-
let key = load_private_key(key_file)?;
138-
builder
139-
.with_client_auth_cert(certs, key)
140-
.map_err(|e| MqttError::ConnectionFailed(format!("Failed to set client cert: {}", e)))?
141-
} else {
142-
// No client certificate
143-
builder.with_no_client_auth()
144-
};
135+
let tls_config =
136+
if let (Some(cert_file), Some(key_file)) = (&config.cert_file, &config.key_file) {
137+
// Client certificate authentication
138+
let certs = load_certs(cert_file)?;
139+
let key = load_private_key(key_file)?;
140+
builder.with_client_auth_cert(certs, key).map_err(|e| {
141+
MqttError::ConnectionFailed(format!("Failed to set client cert: {}", e))
142+
})?
143+
} else {
144+
// No client certificate
145+
builder.with_no_client_auth()
146+
};
145147

146148
// Create rustls ClientConfig with dangerous verifier if insecure mode
147149
let tls_config = if config.insecure {
@@ -162,7 +164,9 @@ fn build_tls_transport(host: &str, config: &TlsConfig) -> Result<Transport, Mqtt
162164
.try_into()
163165
.map_err(|_| MqttError::ConnectionFailed(format!("Invalid server name: {}", host)))?;
164166

165-
Ok(Transport::tls_with_config(rumqttc::TlsConfiguration::Rustls(Arc::new(tls_config))))
167+
Ok(Transport::tls_with_config(
168+
rumqttc::TlsConfiguration::Rustls(Arc::new(tls_config)),
169+
))
166170
}
167171

168172
/// Load certificates from a PEM file
@@ -260,19 +264,86 @@ impl rustls::client::danger::ServerCertVerifier for InsecureServerCertVerifier {
260264
}
261265
}
262266

267+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268+
enum SubscriptionQueueStatus {
269+
Complete,
270+
Pending,
271+
}
272+
273+
fn queue_pending_subscriptions(
274+
client: &AsyncClient,
275+
subscribe_topics: &[String],
276+
next_subscription: &mut usize,
277+
) -> SubscriptionQueueStatus {
278+
while *next_subscription < subscribe_topics.len() {
279+
let topic = &subscribe_topics[*next_subscription];
280+
if client.try_subscribe(topic, QoS::AtLeastOnce).is_err() {
281+
return SubscriptionQueueStatus::Pending;
282+
}
283+
284+
info!("Subscribing to {}", topic);
285+
*next_subscription += 1;
286+
}
287+
288+
SubscriptionQueueStatus::Complete
289+
}
290+
291+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
292+
enum ForwardMessageStatus {
293+
Sent,
294+
DroppedFull,
295+
DroppedClosed,
296+
Ignored,
297+
}
298+
299+
fn forward_incoming_message(
300+
message_tx: Option<&mpsc::Sender<IncomingMessage>>,
301+
msg: IncomingMessage,
302+
) -> ForwardMessageStatus {
303+
let Some(tx) = message_tx else {
304+
return ForwardMessageStatus::Ignored;
305+
};
306+
307+
match tx.try_send(msg) {
308+
Ok(()) => ForwardMessageStatus::Sent,
309+
Err(mpsc::error::TrySendError::Full(_)) => ForwardMessageStatus::DroppedFull,
310+
Err(mpsc::error::TrySendError::Closed(_)) => ForwardMessageStatus::DroppedClosed,
311+
}
312+
}
313+
263314
/// Run the MQTT event loop and forward incoming messages
264315
///
265-
/// Re-subscribes to all topics on every ConnAck (reconnection), since
266-
/// rumqttc uses `clean_start = true` by default and the broker discards
267-
/// session state (including subscriptions) when the client reconnects.
316+
/// When the broker does not resume a previous session on ConnAck, subscriptions
317+
/// are re-queued with `try_subscribe` and retried across loop iterations so the
318+
/// rumqtt event loop never blocks waiting for channel capacity.
268319
pub async fn run_event_loop(
269320
mut eventloop: EventLoop,
270321
client: AsyncClient,
271322
subscribe_topics: Vec<String>,
272323
message_tx: Option<mpsc::Sender<IncomingMessage>>,
273324
mqtt_connected: Arc<AtomicBool>,
274325
) {
326+
let mut pending_subscription_index: Option<usize> = None;
327+
let mut subscription_restore_stalled = false;
328+
275329
loop {
330+
if let Some(next_subscription) = pending_subscription_index.as_mut() {
331+
match queue_pending_subscriptions(&client, &subscribe_topics, next_subscription) {
332+
SubscriptionQueueStatus::Complete => {
333+
pending_subscription_index = None;
334+
subscription_restore_stalled = false;
335+
}
336+
SubscriptionQueueStatus::Pending => {
337+
if !subscription_restore_stalled {
338+
warn!(
339+
"Could not queue all MQTT subscriptions yet; will retry without blocking the event loop"
340+
);
341+
subscription_restore_stalled = true;
342+
}
343+
}
344+
}
345+
}
346+
276347
match eventloop.poll().await {
277348
Ok(event) => {
278349
if let Event::Incoming(incoming) = event {
@@ -282,22 +353,38 @@ pub async fn run_event_loop(
282353
let payload = String::from_utf8_lossy(&publish.payload).to_string();
283354
debug!("Received message on {}: {}", topic, payload);
284355

285-
if let Some(tx) = &message_tx {
286-
let msg = IncomingMessage { topic, payload };
287-
if tx.send(msg).await.is_err() {
356+
let msg = IncomingMessage {
357+
topic: topic.clone(),
358+
payload,
359+
};
360+
match forward_incoming_message(message_tx.as_ref(), msg) {
361+
ForwardMessageStatus::Sent | ForwardMessageStatus::Ignored => {}
362+
ForwardMessageStatus::DroppedFull => {
363+
warn!(
364+
"Dropping incoming message on {} because the subscriber queue is full",
365+
topic
366+
);
367+
}
368+
ForwardMessageStatus::DroppedClosed => {
288369
warn!("Failed to forward incoming message - receiver dropped");
289370
}
290371
}
291372
}
292-
rumqttc::v5::Incoming::ConnAck(_) => {
373+
rumqttc::v5::Incoming::ConnAck(connack) => {
293374
info!("Connected to MQTT broker");
294375
mqtt_connected.store(true, Ordering::Relaxed);
295-
296-
// Re-subscribe to all topics on every (re)connection
297-
for topic in &subscribe_topics {
298-
info!("Subscribing to {}", topic);
299-
if let Err(e) = client.subscribe(topic, QoS::AtLeastOnce).await {
300-
error!("Failed to subscribe to {}: {}", topic, e);
376+
subscription_restore_stalled = false;
377+
378+
if !subscribe_topics.is_empty() {
379+
if connack.session_present {
380+
debug!("MQTT session resumed; keeping existing subscriptions");
381+
pending_subscription_index = None;
382+
} else {
383+
debug!(
384+
"Scheduling restore of {} MQTT subscription(s)",
385+
subscribe_topics.len()
386+
);
387+
pending_subscription_index = Some(0);
301388
}
302389
}
303390
}
@@ -310,6 +397,8 @@ pub async fn run_event_loop(
310397
rumqttc::v5::Incoming::Disconnect(_) => {
311398
warn!("Disconnected from MQTT broker");
312399
mqtt_connected.store(false, Ordering::Relaxed);
400+
pending_subscription_index = None;
401+
subscription_restore_stalled = false;
313402
}
314403
_ => {}
315404
}
@@ -318,9 +407,75 @@ pub async fn run_event_loop(
318407
Err(e) => {
319408
error!("MQTT event loop error: {}", e);
320409
mqtt_connected.store(false, Ordering::Relaxed);
410+
pending_subscription_index = None;
411+
subscription_restore_stalled = false;
321412
// Wait before retrying
322413
tokio::time::sleep(Duration::from_secs(10)).await;
323414
}
324415
}
325416
}
326417
}
418+
419+
#[cfg(test)]
420+
mod tests {
421+
use super::*;
422+
423+
#[test]
424+
fn queue_pending_subscriptions_completes_when_capacity_is_available() {
425+
let options = MqttOptions::new("test-client", "localhost", 1883);
426+
let (client, _eventloop) = AsyncClient::new(options, 2);
427+
let topics = vec![
428+
"heating/request".to_string(),
429+
"heating/response".to_string(),
430+
];
431+
let mut next_subscription = 0;
432+
433+
let status = queue_pending_subscriptions(&client, &topics, &mut next_subscription);
434+
435+
assert_eq!(status, SubscriptionQueueStatus::Complete);
436+
assert_eq!(next_subscription, topics.len());
437+
}
438+
439+
#[test]
440+
fn queue_pending_subscriptions_stays_pending_when_request_channel_is_full() {
441+
let options = MqttOptions::new("test-client", "localhost", 1883);
442+
let (client, _eventloop) = AsyncClient::new(options, 1);
443+
let topics = vec!["heating/request".to_string()];
444+
let mut next_subscription = 0;
445+
446+
client
447+
.try_publish("heating/command/getTempA", QoS::AtLeastOnce, false, "21.5")
448+
.expect("request channel should accept the first queued publish");
449+
450+
let status = queue_pending_subscriptions(&client, &topics, &mut next_subscription);
451+
452+
assert_eq!(status, SubscriptionQueueStatus::Pending);
453+
assert_eq!(next_subscription, 0);
454+
}
455+
456+
#[tokio::test]
457+
async fn forward_incoming_message_drops_when_subscriber_queue_is_full() {
458+
let (tx, mut rx) = mpsc::channel(1);
459+
let first = IncomingMessage {
460+
topic: "heating/request".to_string(),
461+
payload: "first".to_string(),
462+
};
463+
let second = IncomingMessage {
464+
topic: "heating/request".to_string(),
465+
payload: "second".to_string(),
466+
};
467+
468+
tx.send(first).await.unwrap();
469+
470+
let status = forward_incoming_message(Some(&tx), second);
471+
472+
assert_eq!(status, ForwardMessageStatus::DroppedFull);
473+
474+
let queued = rx.recv().await.expect("first message should stay queued");
475+
assert_eq!(queued.payload, "first");
476+
assert!(
477+
rx.try_recv().is_err(),
478+
"full queue must not accept second message"
479+
);
480+
}
481+
}

0 commit comments

Comments
 (0)