Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pyathena/filesystem/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -1409,11 +1409,18 @@ def _initiate_upload(self) -> None:
)

def _upload_chunk(self, final: bool = False) -> bool:
# The return value controls whether fsspec's flush() resets self.buffer
# afterwards: it does so only when this returns a value other than False.
# Returning ``not final`` keeps the buffer intact on the final flush so a
# deferred commit() (autocommit=False, i.e. inside an fsspec transaction)
# can still read the bytes; resetting it there would upload an empty
# object for small files. Mid-stream chunks (final=False) return True so
# fsspec clears the already-uploaded buffer between parts.
if self.tell() < self.blocksize:
# Files smaller than block size in size cannot be multipart uploaded.
if self.autocommit and final:
self.commit()
return True
return not final

if not self.multipart_upload:
raise RuntimeError("Multipart upload is not initialized.")
Expand Down Expand Up @@ -1456,7 +1463,7 @@ def _upload_chunk(self, final: bool = False) -> bool:

if self.autocommit and final:
self.commit()
return True
return not final

def commit(self) -> None:
if self.tell() == 0:
Expand Down
160 changes: 160 additions & 0 deletions tests/pyathena/filesystem/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import io
import os
import tempfile
import time
import urllib.parse
import urllib.request
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from itertools import chain
from pathlib import Path
from types import SimpleNamespace
from unittest import mock

import fsspec
import pytest
Expand Down Expand Up @@ -194,6 +198,56 @@ def test_write(self, fs, base, exp):
assert len(actual) == len(data)
assert actual == data

@pytest.mark.parametrize(
"size",
[
2**10, # < block size: one-shot PutObject path (the GH-719 regression)
10 * 2**20, # > block size (5 MiB): multipart CompleteMultipartUpload path
],
)
def test_write_transaction(self, fs, size):
# Regression test for GH-719: files written inside an fsspec transaction
# (autocommit=False) must round-trip with their real content (small files
# were previously committed as empty objects).
data = b"a" * size
path = (
f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/"
f"filesystem/test_write_transaction/{uuid.uuid4()}"
)
with fs.transaction, fs.open(path, "wb") as f:
f.write(data)
with fs.open(path, "rb") as f:
actual = f.read()
assert len(actual) == len(data)
assert actual == data

@pytest.mark.parametrize(
"size",
[
2**10, # < block size: small-file discard() is a no-op
10 * 2**20, # > block size: discard() aborts the multipart upload
],
)
def test_write_transaction_rollback(self, fs, size):
# Raising inside the transaction must leave no object behind.
data = b"a" * size
path = (
f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/"
f"filesystem/test_write_transaction_rollback/{uuid.uuid4()}"
)

def write_then_fail():
with fs.transaction:
f = fs.open(path, "wb")
f.write(data)
f.close()
raise RuntimeError("rollback")

with pytest.raises(RuntimeError):
write_then_fail()
fs.invalidate_cache(path)
assert not fs.exists(path)

