Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion bindings/python/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,31 @@ impl PySQLContext {
Ok(ctx)
}

/// Registers a Paimon catalog under the given name.
///
/// `default_database`: omitted / `None` → use `"default"` (back-compat);
/// `""` → skip default-db init (for principals without DESCRIBE on `default`);
/// `"name"` → use `name`.
#[pyo3(signature = (catalog_name, catalog_options, default_database=None))]
fn register_catalog(
&mut self,
py: Python<'_>,
catalog_name: String,
catalog_options: HashMap<String, String>,
default_database: Option<String>,
) -> PyResult<()> {
let rt = runtime();
py.detach(|| {
rt.block_on(async {
let options = Options::from_map(catalog_options);
let catalog = CatalogFactory::create(options).await.map_err(to_py_err)?;
let default_db: Option<String> = match default_database {
None => Some("default".to_string()),
Some(s) if s.is_empty() => None,
Some(s) => Some(s),
};
self.inner
.register_catalog(catalog_name, catalog)
.register_catalog_with_default_db(catalog_name, catalog, default_db.as_deref())
.await
.map_err(df_to_py_err)
})
Expand Down
245 changes: 234 additions & 11 deletions crates/integrations/datafusion/src/sql_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,48 @@ impl SQLContext {
catalog_name: impl Into<String>,
catalog: Arc<dyn Catalog>,
) -> DFResult<()> {
self.register_catalog_with_default_db(catalog_name, catalog, Some("default"))
.await
}

/// Like [`Self::register_catalog`] but lets the caller control default-database init.
///
/// `default_db = Some(name)` ensures `name` exists and sets it as current on the
/// first catalog. `default_db = None` skips both — required for principals that
/// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java `FlinkCatalog`'s
/// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`.
///
/// `default_db = Some("")` is rejected — pass `None` to opt out instead.
///
/// **Note on built-in TVFs (`vector_search`, `full_text_search`):** when
/// `default_db = None`, bare table names inside these functions still resolve
/// against the literal namespace `"default"` (the fallback in
/// [`register_table_functions`]). Callers using `None` must qualify table names
/// (`'db.table'` or `'catalog.db.table'`) in those calls.
pub async fn register_catalog_with_default_db(
&mut self,
catalog_name: impl Into<String>,
catalog: Arc<dyn Catalog>,
default_db: Option<&str>,
) -> DFResult<()> {
if matches!(default_db, Some("")) {
return Err(DataFusionError::Plan(
"default_db must not be empty; pass None to skip default-database init".to_string(),
));
}
let catalog_name = catalog_name.into();
let is_first = self.catalogs.is_empty();
let default_db = "default";
match catalog.get_database(default_db).await {
Ok(_) => {}
Err(paimon::Error::DatabaseNotExist { .. }) => {
catalog
.create_database(default_db, true, Default::default())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
if let Some(default_db) = default_db {
match catalog.get_database(default_db).await {
Ok(_) => {}
Err(paimon::Error::DatabaseNotExist { .. }) => {
catalog
.create_database(default_db, true, Default::default())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
}
Err(e) => return Err(DataFusionError::External(Box::new(e))),
}
Err(e) => return Err(DataFusionError::External(Box::new(e))),
}
self.ctx.register_catalog(
&catalog_name,
Expand All @@ -143,11 +173,13 @@ impl SQLContext {
self.blob_reader_registry.clone(),
)),
);
register_table_functions(&self.ctx, &catalog, default_db);
register_table_functions(&self.ctx, &catalog, default_db.unwrap_or("default"));
self.catalogs.insert(catalog_name.clone(), catalog);
if is_first {
self.set_current_catalog(catalog_name).await?;
self.set_current_database(default_db).await?;
if let Some(default_db) = default_db {
self.set_current_database(default_db).await?;
}
}
Ok(())
}
Expand Down Expand Up @@ -2466,6 +2498,197 @@ mod tests {
ctx
}

// ==================== register_catalog_with_default_db tests ====================

/// Counts get/create_database calls so tests can assert whether the default-db
/// init path fired. `get_database` returns `Unsupported` rather than
/// `DatabaseNotExist` so that *not* skipping the probe surfaces as a hard error
/// (mimics a "Forbidden: DESCRIBE on DATABASE default" failure).
struct ProbeTrackingCatalog {
get_calls: std::sync::atomic::AtomicUsize,
create_calls: std::sync::atomic::AtomicUsize,
}

impl ProbeTrackingCatalog {
fn new() -> Self {
Self {
get_calls: std::sync::atomic::AtomicUsize::new(0),
create_calls: std::sync::atomic::AtomicUsize::new(0),
}
}
fn get_count(&self) -> usize {
self.get_calls.load(std::sync::atomic::Ordering::SeqCst)
}
fn create_count(&self) -> usize {
self.create_calls.load(std::sync::atomic::Ordering::SeqCst)
}
}

#[async_trait]
impl Catalog for ProbeTrackingCatalog {
async fn list_databases(&self) -> paimon::Result<Vec<String>> {
Ok(vec![])
}
async fn create_database(
&self,
_name: &str,
_ignore_if_exists: bool,
_properties: HashMap<String, String>,
) -> paimon::Result<()> {
self.create_calls
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
async fn get_database(&self, _name: &str) -> paimon::Result<Database> {
self.get_calls
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Err(paimon::Error::Unsupported {
message: "simulated Forbidden".to_string(),
})
}
async fn drop_database(
&self,
_name: &str,
_ignore_if_not_exists: bool,
_cascade: bool,
) -> paimon::Result<()> {
Ok(())
}
async fn get_table(&self, identifier: &Identifier) -> paimon::Result<Table> {
Err(paimon::Error::TableNotExist {
full_name: identifier.to_string(),
})
}
async fn list_tables(&self, _database_name: &str) -> paimon::Result<Vec<String>> {
Ok(vec![])
}
async fn create_table(
&self,
_identifier: &Identifier,
_creation: PaimonSchema,
_ignore_if_exists: bool,
) -> paimon::Result<()> {
Ok(())
}
async fn drop_table(
&self,
_identifier: &Identifier,
_ignore_if_not_exists: bool,
) -> paimon::Result<()> {
Ok(())
}
async fn rename_table(
&self,
_from: &Identifier,
_to: &Identifier,
_ignore_if_not_exists: bool,
) -> paimon::Result<()> {
Ok(())
}
async fn alter_table(
&self,
_identifier: &Identifier,
_changes: Vec<SchemaChange>,
_ignore_if_not_exists: bool,
) -> paimon::Result<()> {
Ok(())
}
}

#[tokio::test]
async fn register_catalog_with_none_skips_default_db_probe() {
let catalog = Arc::new(ProbeTrackingCatalog::new());
let mut ctx = SQLContext::new();
ctx.register_catalog_with_default_db("paimon", catalog.clone(), None)
.await
.expect("None must skip probe so Forbidden-shaped error never fires");
assert_eq!(catalog.get_count(), 0, "get_database must not be called");
assert_eq!(
catalog.create_count(),
0,
"create_database must not be called"
);
// Current catalog still set; current database left unchanged.
assert_eq!(
ctx.ctx().state().config().options().catalog.default_catalog,
"paimon"
);
}

#[tokio::test]
async fn register_catalog_with_some_default_propagates_probe_error() {
let catalog = Arc::new(ProbeTrackingCatalog::new());
let mut ctx = SQLContext::new();
let err = ctx
.register_catalog_with_default_db("paimon", catalog.clone(), Some("default"))
.await
.expect_err("non-DatabaseNotExist error from get_database must propagate");
assert!(
err.to_string().contains("simulated Forbidden"),
"unexpected error: {err}"
);
assert_eq!(catalog.get_count(), 1);
assert_eq!(catalog.create_count(), 0);
}

#[tokio::test]
async fn register_catalog_with_some_empty_string_is_rejected() {
// Footgun guard: raw Rust callers could pass `Some("")` and silently probe
// `get_database("")`. Reject at the API boundary; tell them to use `None` instead.
let catalog = Arc::new(ProbeTrackingCatalog::new());
let mut ctx = SQLContext::new();
let err = ctx
.register_catalog_with_default_db("paimon", catalog.clone(), Some(""))
.await
.expect_err("empty default_db must be rejected at the API");
assert!(
err.to_string().contains("must not be empty"),
"unexpected error: {err}"
);
assert_eq!(
catalog.get_count(),
0,
"guard must short-circuit before any catalog call"
);
}

#[tokio::test]
async fn register_catalog_with_none_table_function_resolves_bare_name_to_literal_default() {
// Documents the fallback in register_table_functions: `default_db.unwrap_or("default")`.
// When the caller opts out of default-db init, bare table names inside built-in TVFs
// (vector_search / full_text_search) still resolve against the literal namespace
// `"default"` — so a caller using `None` MUST use fully-qualified names with these
// functions or they'll hit a `default.<name>` lookup that may not exist / be readable.
let catalog = Arc::new(ProbeTrackingCatalog::new());
let mut ctx = SQLContext::new();
ctx.register_catalog_with_default_db("paimon", catalog, None)
.await
.unwrap();

let err = ctx
.sql("SELECT * FROM vector_search('bare', 'col', '[1.0]', 1)")
.await
.expect_err("bare name must error out — no `default.bare` table in mock catalog");
let msg = err.to_string();
assert!(
msg.contains("default") && msg.contains("bare"),
"error must surface the fallback 'default' namespace + bare name, got: {msg}"
);
}

#[tokio::test]
async fn register_catalog_default_wrapper_uses_default_db() {
// The bare register_catalog() must delegate with Some("default"), so the
// probe fires and Forbidden propagates — same as Some("default") above.
let catalog = Arc::new(ProbeTrackingCatalog::new());
let mut ctx = SQLContext::new();
assert!(ctx
.register_catalog("paimon", catalog.clone())
.await
.is_err());
assert_eq!(catalog.get_count(), 1);
}

fn assert_sql_type_to_paimon(
sql_type: datafusion::sql::sqlparser::ast::DataType,
expected: PaimonDataType,
Expand Down
Loading