"""
Minio implementation of KnowledgeServiceConfigRepository.
This module provides a Minio-based implementation of the
KnowledgeServiceConfigRepository
protocol that follows the Clean Architecture patterns defined in the
Fun-Police Framework. It handles knowledge service configuration storage
as JSON objects in Minio, ensuring idempotency and proper error handling.
The implementation stores knowledge service configurations as JSON objects
in Minio, following the large payload handling pattern from the architectural
guidelines. Each configuration is stored with its knowledge_service_id as the
key.
"""
import logging
from julee.domain.models.knowledge_service_config import (
KnowledgeServiceConfig,
)
from julee.domain.repositories.knowledge_service_config import (
KnowledgeServiceConfigRepository,
)
from .client import MinioClient, MinioRepositoryMixin
[docs]
class MinioKnowledgeServiceConfigRepository(
KnowledgeServiceConfigRepository, MinioRepositoryMixin
):
"""
Minio implementation of KnowledgeServiceConfigRepository using Minio for
persistence.
This implementation stores knowledge service configurations as JSON
objects:
- Knowledge Service Configs: JSON objects in the
"knowledge-service-configs" bucket
Each configuration is stored with its knowledge_service_id as the object
name for efficient retrieval and updates.
"""
def __init__(self, client: MinioClient) -> None:
"""Initialize repository with Minio client.
Args:
client: MinioClient protocol implementation (real or fake)
"""
[docs]
self.logger = logging.getLogger("MinioKnowledgeServiceConfigRepository")
[docs]
self.bucket_name = "knowledge-service-configs"
self.ensure_buckets_exist(self.bucket_name)
[docs]
async def get(self, knowledge_service_id: str) -> KnowledgeServiceConfig | None:
"""Retrieve a knowledge service configuration by ID.
Args:
knowledge_service_id: Unique knowledge service identifier
Returns:
KnowledgeServiceConfig object if found, None otherwise
"""
object_name = f"config/{knowledge_service_id}"
return self.get_json_object(
bucket_name=self.bucket_name,
object_name=object_name,
model_class=KnowledgeServiceConfig,
not_found_log_message="Knowledge service config not found",
error_log_message="Error retrieving knowledge service config",
extra_log_data={"knowledge_service_id": knowledge_service_id},
)
[docs]
async def save(self, knowledge_service: KnowledgeServiceConfig) -> None:
"""Save a knowledge service configuration.
Args:
knowledge_service: Complete KnowledgeServiceConfig to save
"""
# Update timestamps
self.update_timestamps(knowledge_service)
object_name = f"config/{knowledge_service.knowledge_service_id}"
self.put_json_object(
bucket_name=self.bucket_name,
object_name=object_name,
model=knowledge_service,
success_log_message="Knowledge service config saved successfully",
error_log_message="Error saving knowledge service config",
extra_log_data={
"knowledge_service_id": (knowledge_service.knowledge_service_id),
"service_name": knowledge_service.name,
"service_api": knowledge_service.service_api.value,
},
)
[docs]
async def get_many(
self, knowledge_service_ids: list[str]
) -> dict[str, KnowledgeServiceConfig | None]:
"""Retrieve multiple knowledge service configs by ID.
Args:
knowledge_service_ids: List of unique knowledge service
identifiers
Returns:
Dict mapping knowledge_service_id to KnowledgeServiceConfig (or
None if not found)
"""
# Convert knowledge service IDs to object names
object_names = [f"config/{service_id}" for service_id in knowledge_service_ids]
# Get objects from Minio using batch method
object_results = self.get_many_json_objects(
bucket_name=self.bucket_name,
object_names=object_names,
model_class=KnowledgeServiceConfig,
not_found_log_message="Knowledge service config not found",
error_log_message="Error retrieving knowledge service config",
extra_log_data={"knowledge_service_ids": knowledge_service_ids},
)
# Convert object names back to knowledge service IDs for the result
result: dict[str, KnowledgeServiceConfig | None] = {}
for i, service_id in enumerate(knowledge_service_ids):
object_name = object_names[i]
result[service_id] = object_results[object_name]
return result
[docs]
async def generate_id(self) -> str:
"""Generate a unique knowledge service identifier.
Returns:
Unique knowledge service ID string
"""
return self.generate_id_with_prefix("ks")
[docs]
async def list_all(self) -> list[KnowledgeServiceConfig]:
"""List all knowledge service configurations.
Returns:
List of all knowledge service configurations, sorted by
knowledge_service_id
"""
try:
# Extract knowledge service IDs from objects with config/ prefix
service_ids = self.list_objects_with_prefix_extract_ids(
bucket_name=self.bucket_name,
prefix="config/",
entity_type_name="configs",
)
if not service_ids:
return []
# Get all configurations using the existing get_many method
config_results = await self.get_many(service_ids)
# Filter out None results and sort by knowledge_service_id
configs = [
config for config in config_results.values() if config is not None
]
configs.sort(key=lambda x: x.knowledge_service_id)
self.logger.debug(
"Retrieved configs",
extra={"count": len(configs)},
)
return configs
except Exception as e:
self.logger.error(
"Error listing configs",
exc_info=True,
extra={
"error_type": type(e).__name__,
"error_message": str(e),
"bucket": self.bucket_name,
},
)
# Return empty list on error to avoid breaking the API
return []