Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions daft/io/lance/lance_data_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ def _load_existing_dataset(self, lance_module: ModuleType) -> Any | None:
try:
return lance_module.dataset(self._table_uri, storage_options=self._storage_options)
except (ValueError, FileNotFoundError, OSError) as e:
# Check if this is specifically a "dataset not found" error
if "not found" in str(e).lower() or "does not exist" in str(e).lower():
err_msg = str(e).lower()
# Check if this is specifically a "dataset not found" error or a "not a directory" error
# (e.g. when the target path points to a file instead of a directory)
if "not found" in err_msg or "does not exist" in err_msg or "not a directory" in err_msg:
if self._mode == "append":
raise ValueError("Cannot append to non-existent Lance dataset.")
return None
Expand Down
32 changes: 27 additions & 5 deletions daft/io/lance/lance_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ def __init__(
column: str,
index_type: str,
name: str,
fragment_uuid: str,
index_uuid: str,
replace: bool,
**kwargs: Any,
) -> None:
self.lance_ds = lance_ds
self.column = column
self.index_type = index_type
self.name = name
self.fragment_uuid = fragment_uuid
self.index_uuid = index_uuid
self.replace = replace
self.kwargs = kwargs

Expand All @@ -51,7 +51,7 @@ def __call__(self, fragment_ids: list[int]) -> bool:
index_type=self.index_type,
name=self.name,
replace=self.replace,
fragment_uuid=self.fragment_uuid,
index_uuid=self.index_uuid,
fragment_ids=fragment_ids,
**self.kwargs,
)
Expand Down Expand Up @@ -180,7 +180,7 @@ def create_scalar_index_internal(
column=column,
index_type=index_type,
name=name,
fragment_uuid=index_id,
index_uuid=index_id,
replace=replace,
**kwargs,
)
Expand Down Expand Up @@ -208,9 +208,31 @@ def create_scalar_index_internal(
fragment_ids=set(fragment_ids_to_use),
index_version=0,
)

# When replacing, find and remove existing indices with the same name
removed_indices = []
if replace:
try:
existing_indices = lance_ds.list_indices()
for idx in existing_indices:
if idx["name"] == name:
removed_indices.append(
lance.Index(
uuid=idx["uuid"],
name=idx["name"],
fields=[lance_ds.schema.get_field_index(f) for f in idx["fields"]],
dataset_version=lance_ds.version,
fragment_ids=idx.get("fragment_ids", set()),
index_version=0,
)
)
Comment on lines +218 to +227
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 dataset_version for removed indices uses current version instead of the index's creation version

lance_ds.version here is the version after merge_index_metadata was called, which is later than the version at which the existing (to-be-replaced) index was originally committed. Consider using the version from the index metadata if available:

dataset_version=idx.get("dataset_version", lance_ds.version),

except Exception:
# If we can't check existing indices, continue without removing
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Silent failure re-triggers the panic being fixed

If list_indices() raises for any reason (network hiccup, API change, etc.), execution falls through with removed_indices = []. Committing a CreateIndex operation with an empty removed_indices while an index of the same name already exists was exactly the scenario that caused the pylance 4.0.0 panic, so swallowing the exception silently defeats the purpose of this block. Consider logging at warning level:

except Exception as e:
    logger.warning(
        "Could not fetch existing indices for removal; old index may not be cleaned up: %s", e
    )


create_index_op = lance.LanceOperation.CreateIndex(
new_indices=[index],
removed_indices=[],
removed_indices=removed_indices,
)

# Commit the index operation atomically
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ gravitino = ["requests>=2.28.0,<3.0.0"]
hudi = ["pyarrow >= 8.0.0,<22.1.0"]
huggingface = ["huggingface-hub<1.5.0", "datasets<4.6.0"]
iceberg = ["pyiceberg >= 0.7.0, <= 0.11.0, != 0.9.1, != 0.10.0"]
lance = ["pylance<0.40.0"]
lance = ["pylance>=0.39.0,<5.0.0"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Version lower-bound incompatible with renamed API

pylance>=0.39.0 allows versions that do not accept the index_uuid keyword argument introduced in pylance 4.0.0. The FragmentIndexHandler.__call__ now passes index_uuid=self.index_uuid to lance_ds.create_scalar_index(); on pylance 0.39.x (where the parameter was still called fragment_uuid) this raises TypeError: create_scalar_index() got an unexpected keyword argument 'index_uuid' at runtime. The lower bound should be raised to match the new API.

Suggested change
lance = ["pylance>=0.39.0,<5.0.0"]
lance = ["pylance>=4.0.0,<5.0.0"]

numpy = ["numpy<2.4.0"]
openai = ["openai<2.21.0", "numpy<2.4.0", "pillow==12.1.1"]
pandas = ["pandas<2.4.0"]
Expand Down Expand Up @@ -108,7 +108,7 @@ dev = [
# Ray
"ray[data, client]==2.53.0",
# Lance
"pylance==0.39.0",
"pylance==4.0.0",
# Iceberg
"pyiceberg==0.11.0",
"pydantic==2.12.4",
Expand Down
2 changes: 1 addition & 1 deletion tests/io/lancedb/test_lancedb_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_build_distributed_index_invalid_index_type(self, multi_fragment_lance_d

with pytest.raises(
NotImplementedError,
match=r'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", or "INVERTED" or "BLOOMFILTER" are supported for scalar columns. Received INVALID',
match=r'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", "INVERTED", "BLOOMFILTER" or "RTREE" are supported for scalar columns. Received INVALID',
):
Comment on lines 198 to 201
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Error-message match is pylance-version-specific

The regex now matches the exact error text emitted by pylance 4.0.0 (including "RTREE"). Whenever pylance adds or renames index types in a future release the match will fail. A narrower pattern that only captures the invariant part of the message is more robust:

Suggested change
with pytest.raises(
NotImplementedError,
match=r'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", or "INVERTED" or "BLOOMFILTER" are supported for scalar columns. Received INVALID',
match=r'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", "INVERTED", "BLOOMFILTER" or "RTREE" are supported for scalar columns. Received INVALID',
):
match=r'Received INVALID',

create_scalar_index(
uri=dataset_uri,
Expand Down
34 changes: 15 additions & 19 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading