Skip to content

SQL Unparser generates incorrect column references #23317

Description

@drewrip

Describe the bug

This is a followup to the last issue #23138. Many thanks to @Phoenix500526 for the last fix.

There still seems to be circumstances where the Unparser module generates incorrect SQL. This is generally because it is qualifying the column references with a relation that is no longer available at that scope or because it creates invalid references to columns generated by the optimizer.

I've encountered two separate issues:

  1. The SingleDistinctToGroupBy pass leaves around group_alias_* references that the Unparser doesn't properly resolve
  2. The resulting SQL tries to use table qualified column references to columns in an unnamed subquery

In the steps to reproduce below I'll show an example of both.

To Reproduce

Generate the DuckDB tables:

duckdb warehouse.duckdb "
CREATE TABLE IF NOT EXISTS main.customers (
    customer_id   INTEGER NOT NULL,
    full_name     VARCHAR,
    email         VARCHAR,
    country       VARCHAR,
    signup_date   DATE,
    is_active     BOOLEAN,
    lifetime_spend DECIMAL(12, 2)
);
CREATE TABLE IF NOT EXISTS main.sales (
    customer_id    INTEGER NOT NULL,
    total_revenue  DECIMAL(12, 2),
    value_segment  VARCHAR
);
"

The Rust Dependencies (points to the latest commit on main):

