Source code for file_storage
import logging
from temporalio.client import Client
from julee.util.domain import FileMetadata, FileUploadArgs
from julee.util.repositories import FileStorageRepository
[docs]
logger = logging.getLogger(__name__)
[docs]
class TemporalFileStorageRepository(FileStorageRepository):
"""
Client-side proxy for FileStorageRepository that calls activities.
This proxy ensures that all interactions with the FileStorageRepository
are performed via Temporal activities, maintaining workflow determinism.
"""
def __init__(
self,
client: Client,
concrete_repo: FileStorageRepository | None = None,
):
[docs]
self.concrete_repo = concrete_repo
logger.debug("Initialized TemporalFileStorageRepository")
[docs]
async def upload_file(self, args: FileUploadArgs) -> FileMetadata:
"""Upload a file via Temporal activity."""
logger.debug(f"Client calling activity to upload file: {args.file_id}")
handle = await self.client.start_workflow(
"util.file_storage.minio.upload_file",
args,
id=f"upload-{args.file_id}",
task_queue="order-fulfillment-queue",
)
result = await handle.result()
return result # type: ignore[no-any-return]
[docs]
async def download_file(self, file_id: str) -> bytes | None:
"""Download a file via Temporal activity."""
logger.debug(f"Client calling activity to download file: {file_id}")
handle = await self.client.start_workflow(
"util.file_storage.minio.download_file",
file_id,
id=f"download-{file_id}",
task_queue="order-fulfillment-queue",
)
result = await handle.result()
return result # type: ignore[no-any-return]