Source code for julee.repositories.minio.knowledge_service_query

"""
Minio implementation of KnowledgeServiceQueryRepository.

This module provides a Minio-based implementation of the
KnowledgeServiceQueryRepository protocol that follows the Clean Architecture
patterns defined in the Fun-Police Framework. It handles knowledge service
query storage as JSON objects in Minio, ensuring idempotency and proper
error handling.

The implementation stores knowledge service queries as JSON objects in Minio,
following the large payload handling pattern from the architectural
guidelines.
Each query is stored as a separate object with the query ID as the key.
"""

import logging
import uuid

from typing import Optional, List, Dict


from julee.domain.models.assembly_specification import (
    KnowledgeServiceQuery,
)
from .client import MinioClient, MinioRepositoryMixin
from julee.domain.repositories.knowledge_service_query import (
    KnowledgeServiceQueryRepository,
)


[docs] logger = logging.getLogger(__name__)
[docs] class MinioKnowledgeServiceQueryRepository( KnowledgeServiceQueryRepository, MinioRepositoryMixin ): """ Minio implementation of KnowledgeServiceQueryRepository. This implementation stores knowledge service queries as JSON objects in Minio buckets, following the established patterns for Minio repositories in this system. Each query is stored as a separate object with deterministic naming. """ def __init__(self, client: MinioClient) -> None: """Initialize repository with Minio client. Args: client: MinioClient protocol implementation (real or fake) """
[docs] self.client = client
[docs] self.logger = logging.getLogger("MinioKnowledgeServiceQueryRepository")
[docs] self.bucket_name = "knowledge-service-queries"
self.ensure_buckets_exist(self.bucket_name)
[docs] async def get(self, query_id: str) -> Optional[KnowledgeServiceQuery]: """Retrieve a knowledge service query by ID. Args: query_id: Unique query identifier Returns: KnowledgeServiceQuery object if found, None otherwise """ logger.debug( "MinioKnowledgeServiceQueryRepository: Attempting to retrieve " "query", extra={"query_id": query_id, "bucket": self.bucket_name}, ) object_name = f"query/{query_id}" # Get object from Minio query_data = self.get_json_object( bucket_name=self.bucket_name, object_name=object_name, model_class=KnowledgeServiceQuery, not_found_log_message="Knowledge service query not found", error_log_message="Error retrieving knowledge service query", extra_log_data={"query_id": query_id}, ) return query_data
[docs] async def save(self, query: KnowledgeServiceQuery) -> None: """Store or update a knowledge service query. Args: query: KnowledgeServiceQuery object to store """ logger.debug( "MinioKnowledgeServiceQueryRepository: Saving query", extra={"query_id": query.query_id, "bucket": self.bucket_name}, ) # Update the updated_at timestamp self.update_timestamps(query) object_name = f"query/{query.query_id}" # Store in Minio self.put_json_object( bucket_name=self.bucket_name, object_name=object_name, model=query, success_log_message="Knowledge service query saved successfully", error_log_message="Failed to save knowledge service query", extra_log_data={"query_id": query.query_id}, )
[docs] async def generate_id(self) -> str: """Generate a unique query identifier. Returns: Unique string identifier for a new query """ query_id = f"query-{uuid.uuid4().hex[:12]}" logger.debug( "MinioKnowledgeServiceQueryRepository: Generated query ID", extra={"query_id": query_id}, ) return query_id
[docs] async def get_many( self, query_ids: List[str] ) -> Dict[str, Optional[KnowledgeServiceQuery]]: """Retrieve multiple knowledge service queries by ID. Args: query_ids: List of unique query identifiers Returns: Dict mapping query_id to KnowledgeServiceQuery (or None if not found) """ logger.debug( "MinioKnowledgeServiceQueryRepository: Attempting to retrieve " "multiple queries", extra={ "query_ids": query_ids, "count": len(query_ids), "bucket": self.bucket_name, }, ) # Convert query IDs to object names object_names = [f"query/{query_id}" for query_id in query_ids] # Get objects from Minio using batch method object_results = self.get_many_json_objects( bucket_name=self.bucket_name, object_names=object_names, model_class=KnowledgeServiceQuery, not_found_log_message="Knowledge service query not found", error_log_message="Error retrieving knowledge service query", extra_log_data={"query_ids": query_ids}, ) # Convert object names back to query IDs for the result result: Dict[str, Optional[KnowledgeServiceQuery]] = {} for i, query_id in enumerate(query_ids): object_name = object_names[i] result[query_id] = object_results[object_name] return result
[docs] async def list_all(self) -> List[KnowledgeServiceQuery]: """List all knowledge service queries. Returns: List of all knowledge service queries, sorted by query_id """ try: # Extract query IDs from objects with the query/ prefix query_ids = self.list_objects_with_prefix_extract_ids( bucket_name=self.bucket_name, prefix="query/", entity_type_name="queries", ) if not query_ids: return [] # Get all queries using the existing get_many method query_results = await self.get_many(query_ids) # Filter out None results and sort by query_id queries = [query for query in query_results.values() if query is not None] queries.sort(key=lambda x: x.query_id) logger.debug( "Retrieved queries", extra={"count": len(queries)}, ) return queries except Exception as e: logger.error( "Error listing queries", exc_info=True, extra={ "error_type": type(e).__name__, "error_message": str(e), "bucket": self.bucket_name, }, ) # Return empty list on error to avoid breaking the API return []