diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index 86e2f775e769..b7356b45955b 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -142,11 +142,11 @@ def _load_data_table(self, identifier: Identifier) -> FileStoreTable: table_schema = self.get_table_schema(identifier) # Create catalog environment for filesystem catalog - # Filesystem catalog doesn't support version management by default + from pypaimon.catalog.filesystem_catalog_loader import FileSystemCatalogLoader catalog_environment = CatalogEnvironment( identifier=identifier, - uuid=None, # Filesystem catalog doesn't track table UUIDs - catalog_loader=None, # No catalog loader for filesystem + uuid=None, + catalog_loader=FileSystemCatalogLoader(self.catalog_context), supports_version_management=False ) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index b2f0e019166a..7002b5317e75 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -241,6 +241,16 @@ class CoreOptions: ) ) + BLOB_VIEW_FIELD: ConfigOption[str] = ( + ConfigOptions.key("blob-view-field") + .string_type() + .no_default_value() + .with_description( + "Comma-separated BLOB field names that should be stored as serialized BlobViewStruct bytes " + "inline in normal data files and resolved from upstream tables at read time." + ) + ) + TARGET_FILE_SIZE: ConfigOption[MemorySize] = ( ConfigOptions.key("target-file-size") .memory_type() @@ -661,6 +671,20 @@ def variant_shredding_schema(self) -> Optional[str]: def blob_descriptor_fields(self, default=None): value = self.options.get(CoreOptions.BLOB_DESCRIPTOR_FIELD, default) + return CoreOptions._parse_field_set(value) + + def blob_view_fields(self, default=None): + value = self.options.get(CoreOptions.BLOB_VIEW_FIELD, default) + return CoreOptions._parse_field_set(value) + + def blob_inline_fields(self, default=None): + fields = set() + fields.update(self.blob_descriptor_fields(default)) + fields.update(self.blob_view_fields(default)) + return fields + + @staticmethod + def _parse_field_set(value): if value is None: return set() if isinstance(value, str): diff --git a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py index 35fe046a03ce..456ae35248d9 100644 --- a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py +++ b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py @@ -15,68 +15,192 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import Callable, Optional, Set +import pyarrow from pyarrow import RecordBatch from pypaimon.common.options.core_options import CoreOptions from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.table.row.blob import Blob, BlobViewStruct class BlobDescriptorConvertReader(RecordBatchReader): - def __init__(self, inner: RecordBatchReader, table): + """Resolves BlobView and BlobDescriptor fields in record batches. + + Processing is split into two clear stages: + Stage 1 (BlobView resolution): If view fields exist, use a lightweight + prescan reader (only projecting view columns) to collect + BlobViewStructs, bulk-preload their descriptors, then read + full data from the main reader and replace view field values + with the corresponding BlobDescriptor serialized bytes. + Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option. + If false, resolve all BlobDescriptor bytes (from both descriptor + fields and view fields) into real blob data bytes. + If true, return as-is. + """ + + def __init__(self, inner: RecordBatchReader, table, + prescan_reader_factory: Optional[Callable[[Set[str]], RecordBatchReader]] = None): + """ + Args: + inner: The main data reader (reads all columns). + table: The table instance. + prescan_reader_factory: Optional factory that creates a lightweight + reader projecting only the specified field names. Used for + prescan to collect BlobViewStructs without reading all columns. + Signature: (field_names: Set[str]) -> RecordBatchReader + """ self._inner = inner self._table = table + self._prescan_reader_factory = prescan_reader_factory self._descriptor_fields = CoreOptions.blob_descriptor_fields(table.options) self.file_io = inner.file_io self.blob_field_indices = inner.blob_field_indices + self._view_fields = CoreOptions.blob_view_fields(table.options) + self._descriptor_fields = CoreOptions.blob_descriptor_fields(table.options) + self._blob_as_descriptor = CoreOptions.blob_as_descriptor(table.options) + self._prescan_done = False + self._blob_view_lookup = None def read_arrow_batch(self) -> Optional[RecordBatch]: - import pyarrow + # Ensure prescan is done before reading (only needed for view fields) + if self._view_fields and not self._prescan_done: + self._prescan_view_structs() + batch = self._inner.read_arrow_batch() if batch is None: return None - return self._convert_batch(batch, pyarrow) + # Resolve view fields using the preloaded lookup + if self._view_fields and self._blob_view_lookup is not None: + batch = self._resolve_view_fields(batch, self._blob_view_lookup) + # Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false) + return self._resolve_blob_data(batch) + + # ------------------------------------------------------------------ + # Stage 1: BlobView prescan (lightweight, only reads view columns) + # ------------------------------------------------------------------ + + def _prescan_view_structs(self): + """Use a lightweight prescan reader (projecting only view columns) to + collect all BlobViewStructs and bulk-preload their descriptors.""" + from pypaimon.table.row.blob import BlobViewStruct + from pypaimon.utils.blob_view_lookup import BlobViewLookup + + self._prescan_done = True + all_view_structs = [] + + prescan_reader = self._prescan_reader_factory(self._view_fields) + try: + while True: + batch = prescan_reader.read_arrow_batch() + if batch is None: + break + for field_name in self._view_fields: + if field_name not in batch.schema.names: + continue + for value in batch.column(field_name).to_pylist(): + value = self._normalize_blob_to_bytes(value) + if isinstance(value, bytes) and BlobViewStruct.is_blob_view_struct(value): + all_view_structs.append(BlobViewStruct.deserialize(value)) + finally: + prescan_reader.close() - def _convert_batch(self, batch, pyarrow): - from pypaimon.table.row.blob import Blob, BlobDescriptor + # Bulk-preload BlobViewStruct -> BlobDescriptor mapping + if all_view_structs: + self._blob_view_lookup = BlobViewLookup(self._table) + self._blob_view_lookup.preload(all_view_structs) - result = batch - for field_name in self._descriptor_fields: - if field_name not in result.schema.names: + def _resolve_view_fields(self, batch, blob_view_lookup): + """Replace BlobViewStruct bytes in view fields with the corresponding + BlobDescriptor serialized bytes.""" + for field_name in self._view_fields: + if field_name not in batch.schema.names: continue - values = result.column(field_name).to_pylist() + values = [self._normalize_blob_to_bytes(v) for v in batch.column(field_name).to_pylist()] converted_values = [] for value in values: if value is None: converted_values.append(None) continue - if hasattr(value, 'as_py'): - value = value.as_py() - if isinstance(value, str): - value = value.encode('utf-8') - if isinstance(value, bytearray): - value = bytes(value) if not isinstance(value, bytes): converted_values.append(value) continue - try: - descriptor = BlobDescriptor.deserialize(value) - if descriptor.serialize() != value: - converted_values.append(value) - continue - uri_reader = self._table.file_io.uri_reader_factory.create(descriptor.uri) - converted_values.append(Blob.from_descriptor(uri_reader, descriptor).to_data()) - except Exception: + if not BlobViewStruct.is_blob_view_struct(value): converted_values.append(value) + continue + view_struct = BlobViewStruct.deserialize(value) + descriptor = blob_view_lookup.resolve_descriptor(view_struct) + converted_values.append(descriptor.serialize()) - column_idx = result.schema.names.index(field_name) - result = result.set_column( + column_idx = batch.schema.names.index(field_name) + batch = batch.set_column( column_idx, pyarrow.field(field_name, pyarrow.large_binary(), nullable=True), pyarrow.array(converted_values, type=pyarrow.large_binary()), ) - return result + return batch + + # ------------------------------------------------------------------ + # Stage 2: BlobData resolution (unified exit) + # ------------------------------------------------------------------ + + def _resolve_blob_data(self, batch): + if self._blob_as_descriptor: + return batch + + all_inline_blob_fields = self._descriptor_fields | self._view_fields + for field_name in all_inline_blob_fields: + if field_name not in batch.schema.names: + continue + values = [self._normalize_blob_to_bytes(v) for v in batch.column(field_name).to_pylist()] + converted_values = [] + for value in values: + blob = Blob.from_bytes(value, self._table.file_io) + converted_values.append(blob.to_data() if blob else None) + + column_idx = batch.schema.names.index(field_name) + batch = batch.set_column( + column_idx, + pyarrow.field(field_name, pyarrow.large_binary(), nullable=True), + pyarrow.array(converted_values, type=pyarrow.large_binary()), + ) + return batch + + # ------------------------------------------------------------------ + # Utilities + # ------------------------------------------------------------------ + + @staticmethod + def _normalize_blob_to_bytes(value): + if value is None: + return None + if hasattr(value, 'as_py'): + value = value.as_py() + if isinstance(value, str): + value = value.encode('utf-8') + if isinstance(value, bytearray): + value = bytes(value) + return value def close(self): self._inner.close() + + +class _CachedBatchReader(RecordBatchReader): + """A simple reader that replays pre-cached RecordBatches. + Used as fallback when no prescan_reader_factory is provided.""" + + def __init__(self, batches): + self._batches = batches + self._index = 0 + + def read_arrow_batch(self) -> Optional[RecordBatch]: + if self._index >= len(self._batches): + return None + batch = self._batches[self._index] + self._index += 1 + return batch + + def close(self): + self._batches = None diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index e432eca6b122..ce609b05fed9 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -729,6 +729,19 @@ def _push_down_predicate(self) -> Optional[Predicate]: return None def create_reader(self) -> RecordReader: + reader = self._create_raw_reader() + + if (CoreOptions.blob_view_fields(self.table.options) + or (not CoreOptions.blob_as_descriptor(self.table.options) + and CoreOptions.blob_descriptor_fields(self.table.options))): + reader = BlobDescriptorConvertReader( + reader, self.table, + prescan_reader_factory=lambda names: self._create_prescan_reader(names)) + + return reader + + def _create_raw_reader(self) -> RecordReader: + """Core read logic: split_by_row_id -> suppliers -> ConcatBatchReader -> filter.""" files = self.split.files suppliers = [] @@ -760,12 +773,27 @@ def create_reader(self) -> RecordReader: else: reader = merge_reader - if (not CoreOptions.blob_as_descriptor(self.table.options) - and CoreOptions.blob_descriptor_fields(self.table.options)): - reader = BlobDescriptorConvertReader(reader, self.table) - return reader + def _create_prescan_reader(self, field_names): + """Create a prescan reader by constructing a new DataEvolutionSplitRead + instance that only projects the specified field names.""" + from pypaimon.read.reader.iface.record_batch_reader import EmptyRecordBatchReader + + prescan_fields = [f for f in self.read_fields if f.name in field_names] + if not prescan_fields: + return EmptyRecordBatchReader() + + prescan_read = DataEvolutionSplitRead( + table=self.table, + predicate=self.predicate, + read_type=prescan_fields, + split=self.split, + row_tracking_enabled=False, + ) + prescan_read.row_ranges = self.row_ranges + return prescan_read._create_raw_reader() + def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: """Split files by firstRowId for data evolution.""" diff --git a/paimon-python/pypaimon/schema/schema.py b/paimon-python/pypaimon/schema/schema.py index 912966732660..e758fc262512 100644 --- a/paimon-python/pypaimon/schema/schema.py +++ b/paimon-python/pypaimon/schema/schema.py @@ -62,39 +62,8 @@ def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: Optional[List[str] if field.name in pk_set: field.type.nullable = False - # Check if Blob type exists in the schema - blob_names = [ - field.name for field in fields - if 'blob' in str(field.type).lower() - ] - - if blob_names: - if options is None: - options = {} - - if len(fields) <= len(blob_names): - raise ValueError( - "Table with BLOB type column must have other normal columns." - ) - - required_options = { - CoreOptions.ROW_TRACKING_ENABLED.key(): 'true', - CoreOptions.DATA_EVOLUTION_ENABLED.key(): 'true' - } - - missing_options = [] - for key, expected_value in required_options.items(): - if key not in options or options[key] != expected_value: - missing_options.append(f"{key}='{expected_value}'") - - if missing_options: - raise ValueError( - f"Schema contains Blob type but is missing required options: {', '.join(missing_options)}. " - f"Please add these options to the schema." - ) - - if primary_keys is not None: - raise ValueError("Blob type is not supported with primary key.") + # Validate Blob type fields in the schema + Schema._validate_blob_fields(fields, options, primary_keys) # Check if Vector type with dedicated file format vector_names = [ @@ -133,3 +102,61 @@ def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: Optional[List[str] ) return Schema(fields, partition_keys, primary_keys, options, comment) + + @staticmethod + def _validate_blob_fields(fields, options, primary_keys): + """Validate blob field configurations in the schema.""" + blob_names = [ + field.name for field in fields + if 'blob' in str(field.type).lower() + ] + + if not blob_names: + return + + if options is None: + options = {} + + if len(fields) <= len(blob_names): + raise ValueError( + "Table with BLOB type column must have other normal columns." + ) + + blob_field_names = { + field.name for field in fields if 'blob' in str(field.type).lower() + } + core_options = CoreOptions.from_dict(options) + descriptor_fields = core_options.blob_descriptor_fields() + view_fields = core_options.blob_view_fields() + unknown_inline_fields = descriptor_fields.union(view_fields).difference(blob_field_names) + if unknown_inline_fields: + raise ValueError( + "Fields in 'blob-descriptor-field' or 'blob-view-field' must be blob fields " + "in schema. Unknown fields: {}".format(sorted(unknown_inline_fields)) + ) + + overlapping_inline_fields = descriptor_fields.intersection(view_fields) + if overlapping_inline_fields: + raise ValueError( + "Fields in 'blob-descriptor-field' and 'blob-view-field' must not overlap. " + "Overlapping fields: {}".format(sorted(overlapping_inline_fields)) + ) + + required_options = { + CoreOptions.ROW_TRACKING_ENABLED.key(): 'true', + CoreOptions.DATA_EVOLUTION_ENABLED.key(): 'true' + } + + missing_options = [] + for key, expected_value in required_options.items(): + if key not in options or options[key] != expected_value: + missing_options.append(f"{key}='{expected_value}'") + + if missing_options: + raise ValueError( + f"Schema contains Blob type but is missing required options: {', '.join(missing_options)}. " + f"Please add these options to the schema." + ) + + if primary_keys is not None: + raise ValueError("Blob type is not supported with primary key.") diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index 43391775bd8d..78f9df47c8d8 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -21,6 +21,7 @@ from typing import BinaryIO, Optional, Union from urllib.parse import urlparse +from pypaimon.common.identifier import Identifier from pypaimon.common.uri_reader import UriReader, FileUriReader @@ -162,6 +163,115 @@ def __repr__(self) -> str: return self.__str__() +class BlobViewStruct: + CURRENT_VERSION = 1 + MAGIC = 0x424C4F4256494557 # "BLOBVIEW" + + def __init__(self, identifier: Union[Identifier, str], field_id: int, row_id: int): + if isinstance(identifier, str): + identifier = Identifier.from_string(identifier) + if not isinstance(identifier, Identifier): + raise TypeError("BlobViewStruct identifier must be Identifier or str.") + self._identifier = identifier + self._field_id = field_id + self._row_id = row_id + + @property + def identifier(self) -> Identifier: + return self._identifier + + @property + def field_id(self) -> int: + return self._field_id + + @property + def row_id(self) -> int: + return self._row_id + + def serialize(self) -> bytes: + identifier_bytes = self._identifier.get_full_name().encode('utf-8') + data = struct.pack(' 'BlobViewStruct': + if len(data) < 25: + raise ValueError("Invalid BlobViewStruct data: too short") + + offset = 0 + version = struct.unpack(' len(data): + raise ValueError("Invalid BlobViewStruct data: identifier length exceeds data size") + + identifier = data[offset:offset + identifier_length].decode('utf-8') + offset += identifier_length + field_id = struct.unpack(' bool: + if not isinstance(data, (bytes, bytearray)): + return False + raw = bytes(data) + if len(raw) < 9: + return False + version = raw[0] + if version != cls.CURRENT_VERSION: + return False + try: + magic = struct.unpack(' bool: + if not isinstance(other, BlobViewStruct): + return False + return (self._identifier == other._identifier + and self._field_id == other._field_id + and self._row_id == other._row_id) + + def __hash__(self) -> int: + return hash((self._identifier.get_full_name(), self._field_id, self._row_id)) + + def __str__(self) -> str: + return ( + f"BlobViewStruct(identifier={self._identifier.get_full_name()}, " + f"field_id={self._field_id}, row_id={self._row_id})" + ) + + def __repr__(self) -> str: + return self.__str__() + + class OffsetInputStream(io.RawIOBase): def __init__(self, wrapped, offset: int, length: int): @@ -382,3 +492,42 @@ def __eq__(self, other) -> bool: def __hash__(self) -> int: return hash(self._descriptor) + + +class BlobView(Blob): + + def __init__(self, view_struct: BlobViewStruct): + self._view_struct: BlobViewStruct = view_struct + self._resolved_blob: Optional[BlobRef] = None + + @property + def view_struct(self) -> BlobViewStruct: + return self._view_struct + + def is_resolved(self) -> bool: + return self._resolved_blob is not None + + def resolve(self, uri_reader: UriReader, descriptor: BlobDescriptor): + self._resolved_blob = BlobRef(uri_reader, descriptor) + + def to_data(self) -> bytes: + return self._resolved().to_data() + + def to_descriptor(self) -> BlobDescriptor: + return self._resolved().to_descriptor() + + def new_input_stream(self) -> BinaryIO: + return self._resolved().new_input_stream() + + def _resolved(self) -> BlobRef: + if self._resolved_blob is None: + raise RuntimeError("BlobView is not resolved.") + return self._resolved_blob + + def __eq__(self, other) -> bool: + if not isinstance(other, BlobView): + return False + return self._view_struct == other._view_struct + + def __hash__(self) -> int: + return hash(self._view_struct) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index c4e5a4d1bd3f..ee907d548259 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -1315,6 +1315,148 @@ def test_blob_descriptor_fields_mixed_mode(self): self.assertEqual(result.column('pic1').to_pylist()[0], pic1_data) self.assertEqual(result.column('pic2').to_pylist()[0], pic2_data) + def test_blob_view_fields_resolve_upstream_blob(self): + from pypaimon import Schema + from pypaimon.common.options.core_options import CoreOptions + from pypaimon.table.row.blob import BlobViewStruct + + source_schema = pa.schema([ + ('id', pa.int32()), + ('picture', pa.large_binary()), + ]) + source = Schema.from_pyarrow_schema( + source_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + } + ) + self.catalog.create_table('test_db.blob_view_source', source, False) + source_table = self.catalog.get_table('test_db.blob_view_source') + payloads = [b'view-source-0', b'view-source-1'] + + write_builder = source_table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(pa.Table.from_pydict({ + 'id': [1, 2], + 'picture': payloads, + }, schema=source_schema)) + commit_messages = writer.prepare_commit() + write_builder.new_commit().commit(commit_messages) + writer.close() + + picture_field_id = next( + field.id for field in source_table.table_schema.fields if field.name == 'picture' + ) + view_values = [ + BlobViewStruct('test_db.blob_view_source', picture_field_id, 0).serialize(), + BlobViewStruct('test_db.blob_view_source', picture_field_id, 1).serialize(), + ] + + target_schema = pa.schema([ + ('id', pa.int32()), + ('picture', pa.large_binary()), + ]) + target = Schema.from_pyarrow_schema( + target_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'blob-view-field': 'picture', + } + ) + self.catalog.create_table('test_db.blob_view_target', target, False) + target_table = self.catalog.get_table('test_db.blob_view_target') + + target_write_builder = target_table.new_batch_write_builder() + target_writer = target_write_builder.new_write() + target_writer.write_arrow(pa.Table.from_pydict({ + 'id': [10, 11], + 'picture': view_values, + }, schema=target_schema)) + target_commit_messages = target_writer.prepare_commit() + target_write_builder.new_commit().commit(target_commit_messages) + target_writer.close() + + all_target_files = [f for msg in target_commit_messages for f in msg.new_files] + self.assertFalse( + any(f.file_name.endswith('.blob') for f in all_target_files), + "Blob view fields should be stored inline without writing new blob files", + ) + + result = target_table.new_read_builder().new_read().to_arrow( + target_table.new_read_builder().new_scan().plan().splits() + ).sort_by('id') + self.assertEqual(result.column('picture').to_pylist(), payloads) + + descriptor_table = target_table.copy({CoreOptions.BLOB_AS_DESCRIPTOR.key(): 'true'}) + descriptor_result = descriptor_table.new_read_builder().new_read().to_arrow( + descriptor_table.new_read_builder().new_scan().plan().splits() + ).sort_by('id') + # With blob-as-descriptor=true, view fields return BlobDescriptor bytes + from pypaimon.table.row.blob import BlobDescriptor + for value in descriptor_result.column('picture').to_pylist(): + self.assertTrue( + BlobDescriptor.is_blob_descriptor(value), + "Expected BlobDescriptor bytes when blob-as-descriptor=true" + ) + + def test_blob_view_fields_rejects_non_view_input(self): + from pypaimon import Schema + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('picture', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'blob-view-field': 'picture', + } + ) + self.catalog.create_table('test_db.blob_view_reject_test', schema, False) + table = self.catalog.get_table('test_db.blob_view_reject_test') + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + bad_data = pa.Table.from_pydict({ + 'id': [1], + 'picture': [b'not-a-view-struct'], + }, schema=pa_schema) + + with self.assertRaises(ValueError) as context: + writer.write_arrow(bad_data) + self.assertIn("blob-view-field", str(context.exception)) + + def test_blob_inline_fields_reject_overlap_and_unknown_fields(self): + from pypaimon import Schema + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('picture', pa.large_binary()), + ]) + base_options = { + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + } + + overlap_options = dict(base_options) + overlap_options.update({ + 'blob-descriptor-field': 'picture', + 'blob-view-field': 'picture', + }) + with self.assertRaises(ValueError) as overlap_context: + Schema.from_pyarrow_schema(pa_schema, options=overlap_options) + self.assertIn("must not overlap", str(overlap_context.exception)) + + unknown_options = dict(base_options) + unknown_options.update({'blob-view-field': 'missing_picture'}) + with self.assertRaises(ValueError) as unknown_context: + Schema.from_pyarrow_schema(pa_schema, options=unknown_options) + self.assertIn("must be blob fields", str(unknown_context.exception)) + def test_to_arrow_batch_reader(self): import random from pypaimon import Schema diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index e6b856432b50..47180a60d58c 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -31,7 +31,7 @@ from pypaimon.common.options import Options from pypaimon.read.reader.format_blob_reader import BlobRecordIterator, FormatBlobReader from pypaimon.schema.data_types import AtomicType, DataField -from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor +from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor, BlobViewStruct from pypaimon.table.row.generic_row import GenericRowDeserializer, GenericRowSerializer, GenericRow from pypaimon.table.row.row_kind import RowKind @@ -166,6 +166,20 @@ def test_from_bytes_invalid_type_raises(self): with self.assertRaises(TypeError): Blob.from_bytes(12345) + def test_blob_view_struct_roundtrip(self): + """Test BlobViewStruct serialization compatibility.""" + view_struct = BlobViewStruct("test_db.source_table", 7, 42) + serialized = view_struct.serialize() + + self.assertTrue(BlobViewStruct.is_blob_view_struct(serialized)) + self.assertFalse(BlobDescriptor.is_blob_descriptor(serialized)) + + restored = BlobViewStruct.deserialize(serialized) + self.assertEqual(restored, view_struct) + self.assertEqual(restored.identifier.get_full_name(), "test_db.source_table") + self.assertEqual(restored.field_id, 7) + self.assertEqual(restored.row_id, 42) + def test_blob_data_interface_compliance(self): """Test that BlobData properly implements Blob interface.""" test_data = b"interface test data" diff --git a/paimon-python/pypaimon/utils/blob_view_lookup.py b/paimon-python/pypaimon/utils/blob_view_lookup.py new file mode 100644 index 000000000000..264652183d54 --- /dev/null +++ b/paimon-python/pypaimon/utils/blob_view_lookup.py @@ -0,0 +1,263 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Dict, List, Tuple + +from pypaimon.common.identifier import Identifier +from pypaimon.common.options.core_options import CoreOptions +from pypaimon.common.uri_reader import UriReader +from pypaimon.table.row.blob import BlobDescriptor, BlobViewStruct +from pypaimon.table.special_fields import SpecialFields +from pypaimon.utils.range import Range + +_PRELOAD_THREAD_NUM = 100 +_MIN_ROWS_PER_TASK = 100 + + +class TableReferences: + """Groups BlobViewStruct references by upstream table.""" + + def __init__(self, identifier: Identifier): + self.identifier: Identifier = identifier + self.references_by_field: Dict[int, List[BlobViewStruct]] = {} + self.row_ids: List[int] = [] + + def add(self, view_struct: BlobViewStruct) -> None: + self.references_by_field.setdefault(view_struct.field_id, []).append(view_struct) + self.row_ids.append(int(view_struct.row_id)) + + +class TableReadPlan: + """A plan for reading blob descriptors from one upstream table.""" + + def __init__(self, identifier: Identifier, upstream_table, + fields: List, row_ranges: List[Range]): + self.identifier: Identifier = identifier + self.upstream_table = upstream_table + self.fields: List = fields + self.row_ranges: List[Range] = row_ranges + + +class BlobViewLookup: + """Resolve BlobViewStruct references by reading upstream blob descriptors.""" + + def __init__(self, table): + self._table = table + self._uri_reader_cache: Dict[str, UriReader] = {} + self._descriptor_cache: Dict[BlobViewStruct, BlobDescriptor] = {} + + def preload(self, view_structs: List[BlobViewStruct]): + if not view_structs: + return + + grouped: Dict[str, TableReferences] = self._group_by_table(view_structs) + plans: List[TableReadPlan] = [] + for table_refs in grouped.values(): + plans.append(self._create_table_read_plan(table_refs)) + + target_rows: int = self._target_rows_per_task(plans) + tasks: List[Tuple[TableReadPlan, List[Range]]] = [] + for plan in plans: + for range_chunk in self._split_row_ranges(plan.row_ranges, target_rows): + tasks.append((plan, range_chunk)) + + if len(tasks) <= 1: + for plan, range_chunk in tasks: + self._descriptor_cache.update(self._load_descriptor_chunk(plan, range_chunk)) + return + + with ThreadPoolExecutor(max_workers=min(_PRELOAD_THREAD_NUM, len(tasks))) as executor: + futures = { + executor.submit(self._load_descriptor_chunk, plan, range_chunk): (plan, range_chunk) + for plan, range_chunk in tasks + } + for future in as_completed(futures): + try: + self._descriptor_cache.update(future.result()) + except Exception as exc: + raise RuntimeError("Failed to preload blob descriptors.") from exc + + def resolve_descriptor(self, view_struct: BlobViewStruct) -> BlobDescriptor: + descriptor: BlobDescriptor = self._descriptor_cache.get(view_struct) + if descriptor is None: + raise ValueError( + "Cannot resolve BlobViewStruct {} because row id {} was not found " + "in upstream table.".format(view_struct, view_struct.row_id) + ) + return descriptor + + def _group_by_table( + self, view_structs: List[BlobViewStruct] + ) -> Dict[str, TableReferences]: + grouped: Dict[str, TableReferences] = {} + for view_struct in view_structs: + key = view_struct.identifier.get_full_name() + if key not in grouped: + grouped[key] = TableReferences(view_struct.identifier) + grouped[key].add(view_struct) + return grouped + + def _create_table_read_plan(self, table_refs: TableReferences) -> TableReadPlan: + upstream_table = self._load_table(table_refs.identifier) + + fields: List = [] + for field_id in table_refs.references_by_field: + fields.append(self._field_by_id(upstream_table, field_id)) + + return TableReadPlan(table_refs.identifier, upstream_table, fields, Range.to_ranges(table_refs.row_ids)) + + def _load_descriptor_chunk( + self, plan: TableReadPlan, row_ranges: List[Range] + ) -> Dict[BlobViewStruct, BlobDescriptor]: + identifier: Identifier = plan.identifier + upstream_table = plan.upstream_table + fields: List = plan.fields + + field_names: List[str] = [f.name for f in fields] + projection: List[str] = field_names + [SpecialFields.ROW_ID.name] + + descriptor_table = upstream_table.copy({CoreOptions.BLOB_AS_DESCRIPTOR.key(): "true"}) + read_builder = descriptor_table.new_read_builder().with_projection(projection) + + if SpecialFields.ROW_ID.name not in [ + data_field.name for data_field in read_builder.read_type() + ]: + raise ValueError( + "Cannot resolve blob view for table {} because row tracking is not readable." + .format(identifier.get_full_name()) + ) + + predicate_builder = read_builder.new_predicate_builder() + range_predicates: List = [] + for r in row_ranges: + if r.from_ == r.to: + range_predicates.append( + predicate_builder.equal(SpecialFields.ROW_ID.name, r.from_)) + else: + range_predicates.append( + predicate_builder.between(SpecialFields.ROW_ID.name, r.from_, r.to)) + if len(range_predicates) == 1: + predicate = range_predicates[0] + else: + predicate = predicate_builder.or_predicates(range_predicates) + read_builder.with_filter(predicate) + result = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits()) + + if SpecialFields.ROW_ID.name not in result.schema.names: + raise ValueError( + "Cannot resolve blob view for table {} because row tracking is not readable." + .format(identifier.get_full_name()) + ) + + row_id_values: List = result.column(SpecialFields.ROW_ID.name).to_pylist() + resolved: Dict[BlobViewStruct, BlobDescriptor] = {} + for field in fields: + if field.name not in result.schema.names: + continue + values = result.column(field.name).to_pylist() + for row_id, value in zip(row_id_values, values): + if value is None: + continue + descriptor = self._to_descriptor(value) + view_struct = BlobViewStruct( + identifier.get_full_name(), field.id, int(row_id)) + resolved[view_struct] = descriptor + return resolved + + @staticmethod + def _split_row_ranges( + row_ranges: List[Range], target_rows_per_task: int + ) -> List[List[Range]]: + """ + Split row ranges into multiple chunks for parallel task processing. + """ + if not row_ranges: + return [] + + chunks: List[List[Range]] = [] + current_chunk: List[Range] = [] + current_chunk_rows: int = 0 + + for r in row_ranges: + next_from = r.from_ + # Process current range until all rows are allocated + while next_from <= r.to: + # If current chunk is full, save it and start a new one + if current_chunk_rows == target_rows_per_task: + chunks.append(current_chunk) + current_chunk = [] + current_chunk_rows = 0 + + # Calculate remaining capacity in current chunk + remaining = target_rows_per_task - current_chunk_rows + # Determine the end position for this allocation (don't exceed range boundary) + next_to = min(r.to, next_from + remaining - 1) + + # Add the allocated range to current chunk + current_chunk.append(Range(next_from, next_to)) + current_chunk_rows += next_to - next_from + 1 + + # Move to next unallocated position + next_from = next_to + 1 + + # Don't forget the last chunk if it has any ranges + if current_chunk: + chunks.append(current_chunk) + + return chunks + + @staticmethod + def _target_rows_per_task(plans: List[TableReadPlan]) -> int: + total_rows: int = 0 + for plan in plans: + for r in plan.row_ranges: + total_rows += r.count() + if total_rows <= 0: + return _MIN_ROWS_PER_TASK + + return max(_MIN_ROWS_PER_TASK, (total_rows + _PRELOAD_THREAD_NUM - 1) // _PRELOAD_THREAD_NUM) + + def _load_table(self, identifier: Identifier): + catalog = self._table.catalog_environment.catalog_loader.load() + return catalog.get_table(identifier) + + @staticmethod + def _field_by_id(table, field_id: int) -> 'DataField': + for field in table.table_schema.fields: + if field.id == field_id: + return field + raise ValueError( + "Cannot find blob fieldId {} in upstream table {}." + .format(field_id, table.identifier.get_full_name()) + ) + + def _to_descriptor(self, value) -> BlobDescriptor: + if hasattr(value, "as_py"): + value = value.as_py() + if isinstance(value, str): + value = value.encode("utf-8") + if isinstance(value, bytearray): + value = bytes(value) + if not isinstance(value, bytes): + raise ValueError("Blob view upstream value must be serialized blob bytes.") + if BlobViewStruct.is_blob_view_struct(value): + return self.resolve_descriptor(BlobViewStruct.deserialize(value)) + if not BlobDescriptor.is_blob_descriptor(value): + raise ValueError("Blob view upstream value is not a serialized BlobDescriptor.") + return BlobDescriptor.deserialize(value) diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py index 4c3289f5aa44..9bb25f518b22 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -84,6 +84,8 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op # Determine blob columns from table schema self.blob_column_names = self._get_blob_columns_from_schema() self.blob_descriptor_fields = CoreOptions.blob_descriptor_fields(self.options) + self.blob_view_fields = CoreOptions.blob_view_fields(self.options) + self.blob_inline_fields = self.blob_descriptor_fields.union(self.blob_view_fields) unknown_descriptor_fields = self.blob_descriptor_fields.difference( set(self.blob_column_names) @@ -95,10 +97,10 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op ) # Blob fields that should still be written to `.blob` files. - full_blob_file_column_names = [ - col for col in self.blob_column_names if col not in self.blob_descriptor_fields + self.blob_file_column_names = [ + col for col in self.blob_column_names if col not in self.blob_inline_fields ] - full_blob_file_set = set(full_blob_file_column_names) + full_blob_file_set = set(self.blob_file_column_names) all_column_names = self.table.field_names # Narrow columns when TableWrite.with_write_type(...) supplies a partial column list. @@ -107,13 +109,13 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op if write_cols is not None: write_col_set = set(write_cols) self.blob_file_column_names = [ - col for col in full_blob_file_column_names if col in write_col_set + col for col in self.blob_file_column_names if col in write_col_set ] self.normal_column_names = [ col for col in write_cols if col not in full_blob_file_set ] else: - self.blob_file_column_names = list(full_blob_file_column_names) + self.blob_file_column_names = list(self.blob_file_column_names) self.normal_column_names = [ col for col in all_column_names if col not in full_blob_file_set ] @@ -160,11 +162,12 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op logger.info( "Initialized DataBlobWriter with blob columns: %s, blob file columns: %s, descriptor " - "stored columns: %s, external storage fields: %s", + "stored columns: %s, external storage fields: %s, view stored columns: %s", self.blob_column_names, self.blob_file_column_names, sorted(self.blob_descriptor_fields), sorted(external_storage_fields) if external_storage_fields else [], + sorted(self.blob_view_fields) ) def _get_blob_columns_from_schema(self) -> List[str]: @@ -194,7 +197,7 @@ def write(self, data: pa.RecordBatch): # Split data into normal and blob parts normal_data, blob_data_map = self._split_data(data) - self._validate_descriptor_stored_fields_input(data) + self._validate_inline_stored_fields_input(data) # Process and accumulate normal data processed_normal = self._process_normal_data(normal_data) @@ -259,11 +262,11 @@ def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, Dict[str, p } return normal_data, blob_data_map - def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch): - if not self.blob_descriptor_fields: + def _validate_inline_stored_fields_input(self, data: pa.RecordBatch): + if not self.blob_inline_fields: return - from pypaimon.table.row.blob import BlobDescriptor + from pypaimon.table.row.blob import BlobDescriptor, BlobViewStruct for field_name in self.blob_descriptor_fields: if field_name not in data.schema.names: @@ -292,6 +295,33 @@ def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch): "BlobDescriptor." ) from e + for field_name in self.blob_view_fields: + if field_name not in data.schema.names: + continue + values = data.column(data.schema.get_field_index(field_name)).to_pylist() + for value in values: + if value is None: + continue + if hasattr(value, 'as_py'): + value = value.as_py() + if isinstance(value, str): + value = value.encode('utf-8') + if not isinstance(value, (bytes, bytearray)): + raise ValueError( + "blob-view-field requires blob field value to be a serialized " + "BlobViewStruct." + ) + try: + view_bytes = bytes(value) + view_struct = BlobViewStruct.deserialize(view_bytes) + if view_struct.serialize() != view_bytes: + raise ValueError("BlobViewStruct payload contains trailing bytes.") + except Exception as e: + raise ValueError( + "blob-view-field requires blob field value to be a serialized " + "BlobViewStruct." + ) from e + @staticmethod def _process_normal_data(data: pa.RecordBatch) -> pa.Table: """Process normal data (similar to base DataWriter)."""