Source code for julee.repositories.memory.base

"""
Memory repository base classes and mixins.

This module provides common functionality for in-memory repository
implementations, reducing code duplication and ensuring consistent patterns
across all memory-based repositories in the julee domain.

The MemoryRepositoryMixin encapsulates common patterns like:
- Dictionary-based storage management
- Standardized logging patterns
- ID generation with consistent prefixes
- Timestamp management (created_at, updated_at)
- Generic CRUD operations with proper error handling

Classes using this mixin must provide:
- self.storage_dict: Dict[str, T] for entity storage
- self.entity_name: str for logging and ID generation
- self.logger: logging.Logger instance
"""

import uuid
from datetime import datetime, timezone
from typing import Any, Generic, TypeVar

from pydantic import BaseModel

[docs] T = TypeVar("T", bound=BaseModel)
[docs] class MemoryRepositoryMixin(Generic[T]): """ Mixin that provides common repository patterns for memory implementations. This mixin encapsulates common functionality used across all memory repository implementations, including: - Dictionary-based entity storage and retrieval - Standardized logging patterns with consistent messaging - ID generation with configurable prefixes - Timestamp management (created_at if None, always updated_at) - Generic error handling patterns Classes using this mixin must provide: - self.storage_dict: Dict[str, T] instance for entity storage - self.entity_name: str for logging and ID generation prefixes - self.logger: logging.Logger instance (typically set in __init__) """ # Type annotations for attributes that implementing classes must provide
[docs] storage_dict: dict[str, T]
[docs] entity_name: str
[docs] logger: Any # logging.Logger, but avoiding import
[docs] def get_entity(self, entity_id: str) -> T | None: """Get an entity from memory storage with standardized logging. Args: entity_id: Unique entity identifier Returns: Entity if found, None otherwise """ self.logger.debug( f"Memory{self.entity_name}Repository: Attempting to retrieve " f"{self.entity_name.lower()}", extra={f"{self.entity_name.lower()}_id": entity_id}, ) entity = self.storage_dict.get(entity_id) if entity is None: self.logger.debug( f"Memory{self.entity_name}Repository: {self.entity_name} " f"not found", extra={f"{self.entity_name.lower()}_id": entity_id}, ) return None # Log success with entity-specific details extra_data = {f"{self.entity_name.lower()}_id": entity_id} self._add_entity_specific_log_data(entity, extra_data) self.logger.info( f"Memory{self.entity_name}Repository: {self.entity_name} " f"retrieved successfully", extra=extra_data, ) return entity
[docs] def get_many_entities(self, entity_ids: list[str]) -> dict[str, T | None]: """Get multiple entities from memory storage with standardized logging. Args: entity_ids: List of unique entity identifiers Returns: Dict mapping entity_id to entity (or None if not found) """ self.logger.debug( f"Memory{self.entity_name}Repository: Attempting to retrieve " f"multiple {self.entity_name.lower()}s", extra={ f"{self.entity_name.lower()}_ids": entity_ids, "count": len(entity_ids), }, ) result: dict[str, T | None] = {} found_count = 0 for entity_id in entity_ids: entity = self.storage_dict.get(entity_id) result[entity_id] = entity if entity is not None: found_count += 1 self.logger.info( f"Memory{self.entity_name}Repository: Retrieved " f"{found_count}/{len(entity_ids)} {self.entity_name.lower()}s", extra={ f"{self.entity_name.lower()}_ids": entity_ids, "requested_count": len(entity_ids), "found_count": found_count, "missing_count": len(entity_ids) - found_count, }, ) return result
[docs] def save_entity(self, entity: T, entity_id_field: str) -> None: """Save an entity to memory storage with timestamp management. Args: entity: Entity to save entity_id_field: Name of the ID field on the entity """ entity_id = getattr(entity, entity_id_field) # Log save attempt with entity-specific details log_extra = {f"{self.entity_name.lower()}_id": entity_id} self._add_entity_specific_log_data(entity, log_extra) self.logger.debug( f"Memory{self.entity_name}Repository: Saving " f"{self.entity_name.lower()}", extra=log_extra, ) # Update timestamps self.update_timestamps(entity) # Store the entity (idempotent - will overwrite if exists) self.storage_dict[entity_id] = entity # Log success with final state success_extra = {f"{self.entity_name.lower()}_id": entity_id} self._add_entity_specific_log_data(entity, success_extra) self.logger.info( f"Memory{self.entity_name}Repository: {self.entity_name} " f"saved successfully", extra=success_extra, )
[docs] def generate_entity_id(self, prefix: str | None = None) -> str: """Generate a unique entity ID with consistent format. Args: prefix: Optional prefix for the ID. If None, uses entity_name Returns: Unique entity ID string in format "{prefix}-{uuid}" """ if prefix is None: prefix = self.entity_name.lower() entity_id = f"{prefix}-{uuid.uuid4()}" self.logger.debug( f"Memory{self.entity_name}Repository: Generated " f"{self.entity_name.lower()} ID", extra={f"{self.entity_name.lower()}_id": entity_id}, ) return entity_id
[docs] def update_timestamps(self, entity: T) -> None: """Update timestamps on an entity (created_at if None, always updated_at). Args: entity: Pydantic model with created_at and updated_at fields """ now = datetime.now(timezone.utc) # Set created_at if it's None (for new objects) if ( hasattr(entity, "created_at") and getattr(entity, "created_at", None) is None ): entity.created_at = now # Always update updated_at if hasattr(entity, "updated_at"): entity.updated_at = now
def _add_entity_specific_log_data( self, entity: T, log_data: dict[str, Any] ) -> None: """Add entity-specific data to log entries for richer logging. This method can be overridden by specific repository implementations to add domain-specific logging information. Args: entity: The entity being logged log_data: Dictionary to add logging data to """ # Default implementation adds basic model info if hasattr(entity, "status"): status = entity.status log_data["status"] = ( status.value if hasattr(status, "value") else str(status) ) if hasattr(entity, "updated_at"): updated_at = entity.updated_at if updated_at: log_data["updated_at"] = updated_at.isoformat()