Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
package org.apache.paimon.flink.lineage;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.jdbc.JdbcCatalogFactory;
import org.apache.paimon.jdbc.JdbcCatalogOptions;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.lineage.LineageDataset;
Expand All @@ -39,7 +45,6 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Lineage utilities for building {@link SourceLineageVertex} and {@link LineageVertex} from a
Expand All @@ -57,8 +62,7 @@ public class LineageUtils {
new HashSet<>(
Arrays.asList(CatalogOptions.WAREHOUSE.key(), CatalogOptions.METASTORE.key()));

private static final Set<String> PAIMON_OPTION_KEYS =
CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet());
private static final String DEFAULT_CATALOG_IDENTIFIER = FlinkCatalogFactory.IDENTIFIER;

/** Extracts the {@link CatalogContext} from a table, or null if not available. */
@Nullable
Expand All @@ -73,19 +77,16 @@ private static CatalogContext catalogContext(Table table) {
}

/**
* Builds the config map for a dataset facet. Includes filtered Paimon {@link CoreOptions},
* partition keys, primary keys, and a safe subset of catalog-level options (warehouse,
* metastore) prefixed with {@code "catalog."}.
* Builds the config map for a dataset facet from a {@link Table}. Includes all table options,
* partition keys, and primary keys.
*/
private static Map<String, String> buildConfigMap(
Table table, @Nullable CatalogContext catalogContext) {
Map<String, String> config = new HashMap<>();
config.put("partition-keys", String.join(",", table.partitionKeys()));
config.put("primary-keys", String.join(",", table.primaryKeys()));

table.options().entrySet().stream()
.filter(e -> PAIMON_OPTION_KEYS.contains(e.getKey()))
.forEach(e -> config.put(e.getKey(), e.getValue()));
config.putAll(table.options());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now publishes every table option into the lineage config facet. Unlike the previous CoreOptions whitelist, table.options() can include arbitrary/dynamic options supplied by users, including credential-like keys such as tokens, passwords, keytabs, or filesystem secrets. Since lineage facets may be consumed outside the job, please keep this filtered to known-safe options (or explicitly redact secret-like keys) instead of exposing the full table option map.


config.put("type", "paimon");

Expand Down Expand Up @@ -122,6 +123,25 @@ public static String getNamespace(Table table, @Nullable CatalogContext catalogC
return DEFAULT_NAMESPACE;
}

@VisibleForTesting
static String resolveNameByMetastore(Table table, @Nullable String defaultName) {
CatalogContext ctx = catalogContext(table);
if (ctx != null) {
Options catalogOptions = ctx.options();
// If jdbc metastore is used, use catalog-key as the catalog identifier.
if (JdbcCatalogFactory.IDENTIFIER.equals(
catalogOptions.get(CatalogOptions.METASTORE))) {
String catalogKeyValue = catalogOptions.get(JdbcCatalogOptions.CATALOG_KEY);
if (!StringUtils.isNullOrWhitespaceOnly(catalogKeyValue)) {
return catalogKeyValue + "." + table.fullName();
}
}
}
return defaultName != null
? defaultName
: DEFAULT_CATALOG_IDENTIFIER + "." + table.fullName();
}

/**
* Creates a {@link SourceLineageVertex} for a Paimon source table.
*
Expand All @@ -134,12 +154,26 @@ public static SourceLineageVertex sourceLineageVertex(
CatalogContext ctx = catalogContext(table);
LineageDataset dataset =
new PaimonLineageDataset(
name, getNamespace(table, ctx), buildConfigMap(table, ctx));
resolveNameByMetastore(table, name),
getNamespace(table, ctx),
buildConfigMap(table, ctx),
table.rowType());
Boundedness boundedness =
isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset));
}

/**
* Creates a {@link SourceLineageVertex} for a Paimon DataStream source table. The table name is
* derived from the table's full name, prefixed with the {@code catalog-key} if available.
*
* @param isBounded whether the source is bounded (batch) or unbounded (streaming)
* @param table the Paimon table
*/
public static SourceLineageVertex sourceLineageVertex(boolean isBounded, Table table) {
return sourceLineageVertex(null, isBounded, table);
}

/**
* Creates a {@link LineageVertex} for a Paimon sink table.
*
Expand All @@ -150,7 +184,20 @@ public static LineageVertex sinkLineageVertex(String name, Table table) {
CatalogContext ctx = catalogContext(table);
LineageDataset dataset =
new PaimonLineageDataset(
name, getNamespace(table, ctx), buildConfigMap(table, ctx));
resolveNameByMetastore(table, name),
getNamespace(table, ctx),
buildConfigMap(table, ctx),
table.rowType());
return new PaimonSinkLineageVertex(Collections.singletonList(dataset));
}

/**
* Creates a {@link LineageVertex} for a Paimon DataStream sink table. The table name is derived
* from the table's full name, prefixed with the {@code catalog-key} if available.
*
* @param table the Paimon table
*/
public static LineageVertex sinkLineageVertex(Table table) {
return sinkLineageVertex(null, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

package org.apache.paimon.flink.lineage;

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet;
import org.apache.flink.streaming.api.lineage.DatasetSchemaField;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

/**
Expand All @@ -34,11 +42,21 @@ public class PaimonLineageDataset implements LineageDataset {
private final String name;
private final String namespace;
private final Map<String, String> tableOptions;
@Nullable private final RowType rowType;

public PaimonLineageDataset(String name, String namespace, Map<String, String> tableOptions) {
this(name, namespace, tableOptions, null);
}

public PaimonLineageDataset(
String name,
String namespace,
Map<String, String> tableOptions,
@Nullable RowType rowType) {
this.name = name;
this.namespace = namespace;
this.tableOptions = tableOptions;
this.rowType = rowType;
}

@Override
Expand Down Expand Up @@ -67,6 +85,39 @@ public Map<String, String> config() {
return tableOptions;
}
});
if (rowType != null) {
facets.put(
"schema",
new DatasetSchemaFacet() {
@Override
public String name() {
return "schema";
}

@Override
public Map<String, DatasetSchemaField<String>> fields() {
Map<String, DatasetSchemaField<String>> result = new LinkedHashMap<>();
for (DataField field : rowType.getFields()) {
String fieldName = field.name();
String fieldType = field.type().asSQLString();
result.put(
fieldName,
new DatasetSchemaField<String>() {
@Override
public String name() {
return fieldName;
}

@Override
public String type() {
return fieldType;
}
});
}
return result;
}
});
}
return facets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.lineage.LineageUtils;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.format.FormatTableWrite;
import org.apache.paimon.table.sink.BatchTableCommit;
Expand All @@ -32,6 +33,8 @@
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;

import java.util.List;
Expand All @@ -55,7 +58,7 @@ public DataStreamSink<?> sinkFrom(DataStream<RowData> dataStream) {
return dataStream.sinkTo(new FormatTableSink(table, overwrite, staticPartitions));
}

private static class FormatTableSink implements Sink<RowData> {
private static class FormatTableSink implements Sink<RowData>, LineageVertexProvider {

private final FormatTable table;
private final boolean overwrite;
Expand All @@ -68,6 +71,11 @@ public FormatTableSink(
this.staticPartitions = staticPartitions;
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sinkLineageVertex(table);
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink
* 2.0+.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -240,7 +239,7 @@ public DataStreamSink<?> doCommit(DataStream<Committable> written, String commit
}
configureSlotSharingGroup(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
return committed.sinkTo(new PaimonDiscardingSink<>(table)).name("end").setParallelism(1);
}

public static void configureSlotSharingGroup(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.lineage.LineageUtils;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;

/**
* A {@link DiscardingSink} that implements {@link LineageVertexProvider} so Flink's lineage graph
* discovers the Paimon sink table when using the DataStream API.
*/
public class PaimonDiscardingSink<T> extends DiscardingSink<T> implements LineageVertexProvider {

private static final long serialVersionUID = 1L;

private final FileStoreTable table;

public PaimonDiscardingSink(FileStoreTable table) {
this.table = table;
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sinkLineageVertex(table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private DataStream<RowData> buildAlignedContinuousFileSource() {
private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
DataStreamSource<RowData> dataStream =
env.fromSource(
source,
new PaimonDataStreamSource<>(source, table),
watermarkStrategy == null
? WatermarkStrategy.noWatermarks()
: watermarkStrategy,
Expand Down Expand Up @@ -354,7 +354,8 @@ private DataStream<RowData> buildDedicatedSplitGenSource(boolean isBounded) {
unordered,
outerProject(),
isBounded,
limit);
limit,
table);
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Loading