Nether Framework Architecture Review

Framework Overview

The Nether framework is a sophisticated event-driven architecture built around the Mediator Pattern with support for asynchronous message processing, modular design, and workflow orchestration.

⚠️ Critical Architectural Guidance: DDD and Separation of Concerns

The Problem: Mixing Framework Modules with Business Logic

COMMON MISTAKE: Putting business workflows and domain logic directly in framework modules

# ❌ WRONG - Business logic in framework component
class OrderProcessingSaga(Component[ProcessOrder]):
    async def handle(self, message: ProcessOrder, *, dispatch, join_stream):
        # ❌ Business rules and workflows directly in framework component
        if message.total_amount > 1000:
            # Apply enterprise discount
            discount = message.total_amount * 0.1
        
        # ❌ Domain logic mixed with message handling
        if not self._validate_inventory(message.items):
            raise InvalidInventoryError()
        
        # ❌ Complex business workflow in infrastructure layer
        await self._process_payment_with_retry_logic(message)

The Solution: Proper DDD Layering

CORRECT: Framework components delegate to application services

# ✅ RIGHT - Framework component is thin, delegates to application service
class OrderCommandHandler(Component[ProcessOrder]):
    def __init__(self, app: Application, order_service: OrderProcessingService):
        super().__init__(app)
        self._order_service = order_service  # Application layer
    
    async def handle(self, message: ProcessOrder, *, dispatch, join_stream):
        # ✅ Delegate to application service
        success, result = await self._order_service.process_order(
            message.order_id, message.customer_id, message.items
        )
        
        # ✅ Framework only handles coordination and events
        if success:
            await dispatch(OrderCreated(order_id=message.order_id))
        else:
            await dispatch(OrderFailed(order_id=message.order_id, error=result))

Architecture Layers

┌─────────────────────────────────────────┐
│ INFRASTRUCTURE LAYER                    │
│ • Framework Modules (Message Handlers)  │ ← Nether modules go here
│ • Repositories (Implementations)        │
│ • External Service Adapters            │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│ APPLICATION LAYER                       │
│ • Application Services (Use Cases)      │ ← Business workflows go here
│ • Command/Query Handlers                │
│ • Workflow Orchestration                │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│ DOMAIN LAYER                           │
│ • Entities & Value Objects             │ ← Business logic goes here
│ • Domain Services                      │
│ • Business Rules & Invariants          │
└─────────────────────────────────────────┘

What Framework Modules Should Do

Framework modules should be THIN and only handle:

  1. Message Routing: Dispatch messages to appropriate application services

  2. Event Emission: Convert service results into domain events

  3. Coordination: Orchestrate calls between different services

  4. Cross-cutting Concerns: Logging, metrics, error handling

What Framework Modules Should NOT Do

Framework modules should NEVER contain:

  1. Business Rules: Domain logic belongs in domain/application layers

  2. Workflow Logic: Complex business processes belong in application services

  3. Data Validation: Business validation belongs in domain entities

  4. State Management: Domain state belongs in entities and aggregates

Example: Proper Separation

See examples/proper_ddd_example.py for a complete demonstration of:

  • ✅ Domain entities with business logic

  • ✅ Application services with use cases

  • ✅ Framework modules that only handle messages

  • ✅ Clear dependency flow: Infrastructure → Application → Domain

Core Components

1. Message Types (common.py)

@dataclass(frozen=True, kw_only=True)
class Message:
    created_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

@dataclass(frozen=True)
class Command(Message): ...    # Intent to change state

@dataclass(frozen=True)  
class Query(Message): ...      # Request for information

@dataclass(frozen=True)
class Event(Message): ...      # Something that happened

Key Features:

  • Commands: Represent intent to perform an action (imperative)

  • Queries: Request information without side effects

  • Events: Notifications about something that occurred (past tense)

  • All messages are immutable (frozen=True) and include timestamps

2. Mediator (mediator.py)

The central orchestrator that routes messages between modules.

Key Features:

  • Singleton Pattern: Ensures single instance across application

  • Context Management: Isolated units of work with dedicated queues

  • Asynchronous Processing: Non-blocking message handling

  • Module Registration: Dynamic service discovery and routing

Architecture:

Application
    ↓
Mediator (Singleton)
    ↓
Context (Unit of Work)
    ↓
Modules (Message Handlers)

3. Components (component.py)

Self-contained units that handle specific message types.

class Component[T: Message](ComponentProtocol[T]):
    @property
    def supports(self) -> type[T]:
        # Automatically determined from generic type parameter
        
    async def handle(self, message: Message, *, dispatch, join_stream):
        # Process the message and optionally dispatch new messages

4. Application (application.py)

The main application container that manages the lifecycle.

What You Can Do with Event Dispatching and Commands

1. Simple Command-Event Flow

# Command: Intent to do something
@dataclass(frozen=True, slots=True, kw_only=True)
class ProcessOrder(Command):
    order_id: str
    customer_id: str

# Event: Something that happened
@dataclass(frozen=True, slots=True, kw_only=True)
class OrderProcessed(Event):
    order_id: str
    status: str

class OrderProcessor(Module[ProcessOrder]):
    async def handle(self, message: ProcessOrder, *, dispatch, join_stream):
        # Process the order
        result = await self._process_order(message.order_id)
        
        # Dispatch result event
        await dispatch(OrderProcessed(
            order_id=message.order_id,
            status="completed"
        ))

2. Chain of Events (Event Sourcing Pattern)

class OrderEventListener(Module[OrderProcessed]):
    async def handle(self, message: OrderProcessed, *, dispatch, join_stream):
        # React to order being processed
        if message.status == "completed":
            await dispatch(SendConfirmationEmail(order_id=message.order_id))
            await dispatch(UpdateInventory(order_id=message.order_id))

3. Saga Pattern for Distributed Transactions

class PaymentSaga(Module[OrderProcessed | PaymentFailed]):
    async def handle(self, message, *, dispatch, join_stream):
        match message:
            case OrderProcessed():
                await dispatch(ProcessPayment(order_id=message.order_id))
            case PaymentFailed():
                await dispatch(RefundOrder(order_id=message.order_id))

Creating Workflows and Pipelines

1. Sequential Workflows

# Define workflow steps
steps = ["validate", "process", "notify"]

class WorkflowManager(Module[StartWorkflow]):
    async def handle(self, message: StartWorkflow, *, dispatch, join_stream):
        for step in steps:
            await dispatch(WorkflowStep(
                workflow_id=message.workflow_id,
                step_name=step
            ))

2. Parallel Processing Pipelines

class DataPipeline(Module[ProcessData]):
    async def handle(self, message: ProcessData, *, dispatch, join_stream):
        # Start parallel processing stages
        tasks = []
        for processor in ["validator", "enricher", "transformer"]:
            tasks.append(dispatch(ProcessDataStage(
                data=message.data,
                processor=processor
            )))
        
        await asyncio.gather(*tasks)

3. Conditional Workflows

class ConditionalWorkflow(Module[WorkflowStep]):
    async def handle(self, message: WorkflowStep, *, dispatch, join_stream):
        if message.step_name == "decision_point":
            if self._should_approve(message.data):
                await dispatch(WorkflowStep(step_name="approve"))
            else:
                await dispatch(WorkflowStep(step_name="reject"))

Cycle Detection in Workflows

Why Cycle Detection Matters

  • Infinite Loops: Prevents workflows from running indefinitely

  • Resource Exhaustion: Avoids memory/CPU consumption from circular dependencies

  • Deadlocks: Prevents mutual dependencies that can’t be resolved

Implementation Approaches

1. Depth-First Search (DFS) Cycle Detection

class CycleDetector:
    def has_cycle(self, workflow_type: str) -> bool:
        visited = set()
        rec_stack = set()  # Recursion stack for DFS
        
        def dfs(node: str) -> bool:
            visited.add(node)
            rec_stack.add(node)
            
            for neighbor in self.graph.get(node, []):
                if neighbor not in visited:
                    if dfs(neighbor):
                        return True
                elif neighbor in rec_stack:  # Back edge = cycle
                    return True
                    
            rec_stack.remove(node)
            return False
        
        # Check all components
        for step in self.workflow_definitions[workflow_type]:
            if step not in visited:
                if dfs(step):
                    return True
        return False

2. Topological Sort for Execution Order

def get_topological_order(self, workflow_type: str) -> list[str] | None:
    """Returns execution order if no cycles, None if cycles exist"""
    
    # Kahn's Algorithm
    in_degree = dict.fromkeys(steps, 0)
    
    # Calculate in-degrees
    for step, deps in workflow_definitions.items():
        for dep in deps:
            in_degree[step] += 1
    
    # Start with steps having no dependencies
    queue = deque([step for step, degree in in_degree.items() if degree == 0])
    result = []
    
    while queue:
        current = queue.popleft()
        result.append(current)
        
        # Update dependent steps
        for dependent in self._get_dependents(current):
            in_degree[dependent] -= 1
            if in_degree[dependent] == 0:
                queue.append(dependent)
    
    # If not all steps processed, there's a cycle
    return result if len(result) == len(steps) else None

3. Runtime Cycle Prevention

class WorkflowExecutor:
    def __init__(self):
        self.execution_stack: dict[str, set[str]] = {}
    
    async def execute_step(self, workflow_id: str, step_name: str):
        if workflow_id not in self.execution_stack:
            self.execution_stack[workflow_id] = set()
            
        if step_name in self.execution_stack[workflow_id]:
            raise CyclicExecutionError(f"Cycle detected: {step_name}")
            
        self.execution_stack[workflow_id].add(step_name)
        try:
            await self._do_execute_step(step_name)
        finally:
            self.execution_stack[workflow_id].remove(step_name)

Advanced Patterns

1. Event Streaming with Join Streams

class StreamProcessor(Module[DataEvent]):
    async def handle(self, message: DataEvent, *, dispatch, join_stream):
        stream_queue, stop_event = join_stream()
        
        # Subscribe to real-time events
        while not stop_event.is_set():
            try:
                event_data = await asyncio.wait_for(
                    stream_queue.get(), 
                    timeout=1.0
                )
                await self._process_stream_data(event_data)
            except asyncio.TimeoutError:
                continue

2. Message Aggregation

class OrderAggregator(Module[OrderItemProcessed]):
    def __init__(self, application):
        super().__init__(application)
        self.pending_orders: dict[str, set[str]] = {}
        
    async def handle(self, message: OrderItemProcessed, *, dispatch, join_stream):
        order_id = message.order_id
        item_id = message.item_id
        
        if order_id not in self.pending_orders:
            self.pending_orders[order_id] = set()
            
        self.pending_orders[order_id].add(item_id)
        
        # Check if all items processed
        if len(self.pending_orders[order_id]) >= message.total_items:
            await dispatch(OrderFullyProcessed(order_id=order_id))
            del self.pending_orders[order_id]

3. Circuit Breaker Pattern

class CircuitBreakerModule(Module[ExternalServiceCall]):
    def __init__(self, application):
        super().__init__(application)
        self.failure_count = 0
        self.last_failure_time = 0
        self.circuit_open = False
        
    async def handle(self, message: ExternalServiceCall, *, dispatch, join_stream):
        if self._should_allow_request():
            try:
                result = await self._call_external_service(message)
                self._on_success()
                await dispatch(ExternalServiceSuccess(result=result))
            except Exception as e:
                self._on_failure()
                await dispatch(ExternalServiceFailure(error=str(e)))

Workflow Examples and Use Cases

1. E-commerce Order Processing

# Workflow definition with dependencies
order_workflow = {
    "validate_order": [],                    # Entry point
    "check_inventory": ["validate_order"],   # Depends on validation
    "process_payment": ["validate_order"],   # Parallel with inventory
    "ship_order": ["check_inventory", "process_payment"],  # Waits for both
    "send_confirmation": ["ship_order"]      # Final step
}

