Source code for julee.util.temporal.activities

"""
Utilities for working with Temporal activities.

This module provides helper functions for automatically discovering and
collecting activity methods from decorated instances, eliminating the need
for manual activity registration boilerplate.
"""

import inspect
import logging
from typing import Any

[docs] logger = logging.getLogger(__name__)
[docs] def discover_protocol_methods( cls_hierarchy: tuple[type, ...], ) -> dict[str, Any]: """ Discover protocol methods that should be wrapped as activities. This function finds async methods defined in protocol interfaces within a class hierarchy. It's used to automatically collect methods that should be registered as Temporal activities. Args: cls_hierarchy: The class MRO (method resolution order) Returns: Dict mapping method names to method objects from the concrete class """ methods_to_wrap = {} concrete_class = cls_hierarchy[0] # The actual class being decorated # Look for protocol interfaces (classes with runtime_checkable/Protocol) for base_class in cls_hierarchy: # Skip object base class if base_class is object: continue # Check if this is a protocol class has_protocol_attr = hasattr(base_class, "__protocol__") has_is_protocol = getattr(base_class, "_is_protocol", False) has_protocol_in_str = "Protocol" in str(base_class) is_protocol = has_protocol_attr or has_is_protocol or has_protocol_in_str # Only process protocol classes for architectural compliance if is_protocol: # Get method names defined in this class, but get the actual # implementation from the concrete class for name in base_class.__dict__: if name in methods_to_wrap: continue # Already found this method base_method = getattr(base_class, name) # Only wrap async methods that don't start with underscore if inspect.iscoroutinefunction(base_method) and not name.startswith( "_" ): # Get the concrete implementation from the actual class if hasattr(concrete_class, name): concrete_method = getattr(concrete_class, name) methods_to_wrap[name] = concrete_method # Log final results final_method_names = list(methods_to_wrap.keys()) logger.debug(f"Method discovery found {len(methods_to_wrap)}: {final_method_names}") return methods_to_wrap
[docs] def collect_activities_from_instances(*instances: Any) -> list[Any]: """Automatically collect all activity methods from decorated instances. Uses protocol method discovery to find and collect all methods that have been wrapped as Temporal activities by the @temporal_activity_registration decorator. This ensures we don't miss any activities and eliminates boilerplate registration code. Args: *instances: Repository and service instances decorated with @temporal_activity_registration Returns: List of activity methods ready for Worker registration Example: # Instead of manually listing all activities: activities = [ repo.generate_id, repo.save, repo.get, # ... many more lines ] # Use automatic discovery: activities = collect_activities_from_instances( temporal_assembly_repo, temporal_document_repo, temporal_knowledge_service, ) """ activities = [] for instance in instances: # Use the same discovery logic as the decorator methods_to_collect = discover_protocol_methods(instance.__class__.__mro__) # Get the actual bound methods from the instance for method_name in methods_to_collect: if hasattr(instance, method_name): bound_method = getattr(instance, method_name) activities.append(bound_method) logger.debug( f"Collected activity method: " f"{instance.__class__.__name__}.{method_name}" ) logger.info( f"Collected {len(activities)} activities from " f"{len(instances)} instances" ) return activities