diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs index cc855ee8..2eb94242 100644 --- a/bindings/python/src/context.rs +++ b/bindings/python/src/context.rs @@ -184,19 +184,31 @@ impl PySQLContext { Ok(ctx) } + /// Registers a Paimon catalog under the given name. + /// + /// `default_database`: omitted / `None` → use `"default"` (back-compat); + /// `""` → skip default-db init (for principals without DESCRIBE on `default`); + /// `"name"` → use `name`. + #[pyo3(signature = (catalog_name, catalog_options, default_database=None))] fn register_catalog( &mut self, py: Python<'_>, catalog_name: String, catalog_options: HashMap, + default_database: Option, ) -> PyResult<()> { let rt = runtime(); py.detach(|| { rt.block_on(async { let options = Options::from_map(catalog_options); let catalog = CatalogFactory::create(options).await.map_err(to_py_err)?; + let default_db: Option = match default_database { + None => Some("default".to_string()), + Some(s) if s.is_empty() => None, + Some(s) => Some(s), + }; self.inner - .register_catalog(catalog_name, catalog) + .register_catalog_with_default_db(catalog_name, catalog, default_db.as_deref()) .await .map_err(df_to_py_err) }) diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index ec93741a..96790dec 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -122,18 +122,48 @@ impl SQLContext { catalog_name: impl Into, catalog: Arc, ) -> DFResult<()> { + self.register_catalog_with_default_db(catalog_name, catalog, Some("default")) + .await + } + + /// Like [`Self::register_catalog`] but lets the caller control default-database init. + /// + /// `default_db = Some(name)` ensures `name` exists and sets it as current on the + /// first catalog. `default_db = None` skips both — required for principals that + /// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java `FlinkCatalog`'s + /// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`. + /// + /// `default_db = Some("")` is rejected — pass `None` to opt out instead. + /// + /// **Note on built-in TVFs (`vector_search`, `full_text_search`):** when + /// `default_db = None`, bare table names inside these functions still resolve + /// against the literal namespace `"default"` (the fallback in + /// [`register_table_functions`]). Callers using `None` must qualify table names + /// (`'db.table'` or `'catalog.db.table'`) in those calls. + pub async fn register_catalog_with_default_db( + &mut self, + catalog_name: impl Into, + catalog: Arc, + default_db: Option<&str>, + ) -> DFResult<()> { + if matches!(default_db, Some("")) { + return Err(DataFusionError::Plan( + "default_db must not be empty; pass None to skip default-database init".to_string(), + )); + } let catalog_name = catalog_name.into(); let is_first = self.catalogs.is_empty(); - let default_db = "default"; - match catalog.get_database(default_db).await { - Ok(_) => {} - Err(paimon::Error::DatabaseNotExist { .. }) => { - catalog - .create_database(default_db, true, Default::default()) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + if let Some(default_db) = default_db { + match catalog.get_database(default_db).await { + Ok(_) => {} + Err(paimon::Error::DatabaseNotExist { .. }) => { + catalog + .create_database(default_db, true, Default::default()) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Err(e) => return Err(DataFusionError::External(Box::new(e))), } - Err(e) => return Err(DataFusionError::External(Box::new(e))), } self.ctx.register_catalog( &catalog_name, @@ -143,11 +173,13 @@ impl SQLContext { self.blob_reader_registry.clone(), )), ); - register_table_functions(&self.ctx, &catalog, default_db); + register_table_functions(&self.ctx, &catalog, default_db.unwrap_or("default")); self.catalogs.insert(catalog_name.clone(), catalog); if is_first { self.set_current_catalog(catalog_name).await?; - self.set_current_database(default_db).await?; + if let Some(default_db) = default_db { + self.set_current_database(default_db).await?; + } } Ok(()) } @@ -2466,6 +2498,197 @@ mod tests { ctx } + // ==================== register_catalog_with_default_db tests ==================== + + /// Counts get/create_database calls so tests can assert whether the default-db + /// init path fired. `get_database` returns `Unsupported` rather than + /// `DatabaseNotExist` so that *not* skipping the probe surfaces as a hard error + /// (mimics a "Forbidden: DESCRIBE on DATABASE default" failure). + struct ProbeTrackingCatalog { + get_calls: std::sync::atomic::AtomicUsize, + create_calls: std::sync::atomic::AtomicUsize, + } + + impl ProbeTrackingCatalog { + fn new() -> Self { + Self { + get_calls: std::sync::atomic::AtomicUsize::new(0), + create_calls: std::sync::atomic::AtomicUsize::new(0), + } + } + fn get_count(&self) -> usize { + self.get_calls.load(std::sync::atomic::Ordering::SeqCst) + } + fn create_count(&self) -> usize { + self.create_calls.load(std::sync::atomic::Ordering::SeqCst) + } + } + + #[async_trait] + impl Catalog for ProbeTrackingCatalog { + async fn list_databases(&self) -> paimon::Result> { + Ok(vec![]) + } + async fn create_database( + &self, + _name: &str, + _ignore_if_exists: bool, + _properties: HashMap, + ) -> paimon::Result<()> { + self.create_calls + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + async fn get_database(&self, _name: &str) -> paimon::Result { + self.get_calls + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Err(paimon::Error::Unsupported { + message: "simulated Forbidden".to_string(), + }) + } + async fn drop_database( + &self, + _name: &str, + _ignore_if_not_exists: bool, + _cascade: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn get_table(&self, identifier: &Identifier) -> paimon::Result { + Err(paimon::Error::TableNotExist { + full_name: identifier.to_string(), + }) + } + async fn list_tables(&self, _database_name: &str) -> paimon::Result> { + Ok(vec![]) + } + async fn create_table( + &self, + _identifier: &Identifier, + _creation: PaimonSchema, + _ignore_if_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn drop_table( + &self, + _identifier: &Identifier, + _ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn rename_table( + &self, + _from: &Identifier, + _to: &Identifier, + _ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn alter_table( + &self, + _identifier: &Identifier, + _changes: Vec, + _ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn register_catalog_with_none_skips_default_db_probe() { + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + ctx.register_catalog_with_default_db("paimon", catalog.clone(), None) + .await + .expect("None must skip probe so Forbidden-shaped error never fires"); + assert_eq!(catalog.get_count(), 0, "get_database must not be called"); + assert_eq!( + catalog.create_count(), + 0, + "create_database must not be called" + ); + // Current catalog still set; current database left unchanged. + assert_eq!( + ctx.ctx().state().config().options().catalog.default_catalog, + "paimon" + ); + } + + #[tokio::test] + async fn register_catalog_with_some_default_propagates_probe_error() { + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + let err = ctx + .register_catalog_with_default_db("paimon", catalog.clone(), Some("default")) + .await + .expect_err("non-DatabaseNotExist error from get_database must propagate"); + assert!( + err.to_string().contains("simulated Forbidden"), + "unexpected error: {err}" + ); + assert_eq!(catalog.get_count(), 1); + assert_eq!(catalog.create_count(), 0); + } + + #[tokio::test] + async fn register_catalog_with_some_empty_string_is_rejected() { + // Footgun guard: raw Rust callers could pass `Some("")` and silently probe + // `get_database("")`. Reject at the API boundary; tell them to use `None` instead. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + let err = ctx + .register_catalog_with_default_db("paimon", catalog.clone(), Some("")) + .await + .expect_err("empty default_db must be rejected at the API"); + assert!( + err.to_string().contains("must not be empty"), + "unexpected error: {err}" + ); + assert_eq!( + catalog.get_count(), + 0, + "guard must short-circuit before any catalog call" + ); + } + + #[tokio::test] + async fn register_catalog_with_none_table_function_resolves_bare_name_to_literal_default() { + // Documents the fallback in register_table_functions: `default_db.unwrap_or("default")`. + // When the caller opts out of default-db init, bare table names inside built-in TVFs + // (vector_search / full_text_search) still resolve against the literal namespace + // `"default"` — so a caller using `None` MUST use fully-qualified names with these + // functions or they'll hit a `default.` lookup that may not exist / be readable. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + ctx.register_catalog_with_default_db("paimon", catalog, None) + .await + .unwrap(); + + let err = ctx + .sql("SELECT * FROM vector_search('bare', 'col', '[1.0]', 1)") + .await + .expect_err("bare name must error out — no `default.bare` table in mock catalog"); + let msg = err.to_string(); + assert!( + msg.contains("default") && msg.contains("bare"), + "error must surface the fallback 'default' namespace + bare name, got: {msg}" + ); + } + + #[tokio::test] + async fn register_catalog_default_wrapper_uses_default_db() { + // The bare register_catalog() must delegate with Some("default"), so the + // probe fires and Forbidden propagates — same as Some("default") above. + let catalog = Arc::new(ProbeTrackingCatalog::new()); + let mut ctx = SQLContext::new(); + assert!(ctx + .register_catalog("paimon", catalog.clone()) + .await + .is_err()); + assert_eq!(catalog.get_count(), 1); + } + fn assert_sql_type_to_paimon( sql_type: datafusion::sql::sqlparser::ast::DataType, expected: PaimonDataType,