@pytest.mark.parametrize(
("base", "exp"),
[
Expand Down Expand Up @@ -820,6 +874,40 @@ def test_pandas_write_csv(self, line_count):


class TestS3File:
@staticmethod
def _make_write_file(data: bytes, autocommit: bool):
# Build a minimal write-mode S3File without touching AWS, bypassing
# __init__ which would require a real connection.
file = S3File.__new__(S3File)
file.fs = mock.MagicMock(spec=S3FileSystem)
file.path = "s3://bucket/key.txt"
file.bucket = "bucket"
file.key = "key.txt"
file.s3_additional_kwargs = {}
file.autocommit = autocommit
file.blocksize = S3FileSystem.MULTIPART_UPLOAD_MIN_PART_SIZE
file.multipart_upload = None
file.multipart_upload_parts = []
file.buffer = io.BytesIO(data)
file.loc = len(data) # tell() returns self.loc
return file

@staticmethod
def _make_multipart_write_file(data: bytes, autocommit: bool):
# A write-mode S3File set up to take the multipart branch (blocksize <
# len(data)), with the executor and S3 part calls mocked so no AWS
# access is needed.
file = TestS3File._make_write_file(data, autocommit=autocommit)
file.blocksize = 4
file.fs.MULTIPART_UPLOAD_MIN_PART_SIZE = 4
file.fs.MULTIPART_UPLOAD_MAX_PART_SIZE = 8
file.multipart_upload = SimpleNamespace(upload_id="uploadid")
file._executor = ThreadPoolExecutor(max_workers=1)
file.fs._upload_part.side_effect = lambda **kw: SimpleNamespace(
etag=f'"e{kw["part_number"]}"', part_number=kw["part_number"]
)
return file

@pytest.mark.parametrize(
("objects", "target"),
[
Expand Down Expand Up @@ -867,3 +955,75 @@ def test_get_ranges(self, start, end, max_workers, worker_block_size, ranges):

def test_format_ranges(self):
assert S3File._format_ranges((0, 100)) == "bytes=0-99"

@pytest.mark.parametrize("autocommit", [True, False])
def test_upload_chunk_small_file(self, autocommit):
# Single (one-shot PutObject) upload via _upload_chunk + commit.
# autocommit=True -> normal open(), committed immediately.
# autocommit=False -> inside a transaction, commit() is deferred to
# Transaction.complete(). This is the GH-719 case:
# _upload_chunk must return False so fsspec.flush()
# keeps self.buffer for the deferred commit, instead
# of resetting it and uploading an empty object.
data = b"hello world"
file = self._make_write_file(data, autocommit=autocommit)

assert file._upload_chunk(final=True) is False
if not autocommit:
# Deferred: nothing is uploaded until commit() runs.
file.fs._put_object.assert_not_called()
file.commit()
file.fs._put_object.assert_called_once_with(bucket="bucket", key="key.txt", body=data)

def test_upload_chunk_empty_file_touches(self):
# An intentionally empty file (tell() == 0) is created via touch(),
# never via _put_object.
file = self._make_write_file(b"", autocommit=True)

assert file._upload_chunk(final=True) is False
file.fs.touch.assert_called_once()
file.fs._put_object.assert_not_called()

@pytest.mark.parametrize("multipart", [False, True])
def test_discard(self, multipart):
# Rollback (Transaction.complete(commit=False)) never creates the object:
# a single small file is a no-op, while a multipart upload is aborted.
if multipart:
file = self._make_multipart_write_file(b"x" * 16, autocommit=False)
else:
file = self._make_write_file(b"hello world", autocommit=False)
assert file._upload_chunk(final=True) is False

file.discard()

if multipart:
file.fs._call.assert_called_once()
assert file.fs._call.call_args.args[0] == "abort_multipart_upload"
else:
file.fs._call.assert_not_called()
file.fs._put_object.assert_not_called()
assert file.multipart_upload is None
assert file.multipart_upload_parts == []

@pytest.mark.parametrize("autocommit", [True, False])
def test_upload_chunk_multipart(self, autocommit):
# Multipart upload (CompleteMultipartUpload), completed from the uploaded
# parts; the buffer is never read for the body and no one-shot PutObject
# is issued. autocommit=True completes inside _upload_chunk; autocommit=
# False (transaction) defers completion to commit().
#
# The mid-stream assertion also guards why _upload_chunk returns
# `not final` rather than a plain False (as s3fs does): PyAthena delegates
# mid-stream buffer management to fsspec, so a non-final chunk MUST return
# True to have fsspec reset the buffer between parts. Returning False
# mid-stream would re-upload the already-sent buffer.
file = self._make_multipart_write_file(b"x" * 16, autocommit=autocommit)

assert file._upload_chunk(final=False) is True # mid-stream -> buffer reset
assert file._upload_chunk(final=True) is False # final -> buffer kept
if not autocommit:
# Deferred: the multipart upload is completed by commit().
file.fs._complete_multipart_upload.assert_not_called()
file.commit()
file.fs._complete_multipart_upload.assert_called_once()
file.fs._put_object.assert_not_called()
44 changes: 44 additions & 0 deletions tests/pyathena/filesystem/test_s3_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,50 @@ def test_write(self, fs, base, exp):
assert len(actual) == len(data)
assert actual == data

@pytest.mark.parametrize(
"size",
[
2**10, # < block size: one-shot PutObject path (the GH-719 regression)
10 * 2**20, # > block size (5 MiB): multipart path via the async executor
],
)
def test_write_transaction(self, fs, size):
# GH-719 regression for the async filesystem: AioS3File inherits
# _upload_chunk/commit from S3File, so the transaction fix must hold here
# too, including the multipart path driven by the async executor.
data = b"a" * size
path = (
f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/"
f"filesystem/test_async_write_transaction/{uuid.uuid4()}"
)
with fs.transaction, fs.open(path, "wb") as f:
f.write(data)
with fs.open(path, "rb") as f:
actual = f.read()
assert len(actual) == len(data)
assert actual == data

def test_write_transaction_rollback(self, fs):
# Kept small-only on purpose: the multipart discard()/abort path is
# already exercised by the sync test (AioS3File inherits discard()),
# so there is no need to pay for another large upload here.
path = (
f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/"
f"filesystem/test_async_write_transaction_rollback/{uuid.uuid4()}"
)

def write_then_fail():
with fs.transaction:
f = fs.open(path, "wb")
f.write(b"hello world")
f.close()
raise RuntimeError("rollback")

with pytest.raises(RuntimeError):
write_then_fail()
fs.invalidate_cache(path)
assert not fs.exists(path)

@pytest.mark.parametrize(
("base", "exp"),
[
Expand Down