Skip to content

Save to s3 not working #124

@Tamerlan-hash

Description

@Tamerlan-hash

Describe the bug
Im trying to upload my processed hls playlist with fragments
Im using fastapi and streaming upload to s3.
First i do upload to cloud my raw mp4 video file
Second i get url and set to input ffmpeg_streaming
Third i get proccess hls file and do save to s3

In my function i dont get any errors, but hls files in Temp folder in Windows, but not in s3

def convert_to_hls(
        request: Request,
        folder_name: str,
        filename: str,
):
    s3 = S3(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION_NAME,
        endpoint_url=AWS_ENDPOINT_URL,
    )
    video = ffmpeg_streaming.input(
        s3,
        bucket_name=AWS_BUCKET_NAME,
        key=f"{folder_name}/{filename}",
    )
    hls = video.hls(Formats.h264())
    hls.auto_generate_representations()

    save_to_s3 = CloudManager().add(
        s3,
        bucket_name=AWS_BUCKET_NAME,
    )
    hls.output(clouds=save_to_s3, async_run=False, monitor=monitor)
async def upload_by_streaming(
        request: Request,
        file: UploadFile,
        filename: str,
        folder_name: str,
        content_type: str,
):
    MB = 1024 * 1024
    CHUNK_SIZE = 5 * MB

    parts = []
    part_number = 1
    upload_id = await request.state.cdn_client.start_multipart_upload(
        filename=filename,
        folder_name=folder_name,
        content_type=content_type,
    )

    chunks_list = []
    buffer_size = 0

    # Изменение здесь: используем while и file.read(size)
    while True:
        chunk = await file.read(CHUNK_SIZE)
        if not chunk:
            break

        chunks_list.append(chunk)
        buffer_size += len(chunk)

        if buffer_size >= CHUNK_SIZE:
            # Объединяем все чанки в один байтовый массив перед загрузкой
            buffer = b"".join(chunks_list)
            e_tag = await request.state.cdn_client.upload_part(
                chunk=buffer,
                filename=filename,
                folder_name=folder_name,
                part_number=part_number,
                upload_id=upload_id,
            )
            parts.append({'PartNumber': part_number, 'ETag': e_tag})
            part_number += 1
            # Очищаем список и сбрасываем размер буфера
            chunks_list = []
            buffer_size = 0

    # Обработка оставшихся чанков после завершения цикла
    if chunks_list:
        buffer = b"".join(chunks_list)
        e_tag = await request.state.cdn_client.upload_part(
            chunk=buffer,
            filename=filename,
            folder_name=folder_name,
            part_number=part_number,
            upload_id=upload_id,
        )
        parts.append({'PartNumber': part_number, 'ETag': e_tag})

    # Step 3: Complete the Multipart Upload
    await request.state.cdn_client.complete_multipart_upload(
        filename=filename,
        folder_name=folder_name,
        upload_id=upload_id,
        parts=parts,
    )
async def upload_video_handler(
        request: Request,
        file: UploadFile = File(...),
):
    filename = f"{uuid4()}"
    folder_name = "videos"
    await upload_by_streaming(
        request=request,
        file=file,
        filename=filename,
        folder_name=folder_name,
        content_type="video/mp4",
    )
    convert_to_hls(
        request=request,
        folder_name=folder_name,
        filename=filename,
    )

    mp4_url = f"https://{request.state.cdn_client._bucket_name}/{folder_name}/{filename}"
    hls_url = f"https://{request.state.cdn_client._bucket_name}/{folder_name}/{filename}.m3u8"
    return {
        "mp4_url": mp4_url,
        "hls_url": hls_url,
    }

Local machine (please complete the following information):

  • OS: Windows
  • FFMped version 2023-08-10-git-d9d5695390-full_build-www.gyan.dev Copyright (c) 2000-2023 the FFmpeg developers
    built with gcc 12.2.0 (Rev10, Built by MSYS2 project)

I tryied use scratch file, and took this error

import logging

import ffmpeg_streaming
from ffmpeg_streaming import CloudManager, Formats, S3

AWS_ACCESS_KEY_ID = "DO00KZKFXCU3D4JVUD9F"
AWS_SECRET_ACCESS_KEY = "mzIFi+fQsj0N2yT+7iBrn+0mEfP7F7QAceUzqkA2PDA"
AWS_ENDPOINT_URL = "https://ams3.digitaloceanspaces.com"
AWS_REGION_NAME = "ams3"
AWS_BUCKET_NAME = "test-cdn.project.com"

logging.basicConfig(filename='streaming.log', level=logging.NOTSET, format='[%(asctime)s] %(levelname)s: %(message)s')

s3 = S3(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION_NAME,
    endpoint_url=AWS_ENDPOINT_URL,
)

save_to_s3 = CloudManager().add(s3, bucket_name=AWS_BUCKET_NAME, folder="vod")
url = "https://test-cdn.project.com/videos/598551b1-de97-48b6-b430-97679f34b7fc"
video = ffmpeg_streaming.input(s3, bucket_name=AWS_BUCKET_NAME, key=f'videos/{url.split("/")[-1]}')

hls = video.hls(Formats.h264())
hls.auto_generate_representations()

hls.output(clouds=save_to_s3, async_run=False)
Exception ignored in atexit callback: <bound method HLS.finish_up of <ffmpeg_streaming._media.HLS object at 0x000001F913460CA0>>
Traceback (most recent call last):
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\ffmpeg_streaming\_media.py", line 228, in finish_up
    super(HLS, self).finish_up()
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\ffmpeg_streaming\_media.py", line 53, in finish_up
    self.clouds.transfer('upload_directory', os.path.dirname(self.output_))
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\ffmpeg_streaming\_clouds.py", line 200, in transfer
    getattr(cloud[0], method)(path, **cloud[1])
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\ffmpeg_streaming\_clouds.py", line 59, in upload_directory
    self.s3.upload_file(join(directory, file), bucket_name, join(folder, file).replace("\\", "/"))
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\boto3\s3\inject.py", line 143, in upload_file
    return transfer.upload_file(
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\boto3\s3\transfer.py", line 288, in upload_file
    future = self._manager.upload(
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\s3transfer\manager.py", line 333, in upload
    return self._submit_transfer(
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\s3transfer\manager.py", line 528, in _submit_transfer
    self._submission_executor.submit(
  File "C:\Users\dzhur\Desktop\project\Source\monolith\.venv\lib\site-packages\s3transfer\futures.py", line 474, in submit
    future = ExecutorFuture(self._executor.submit(task))
  File "C:\Users\dzhur\.pyenv\pyenv-win\versions\3.10.9\lib\concurrent\futures\thread.py", line 169, in submit
    raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions