Source code for file_storage

import logging
from typing import Optional

from temporalio.client import Client

from julee.util.domain import FileMetadata, FileUploadArgs
from julee.util.repositories import FileStorageRepository

[docs] logger = logging.getLogger(__name__)
[docs] class TemporalFileStorageRepository(FileStorageRepository): """ Client-side proxy for FileStorageRepository that calls activities. This proxy ensures that all interactions with the FileStorageRepository are performed via Temporal activities, maintaining workflow determinism. """ def __init__( self, client: Client, concrete_repo: Optional[FileStorageRepository] = None, ):
[docs] self.client = client
[docs] self.concrete_repo = concrete_repo
logger.debug("Initialized TemporalFileStorageRepository")
[docs] async def upload_file(self, args: FileUploadArgs) -> FileMetadata: """Upload a file via Temporal activity.""" logger.debug(f"Client calling activity to upload file: {args.file_id}") handle = await self.client.start_workflow( "util.file_storage.minio.upload_file", args, id=f"upload-{args.file_id}", task_queue="order-fulfillment-queue", ) result = await handle.result() return result # type: ignore[no-any-return]
[docs] async def download_file(self, file_id: str) -> Optional[bytes]: """Download a file via Temporal activity.""" logger.debug(f"Client calling activity to download file: {file_id}") handle = await self.client.start_workflow( "util.file_storage.minio.download_file", file_id, id=f"download-{file_id}", task_queue="order-fulfillment-queue", ) result = await handle.result() return result # type: ignore[no-any-return]
[docs] async def get_file_metadata(self, file_id: str) -> Optional[FileMetadata]: """Retrieve file metadata via Temporal activity.""" logger.debug(f"Client calling activity to get file metadata: {file_id}") handle = await self.client.start_workflow( "util.file_storage.minio.get_file_metadata", file_id, id=f"metadata-{file_id}", task_queue="order-fulfillment-queue", ) result = await handle.result() return result # type: ignore[no-any-return]