diff --git a/libsql-ffi/build.rs b/libsql-ffi/build.rs index ea95d1ed28..61d924829d 100644 --- a/libsql-ffi/build.rs +++ b/libsql-ffi/build.rs @@ -267,7 +267,16 @@ pub fn build_bundled(out_dir: &str, out_path: &Path) { let mut sqlean_sources = Vec::new(); for pattern in sqlean_patterns { let full_pattern = format!("{BUNDLED_DIR}/sqlean/{}", pattern); - sqlean_sources.extend(glob(&full_pattern).unwrap().filter_map(Result::ok)); + sqlean_sources.extend( + glob(&full_pattern) + .unwrap() + .filter_map(Result::ok) + // Headers are glob'd in as a side effect but must not + // be passed to `cc::Build::files()`: on clang/macOS + // that turns them into precompiled-header .o files + // which fail to link. + .filter(|p| p.extension().map_or(false, |ext| ext == "c")), + ); } if cfg!(feature = "sqlean-extension-regexp") { diff --git a/libsql-server/src/http/admin/mod.rs b/libsql-server/src/http/admin/mod.rs index 6953696312..0f97efc0c3 100644 --- a/libsql-server/src/http/admin/mod.rs +++ b/libsql-server/src/http/admin/mod.rs @@ -158,6 +158,14 @@ where "/v1/namespaces/:namespace/checkpoint", post(handle_checkpoint), ) + .route( + "/v1/namespaces/:namespace/reset-replication", + post(handle_reset_replication), + ) + .route( + "/v1/namespaces/:namespace/integrity-check", + post(handle_integrity_check), + ) .route("/v1/namespaces/:namespace", delete(handle_delete_namespace)) .route("/v1/namespaces/:namespace/stats", get(stats::handle_stats)) .route( @@ -550,6 +558,81 @@ async fn handle_checkpoint( Ok(()) } +/// Rebuild the replication log for a namespace from its live DB file +/// without touching other namespaces on this pod. +/// +/// Use when the replication artifacts (wallog, snapshots/, to_compact/) +/// are corrupt but the live `data` file is intact (verify first with +/// `PRAGMA quick_check`). +/// +/// Side effects: +/// - new `log_id` is minted +/// - connected replicas see `LogIncompatible` and must re-bootstrap +/// - live DB data is preserved +/// - metastore config (jwt_key, block_writes, etc.) is preserved +/// +/// Other namespaces on this pod are completely unaffected. +#[derive(serde::Serialize)] +struct ResetReplicationResp { + /// Wall-clock duration of the reset, for operator-visible metrics. + elapsed_ms: u64, +} + +async fn handle_reset_replication( + State(app_state): State>>, + Path(namespace): Path, +) -> crate::Result> { + let elapsed_ms = app_state.namespaces.reset_replication(namespace).await?; + Ok(axum::Json(ResetReplicationResp { elapsed_ms })) +} + +#[derive(serde::Deserialize, Default)] +struct IntegrityCheckReq { + /// If true, run full `PRAGMA integrity_check` (O(DB size), thorough). + /// Default is `PRAGMA quick_check` which is fast and catches the + /// critical corruption classes. + #[serde(default)] + full: bool, +} + +#[derive(serde::Serialize)] +struct IntegrityCheckResp { + ok: bool, + /// Raw SQLite diagnostic text. `"ok"` on success, otherwise one or + /// more messages describing integrity issues. + message: String, + /// "quick" or "full", mirrors the `full` request field. + check: &'static str, +} + +/// Run `PRAGMA quick_check` (default) or `PRAGMA integrity_check` on a +/// namespace's live data file without touching other namespaces. +/// +/// Use this to classify the failure mode before recovery: +/// - `ok` → live DB is fine, any corruption is in wallog/snapshots (Mode A) +/// → caller should use `POST /v1/namespaces/:ns/reset-replication`. +/// - non-"ok" → live DB itself is corrupt (Mode B) +/// → caller should restore from backup, not reset-replication. +/// +/// Cheap: ~10ms for quick_check on small-to-medium namespaces. +async fn handle_integrity_check( + State(app_state): State>>, + Path(namespace): Path, + payload: Option>, +) -> crate::Result> { + let full = payload.map(|p| p.0.full).unwrap_or(false); + let message = app_state + .namespaces + .integrity_check(namespace, full) + .await?; + let ok = message.trim() == "ok"; + Ok(Json(IntegrityCheckResp { + ok, + message, + check: if full { "full" } else { "quick" }, + })) +} + #[derive(serde::Deserialize)] struct EnableHeapProfileRequest { #[serde(default)] diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index ec45b50445..76eaa63ba0 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -72,6 +72,12 @@ impl Namespace { &self.name } + /// On-disk path of this namespace's files (data, wallog, snapshots/, + /// to_compact/, .sentinel). + pub(crate) fn path(&self) -> &Arc { + &self.path + } + async fn destroy(mut self) -> anyhow::Result<()> { self.tasks.shutdown().await; self.db.destroy(); @@ -85,14 +91,79 @@ impl Namespace { Ok(()) } + /// Run `PRAGMA quick_check` (or `integrity_check` if `full=true`) on + /// the namespace's live DB file and return the result string. + /// + /// For a healthy DB this returns `"ok"`. Anything else is an + /// integrity diagnostic message from SQLite. + /// + /// A catastrophically corrupt DB can fail before PRAGMA runs (e.g. + /// `malformed database schema` raised while the prepared statement + /// is parsing the schema). We normalize that into the same + /// `Ok(String)` return path so callers get a uniform classification + /// signal instead of a server error. + async fn integrity_check(&self, full: bool) -> anyhow::Result { + // Even creating a connection can fail ("malformed database schema") + // when the DB is badly corrupt — that IS an integrity signal so we + // surface it as `Ok(String)` rather than an Err that becomes a 500. + let conn = match self.db.connection_maker().create().await { + Ok(c) => c, + Err(e) => { + return Ok(format!("connection failed: {e}")); + } + }; + let pragma = if full { "integrity_check" } else { "quick_check" }; + let result = conn.with_raw(move |raw| -> rusqlite::Result> { + let mut stmt = raw.prepare(&format!("PRAGMA {pragma}"))?; + let mut rows = stmt.query([])?; + let mut out = Vec::new(); + while let Some(row) = rows.next()? { + let s: String = row.get(0)?; + out.push(s); + } + Ok(out) + }); + match result { + Ok(rows) => Ok(rows.join("\n")), + Err(e) => { + // SQLite surfaces integrity failures as prepare/query errors + // rather than PRAGMA rows. Treat those as integrity signals. + Ok(format!("{e}")) + } + } + } + async fn shutdown(mut self, should_checkpoint: bool) -> anyhow::Result<()> { self.tasks.shutdown().await; if should_checkpoint { self.checkpoint().await?; } self.db.shutdown().await?; - if let Err(e) = tokio::fs::remove_file(self.path.join(".sentinel")).await { - tracing::error!("unable to remove .sentinel file: {}", e); + // Historically `.sentinel` was removed unconditionally on graceful + // shutdown. This makes the documented `touch .sentinel + kubectl + // delete pod` operator recovery path silently ineffective, because + // kubectl sends SIGTERM first which invokes this graceful shutdown + // and removes the sentinel before the pod actually stops. + // + // Guard the removal behind `LIBSQL_PRESERVE_SENTINEL_ON_SHUTDOWN`. + // When set, the sentinel survives graceful shutdown, so the next + // namespace init will correctly trigger dirty-recovery from the + // live `data` file. + // + // Default remains: remove (preserves existing behavior for the + // 99% of deployments that don't need this recovery path, now that + // `POST /v1/namespaces/:ns/reset-replication` is the primary + // recovery primitive). + let preserve_sentinel = + std::env::var("LIBSQL_PRESERVE_SENTINEL_ON_SHUTDOWN").is_ok(); + if !preserve_sentinel { + if let Err(e) = tokio::fs::remove_file(self.path.join(".sentinel")).await { + tracing::error!("unable to remove .sentinel file: {}", e); + } + } else { + tracing::info!( + "LIBSQL_PRESERVE_SENTINEL_ON_SHUTDOWN set; keeping .sentinel for recovery" + ); } Ok(()) } diff --git a/libsql-server/src/namespace/store.rs b/libsql-server/src/namespace/store.rs index 86e9438ccd..55cec58011 100644 --- a/libsql-server/src/namespace/store.rs +++ b/libsql-server/src/namespace/store.rs @@ -153,6 +153,44 @@ impl NamespaceStore { Ok(()) } + /// Run `PRAGMA quick_check` (or `integrity_check` if `full=true`) on + /// the namespace's live DB and return the result string. Takes only + /// a read lock on the namespace; other operations (including other + /// namespaces on this pod) are unaffected. + /// + /// A healthy DB returns `"ok"`. Any other text is diagnostic output + /// from SQLite. Use this to classify Mode A (wallog/snapshot + /// corruption, live DB OK) vs Mode B (live DB corruption) before + /// deciding on a recovery procedure. + pub async fn integrity_check( + &self, + namespace: NamespaceName, + full: bool, + ) -> crate::Result { + if !self.inner.metadata.exists(&namespace).await { + return Err(Error::NamespaceDoesntExist(namespace.to_string())); + } + // Force-load the namespace so we can run a query against it. + let db_config = self.inner.metadata.handle(namespace.clone()).await; + let _ = self + .load_namespace(&namespace, db_config, RestoreOption::Latest) + .await?; + + let entry = self + .inner + .store + .get_with(namespace.clone(), async { Default::default() }) + .await; + let lock = entry.read().await; + if let Some(ns) = &*lock { + ns.integrity_check(full) + .await + .map_err(|e| Error::Internal(format!("integrity_check: {e}"))) + } else { + Err(Error::NamespaceDoesntExist(namespace.to_string())) + } + } + pub async fn reset( &self, namespace: NamespaceName, @@ -191,6 +229,177 @@ impl NamespaceStore { Ok(()) } + /// Rebuild the replication log for a namespace from its live DB file, + /// without wiping the DB or the metastore config. + /// + /// Use this when the replication artifacts (wallog, snapshots/, + /// to_compact/) are corrupt but the live `data` file (and metastore + /// config) are intact. + /// + /// Semantics: + /// 1. Acquire the single-namespace write lock (other namespaces on this + /// pod are unaffected). + /// 2. Checkpoint the current WAL into `data` so nothing in-flight is + /// lost. + /// 3. Destroy the in-memory namespace (closes connections, stops tasks). + /// This does NOT touch any files on disk. + /// 4. Remove only `wallog`, `snapshots/`, `to_compact/`. + /// 5. Create `.sentinel`. + /// 6. Re-initialize the namespace via `make_namespace()`. The + /// `PrimaryConfigurator` sees `.sentinel` → opens + /// `ReplicationLogger` with `dirty=true` → `recover()` rebuilds the + /// wallog page-by-page from the restored `data` file, mints a fresh + /// `log_id`, and wipes `snapshots/` + `to_compact/`. + /// + /// Effects for clients: + /// - connected embedded replicas see `LogIncompatible` on next sync + /// (new log_id) and self-reset (wipe local + re-bootstrap). + /// - fresh bootstrap succeeds against the rebuilt history. + /// - live DB data is fully preserved. + /// - metastore config (jwt_key, block_writes, bottomless_db_id, etc.) + /// is fully preserved. + /// + /// Brief unavailability window: from the moment we take the write lock + /// until `make_namespace` returns. Other namespaces on the pod are + /// completely unaffected. + pub async fn reset_replication(&self, namespace: NamespaceName) -> crate::Result { + if self.inner.has_shutdown.load(Ordering::Relaxed) { + return Err(Error::NamespaceStoreShutdown); + } + + if !self.inner.metadata.exists(&namespace).await { + return Err(Error::NamespaceDoesntExist(namespace.to_string())); + } + + let start = Instant::now(); + tracing::info!("reset_replication: starting for namespace {namespace}"); + + // Load the namespace first so we can resolve its on-disk path + // cleanly. This is effectively a no-op if it's already hot. + let db_config = self.inner.metadata.handle(namespace.clone()).await; + let _ = self + .load_namespace(&namespace, db_config.clone(), RestoreOption::Latest) + .await?; + + let entry = self + .inner + .store + .get_with(namespace.clone(), async { Default::default() }) + .await; + let mut lock = entry.write().await; + + let ns_path: Arc = match lock.as_ref() { + Some(ns) => ns.path().clone(), + None => { + return Err(Error::NamespaceDoesntExist(namespace.to_string())); + } + }; + + // (1) Checkpoint before we tear down in-memory state so the data + // file has everything the WAL was holding. + // + // If checkpoint fails with a `DatabaseCorrupt` / `malformed` + // error, the live data file itself is corrupt. Rebuilding the + // wallog from a corrupt data file would just propagate the + // corruption — so bail out BEFORE we destroy the in-memory + // namespace. The caller should fall back to a restore-from- + // backup path (Mode B), not this endpoint. + if let Some(ns) = lock.as_ref() { + match ns.checkpoint().await { + Ok(()) => {} + Err(e) => { + let msg = e.to_string(); + let is_live_db_corrupt = msg.contains("malformed") + || msg.contains("DatabaseCorrupt") + || msg.contains("database disk image") + || msg.contains("file is not a database"); + if is_live_db_corrupt { + return Err(Error::Internal(format!( + "reset_replication: live data file appears corrupt \ + (checkpoint failed: {e}); refusing to rebuild \ + replication log from corrupt data. Use a \ + restore-from-backup procedure instead." + ))); + } + tracing::warn!( + "reset_replication: checkpoint failed: {e}; proceeding anyway" + ); + } + } + } + + // (2) Tear down in-memory namespace. Does not touch files. + if let Some(ns) = lock.take() { + if let Err(e) = ns.destroy().await { + // Best-effort: if we can't destroy cleanly, the next + // make_namespace will fail too, so surface the error now. + return Err(Error::Internal(format!( + "reset_replication: destroy failed: {e}" + ))); + } + } + + // (3) Remove only replication artifacts. Keep `data` and + // `data-wal`/`data-shm` + config. + for artifact in ["wallog"] { + let p = ns_path.join(artifact); + if let Err(e) = tokio::fs::remove_file(&p).await { + if e.kind() != std::io::ErrorKind::NotFound { + tracing::warn!( + "reset_replication: remove_file {} failed: {e}", + p.display() + ); + } + } + } + for artifact in ["snapshots", "to_compact"] { + let p = ns_path.join(artifact); + if let Err(e) = tokio::fs::remove_dir_all(&p).await { + if e.kind() != std::io::ErrorKind::NotFound { + tracing::warn!( + "reset_replication: remove_dir_all {} failed: {e}", + p.display() + ); + } + } + } + + // (4) Drop any stale .sentinel left by prior shutdown, then mint a + // fresh one so ReplicationLogger::open takes the dirty-recovery + // path on the next init. + let sentinel = ns_path.join(".sentinel"); + let _ = tokio::fs::remove_file(&sentinel).await; + // Ensure parent dir exists (it should, because data still lives + // there, but be defensive). + if !ns_path.exists() { + tokio::fs::create_dir_all(&ns_path).await.map_err(|e| { + Error::Internal(format!( + "reset_replication: create_dir_all {} failed: {e}", + ns_path.display() + )) + })?; + } + tokio::fs::File::create(&sentinel).await.map_err(|e| { + Error::Internal(format!( + "reset_replication: create .sentinel failed: {e}" + )) + })?; + + // (5) Re-initialize the namespace. setup() sees .sentinel → opens + // ReplicationLogger with dirty=true → recover() rebuilds the + // wallog page-by-page from the intact `data` file. + let ns = self + .make_namespace(&namespace, db_config, RestoreOption::Latest) + .await?; + lock.replace(ns); + + let elapsed_ms = start.elapsed().as_millis() as u64; + tracing::info!( + "reset_replication: rebuilt replication log for namespace {namespace} in {elapsed_ms}ms" + ); + Ok(elapsed_ms) + } + // This is only called on replica fn make_reset_cb(&self) -> ResetCb { let this = self.clone(); diff --git a/libsql-server/tests/namespaces/mod.rs b/libsql-server/tests/namespaces/mod.rs index 37b373b76e..af98c164d7 100644 --- a/libsql-server/tests/namespaces/mod.rs +++ b/libsql-server/tests/namespaces/mod.rs @@ -137,3 +137,364 @@ fn delete_namespace() { sim.run().unwrap(); } + +#[test] +fn integrity_check_on_healthy_namespace() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + make_primary(&mut sim, tmp.path().to_path_buf()); + + sim.client("client", async { + let client = Client::new(); + client + .post("http://primary:9090/v1/namespaces/chk/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://chk.primary:8080", + "", + TurmoilConnector, + )?; + let conn = db.connect()?; + conn.execute("create table t(v text)", ()).await?; + conn.execute("insert into t values ('alive')", ()).await?; + + // quick_check should report ok on a healthy DB. + let resp = client + .post( + "http://primary:9090/v1/namespaces/chk/integrity-check", + json!({ "full": false }), + ) + .await?; + assert_eq!(resp.status(), hyper::http::StatusCode::OK); + let v = resp.json_value().await?; + assert_eq!(v["ok"], json!(true)); + assert_eq!(v["message"], json!("ok")); + assert_eq!(v["check"], json!("quick")); + + // Full integrity_check should also succeed. + let resp = client + .post( + "http://primary:9090/v1/namespaces/chk/integrity-check", + json!({ "full": true }), + ) + .await?; + let v = resp.json_value().await?; + assert_eq!(v["ok"], json!(true)); + assert_eq!(v["check"], json!("full")); + + Ok(()) + }); + + sim.run().unwrap(); +} + +#[test] +fn reset_replication_preserves_data_on_healthy_namespace() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + make_primary(&mut sim, tmp.path().to_path_buf()); + + sim.client("client", async { + let client = Client::new(); + client + .post("http://primary:9090/v1/namespaces/reset/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://reset.primary:8080", + "", + TurmoilConnector, + )?; + let conn = db.connect()?; + conn.execute("create table t(v text)", ()).await?; + for i in 0..100 { + conn.execute( + &format!("insert into t values ('row-{i}')"), + (), + ) + .await?; + } + + // Before reset: 100 rows. + let mut rows = conn.query("select count(*) from t", ()).await?; + assert!(matches!( + rows.next().await.unwrap().unwrap().get_value(0)?, + Value::Integer(100) + )); + + // Reset the replication log on a healthy namespace. + let resp = client + .post( + "http://primary:9090/v1/namespaces/reset/reset-replication", + json!({}), + ) + .await?; + assert_eq!(resp.status(), hyper::http::StatusCode::OK); + + // After reset: still 100 rows (data preserved). + let db2 = Database::open_remote_with_connector( + "http://reset.primary:8080", + "", + TurmoilConnector, + )?; + let conn2 = db2.connect()?; + let mut rows = conn2.query("select count(*) from t", ()).await?; + assert!(matches!( + rows.next().await.unwrap().unwrap().get_value(0)?, + Value::Integer(100) + )); + + // And writes still work. + conn2 + .execute("insert into t values ('post-reset')", ()) + .await?; + let mut rows = conn2.query("select count(*) from t", ()).await?; + assert!(matches!( + rows.next().await.unwrap().unwrap().get_value(0)?, + Value::Integer(101) + )); + + // Integrity check confirms the new DB is fine. + let resp = client + .post( + "http://primary:9090/v1/namespaces/reset/integrity-check", + json!({}), + ) + .await?; + let v = resp.json_value().await?; + assert_eq!(v["ok"], json!(true)); + + Ok(()) + }); + + sim.run().unwrap(); +} + +#[test] +fn reset_replication_on_nonexistent_namespace_returns_404() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + make_primary(&mut sim, tmp.path().to_path_buf()); + + sim.client("client", async { + let client = Client::new(); + let resp = client + .post( + "http://primary:9090/v1/namespaces/missing/reset-replication", + json!({}), + ) + .await; + // Server-error path: post_with_headers bails on 5xx, but 404 + // should come through cleanly as an error response. + match resp { + Ok(r) => assert_eq!(r.status(), hyper::http::StatusCode::NOT_FOUND), + Err(e) => panic!("expected 404 response, got error: {e}"), + } + Ok(()) + }); + + sim.run().unwrap(); +} + +#[test] +fn integrity_check_on_nonexistent_namespace_returns_404() { + // Symmetric to reset_replication_on_nonexistent_namespace_returns_404. + // If someone regresses error handling (e.g., returning 500 on + // NamespaceDoesntExist), the streamer's classifier would treat that + // as a transient server error and retry, instead of falling through + // to the optimistic-reset path. + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + make_primary(&mut sim, tmp.path().to_path_buf()); + + sim.client("client", async { + let client = Client::new(); + let resp = client + .post( + "http://primary:9090/v1/namespaces/missing/integrity-check", + json!({}), + ) + .await; + match resp { + Ok(r) => assert_eq!(r.status(), hyper::http::StatusCode::NOT_FOUND), + Err(e) => panic!("expected 404 response, got error: {e}"), + } + Ok(()) + }); + + sim.run().unwrap(); +} + +#[test] +fn integrity_check_defaults_to_quick_when_full_omitted() { + // Verifies backward compatibility: if the request body is {} (no + // `full` field), the server defaults to quick_check. This preserves + // the contract for any older/simpler client that doesn't send the + // field, and is also the documented default in the admin API. + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + make_primary(&mut sim, tmp.path().to_path_buf()); + + sim.client("client", async { + let client = Client::new(); + client + .post("http://primary:9090/v1/namespaces/defchk/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://defchk.primary:8080", + "", + TurmoilConnector, + )?; + let conn = db.connect()?; + conn.execute("create table t(v text)", ()).await?; + + // Empty body: no `full` field at all. + let resp = client + .post( + "http://primary:9090/v1/namespaces/defchk/integrity-check", + json!({}), + ) + .await?; + assert_eq!(resp.status(), hyper::http::StatusCode::OK); + let v = resp.json_value().await?; + assert_eq!(v["ok"], json!(true)); + assert_eq!(v["check"], json!("quick")); + + Ok(()) + }); + + sim.run().unwrap(); +} + +#[test] +fn reset_replication_response_includes_elapsed_ms() { + // Operators (and ops dashboards) want to see how long reset took + // without having to grep logs. The JSON response body includes + // elapsed_ms, which must be a positive integer for any successful + // reset. + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + make_primary(&mut sim, tmp.path().to_path_buf()); + + sim.client("client", async { + let client = Client::new(); + client + .post("http://primary:9090/v1/namespaces/elapsed/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://elapsed.primary:8080", + "", + TurmoilConnector, + )?; + let conn = db.connect()?; + conn.execute("create table t(v text)", ()).await?; + conn.execute("insert into t values ('x')", ()).await?; + + let resp = client + .post( + "http://primary:9090/v1/namespaces/elapsed/reset-replication", + json!({}), + ) + .await?; + assert_eq!(resp.status(), hyper::http::StatusCode::OK); + let v = resp.json_value().await?; + let elapsed = v["elapsed_ms"].as_u64().expect("elapsed_ms must be u64"); + // We can't assert a specific value (depends on hardware) but we + // can assert it's present and non-negative. In practice this is + // typically single-digit milliseconds for an empty/small table. + // Max bound guards against a runaway regression. + assert!(elapsed < 30_000, "elapsed_ms={elapsed} exceeds 30s sanity bound"); + + Ok(()) + }); + + sim.run().unwrap(); +} + +#[test] +fn reset_replication_is_idempotent() { + // An operator (or the streamer's retry-after-reset path) may call + // reset-replication multiple times in quick succession. Each call + // must succeed independently without corrupting the data file. + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + make_primary(&mut sim, tmp.path().to_path_buf()); + + sim.client("client", async { + let client = Client::new(); + client + .post("http://primary:9090/v1/namespaces/idem/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://idem.primary:8080", + "", + TurmoilConnector, + )?; + let conn = db.connect()?; + conn.execute("create table t(v text)", ()).await?; + for i in 0..20 { + conn.execute(&format!("insert into t values ('row-{i}')"), ()) + .await?; + } + + // Call reset-replication three times in a row. Each must 200. + for attempt in 0..3 { + let resp = client + .post( + "http://primary:9090/v1/namespaces/idem/reset-replication", + json!({}), + ) + .await?; + assert_eq!( + resp.status(), + hyper::http::StatusCode::OK, + "attempt {attempt} should return 200" + ); + } + + // After three resets, the 20 rows are still there. + let db2 = Database::open_remote_with_connector( + "http://idem.primary:8080", + "", + TurmoilConnector, + )?; + let conn2 = db2.connect()?; + let mut rows = conn2.query("select count(*) from t", ()).await?; + assert!(matches!( + rows.next().await.unwrap().unwrap().get_value(0)?, + Value::Integer(20) + )); + + // And integrity-check still passes. + let resp = client + .post( + "http://primary:9090/v1/namespaces/idem/integrity-check", + json!({}), + ) + .await?; + let v = resp.json_value().await?; + assert_eq!(v["ok"], json!(true)); + + Ok(()) + }); + + sim.run().unwrap(); +}