Source code for julee.domain.models.document.document

"""
Document domain models for the Capture, Extract, Assemble, Publish workflow.

This module contains the core document domain objects that represent
documents and their metadata in the CEAP workflow system.

All domain models use Pydantic BaseModel for validation, serialization,
and type safety, following the patterns established in the sample project.
"""

from collections.abc import Callable
from datetime import datetime, timezone
from enum import Enum
from typing import Any

from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator

from julee.domain.models.custom_fields.content_stream import (
    ContentStream,
)


[docs] def delegate_to_content(*method_names: str) -> Callable[[type], type]: """Decorator to delegate IO methods to the content stream property.""" def decorator(cls: type) -> type: for method_name in method_names: def make_delegated_method(name: str) -> Callable[..., Any]: def delegated_method(self: Any, *args: Any, **kwargs: Any) -> Any: return getattr(self.content, name)(*args, **kwargs) delegated_method.__name__ = name delegated_method.__doc__ = f"Delegate {name} to content stream." return delegated_method setattr(cls, method_name, make_delegated_method(method_name)) return cls return decorator
[docs] class DocumentStatus(str, Enum): """Status of a document through the Capture, Extract, Assemble, Publish pipeline."""
[docs] CAPTURED = "captured"
[docs] REGISTERED = "registered" # Registered with knowledge service
# Assembly specification types determined
[docs] ASSEMBLY_SPECIFICATION_IDENTIFIED = "assembly_specification_identified"
[docs] EXTRACTED = "extracted" # Extractions completed
[docs] ASSEMBLED = "assembled" # Template rendered and policies applied
[docs] PUBLISHED = "published"
[docs] FAILED = "failed"
@delegate_to_content("read", "seek", "tell")
[docs] class Document(BaseModel): """Complete document entity including content and metadata. This is the primary domain model that represents a complete document in the CEAP workflow system. Content is provided as a ContentStream for efficient handling of both small and large documents. The content stream is excluded from JSON serialization - use separate content endpoints for streaming binary data over HTTP. """ # Core document identification
[docs] document_id: str
[docs] original_filename: str
[docs] content_type: str
[docs] size_bytes: int = Field(gt=0, description="Size must be positive")
[docs] content_multihash: str = Field( description="Multihash of document content for integrity verification" )
# Document processing state
[docs] status: DocumentStatus = DocumentStatus.CAPTURED
[docs] knowledge_service_id: str | None = None
[docs] assembly_types: list[str] = Field(default_factory=list)
# Timestamps
[docs] created_at: datetime | None = Field( default_factory=lambda: datetime.now(timezone.utc) )
[docs] updated_at: datetime | None = Field( default_factory=lambda: datetime.now(timezone.utc) )
# Additional data and content stream
[docs] additional_metadata: dict[str, Any] = Field(default_factory=dict)
[docs] content: ContentStream | None = Field(default=None, exclude=True)
[docs] content_bytes: bytes | None = Field( default=None, description="Raw content as bytes for cases where direct in-memory " "binary payloads are preferred over ContentStream.", )
@field_validator("document_id") @classmethod
[docs] def document_id_must_not_be_empty(cls, v: str) -> str: if not v or not v.strip(): raise ValueError("Document ID cannot be empty") return v.strip()
@field_validator("original_filename") @classmethod
[docs] def filename_must_not_be_empty(cls, v: str) -> str: if not v or not v.strip(): raise ValueError("Original filename cannot be empty") return v.strip()
@field_validator("content_type") @classmethod
[docs] def content_type_must_not_be_empty(cls, v: str) -> str: if not v or not v.strip(): raise ValueError("Content type cannot be empty") return v.strip()
@field_validator("content_multihash") @classmethod
[docs] def content_multihash_must_not_be_empty(cls, v: str) -> str: if not v or not v.strip(): raise ValueError("Content multihash cannot be empty") return v.strip()
@model_validator(mode="after")
[docs] def validate_content_fields(self, info: ValidationInfo) -> "Document": """Ensure document has at least content, or content_bytes.""" # Skip validation in Temporal deserialization context if info.context and info.context.get("temporal_validation"): return self has_content = self.content is not None has_content_bytes = self.content_bytes is not None if not (has_content or has_content_bytes): raise ValueError("Document must have one of: content, or content_bytes.") return self