[dependencies]
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "0fcaef3e8a29fd174b6e3f22ee936a7283b599a4"}
duckdb = { version = "1.10503.1", features = ["bundled"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

The Rust reproducer:

use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::{
    CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
};
use datafusion::datasource::empty::EmptyTable;
use datafusion::optimizer::{OptimizerRule, single_distinct_to_groupby::SingleDistinctToGroupBy};
use datafusion::prelude::*;
use datafusion::sql::unparser::Unparser;
use datafusion::sql::unparser::dialect::DuckDBDialect;
use duckdb::Connection;

const QUERY: &str = r#"
WITH cohort AS (
    SELECT
        signup_year,
        sum(customers) AS customers,
        sum(revenue) AS revenue
    FROM
        (
            SELECT
                date_part('year', c.signup_date) AS signup_year,
                count(DISTINCT cs.customer_id) AS customers,
                round(sum(cs.total_revenue), 2) AS revenue
            FROM
                "warehouse"."main"."sales" cs
                JOIN "warehouse"."main"."customers" c USING (customer_id)
            GROUP BY
                1
        )
    GROUP BY
        signup_year
)
SELECT
    *
FROM
    cohort
"#;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    let schema_provider = Arc::new(MemorySchemaProvider::new());

    let customers_schema = Arc::new(Schema::new(vec![
        Field::new("customer_id", DataType::Int32, false),
        Field::new("signup_date", DataType::Date32, true),
    ]));
    schema_provider.register_table(
        "customers".to_string(),
        Arc::new(EmptyTable::new(customers_schema)),
    )?;

    let sales_schema = Arc::new(Schema::new(vec![
        Field::new("customer_id", DataType::Int32, false),
        Field::new("total_revenue", DataType::Decimal128(12, 2), true),
    ]));
    schema_provider.register_table("sales".to_string(), Arc::new(EmptyTable::new(sales_schema)))?;

    let catalog = Arc::new(MemoryCatalogProvider::new());
    catalog.register_schema("main", schema_provider)?;
    ctx.register_catalog("warehouse", catalog);

    let dialect = DuckDBDialect::new();
    let unparser = Unparser::new(&dialect);
    let conn = Connection::open("warehouse.duckdb")?;

    match conn.execute(QUERY, []) {
        Ok(_) => println!("input SQL is valid: executes fine directly against DuckDB"),
        Err(e) => println!("input SQL is NOT valid against DuckDB: {e}"),
    }
    match ctx.sql(QUERY).await?.collect().await {
        Ok(_) => println!("input SQL is valid: executes fine directly via DataFusion"),
        Err(e) => println!("input SQL is NOT valid via DataFusion: {e}"),
    }

    let plan = ctx.sql(QUERY).await?.into_optimized_plan()?;
    let sql = unparser.plan_to_sql(&plan)?;
    match conn.execute(&sql.to_string(), []) {
        Ok(_) => {
            println!("success");
        }
        Err(e) => {
            println!("failed: {e}");
            println!("Optimized sql =\n{sql}");
            println!("Optimized plan =\n{}", plan.display_indent());
        }
    }

    if ctx.sql(&sql.to_string()).await?.collect().await.is_ok() {
        println!("df succeeded");
    } else {
        println!("df also failed");
    };
    Ok(())
}

The result of this is:

input SQL is valid: executes fine directly against DuckDB
input SQL is valid: executes fine directly via DataFusion
failed: Binder Error: Referenced column "group_alias_0" not found in FROM clause!
Candidate bindings: "signup_date"

LINE 1: ..." AS "c" ON "cs"."customer_id" = "c"."customer_id") GROUP BY "group_alias_0") GROUP BY "signup_year") AS "cohort"
                                                                        ^
Optimized sql =
SELECT * FROM (SELECT "signup_year", sum("customers") AS "customers", sum("revenue") AS "revenue" FROM (SELECT "group_alias_0" AS "signup_year", count("alias1") AS "customers", round(sum("alias2"), 2) AS "revenue" FROM (SELECT "cs"."customer_id", "cs"."total_revenue", "c"."signup_date" FROM "warehouse"."main"."sales" AS "cs" INNER JOIN "warehouse"."main"."customers" AS "c" ON "cs"."customer_id" = "c"."customer_id") GROUP BY "group_alias_0") GROUP BY "signup_year") AS "cohort"
Optimized plan =
SubqueryAlias: cohort
  Projection: signup_year, sum(customers) AS customers, sum(revenue) AS revenue
    Aggregate: groupBy=[[signup_year]], aggr=[[sum(customers), sum(revenue)]]
      Projection: group_alias_0 AS signup_year, count(alias1) AS customers, round(sum(alias2), Int32(2)) AS revenue
        Aggregate: groupBy=[[group_alias_0]], aggr=[[count(alias1), sum(alias2)]]
          Aggregate: groupBy=[[date_part(Utf8("year"), c.signup_date) AS group_alias_0, cs.customer_id AS alias1]], aggr=[[sum(cs.total_revenue) AS alias2]]
            Projection: cs.customer_id, cs.total_revenue, c.signup_date
              Inner Join: cs.customer_id = c.customer_id
                SubqueryAlias: cs
                  TableScan: warehouse.main.sales projection=[customer_id, total_revenue]
                SubqueryAlias: c
                  TableScan: warehouse.main.customers projection=[customer_id, signup_date]
Error: Collection([Diagnostic(Diagnostic { kind: Error, message: "column 'group_alias_0' not found", span: None, notes: [], helps: [] }, SchemaError(FieldNotFound { field: Column { relation: None, name: "group_alias_0" }, valid_fields: [Column { relation: Some(Bare { table: "cs" }), name: "customer_id" }, Column { relation: Some(Bare { table: "cs" }), name: "total_revenue" }, Column { relation: Some(Bare { table: "c" }), name: "signup_date" }] }, Some(""))), Diagnostic(Diagnostic { kind: Error, message: "column 'alias1' not found", span: None, notes: [], helps: [] }, SchemaError(FieldNotFound { field: Column { relation: None, name: "alias1" }, valid_fields: [Column { relation: Some(Bare { table: "cs" }), name: "customer_id" }, Column { relation: Some(Bare { table: "cs" }), name: "total_revenue" }, Column { relation: Some(Bare { table: "c" }), name: "signup_date" }] }, Some(""))), Diagnostic(Diagnostic { kind: Error, message: "column 'alias2' not found", span: None, notes: [], helps: [] }, SchemaError(FieldNotFound { field: Column { relation: None, name: "alias2" }, valid_fields: [Column { relation: Some(Bare { table: "cs" }), name: "customer_id" }, Column { relation: Some(Bare { table: "cs" }), name: "total_revenue" }, Column { relation: Some(Bare { table: "c" }), name: "signup_date" }] }, Some("")))])

This demonstrates part (1) as I outlined above. We can also demonstrate part (2) by disabling the optimizer pass that is creating these group_alias_* aliases with:

ctx.remove_optimizer_rule(SingleDistinctToGroupBy::new().name());

The reproducer now results in:

input SQL is valid: executes fine directly against DuckDB
input SQL is valid: executes fine directly via DataFusion
failed: Binder Error: Referenced table "c" not found!
Candidate tables: "unnamed_subquery"

LINE 1: ...."customer_id" = "c"."customer_id") GROUP BY date_part('year', "c"."signup_date")) GROUP BY "signup_year") AS "cohort"
                                                                          ^
Optimized sql =
SELECT * FROM (SELECT "signup_year", sum("customers") AS "customers", sum("revenue") AS "revenue" FROM (SELECT date_part('year', "c"."signup_date") AS "signup_year", count(DISTINCT "cs"."customer_id") AS "customers", round(sum("cs"."total_revenue"), 2) AS "revenue" FROM (SELECT "cs"."customer_id", "cs"."total_revenue", "c"."signup_date" FROM "warehouse"."main"."sales" AS "cs" INNER JOIN "warehouse"."main"."customers" AS "c" ON "cs"."customer_id" = "c"."customer_id") GROUP BY date_part('year', "c"."signup_date")) GROUP BY "signup_year") AS "cohort"
Optimized plan =
SubqueryAlias: cohort
  Projection: signup_year, sum(customers) AS customers, sum(revenue) AS revenue
    Aggregate: groupBy=[[signup_year]], aggr=[[sum(customers), sum(revenue)]]
      Projection: date_part(Utf8("year"),c.signup_date) AS signup_year, count(DISTINCT cs.customer_id) AS customers, round(sum(cs.total_revenue), Int32(2)) AS revenue
        Aggregate: groupBy=[[date_part(Utf8("year"), c.signup_date)]], aggr=[[count(DISTINCT cs.customer_id), sum(cs.total_revenue)]]
          Projection: cs.customer_id, cs.total_revenue, c.signup_date
            Inner Join: cs.customer_id = c.customer_id
              SubqueryAlias: cs
                TableScan: warehouse.main.sales projection=[customer_id, total_revenue]
              SubqueryAlias: c
                TableScan: warehouse.main.customers projection=[customer_id, signup_date]
df succeeded

Expected behavior

The plans that datafusion generates seem reasonable. The Unparser should be able to generate valid SQL from an optimized plan. For instance the first output from the reproducer generated this SQL:

SELECT
    *
FROM
    (
        SELECT
            "signup_year",
            sum("customers") AS "customers",
            sum("revenue") AS "revenue"
        FROM
            (
                SELECT
                    "group_alias_0" AS "signup_year",  --- <---------- "group_alias_0" column doesn't exist in the DB
                    count("alias1") AS "customers",  --- <---------- "alias1" column doesn't exist in the DB
                    round(sum("alias2"), 2) AS "revenue" --- <---------- "alias2" column doesn't exist in the DB
                FROM
                    (
                        SELECT
                            "cs"."customer_id",
                            "cs"."total_revenue",
                            "c"."signup_date"
                        FROM
                            "warehouse"."main"."sales" AS "cs"
                            INNER JOIN "warehouse"."main"."customers" AS "c" ON "cs"."customer_id" = "c"."customer_id"
                    )
                GROUP BY
                    "group_alias_0"  --- <---------- "group_alias_0" column doesn't exist in the DB
            )
        GROUP BY
            "signup_year"
    ) AS "cohort"

And the second output (when the SingleDistinctToGroupBy pass was disabled) from the reproducer was this SQL:

SELECT
    *
FROM
    (
        SELECT
            "signup_year",
            sum("customers") AS "customers",
            sum("revenue") AS "revenue"
        FROM
            (
                SELECT
                    date_part('year', "c"."signup_date") AS "signup_year",  --- <------ There is no "c" to reference here
                    count(DISTINCT "cs"."customer_id") AS "customers",
                    round(sum("cs"."total_revenue"), 2) AS "revenue"
                FROM
                    (
                        SELECT
                            "cs"."customer_id",
                            "cs"."total_revenue",
                            "c"."signup_date"
                        FROM
                            "warehouse"."main"."sales" AS "cs"
                            INNER JOIN "warehouse"."main"."customers" AS "c" ON "cs"."customer_id" = "c"."customer_id"
                    )
                GROUP BY
                    date_part('year', "c"."signup_date")
            )
        GROUP BY
            "signup_year"
    ) AS "cohort"

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

Fields

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions