Skip to content

[lake/hudi] Introduce Hudi LakeCatalog to create table#3395

Open
fhan688 wants to merge 17 commits into
apache:mainfrom
fhan688:Introduce-hudi-LakeCatalog-to-create-table
Open

[lake/hudi] Introduce Hudi LakeCatalog to create table#3395
fhan688 wants to merge 17 commits into
apache:mainfrom
fhan688:Introduce-hudi-LakeCatalog-to-create-table

Conversation

@fhan688
Copy link
Copy Markdown
Contributor

@fhan688 fhan688 commented May 28, 2026

Purpose

Linked issue: #3275

This PR introduces the Hudi LakeCatalog implementation, enabling Fluss to create tables in Hudi data lake storage. This aligns with the existing Paimon and Iceberg lake catalog support, completing the trio of supported lake formats for table creation.

Brief change log

New modules & classes (fluss-lake/fluss-lake-hudi):

  • HudiLakeCatalog: Implements LakeCatalog interface, supporting both HMS (Hive Metastore) and DFS (filesystem) catalog modes. Handles table creation with schema compatibility check for crash-recovery idempotency, automatic database creation, and system column (__bucket, __offset, __timestamp) appending.

  • FlussDataTypeToHudiDataType: Implements DataTypeVisitor to convert Fluss data types to Flink types (Hudi's type system). Handles LocalZonedTimestampType specially: maps to BIGINT under HMS mode, TIMESTAMP_WITH_LOCAL_TIME_ZONE under DFS mode.

  • HudiConversions: Core conversion utility. Converts Fluss TablePath → Hudi ObjectPath, TableDescriptorResolvedSchema / Hudi table properties. Validates HUDI_UNSETTABLE_OPTIONS (6 protected options that Fluss auto-manages), checks system column name conflicts, and handles property prefix rewriting (hudi.xxxxxx, others → fluss.xxx).

  • HudiCatalogUtils: Factory for creating Hudi Catalog instances (HoodieHiveCatalog for HMS, HoodieCatalog for DFS). Uses copied Configuration to avoid mutating the original.

Modifications to existing modules (fluss-flink/fluss-flink-common):

  • LakeFlinkCatalog: Adds HUDI branch in getLakeCatalog() with a new HudiCatalogFactory inner class that uses reflection to instantiate Hudi catalog (mirroring the Iceberg pattern to avoid compile-time dependency on hudi-flink-bundle).

  • LakeTableFactory: Adds HUDI branch in getLakeTableFactory() with getHudiFactory() that reflectively loads HoodieTableFactory.

  • HudiLakeStorage: Replaces the UnsupportedOperationException in createLakeCatalog() with new HudiLakeCatalog(hudiConfig) to wire the SPI path.

Key design decisions:

Aspect Decision
Table type mapping PK table → MERGE_ON_READ, Log table → COPY_ON_WRITE
Index strategy BUCKET index type, aligned with Fluss's bucketing model
Dependency isolation Hudi bundle loaded via reflection/plugin classloader (no compile-time dependency in fluss-flink-common)
Catalog mode Supports hms (Hive Metastore) and dfs (filesystem)
Property rewriting hudi. prefix stripped; non-hudi properties prefixed with fluss.
Idempotency Schema-compatible duplicate creation is treated as success for crash recovery

Tests

HudiLakeCatalogTest (14 test cases):

  • testPropertyPrefixRewriting — verifies hudi.xxx → xxx and non-hudi → fluss.xxx prefix rewriting

  • testCreatePrimaryKeyTable — PK table (MOR) creation with system columns and primary key

  • testCreateLogTable — Log table (COW) creation with record key from customProperties

  • testIsHudiSchemaCompatibleWithSameSchema — compatible schemas return true

  • testIsHudiSchemaCompatibleWithDifferentColumnCount — different column count returns false

  • testIsHudiSchemaCompatibleWithDifferentColumnName — different column name returns false

  • testIsHudiSchemaCompatibleWithDifferentColumnType — different column type returns false

  • testCreateDuplicateTableWithCompatibleSchema — duplicate creation with compatible schema is idempotent

  • testCreateDuplicateTableWithIncompatibleSchema — duplicate creation with incompatible schema throws TableAlreadyExistException

  • testUnsettableOptionInPropertiesThrowsException — protected option in properties throws InvalidConfigException

  • testUnsettableOptionInCustomPropertiesThrowsException — protected option in customProperties throws InvalidConfigException

  • testNonProtectedHudiOptionPassesValidation — non-protected option passes validation

  • testSystemColumnBucketConflictThrowsException__bucket conflict throws InvalidTableException

  • testSystemColumnOffsetConflictThrowsException__offset conflict throws InvalidTableException

  • testSystemColumnTimestampConflictThrowsException__timestamp conflict throws InvalidTableException

API and Format

No API or storage format changes. This PR only adds new implementations for the existing LakeCatalog and LakeStorage SPI interfaces.

Documentation

A new feature — Hudi lake catalog support for table creation. Will need documentation updates for the Hudi integration guide.

@fhan688
Copy link
Copy Markdown
Contributor Author

fhan688 commented May 28, 2026

all tests are passed, please help review, thanks! @XuQianJin-Stars

// Create table in Hudi catalog
try {
createTable(objectPath, catalogTable, context.isCreatingFlussTable());
} catch (DatabaseNotExistException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner createTable throws org.apache.fluss.exception.TableAlreadyExistException (unchecked) on schema incompatibility, which is fine. The outer method signature declares throws org.apache.fluss.exception.TableAlreadyExistException — looks like it intends to comply with the LakeCatalog.createTable contract — but the exception is not actually thrown as checked via the throws clause because it's a RuntimeException subclass. OK, that's not really a problem. However:

The real bug is in the test testCreateDuplicateTableWithIncompatibleSchema: based on the implementation of isHudiSchemaCompatible(existingTable, catalogTable) (with the ResolvedCatalogBaseTable vs Schema.UnresolvedColumn branches), the catalogTable passed into hudiCatalog.createTable() is a CatalogTable.of(...) (unresolved), while hudiCatalog.getTable() may return a ResolvedCatalogBaseTable. extractColumns will go through two different code paths (one uses column.getDataType() returning DataType, the other uses UnresolvedColumn returning AbstractDataType<?>). Comparison between AbstractDataType and DataType via equals is not necessarily symmetric — even with identical column names/types it may be judged as "incompatible", and conversely different schemas may fail comparison because the AbstractDataType is not fully resolved.

Suggestion: First normalize both existingTable and expectedTable into the same representation (recommended: compare using org.apache.flink.table.types.logical.LogicalType, or convert via getUnresolvedSchema() to Schema then compare on column name + AbstractDataType). The dataType comparison should use the LogicalType.equals dimension (stripping the conversion class). Paimon's approach with isPaimonSchemaCompatible — directly comparing name+type+nullable on Paimon Schema fields — is more robust.

}
// if creating a new fluss table, we should ensure the lake table is empty
// TODO: add emptiness check for Hudi table once LakeTieringFactory is implemented
if (isCreatingFlussTable) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compare with PaimonLakeCatalog.createTable (line 180) which directly calls checkTableIsEmpty(...). Without the emptiness check, if a user attaches a new Fluss table to a Hudi table that already contains data, data corruption/mixing will occur. Even though TieringFactory is not yet ready, at the very least the semantics should be documented (in a comment or in the thrown exception), and consider throwing TableAlreadyExistException here as a safety fallback (safer), then loosening it once TieringFactory is ready. At minimum, do not silently warn and return success.

createTable(objectPath, catalogTable, context.isCreatingFlussTable());
} catch (DatabaseNotExistException t) {
// shouldn't happen in normal cases
throw new RuntimeException(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws a raw RuntimeException without passing t as the cause, losing the stack trace. Please change to new RuntimeException(msg, t). Similarly at line 144, the inner throw new RuntimeException(...) (schema fallback) also drops the cause (tableNotExistException) — please add it.

public static final String FILE_SYSTEM_TYPE = "dfs";

public static Catalog createHudiCatalog(Configuration configuration) {
Map<String, String> hudiProps = configuration.toMap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createHudiCatalog uses the custom MODE_CONFIG = "mode" read in HudiLakeCatalog, while buildHudiCatalog internally uses CatalogOptions.MODE.key() (also "mode") — same string by coincidence, but maintaining two sets of constants invites future drift. Standardize on CatalogOptions.MODE.key() as the authoritative key and drop the MODE_CONFIG constant to avoid long-term maintenance drift. Same for CATALOG_NAME_CONFIG = "name" — recommend using a public Hudi constant.

/** Convert from Fluss's data type to Hudi's internal data type. */
public class FlussDataTypeToHudiDataType implements DataTypeVisitor<DataType> {

private String catalogMode;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The catalogMode field is not declared final, yet the class exposes a public constructor and provides two singleton INSTANCEs. If any caller obtains DFS_INSTANCE and accidentally invokes a future mutator, concurrency issues arise. Please change the field to private final, and either remove the public constructor or change it to private (external code uses the two INSTANCEs uniformly).
  • The class-level Javadoc lacks the @ThreadSafe annotation; per spec (AGENTS.md §5), multiple threads will share the same INSTANCE, so this should be explicitly marked.

return recordKeyField;
}

private static void validateHudiOptions(Map<String, String> properties, boolean isPkTable) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic: non-PK tables allow users to set RECORD_KEY_FIELD (combined with getRecordKeyField); PK tables forbid setting it. However, the test testUnsettableOptionInCustomPropertiesThrowsException expects a PK table passing hudi.hoodie.datasource.write.recordkey.field to fail; this rule indeed works at runtime. But the test at line 152 testCreateLogTable sets the same key on a non-PK table and expects success — that looks OK. Readability issue: the two validation scenarios are easy to confuse — suggest adding a Javadoc on validateHudiOptions explaining that RECORD_KEY_FIELD is a legal usage for non-PK tables (i.e., "log table = use the user-specified primary key field as the index field").

More importantly, after the function lets RECORD_KEY_FIELD pass, it doesn't validate that "non-PK tables must provide RECORD_KEY_FIELD"; that's done in buildHudiTableProperties via IllegalArgumentException("Record key field should be set."). IllegalArgumentException does not belong to the Fluss InvalidConfigException family (Section 4); the user sees a raw IAE. This should be changed to InvalidConfigException with helpful hints including tablePath/the specific key for easier troubleshooting.

return columns;
}

private static AbstractDataType<?> getDataType(Schema.UnresolvedColumn column) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDataType does not handle UnresolvedComputedColumn and silently returns null, Schema.UnresolvedComputedColumn has no datatype; returning null causes ColumnSignature.equals to mistakenly consider different computed columns equal. Hudi tables shouldn't have computed columns in theory, but a safer approach is to throw IllegalStateException("Unexpected column kind: " + column.getClass()).


// Add regular columns
for (org.apache.fluss.metadata.Schema.Column column :
tableDescriptor.getSchema().getColumns()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SYSTEM_COLUMNS (defined at HudiLakeCatalog.java L66–72) adds __bucket / __offset / __timestamp, but Hudi itself reserves _hoodie_commit_time / _hoodie_record_key / _hoodie_partition_path, etc. In HudiConversions.createHudiCatalogTable (L105–119), only collisions against Fluss system columns are checked before writing fluss columns + system columns into the schema. Suggest adding a rejection rule for the _hoodie_ prefix (or a "reserved column" set parallel to HUDI_UNSETTABLE_OPTIONS) with corresponding tests.

@fhan688 fhan688 closed this Jun 1, 2026
@fhan688 fhan688 reopened this Jun 1, 2026
@fhan688 fhan688 closed this Jun 1, 2026
@fhan688 fhan688 reopened this Jun 1, 2026
@luoyuxia luoyuxia requested a review from Copilot June 1, 2026 08:25
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces Hudi lake catalog support so Fluss can create Hudi-backed lake tables (paralleling existing Paimon/Iceberg integrations), including schema conversion, option rewriting/validation, and Flink-side reflective loading to keep fluss-flink-common free of a compile-time Hudi dependency.

Changes:

  • Add new fluss-lake-hudi module implementing LakeCatalog/LakeStorage for Hudi table creation (schema + properties conversions, HMS/DFS modes, idempotent create).
  • Extend Flink integration to support HUDI in LakeFlinkCatalog (reflective catalog creation) and LakeTableFactory (reflective table factory loading).
  • Add unit tests for Hudi catalog behavior (properties rewriting, schema compatibility, idempotency, protected options, system-column conflicts).

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
fluss-lake/pom.xml Adds Flink table API dependency management needed by the new Hudi module.
fluss-lake/fluss-lake-hudi/pom.xml Introduces/adjusts Hudi + Flink/Hadoop dependencies and test utilities for the new module.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java New Hudi LakeCatalog implementation (create table, schema compatibility for idempotency, DB creation).
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java Wires LakeStorage#createLakeCatalog() to return the new HudiLakeCatalog.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java Converts Fluss types to Flink/Hudi types, with mode-specific handling for LTZ timestamps.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java Converts Fluss table descriptors to Hudi/Flink schema + options; validates protected Hudi options; rewrites property prefixes.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java Builds Hudi Catalog instances for HMS/DFS modes and helpers to create catalog table wrappers.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java New unit test suite covering prefix rewriting, table creation, schema compatibility, idempotency, and validation.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java Adds HUDI branch + reflection-based HudiCatalogFactory to load Hudi from plugin classloader.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java Adds HUDI branch and reflective HoodieTableFactory instantiation for table sources.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +108 to +112
try {
Class<?> hudiFactoryClass = Class.forName("org.apache.hudi.table.HoodieTableFactory");
return (DynamicTableSourceFactory)
hudiFactoryClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
Comment on lines +70 to +74
LOG.info(
"create hudi catalog: {}, mode: {}, configuration: {}",
catalogName,
catalogMode,
configuration);
List<String> partitionKeys = tableDescriptor.getPartitionKeys();
Map<String, String> options =
buildHudiTableProperties(tablePath, tableDescriptor, isPkTable);
LOG.info("Hudi table properties: {}", options);
fhan688 and others added 4 commits June 1, 2026 18:49
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
… options log to debug;[flink] load Hudi table factory with context classloader
…reate-table' into Introduce-hudi-LakeCatalog-to-create-table
@fhan688 fhan688 closed this Jun 1, 2026
@fhan688 fhan688 reopened this Jun 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants