Nether Framework Architecture Review
Framework Overview
The Nether framework is a sophisticated event-driven architecture built around the Mediator Pattern with support for asynchronous message processing, modular design, and workflow orchestration.
⚠️ Critical Architectural Guidance: DDD and Separation of Concerns
The Problem: Mixing Framework Modules with Business Logic
❌ COMMON MISTAKE: Putting business workflows and domain logic directly in framework modules
# ❌ WRONG - Business logic in framework component
class OrderProcessingSaga(Component[ProcessOrder]):
async def handle(self, message: ProcessOrder, *, dispatch, join_stream):
# ❌ Business rules and workflows directly in framework component
if message.total_amount > 1000:
# Apply enterprise discount
discount = message.total_amount * 0.1
# ❌ Domain logic mixed with message handling
if not self._validate_inventory(message.items):
raise InvalidInventoryError()
# ❌ Complex business workflow in infrastructure layer
await self._process_payment_with_retry_logic(message)
The Solution: Proper DDD Layering
✅ CORRECT: Framework components delegate to application services
# ✅ RIGHT - Framework component is thin, delegates to application service
class OrderCommandHandler(Component[ProcessOrder]):
def __init__(self, app: Application, order_service: OrderProcessingService):
super().__init__(app)
self._order_service = order_service # Application layer
async def handle(self, message: ProcessOrder, *, dispatch, join_stream):
# ✅ Delegate to application service
success, result = await self._order_service.process_order(
message.order_id, message.customer_id, message.items
)
# ✅ Framework only handles coordination and events
if success:
await dispatch(OrderCreated(order_id=message.order_id))
else:
await dispatch(OrderFailed(order_id=message.order_id, error=result))
Architecture Layers
┌─────────────────────────────────────────┐
│ INFRASTRUCTURE LAYER │
│ • Framework Modules (Message Handlers) │ ← Nether modules go here
│ • Repositories (Implementations) │
│ • External Service Adapters │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ APPLICATION LAYER │
│ • Application Services (Use Cases) │ ← Business workflows go here
│ • Command/Query Handlers │
│ • Workflow Orchestration │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ DOMAIN LAYER │
│ • Entities & Value Objects │ ← Business logic goes here
│ • Domain Services │
│ • Business Rules & Invariants │
└─────────────────────────────────────────┘
What Framework Modules Should Do
✅ Framework modules should be THIN and only handle:
Message Routing: Dispatch messages to appropriate application services
Event Emission: Convert service results into domain events
Coordination: Orchestrate calls between different services
Cross-cutting Concerns: Logging, metrics, error handling
What Framework Modules Should NOT Do
❌ Framework modules should NEVER contain:
Business Rules: Domain logic belongs in domain/application layers
Workflow Logic: Complex business processes belong in application services
Data Validation: Business validation belongs in domain entities
State Management: Domain state belongs in entities and aggregates
Example: Proper Separation
See examples/proper_ddd_example.py
for a complete demonstration of:
✅ Domain entities with business logic
✅ Application services with use cases
✅ Framework modules that only handle messages
✅ Clear dependency flow: Infrastructure → Application → Domain
Core Components
1. Message Types (common.py
)
@dataclass(frozen=True, kw_only=True)
class Message:
created_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
@dataclass(frozen=True)
class Command(Message): ... # Intent to change state
@dataclass(frozen=True)
class Query(Message): ... # Request for information
@dataclass(frozen=True)
class Event(Message): ... # Something that happened
Key Features:
Commands: Represent intent to perform an action (imperative)
Queries: Request information without side effects
Events: Notifications about something that occurred (past tense)
All messages are immutable (
frozen=True
) and include timestamps
2. Mediator (mediator.py
)
The central orchestrator that routes messages between modules.
Key Features:
Singleton Pattern: Ensures single instance across application
Context Management: Isolated units of work with dedicated queues
Asynchronous Processing: Non-blocking message handling
Module Registration: Dynamic service discovery and routing
Architecture:
Application
↓
Mediator (Singleton)
↓
Context (Unit of Work)
↓
Modules (Message Handlers)
3. Components (component.py
)
Self-contained units that handle specific message types.
class Component[T: Message](ComponentProtocol[T]):
@property
def supports(self) -> type[T]:
# Automatically determined from generic type parameter
async def handle(self, message: Message, *, dispatch, join_stream):
# Process the message and optionally dispatch new messages
4. Application (application.py
)
The main application container that manages the lifecycle.
What You Can Do with Event Dispatching and Commands
1. Simple Command-Event Flow
# Command: Intent to do something
@dataclass(frozen=True, slots=True, kw_only=True)
class ProcessOrder(Command):
order_id: str
customer_id: str
# Event: Something that happened
@dataclass(frozen=True, slots=True, kw_only=True)
class OrderProcessed(Event):
order_id: str
status: str
class OrderProcessor(Module[ProcessOrder]):
async def handle(self, message: ProcessOrder, *, dispatch, join_stream):
# Process the order
result = await self._process_order(message.order_id)
# Dispatch result event
await dispatch(OrderProcessed(
order_id=message.order_id,
status="completed"
))
2. Chain of Events (Event Sourcing Pattern)
class OrderEventListener(Module[OrderProcessed]):
async def handle(self, message: OrderProcessed, *, dispatch, join_stream):
# React to order being processed
if message.status == "completed":
await dispatch(SendConfirmationEmail(order_id=message.order_id))
await dispatch(UpdateInventory(order_id=message.order_id))
3. Saga Pattern for Distributed Transactions
class PaymentSaga(Module[OrderProcessed | PaymentFailed]):
async def handle(self, message, *, dispatch, join_stream):
match message:
case OrderProcessed():
await dispatch(ProcessPayment(order_id=message.order_id))
case PaymentFailed():
await dispatch(RefundOrder(order_id=message.order_id))
Creating Workflows and Pipelines
1. Sequential Workflows
# Define workflow steps
steps = ["validate", "process", "notify"]
class WorkflowManager(Module[StartWorkflow]):
async def handle(self, message: StartWorkflow, *, dispatch, join_stream):
for step in steps:
await dispatch(WorkflowStep(
workflow_id=message.workflow_id,
step_name=step
))
2. Parallel Processing Pipelines
class DataPipeline(Module[ProcessData]):
async def handle(self, message: ProcessData, *, dispatch, join_stream):
# Start parallel processing stages
tasks = []
for processor in ["validator", "enricher", "transformer"]:
tasks.append(dispatch(ProcessDataStage(
data=message.data,
processor=processor
)))
await asyncio.gather(*tasks)
3. Conditional Workflows
class ConditionalWorkflow(Module[WorkflowStep]):
async def handle(self, message: WorkflowStep, *, dispatch, join_stream):
if message.step_name == "decision_point":
if self._should_approve(message.data):
await dispatch(WorkflowStep(step_name="approve"))
else:
await dispatch(WorkflowStep(step_name="reject"))
Cycle Detection in Workflows
Why Cycle Detection Matters
Infinite Loops: Prevents workflows from running indefinitely
Resource Exhaustion: Avoids memory/CPU consumption from circular dependencies
Deadlocks: Prevents mutual dependencies that can’t be resolved
Implementation Approaches
1. Depth-First Search (DFS) Cycle Detection
class CycleDetector:
def has_cycle(self, workflow_type: str) -> bool:
visited = set()
rec_stack = set() # Recursion stack for DFS
def dfs(node: str) -> bool:
visited.add(node)
rec_stack.add(node)
for neighbor in self.graph.get(node, []):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack: # Back edge = cycle
return True
rec_stack.remove(node)
return False
# Check all components
for step in self.workflow_definitions[workflow_type]:
if step not in visited:
if dfs(step):
return True
return False
2. Topological Sort for Execution Order
def get_topological_order(self, workflow_type: str) -> list[str] | None:
"""Returns execution order if no cycles, None if cycles exist"""
# Kahn's Algorithm
in_degree = dict.fromkeys(steps, 0)
# Calculate in-degrees
for step, deps in workflow_definitions.items():
for dep in deps:
in_degree[step] += 1
# Start with steps having no dependencies
queue = deque([step for step, degree in in_degree.items() if degree == 0])
result = []
while queue:
current = queue.popleft()
result.append(current)
# Update dependent steps
for dependent in self._get_dependents(current):
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
# If not all steps processed, there's a cycle
return result if len(result) == len(steps) else None
3. Runtime Cycle Prevention
class WorkflowExecutor:
def __init__(self):
self.execution_stack: dict[str, set[str]] = {}
async def execute_step(self, workflow_id: str, step_name: str):
if workflow_id not in self.execution_stack:
self.execution_stack[workflow_id] = set()
if step_name in self.execution_stack[workflow_id]:
raise CyclicExecutionError(f"Cycle detected: {step_name}")
self.execution_stack[workflow_id].add(step_name)
try:
await self._do_execute_step(step_name)
finally:
self.execution_stack[workflow_id].remove(step_name)
Advanced Patterns
1. Event Streaming with Join Streams
class StreamProcessor(Module[DataEvent]):
async def handle(self, message: DataEvent, *, dispatch, join_stream):
stream_queue, stop_event = join_stream()
# Subscribe to real-time events
while not stop_event.is_set():
try:
event_data = await asyncio.wait_for(
stream_queue.get(),
timeout=1.0
)
await self._process_stream_data(event_data)
except asyncio.TimeoutError:
continue
2. Message Aggregation
class OrderAggregator(Module[OrderItemProcessed]):
def __init__(self, application):
super().__init__(application)
self.pending_orders: dict[str, set[str]] = {}
async def handle(self, message: OrderItemProcessed, *, dispatch, join_stream):
order_id = message.order_id
item_id = message.item_id
if order_id not in self.pending_orders:
self.pending_orders[order_id] = set()
self.pending_orders[order_id].add(item_id)
# Check if all items processed
if len(self.pending_orders[order_id]) >= message.total_items:
await dispatch(OrderFullyProcessed(order_id=order_id))
del self.pending_orders[order_id]
3. Circuit Breaker Pattern
class CircuitBreakerModule(Module[ExternalServiceCall]):
def __init__(self, application):
super().__init__(application)
self.failure_count = 0
self.last_failure_time = 0
self.circuit_open = False
async def handle(self, message: ExternalServiceCall, *, dispatch, join_stream):
if self._should_allow_request():
try:
result = await self._call_external_service(message)
self._on_success()
await dispatch(ExternalServiceSuccess(result=result))
except Exception as e:
self._on_failure()
await dispatch(ExternalServiceFailure(error=str(e)))
Workflow Examples and Use Cases
1. E-commerce Order Processing
# Workflow definition with dependencies
order_workflow = {
"validate_order": [], # Entry point
"check_inventory": ["validate_order"], # Depends on validation
"process_payment": ["validate_order"], # Parallel with inventory
"ship_order": ["check_inventory", "process_payment"], # Waits for both
"send_confirmation": ["ship_order"] # Final step
}
Flow Visualization:
validate_order
↓
[check_inventory] [process_payment]
↓ ↓
└── ship_order ──┘
↓
send_confirmation
2. Data Processing Pipeline
# ETL Pipeline with sequential dependencies
data_pipeline = {
"extract_data": [],
"validate_data": ["extract_data"],
"transform_data": ["validate_data"],
"enrich_data": ["transform_data"],
"load_data": ["enrich_data"],
"generate_report": ["load_data"]
}
3. Content Publishing Workflow
# Complex workflow with multiple branches
publishing_workflow = {
"draft_content": [],
"review_content": ["draft_content"],
"approve_content": ["review_content"],
"format_content": ["approve_content"],
"generate_thumbnail": ["approve_content"],
"publish_content": ["format_content", "generate_thumbnail"],
"notify_subscribers": ["publish_content"],
"update_analytics": ["publish_content"]
}
Compensation and Error Handling
1. Saga Pattern with Compensation
class OrderSaga(Module[OrderEvent]):
def __init__(self, application):
super().__init__(application)
self.compensation_stack: dict[str, list[str]] = {}
async def handle(self, message: OrderEvent, *, dispatch, join_stream):
match message:
case PaymentFailed():
# Execute compensation in reverse order
compensations = self.compensation_stack.get(message.order_id, [])
for action in reversed(compensations):
await dispatch(CompensationAction(action=action))
2. Circuit Breaker for External Services
class ExternalServiceModule(Module[ExternalCall]):
def __init__(self, application):
super().__init__(application)
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_time=30,
expected_exception=ServiceUnavailableError
)
async def handle(self, message: ExternalCall, *, dispatch, join_stream):
try:
result = await self.circuit_breaker.call(
self._make_external_call, message.data
)
await dispatch(ExternalCallSuccess(result=result))
except CircuitBreakerOpenError:
await dispatch(ExternalCallFailed(reason="circuit_breaker_open"))
Benefits of This Architecture
Loose Coupling: Modules don’t directly depend on each other
Testability: Easy to test individual components in isolation
Scalability: Can distribute modules across different processes/machines
Maintainability: Clear separation of concerns
Flexibility: Easy to add new features without modifying existing code
Error Isolation: Failures in one module don’t crash the entire system
Auditability: All messages are logged and traceable
Workflow Validation: Built-in cycle detection prevents infinite loops
Best Practices
1. Message Design
Keep messages immutable and focused
Use clear, descriptive names (OrderProcessed vs MessageReceived)
Include all necessary data to avoid additional lookups
Version your messages for backward compatibility
2. Module Responsibility
Each module should have a single responsibility
Modules should be stateless where possible
Use dependency injection for external services
Handle errors gracefully and emit appropriate events
3. Workflow Design
Always validate workflow definitions for cycles
Design for idempotency - steps should be safe to retry
Include compensation logic for complex workflows
Monitor workflow execution and completion rates
4. Performance Considerations
Monitor message processing times and queue depths
Use connection pooling for database/external service calls
Implement backpressure handling for high-throughput scenarios
Consider batching for bulk operations
5. Error Handling
Always handle exceptions and emit appropriate events
Implement circuit breakers for external service calls
Use exponential backoff for retries
Log errors with sufficient context for debugging
6. Testing Strategies
Unit test individual modules in isolation
Integration test workflow scenarios
Use mock dispatch functions for testing
Test error scenarios and compensation logic
This architecture provides a solid foundation for building complex, distributed systems with clear message flows, proper error handling, and scalable workflow orchestration.