diff --git a/pyathena/filesystem/s3.py b/pyathena/filesystem/s3.py index 7885514c..fa4ae558 100644 --- a/pyathena/filesystem/s3.py +++ b/pyathena/filesystem/s3.py @@ -1409,11 +1409,18 @@ def _initiate_upload(self) -> None: ) def _upload_chunk(self, final: bool = False) -> bool: + # The return value controls whether fsspec's flush() resets self.buffer + # afterwards: it does so only when this returns a value other than False. + # Returning ``not final`` keeps the buffer intact on the final flush so a + # deferred commit() (autocommit=False, i.e. inside an fsspec transaction) + # can still read the bytes; resetting it there would upload an empty + # object for small files. Mid-stream chunks (final=False) return True so + # fsspec clears the already-uploaded buffer between parts. if self.tell() < self.blocksize: # Files smaller than block size in size cannot be multipart uploaded. if self.autocommit and final: self.commit() - return True + return not final if not self.multipart_upload: raise RuntimeError("Multipart upload is not initialized.") @@ -1456,7 +1463,7 @@ def _upload_chunk(self, final: bool = False) -> bool: if self.autocommit and final: self.commit() - return True + return not final def commit(self) -> None: if self.tell() == 0: diff --git a/tests/pyathena/filesystem/test_s3.py b/tests/pyathena/filesystem/test_s3.py index 676060da..06b49066 100644 --- a/tests/pyathena/filesystem/test_s3.py +++ b/tests/pyathena/filesystem/test_s3.py @@ -1,12 +1,16 @@ +import io import os import tempfile import time import urllib.parse import urllib.request import uuid +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from itertools import chain from pathlib import Path +from types import SimpleNamespace +from unittest import mock import fsspec import pytest @@ -194,6 +198,56 @@ def test_write(self, fs, base, exp): assert len(actual) == len(data) assert actual == data + @pytest.mark.parametrize( + "size", + [ + 2**10, # < block size: one-shot PutObject path (the GH-719 regression) + 10 * 2**20, # > block size (5 MiB): multipart CompleteMultipartUpload path + ], + ) + def test_write_transaction(self, fs, size): + # Regression test for GH-719: files written inside an fsspec transaction + # (autocommit=False) must round-trip with their real content (small files + # were previously committed as empty objects). + data = b"a" * size + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_write_transaction/{uuid.uuid4()}" + ) + with fs.transaction, fs.open(path, "wb") as f: + f.write(data) + with fs.open(path, "rb") as f: + actual = f.read() + assert len(actual) == len(data) + assert actual == data + + @pytest.mark.parametrize( + "size", + [ + 2**10, # < block size: small-file discard() is a no-op + 10 * 2**20, # > block size: discard() aborts the multipart upload + ], + ) + def test_write_transaction_rollback(self, fs, size): + # Raising inside the transaction must leave no object behind. + data = b"a" * size + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_write_transaction_rollback/{uuid.uuid4()}" + ) + + def write_then_fail(): + with fs.transaction: + f = fs.open(path, "wb") + f.write(data) + f.close() + raise RuntimeError("rollback") + + with pytest.raises(RuntimeError): + write_then_fail() + fs.invalidate_cache(path) + assert not fs.exists(path) + @pytest.mark.parametrize( ("base", "exp"), [ @@ -820,6 +874,40 @@ def test_pandas_write_csv(self, line_count): class TestS3File: + @staticmethod + def _make_write_file(data: bytes, autocommit: bool): + # Build a minimal write-mode S3File without touching AWS, bypassing + # __init__ which would require a real connection. + file = S3File.__new__(S3File) + file.fs = mock.MagicMock(spec=S3FileSystem) + file.path = "s3://bucket/key.txt" + file.bucket = "bucket" + file.key = "key.txt" + file.s3_additional_kwargs = {} + file.autocommit = autocommit + file.blocksize = S3FileSystem.MULTIPART_UPLOAD_MIN_PART_SIZE + file.multipart_upload = None + file.multipart_upload_parts = [] + file.buffer = io.BytesIO(data) + file.loc = len(data) # tell() returns self.loc + return file + + @staticmethod + def _make_multipart_write_file(data: bytes, autocommit: bool): + # A write-mode S3File set up to take the multipart branch (blocksize < + # len(data)), with the executor and S3 part calls mocked so no AWS + # access is needed. + file = TestS3File._make_write_file(data, autocommit=autocommit) + file.blocksize = 4 + file.fs.MULTIPART_UPLOAD_MIN_PART_SIZE = 4 + file.fs.MULTIPART_UPLOAD_MAX_PART_SIZE = 8 + file.multipart_upload = SimpleNamespace(upload_id="uploadid") + file._executor = ThreadPoolExecutor(max_workers=1) + file.fs._upload_part.side_effect = lambda **kw: SimpleNamespace( + etag=f'"e{kw["part_number"]}"', part_number=kw["part_number"] + ) + return file + @pytest.mark.parametrize( ("objects", "target"), [ @@ -867,3 +955,75 @@ def test_get_ranges(self, start, end, max_workers, worker_block_size, ranges): def test_format_ranges(self): assert S3File._format_ranges((0, 100)) == "bytes=0-99" + + @pytest.mark.parametrize("autocommit", [True, False]) + def test_upload_chunk_small_file(self, autocommit): + # Single (one-shot PutObject) upload via _upload_chunk + commit. + # autocommit=True -> normal open(), committed immediately. + # autocommit=False -> inside a transaction, commit() is deferred to + # Transaction.complete(). This is the GH-719 case: + # _upload_chunk must return False so fsspec.flush() + # keeps self.buffer for the deferred commit, instead + # of resetting it and uploading an empty object. + data = b"hello world" + file = self._make_write_file(data, autocommit=autocommit) + + assert file._upload_chunk(final=True) is False + if not autocommit: + # Deferred: nothing is uploaded until commit() runs. + file.fs._put_object.assert_not_called() + file.commit() + file.fs._put_object.assert_called_once_with(bucket="bucket", key="key.txt", body=data) + + def test_upload_chunk_empty_file_touches(self): + # An intentionally empty file (tell() == 0) is created via touch(), + # never via _put_object. + file = self._make_write_file(b"", autocommit=True) + + assert file._upload_chunk(final=True) is False + file.fs.touch.assert_called_once() + file.fs._put_object.assert_not_called() + + @pytest.mark.parametrize("multipart", [False, True]) + def test_discard(self, multipart): + # Rollback (Transaction.complete(commit=False)) never creates the object: + # a single small file is a no-op, while a multipart upload is aborted. + if multipart: + file = self._make_multipart_write_file(b"x" * 16, autocommit=False) + else: + file = self._make_write_file(b"hello world", autocommit=False) + assert file._upload_chunk(final=True) is False + + file.discard() + + if multipart: + file.fs._call.assert_called_once() + assert file.fs._call.call_args.args[0] == "abort_multipart_upload" + else: + file.fs._call.assert_not_called() + file.fs._put_object.assert_not_called() + assert file.multipart_upload is None + assert file.multipart_upload_parts == [] + + @pytest.mark.parametrize("autocommit", [True, False]) + def test_upload_chunk_multipart(self, autocommit): + # Multipart upload (CompleteMultipartUpload), completed from the uploaded + # parts; the buffer is never read for the body and no one-shot PutObject + # is issued. autocommit=True completes inside _upload_chunk; autocommit= + # False (transaction) defers completion to commit(). + # + # The mid-stream assertion also guards why _upload_chunk returns + # `not final` rather than a plain False (as s3fs does): PyAthena delegates + # mid-stream buffer management to fsspec, so a non-final chunk MUST return + # True to have fsspec reset the buffer between parts. Returning False + # mid-stream would re-upload the already-sent buffer. + file = self._make_multipart_write_file(b"x" * 16, autocommit=autocommit) + + assert file._upload_chunk(final=False) is True # mid-stream -> buffer reset + assert file._upload_chunk(final=True) is False # final -> buffer kept + if not autocommit: + # Deferred: the multipart upload is completed by commit(). + file.fs._complete_multipart_upload.assert_not_called() + file.commit() + file.fs._complete_multipart_upload.assert_called_once() + file.fs._put_object.assert_not_called() diff --git a/tests/pyathena/filesystem/test_s3_async.py b/tests/pyathena/filesystem/test_s3_async.py index ba2e07b8..6162be8b 100644 --- a/tests/pyathena/filesystem/test_s3_async.py +++ b/tests/pyathena/filesystem/test_s3_async.py @@ -188,6 +188,50 @@ def test_write(self, fs, base, exp): assert len(actual) == len(data) assert actual == data + @pytest.mark.parametrize( + "size", + [ + 2**10, # < block size: one-shot PutObject path (the GH-719 regression) + 10 * 2**20, # > block size (5 MiB): multipart path via the async executor + ], + ) + def test_write_transaction(self, fs, size): + # GH-719 regression for the async filesystem: AioS3File inherits + # _upload_chunk/commit from S3File, so the transaction fix must hold here + # too, including the multipart path driven by the async executor. + data = b"a" * size + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_write_transaction/{uuid.uuid4()}" + ) + with fs.transaction, fs.open(path, "wb") as f: + f.write(data) + with fs.open(path, "rb") as f: + actual = f.read() + assert len(actual) == len(data) + assert actual == data + + def test_write_transaction_rollback(self, fs): + # Kept small-only on purpose: the multipart discard()/abort path is + # already exercised by the sync test (AioS3File inherits discard()), + # so there is no need to pay for another large upload here. + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_write_transaction_rollback/{uuid.uuid4()}" + ) + + def write_then_fail(): + with fs.transaction: + f = fs.open(path, "wb") + f.write(b"hello world") + f.close() + raise RuntimeError("rollback") + + with pytest.raises(RuntimeError): + write_then_fail() + fs.invalidate_cache(path) + assert not fs.exists(path) + @pytest.mark.parametrize( ("base", "exp"), [