From be01502253370534bb55c067c32c080f1b889197 Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Wed, 27 May 2026 18:02:19 +0800 Subject: [PATCH 1/6] feat(datafusion): allow callers to skip default-database init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `SQLContext::register_catalog` always probes/creates a "default" database and `set_current_database("default")` on the first catalog. This breaks for non-admin principals who lack DESCRIBE / CREATEDATABASE on `default`: they can never construct a `SQLContext` even when their actual queries target databases they do have access to. Add `register_catalog_with_default_db(catalog_name, catalog, default_db)` where `default_db: Option<&str>`: - `Some(name)` — ensure `name` exists (create if missing), set as current on first registration. Same behavior as before, just configurable. - `None` — skip the default-database probe/create AND the implicit `set_current_database`. Callers are expected to use fully-qualified names or call `set_current_database` later. This mirrors Java `FlinkCatalog`'s `defaultDatabase` constructor parameter and `DISABLE_CREATE_TABLE_IN_DEFAULT_DB=true` option. Existing `register_catalog(name, catalog)` delegates with `Some("default")` — fully backwards compatible. Python binding exposes the same control via a new `default_database` keyword. Convention chosen to keep `None`/omitted as the historical behavior: - omitted / `None` -> Some("default") (back-compat) - `default_database=""` -> None (skip init) - `default_database=X` -> Some(X) (custom default) --- bindings/python/src/context.rs | 24 +++++++- .../datafusion/src/sql_context.rs | 59 +++++++++++++++---- 2 files changed, 71 insertions(+), 12 deletions(-) diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs index cc855ee8..5f54db77 100644 --- a/bindings/python/src/context.rs +++ b/bindings/python/src/context.rs @@ -184,19 +184,41 @@ impl PySQLContext { Ok(ctx) } + /// Registers a Paimon catalog under the given name. + /// + /// `default_database` controls the implicit default-database initialization on the + /// underlying [`SQLContext::register_catalog_with_default_db`]: + /// - omitted / `None` — keep the historical behavior: probe `"default"` and create it + /// if missing, then `set_current_database("default")` on the first catalog. Requires + /// the principal to have DESCRIBE / CREATEDATABASE on `default`. + /// - `Some(name)` — same flow but against the named database. + /// - `Some("")` — skip default-database init entirely. Use this when running as a + /// non-admin principal that lacks DESCRIBE on `default`; callers are expected to use + /// fully-qualified table names or call `set_current_database` explicitly later. + #[pyo3(signature = (catalog_name, catalog_options, default_database=None))] fn register_catalog( &mut self, py: Python<'_>, catalog_name: String, catalog_options: HashMap, + default_database: Option, ) -> PyResult<()> { let rt = runtime(); py.detach(|| { rt.block_on(async { let options = Options::from_map(catalog_options); let catalog = CatalogFactory::create(options).await.map_err(to_py_err)?; + // Convention: Some("") at the Python layer = caller wants to skip default-db + // init. Map empty string to None so the Rust layer's "None means skip" path + // fires. Some("name") and the default (None at the Python layer, which we + // translate to Some("default")) both go through register_catalog_with_default_db. + let default_db: Option = match default_database { + None => Some("default".to_string()), + Some(s) if s.is_empty() => None, + Some(s) => Some(s), + }; self.inner - .register_catalog(catalog_name, catalog) + .register_catalog_with_default_db(catalog_name, catalog, default_db.as_deref()) .await .map_err(df_to_py_err) }) diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index ec93741a..7ae1e9f1 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -117,23 +117,55 @@ impl SQLContext { /// for both Paimon-handled SQL and DataFusion-delegated SQL (SELECT, etc.). /// A "default" database is created if it does not already exist (matching /// the behavior of Spark/Flink Paimon catalogs). + /// + /// For callers running as a non-admin principal that lacks `CREATEDATABASE` + /// / `DESCRIBE` on `default`, use [`Self::register_catalog_with_default_db`] + /// with `None` to skip the default-database initialization entirely, or with + /// `Some(name)` to point at a database the principal does have access to. pub async fn register_catalog( &mut self, catalog_name: impl Into, catalog: Arc, + ) -> DFResult<()> { + self.register_catalog_with_default_db(catalog_name, catalog, Some("default")) + .await + } + + /// Registers a Paimon catalog under the given name, with caller-controlled + /// default-database initialization. + /// + /// `default_db`: + /// - `Some(name)` — ensure the named database exists (create if missing). The first + /// registered catalog also gets `set_current_database(name)`. Mirrors the behavior + /// of Java `FlinkCatalog`'s `defaultDatabase` constructor parameter. + /// - `None` — skip the default-database probe and create entirely, and skip the + /// implicit `set_current_database`. Mirrors Java `FlinkCatalog`'s + /// `DISABLE_CREATE_TABLE_IN_DEFAULT_DB=true` option. Required for non-admin + /// principals that lack DESCRIBE / CREATEDATABASE on `default` — without this they + /// hit a `Forbidden: ... DESCRIBE on DATABASE default` failure at session init even + /// when their actual queries target databases they do have access to. Such callers + /// are expected to set the current database themselves via + /// [`Self::set_current_database`] after registration, or to use fully-qualified + /// table names in every query. + pub async fn register_catalog_with_default_db( + &mut self, + catalog_name: impl Into, + catalog: Arc, + default_db: Option<&str>, ) -> DFResult<()> { let catalog_name = catalog_name.into(); let is_first = self.catalogs.is_empty(); - let default_db = "default"; - match catalog.get_database(default_db).await { - Ok(_) => {} - Err(paimon::Error::DatabaseNotExist { .. }) => { - catalog - .create_database(default_db, true, Default::default()) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + if let Some(default_db) = default_db { + match catalog.get_database(default_db).await { + Ok(_) => {} + Err(paimon::Error::DatabaseNotExist { .. }) => { + catalog + .create_database(default_db, true, Default::default()) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Err(e) => return Err(DataFusionError::External(Box::new(e))), } - Err(e) => return Err(DataFusionError::External(Box::new(e))), } self.ctx.register_catalog( &catalog_name, @@ -143,11 +175,16 @@ impl SQLContext { self.blob_reader_registry.clone(), )), ); - register_table_functions(&self.ctx, &catalog, default_db); + // Built-in table functions carry a default database for namespacing only — this is a + // string passed into the function registry, not a remote probe. When the caller opts + // out of default-db init we fall back to the literal "default" here. + register_table_functions(&self.ctx, &catalog, default_db.unwrap_or("default")); self.catalogs.insert(catalog_name.clone(), catalog); if is_first { self.set_current_catalog(catalog_name).await?; - self.set_current_database(default_db).await?; + if let Some(default_db) = default_db { + self.set_current_database(default_db).await?; + } } Ok(()) } From 93d58aee50c27ea0b0ad55a6f4ddf120f8d40ba3 Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Wed, 27 May 2026 18:05:27 +0800 Subject: [PATCH 2/6] chore: trim verbose comments on register_catalog_with_default_db Squash the wall-of-text doc blocks down to the load-bearing lines. No code change. --- bindings/python/src/context.rs | 16 ++--------- .../datafusion/src/sql_context.rs | 28 ++++--------------- 2 files changed, 8 insertions(+), 36 deletions(-) diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs index 5f54db77..2eb94242 100644 --- a/bindings/python/src/context.rs +++ b/bindings/python/src/context.rs @@ -186,15 +186,9 @@ impl PySQLContext { /// Registers a Paimon catalog under the given name. /// - /// `default_database` controls the implicit default-database initialization on the - /// underlying [`SQLContext::register_catalog_with_default_db`]: - /// - omitted / `None` — keep the historical behavior: probe `"default"` and create it - /// if missing, then `set_current_database("default")` on the first catalog. Requires - /// the principal to have DESCRIBE / CREATEDATABASE on `default`. - /// - `Some(name)` — same flow but against the named database. - /// - `Some("")` — skip default-database init entirely. Use this when running as a - /// non-admin principal that lacks DESCRIBE on `default`; callers are expected to use - /// fully-qualified table names or call `set_current_database` explicitly later. + /// `default_database`: omitted / `None` → use `"default"` (back-compat); + /// `""` → skip default-db init (for principals without DESCRIBE on `default`); + /// `"name"` → use `name`. #[pyo3(signature = (catalog_name, catalog_options, default_database=None))] fn register_catalog( &mut self, @@ -208,10 +202,6 @@ impl PySQLContext { rt.block_on(async { let options = Options::from_map(catalog_options); let catalog = CatalogFactory::create(options).await.map_err(to_py_err)?; - // Convention: Some("") at the Python layer = caller wants to skip default-db - // init. Map empty string to None so the Rust layer's "None means skip" path - // fires. Some("name") and the default (None at the Python layer, which we - // translate to Some("default")) both go through register_catalog_with_default_db. let default_db: Option = match default_database { None => Some("default".to_string()), Some(s) if s.is_empty() => None, diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index 7ae1e9f1..a5327a43 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -117,11 +117,6 @@ impl SQLContext { /// for both Paimon-handled SQL and DataFusion-delegated SQL (SELECT, etc.). /// A "default" database is created if it does not already exist (matching /// the behavior of Spark/Flink Paimon catalogs). - /// - /// For callers running as a non-admin principal that lacks `CREATEDATABASE` - /// / `DESCRIBE` on `default`, use [`Self::register_catalog_with_default_db`] - /// with `None` to skip the default-database initialization entirely, or with - /// `Some(name)` to point at a database the principal does have access to. pub async fn register_catalog( &mut self, catalog_name: impl Into, @@ -131,22 +126,12 @@ impl SQLContext { .await } - /// Registers a Paimon catalog under the given name, with caller-controlled - /// default-database initialization. + /// Like [`Self::register_catalog`] but lets the caller control default-database init. /// - /// `default_db`: - /// - `Some(name)` — ensure the named database exists (create if missing). The first - /// registered catalog also gets `set_current_database(name)`. Mirrors the behavior - /// of Java `FlinkCatalog`'s `defaultDatabase` constructor parameter. - /// - `None` — skip the default-database probe and create entirely, and skip the - /// implicit `set_current_database`. Mirrors Java `FlinkCatalog`'s - /// `DISABLE_CREATE_TABLE_IN_DEFAULT_DB=true` option. Required for non-admin - /// principals that lack DESCRIBE / CREATEDATABASE on `default` — without this they - /// hit a `Forbidden: ... DESCRIBE on DATABASE default` failure at session init even - /// when their actual queries target databases they do have access to. Such callers - /// are expected to set the current database themselves via - /// [`Self::set_current_database`] after registration, or to use fully-qualified - /// table names in every query. + /// `default_db = Some(name)` ensures `name` exists and sets it as current on the + /// first catalog. `default_db = None` skips both — required for principals that + /// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java `FlinkCatalog`'s + /// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`. pub async fn register_catalog_with_default_db( &mut self, catalog_name: impl Into, @@ -175,9 +160,6 @@ impl SQLContext { self.blob_reader_registry.clone(), )), ); - // Built-in table functions carry a default database for namespacing only — this is a - // string passed into the function registry, not a remote probe. When the caller opts - // out of default-db init we fall back to the literal "default" here. register_table_functions(&self.ctx, &catalog, default_db.unwrap_or("default")); self.catalogs.insert(catalog_name.clone(), catalog); if is_first { From 7b78591201672dcf1a71ecaf8fb97514f4c3e39a Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Wed, 27 May 2026 18:16:46 +0800 Subject: [PATCH 3/6] test: cover register_catalog_with_default_db None/Some paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ProbeTrackingCatalog records get/create_database calls and returns Unsupported (not DatabaseNotExist) from get_database — so failing to skip the probe surfaces as a hard error rather than the silent create-on-missing path the existing MockCatalog allows. Three cases: - None -> neither get nor create_database is called; registration succeeds - Some -> Unsupported error from get_database propagates - bare register_catalog() -> delegates with Some("default") (probes) --- .../datafusion/src/sql_context.rs | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index a5327a43..45c76ed7 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -2485,6 +2485,152 @@ mod tests { ctx } + // ==================== register_catalog_with_default_db tests ==================== + + /// Counts get/create_database calls so tests can assert whether the default-db + /// init path fired. `get_database` returns `Unsupported` rather than + /// `DatabaseNotExist` so that *not* skipping the probe surfaces as a hard error + /// (mimics a "Forbidden: DESCRIBE on DATABASE default" failure). + struct ProbeTrackingCatalog { + get_calls: std::sync::atomic::AtomicUsize, + create_calls: std::sync::atomic::AtomicUsize, + } + + impl ProbeTrackingCatalog { + fn new() -> Self { + Self { + get_calls: std::sync::atomic::AtomicUsize::new(0), + create_calls: std::sync::atomic::AtomicUsize::new(0), + } + } + fn get_count(&self) -> usize { + self.get_calls.load(std::sync::atomic::Ordering::SeqCst) + } + fn create_count(&self) -> usize { + self.create_calls.load(std::sync::atomic::Ordering::SeqCst) + } + } + + #[async_trait] + impl Catalog for ProbeTrackingCatalog { + async fn list_databases(&self) -> paimon::Result> { + Ok(vec![]) + } + async fn create_database( + &self, + _name: &str, + _ignore_if_exists: bool, + _properties: HashMap, + ) -> paimon::Result<()> { + self.create_calls + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + async fn get_database(&self, _name: &str) -> paimon::Result { + self.get_calls + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Err(paimon::Error::Unsupported { + message: "simulated Forbidden".to_string(), + }) + } + async fn drop_database( + &self, + _name: &str, + _ignore_if_not_exists: bool, + _cascade: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn get_table(&self, identifier: &Identifier) -> paimon::Result { + Err(paimon::Error::TableNotExist { + full_name: identifier.to_string(), + }) + } + async fn list_tables(&self, _database_name: &str) -> paimon::Result> { + Ok(vec![]) + } + async fn create_table( + &self, + _identifier: &Identifier, + _creation: PaimonSchema, + _ignore_if_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn drop_table( + &self, + _identifier: &Identifier, + _ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn rename_table( + &self, + _from: &Identifier, + _to: &Identifier, + _ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn alter_table( + &self, + _identifier: &Identifier, + _changes: Vec, + _ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn register_catalog_with_none_skips_default_db_probe() { + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + ctx.register_catalog_with_default_db("paimon", catalog.clone(), None) + .await + .expect("None must skip probe so Forbidden-shaped error never fires"); + assert_eq!(catalog.get_count(), 0, "get_database must not be called"); + assert_eq!( + catalog.create_count(), + 0, + "create_database must not be called" + ); + // Current catalog still set; current database left unchanged. + assert_eq!( + ctx.ctx().state().config().options().catalog.default_catalog, + "paimon" + ); + } + + #[tokio::test] + async fn register_catalog_with_some_default_propagates_probe_error() { + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + let err = ctx + .register_catalog_with_default_db("paimon", catalog.clone(), Some("default")) + .await + .expect_err("non-DatabaseNotExist error from get_database must propagate"); + assert!( + err.to_string().contains("simulated Forbidden"), + "unexpected error: {err}" + ); + assert_eq!(catalog.get_count(), 1); + assert_eq!(catalog.create_count(), 0); + } + + #[tokio::test] + async fn register_catalog_default_wrapper_uses_default_db() { + // The bare register_catalog() must delegate with Some("default"), so the + // probe fires and Forbidden propagates — same as Some("default") above. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + assert!(ctx + .register_catalog("paimon", catalog.clone()) + .await + .is_err()); + assert_eq!(catalog.get_count(), 1); + } + fn assert_sql_type_to_paimon( sql_type: datafusion::sql::sqlparser::ast::DataType, expected: PaimonDataType, From c3971e6c71b3acfe223483c118dc2983a5a24052 Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Thu, 28 May 2026 15:35:30 +0800 Subject: [PATCH 4/6] =?UTF-8?q?feat(datafusion):=20address=20review=20nits?= =?UTF-8?q?=20=E2=80=94=20empty-string=20guard,=20TVF=20fallback=20doc,=20?= =?UTF-8?q?integration=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three optional review comments, all addressed: 1. Doc note on built-in TVF behavior with default_db=None: bare table names inside vector_search / full_text_search still resolve against the literal "default" namespace (the fallback in register_table_functions). Callers using None must qualify table names in those TVF calls. 2. API-level guard against Some(""). Python binding already maps "" → None to express "skip init", but raw Rust callers could pass Some("") directly and silently probe get_database(""). Reject at the boundary with a Plan error telling them to use None instead. 3. New integration-shaped test `register_catalog_with_none_table_function_resolves_bare_name_to_literal_default` documents the expected error path: bare TVF name + default_db=None → surfaces an error naming `default.` (proving the fallback fired). Plus a unit test for the empty-string guard (`register_catalog_with_some_empty_string_is_rejected`). 5 tests total now, all green. --- .../datafusion/src/sql_context.rs | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index 45c76ed7..96790dec 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -132,12 +132,25 @@ impl SQLContext { /// first catalog. `default_db = None` skips both — required for principals that /// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java `FlinkCatalog`'s /// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`. + /// + /// `default_db = Some("")` is rejected — pass `None` to opt out instead. + /// + /// **Note on built-in TVFs (`vector_search`, `full_text_search`):** when + /// `default_db = None`, bare table names inside these functions still resolve + /// against the literal namespace `"default"` (the fallback in + /// [`register_table_functions`]). Callers using `None` must qualify table names + /// (`'db.table'` or `'catalog.db.table'`) in those calls. pub async fn register_catalog_with_default_db( &mut self, catalog_name: impl Into, catalog: Arc, default_db: Option<&str>, ) -> DFResult<()> { + if matches!(default_db, Some("")) { + return Err(DataFusionError::Plan( + "default_db must not be empty; pass None to skip default-database init".to_string(), + )); + } let catalog_name = catalog_name.into(); let is_first = self.catalogs.is_empty(); if let Some(default_db) = default_db { @@ -2618,6 +2631,51 @@ mod tests { assert_eq!(catalog.create_count(), 0); } + #[tokio::test] + async fn register_catalog_with_some_empty_string_is_rejected() { + // Footgun guard: raw Rust callers could pass `Some("")` and silently probe + // `get_database("")`. Reject at the API boundary; tell them to use `None` instead. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + let err = ctx + .register_catalog_with_default_db("paimon", catalog.clone(), Some("")) + .await + .expect_err("empty default_db must be rejected at the API"); + assert!( + err.to_string().contains("must not be empty"), + "unexpected error: {err}" + ); + assert_eq!( + catalog.get_count(), + 0, + "guard must short-circuit before any catalog call" + ); + } + + #[tokio::test] + async fn register_catalog_with_none_table_function_resolves_bare_name_to_literal_default() { + // Documents the fallback in register_table_functions: `default_db.unwrap_or("default")`. + // When the caller opts out of default-db init, bare table names inside built-in TVFs + // (vector_search / full_text_search) still resolve against the literal namespace + // `"default"` — so a caller using `None` MUST use fully-qualified names with these + // functions or they'll hit a `default.` lookup that may not exist / be readable. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + ctx.register_catalog_with_default_db("paimon", catalog, None) + .await + .unwrap(); + + let err = ctx + .sql("SELECT * FROM vector_search('bare', 'col', '[1.0]', 1)") + .await + .expect_err("bare name must error out — no `default.bare` table in mock catalog"); + let msg = err.to_string(); + assert!( + msg.contains("default") && msg.contains("bare"), + "error must surface the fallback 'default' namespace + bare name, got: {msg}" + ); + } + #[tokio::test] async fn register_catalog_default_wrapper_uses_default_db() { // The bare register_catalog() must delegate with Some("default"), so the From 1727c47dfca9e1afe0127a89455a5e8caabf6264 Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Thu, 28 May 2026 19:26:17 +0800 Subject: [PATCH 5/6] =?UTF-8?q?Revert=20"feat(datafusion):=20address=20rev?= =?UTF-8?q?iew=20nits=20=E2=80=94=20empty-string=20guard,=20TVF=20fallback?= =?UTF-8?q?=20doc,=20integration=20test"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit c3971e6c71b3acfe223483c118dc2983a5a24052. --- .../datafusion/src/sql_context.rs | 58 ------------------- 1 file changed, 58 deletions(-) diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index 96790dec..45c76ed7 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -132,25 +132,12 @@ impl SQLContext { /// first catalog. `default_db = None` skips both — required for principals that /// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java `FlinkCatalog`'s /// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`. - /// - /// `default_db = Some("")` is rejected — pass `None` to opt out instead. - /// - /// **Note on built-in TVFs (`vector_search`, `full_text_search`):** when - /// `default_db = None`, bare table names inside these functions still resolve - /// against the literal namespace `"default"` (the fallback in - /// [`register_table_functions`]). Callers using `None` must qualify table names - /// (`'db.table'` or `'catalog.db.table'`) in those calls. pub async fn register_catalog_with_default_db( &mut self, catalog_name: impl Into, catalog: Arc, default_db: Option<&str>, ) -> DFResult<()> { - if matches!(default_db, Some("")) { - return Err(DataFusionError::Plan( - "default_db must not be empty; pass None to skip default-database init".to_string(), - )); - } let catalog_name = catalog_name.into(); let is_first = self.catalogs.is_empty(); if let Some(default_db) = default_db { @@ -2631,51 +2618,6 @@ mod tests { assert_eq!(catalog.create_count(), 0); } - #[tokio::test] - async fn register_catalog_with_some_empty_string_is_rejected() { - // Footgun guard: raw Rust callers could pass `Some("")` and silently probe - // `get_database("")`. Reject at the API boundary; tell them to use `None` instead. - let catalog = Arc::new(ProbeTrackingCatalog::new()); - let mut ctx = SQLContext::new(); - let err = ctx - .register_catalog_with_default_db("paimon", catalog.clone(), Some("")) - .await - .expect_err("empty default_db must be rejected at the API"); - assert!( - err.to_string().contains("must not be empty"), - "unexpected error: {err}" - ); - assert_eq!( - catalog.get_count(), - 0, - "guard must short-circuit before any catalog call" - ); - } - - #[tokio::test] - async fn register_catalog_with_none_table_function_resolves_bare_name_to_literal_default() { - // Documents the fallback in register_table_functions: `default_db.unwrap_or("default")`. - // When the caller opts out of default-db init, bare table names inside built-in TVFs - // (vector_search / full_text_search) still resolve against the literal namespace - // `"default"` — so a caller using `None` MUST use fully-qualified names with these - // functions or they'll hit a `default.` lookup that may not exist / be readable. - let catalog = Arc::new(ProbeTrackingCatalog::new()); - let mut ctx = SQLContext::new(); - ctx.register_catalog_with_default_db("paimon", catalog, None) - .await - .unwrap(); - - let err = ctx - .sql("SELECT * FROM vector_search('bare', 'col', '[1.0]', 1)") - .await - .expect_err("bare name must error out — no `default.bare` table in mock catalog"); - let msg = err.to_string(); - assert!( - msg.contains("default") && msg.contains("bare"), - "error must surface the fallback 'default' namespace + bare name, got: {msg}" - ); - } - #[tokio::test] async fn register_catalog_default_wrapper_uses_default_db() { // The bare register_catalog() must delegate with Some("default"), so the From d8ad09f0703a37c275447b56e57ae87fec84f09a Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Fri, 29 May 2026 10:19:43 +0800 Subject: [PATCH 6/6] =?UTF-8?q?Revert=20"Revert=20"feat(datafusion):=20add?= =?UTF-8?q?ress=20review=20nits=20=E2=80=94=20empty-string=20guard,=20TVF?= =?UTF-8?q?=20fallback=20doc,=20integration=20test""?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 1727c47dfca9e1afe0127a89455a5e8caabf6264. --- .../datafusion/src/sql_context.rs | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index 45c76ed7..96790dec 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -132,12 +132,25 @@ impl SQLContext { /// first catalog. `default_db = None` skips both — required for principals that /// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java `FlinkCatalog`'s /// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`. + /// + /// `default_db = Some("")` is rejected — pass `None` to opt out instead. + /// + /// **Note on built-in TVFs (`vector_search`, `full_text_search`):** when + /// `default_db = None`, bare table names inside these functions still resolve + /// against the literal namespace `"default"` (the fallback in + /// [`register_table_functions`]). Callers using `None` must qualify table names + /// (`'db.table'` or `'catalog.db.table'`) in those calls. pub async fn register_catalog_with_default_db( &mut self, catalog_name: impl Into, catalog: Arc, default_db: Option<&str>, ) -> DFResult<()> { + if matches!(default_db, Some("")) { + return Err(DataFusionError::Plan( + "default_db must not be empty; pass None to skip default-database init".to_string(), + )); + } let catalog_name = catalog_name.into(); let is_first = self.catalogs.is_empty(); if let Some(default_db) = default_db { @@ -2618,6 +2631,51 @@ mod tests { assert_eq!(catalog.create_count(), 0); } + #[tokio::test] + async fn register_catalog_with_some_empty_string_is_rejected() { + // Footgun guard: raw Rust callers could pass `Some("")` and silently probe + // `get_database("")`. Reject at the API boundary; tell them to use `None` instead. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + let err = ctx + .register_catalog_with_default_db("paimon", catalog.clone(), Some("")) + .await + .expect_err("empty default_db must be rejected at the API"); + assert!( + err.to_string().contains("must not be empty"), + "unexpected error: {err}" + ); + assert_eq!( + catalog.get_count(), + 0, + "guard must short-circuit before any catalog call" + ); + } + + #[tokio::test] + async fn register_catalog_with_none_table_function_resolves_bare_name_to_literal_default() { + // Documents the fallback in register_table_functions: `default_db.unwrap_or("default")`. + // When the caller opts out of default-db init, bare table names inside built-in TVFs + // (vector_search / full_text_search) still resolve against the literal namespace + // `"default"` — so a caller using `None` MUST use fully-qualified names with these + // functions or they'll hit a `default.` lookup that may not exist / be readable. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + ctx.register_catalog_with_default_db("paimon", catalog, None) + .await + .unwrap(); + + let err = ctx + .sql("SELECT * FROM vector_search('bare', 'col', '[1.0]', 1)") + .await + .expect_err("bare name must error out — no `default.bare` table in mock catalog"); + let msg = err.to_string(); + assert!( + msg.contains("default") && msg.contains("bare"), + "error must surface the fallback 'default' namespace + bare name, got: {msg}" + ); + } + #[tokio::test] async fn register_catalog_default_wrapper_uses_default_db() { // The bare register_catalog() must delegate with Some("default"), so the