Flow Visualization:

validate_order
    ↓
[check_inventory] [process_payment]
    ↓                ↓
    └── ship_order ──┘
           ↓
    send_confirmation

2. Data Processing Pipeline

# ETL Pipeline with sequential dependencies
data_pipeline = {
    "extract_data": [],
    "validate_data": ["extract_data"],
    "transform_data": ["validate_data"],
    "enrich_data": ["transform_data"],
    "load_data": ["enrich_data"],
    "generate_report": ["load_data"]
}

3. Content Publishing Workflow

# Complex workflow with multiple branches
publishing_workflow = {
    "draft_content": [],
    "review_content": ["draft_content"],
    "approve_content": ["review_content"],
    "format_content": ["approve_content"],
    "generate_thumbnail": ["approve_content"],
    "publish_content": ["format_content", "generate_thumbnail"],
    "notify_subscribers": ["publish_content"],
    "update_analytics": ["publish_content"]
}

Compensation and Error Handling

1. Saga Pattern with Compensation

class OrderSaga(Module[OrderEvent]):
    def __init__(self, application):
        super().__init__(application)
        self.compensation_stack: dict[str, list[str]] = {}
    
    async def handle(self, message: OrderEvent, *, dispatch, join_stream):
        match message:
            case PaymentFailed():
                # Execute compensation in reverse order
                compensations = self.compensation_stack.get(message.order_id, [])
                for action in reversed(compensations):
                    await dispatch(CompensationAction(action=action))

2. Circuit Breaker for External Services

class ExternalServiceModule(Module[ExternalCall]):
    def __init__(self, application):
        super().__init__(application)
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_time=30,
            expected_exception=ServiceUnavailableError
        )
    
    async def handle(self, message: ExternalCall, *, dispatch, join_stream):
        try:
            result = await self.circuit_breaker.call(
                self._make_external_call, message.data
            )
            await dispatch(ExternalCallSuccess(result=result))
        except CircuitBreakerOpenError:
            await dispatch(ExternalCallFailed(reason="circuit_breaker_open"))

Benefits of This Architecture

  1. Loose Coupling: Modules don’t directly depend on each other

  2. Testability: Easy to test individual components in isolation

  3. Scalability: Can distribute modules across different processes/machines

  4. Maintainability: Clear separation of concerns

  5. Flexibility: Easy to add new features without modifying existing code

  6. Error Isolation: Failures in one module don’t crash the entire system

  7. Auditability: All messages are logged and traceable

  8. Workflow Validation: Built-in cycle detection prevents infinite loops

Best Practices

1. Message Design

  • Keep messages immutable and focused

  • Use clear, descriptive names (OrderProcessed vs MessageReceived)

  • Include all necessary data to avoid additional lookups

  • Version your messages for backward compatibility

2. Module Responsibility

  • Each module should have a single responsibility

  • Modules should be stateless where possible

  • Use dependency injection for external services

  • Handle errors gracefully and emit appropriate events

3. Workflow Design

  • Always validate workflow definitions for cycles

  • Design for idempotency - steps should be safe to retry

  • Include compensation logic for complex workflows

  • Monitor workflow execution and completion rates

4. Performance Considerations

  • Monitor message processing times and queue depths

  • Use connection pooling for database/external service calls

  • Implement backpressure handling for high-throughput scenarios

  • Consider batching for bulk operations

5. Error Handling

  • Always handle exceptions and emit appropriate events

  • Implement circuit breakers for external service calls

  • Use exponential backoff for retries

  • Log errors with sufficient context for debugging

6. Testing Strategies

  • Unit test individual modules in isolation

  • Integration test workflow scenarios

  • Use mock dispatch functions for testing

  • Test error scenarios and compensation logic

This architecture provides a solid foundation for building complex, distributed systems with clear message flows, proper error handling, and scalable workflow orchestration.