Detailed Implementation

aevatar Framework#

Core Layers#

  • Aevatar.Core: Contains the main implementation of the framework
  • Aevatar.Core.Abstractions: Defines interfaces and abstractions
  • Aevatar.EventSourcing.Core: Implements event sourcing functionality
  • GAgentBase#

    The foundation class that provides:

  • Event sourcing capabilities through Orleans' JournaledGrain
  • State management with automatic persistence
  • Event publishing and subscription mechanisms
  • Hierarchical agent relationship management
  • Key Features#

  • Event Handling
    • Automatic event forwarding between agents
    • Custom event handler registration
    • Support for base and specialized handlers
  • State Management
    • Strongly-typed state containers
    • Automatic state persistence
    • State change notification system
  • Agent Registration
    • Dynamic agent registration/unregistration
    • Hierarchical agent relationships
    • Subscription management system
  • Stream Processing
    • Built-in stream provider integration
    • Automatic stream subscription management
    • Event forwarding capabilities
  • 1. Agent Creation and Initialization#

  • Client → GAgentFactory: “Create Agent” The client requests the creation of a new agent instance through the GAgentFactory.
  • GAgentFactory → GAgentBase: “Initialize Agent” Once the factory has decided how to create the agent, it calls into GAgentBase (the agent implementation) to initialize the new agent object.
  • GAgentBase → StateLogEventStorage: “Initialize State” During agent initialization, GAgentBase sets up its initial internal state by interacting with StateLogEventStorage. This storage component is presumably responsible for holding the persisted state or logs of state changes.
  • StateLogEventStorage → GAgentBase: “State Initialized” The storage component confirms that it has initialized the agent’s state (e.g., from a snapshot or a fresh instance).
  • GAgentBase → StreamProvider: “Setup Stream Subscriptions” The agent then subscribes to any relevant external streams (e.g., an event bus or message queue) via a StreamProvider.
  • StreamProvider → GAgentBase: “Subscriptions Active” The agent receives confirmation that these subscriptions are now active (meaning the agent can receive events from the streams).
  • Agent Ready At this point, the agent is considered fully initialized and ready for use.
  • 2. Publishing an Event#

  • Client → GAgentBase: “PublishEventAsync(event)” At runtime, the client wants the agent to handle a new event. The client calls an asynchronous method on GAgentBase to publish the event.
  • GAgentBase → StateLogEventStorage: “AppendEventAsync(event)” Upon receiving the event, the agent appends it to its event log in StateLogEventStorage.
  • GAgentBase → GAgentBase: “UpdateStateAsync(newState)” After logging the event, the agent updates its own in‐memory state to reflect the effect of the newly arrived event. (In some designs, this call might still happen via the storage component, but logically it’s “state is updated” as part of the agent logic.)
  • GAgentBase → StreamProvider: “Publish to Stream” The agent then publishes this event (or a derivative) to a broader system stream so that interested subscribers can act on it.
  • StreamProvider → OtherAgents: “Notify Subscribers” The StreamProvider notifies other subscribers (possibly other agents or external systems) that a new event has been published.
  • (Within GAgentBase) “HandleEventAsync(event)” Internally, the agent may also process the event via some kind of HandleEventAsync call (e.g., applying validations or domain logic).
  • Validate Event If there is any validation logic, the agent confirms the event is valid before applying it.
  • Apply Event to State The agent applies the event (again, or in more detail) to update its in‐memory state.
  • State Updated The in‐memory state is now up to date.
  • Record Event After successfully applying the event, the agent may record additional metadata or confirmations in the event log.
  • Event Handled The process of handling the event (inside the agent) is complete.
  • 3. State Recovery#

  • Client → GAgentBase: “Recover State” At some later point—perhaps after a restart or if we need to restore the agent’s state from persistent storage—the client requests that the agent recover or reconstruct its state.
  • GAgentBase → StateLogEventStorage: “GetSnapshot()” The agent asks the storage for the most recent snapshot of its state.
  • StateLogEventStorage → GAgentBase: “Snapshot(version)” The storage returns the snapshot along with its version number. (For instance, version N.)
  • GAgentBase → StateLogEventStorage: “GetLatestVersion()” The agent then checks the latest persisted version in case there have been subsequent events beyond the snapshot.
  • StateLogEventStorage → GAgentBase: “Latest Version” The storage provides the latest available version (for example, if additional events have been appended, this might be version N+2).
  • alt [Versions Match / Don’t Match]
    • If the snapshot’s version matches the latest version in storage (e.g., N == N), then the agent can use the snapshot directly and skip additional replay.
    • If the snapshot’s version does not match (e.g., N < N+2), there are new events in the log that occurred after the snapshot was taken. The agent must fetch these newer events and apply them.
  • GAgentBase → StateLogEventStorage: “GetEventRecentVersion(version)” (Only if there’s a mismatch.) The agent requests the missing events (those newer than the snapshot’s version).
  • StateLogEventStorage → GAgentBase: “New Events” The storage returns the list of events that happened after the snapshot was taken.
  • Apply Events to Snapshot The agent iterates through these new events and applies them to the snapshot state.
  • UpdateSnapshot(newState) The agent updates the snapshot with the resulting new state (now that the snapshot is caught up).
  • Snapshot Updated The persisted snapshot in StateLogEventStorage is refreshed (if desired) or at least the agent’s in‐memory state is fully up to date.
  • Recovery Complete Finally, the agent signals back to the client that state recovery is finished. The agent is now in the latest known good state and can resume normal operations.
  • Summary#

  • Agent Creation & Initialization: The client asks GAgentFactory to create an agent, which initializes state with StateLogEventStorage and sets up subscriptions via StreamProvider.
  • Event Publishing & Handling: The client (or other systems) publish events to the agent, which appends them to the event storage, updates its in‐memory state, and publishes to an external stream if necessary.
  • State Recovery: When needed, the agent retrieves a snapshot and any subsequent events from the storage, applies all the changes, and ends up with an up‐to‐date state.
  • System Architecture#

    Core Layers#

    Aevatar.Core#

    The main implementation layer contains:

  • GAgentBase: The foundational agent class
  • Event handling and processing mechanisms
  • State management implementation
  • Publishing and subscription logic
  • Aevatar.Core.Abstractions#

    Defines the contract layer including:

  • Interfaces for agents
  • Event definitions
  • State management contracts
  • Message handling protocols
  • Aevatar.EventSourcing.Core#

    Implements event sourcing functionality:

  • Event storage and retrieval
  • Event replay mechanisms
  • Event stream management
  • Consistency guarantees
  • Core Components#

    GAgentBase#

    public abstract class GAgentBase<TState, TEvent>

    The central component of the framework implementing:

  • Event sourcing through Orleans' JournaledGrain
  • State management with automatic persistence
  • Event publishing and subscription
  • Hierarchical agent relationships
  • Key Methods#

  • OnActivateAsync(): Agent activation
  • HandleEventAsync(): Event processing
  • PublishEventAsync(): Event publishing
  • SubscribeToAsync(): Event subscription
  • Event Handling System
  • Event Processing Pipeline#

  • Event Reception
  • Validation
  • Processing
  • State Update
  • Event Publication
  • Event Types Support#

  • System Events
  • Custom Events
  • State Change Events
  • Hierarchical Events
  • State Management
  • Features#

  • Strongly-typed state containers
  • Automatic persistence
  • State change notifications
  • Versioning support
  • State Operations#

  • State initialization
  • State updates
  • State querying
  • State validation
  • Agent Registration and Management
  • Registration Process#

  • Agent Creation
  • Initialization
  • State Setup
  • Subscription Configuration
  • Hierarchical Structure#

  • Parent-Child Relationships
  • Subscription Inheritance
  • Event Propagation
  • Stream Processing
  • Stream Provider Integration#

  • Orleans Stream Provider
  • Custom Stream Implementations
  • Stream Configuration
  • Stream Operations#

  • Stream Creation
  • Subscription Management
  • Event Broadcasting
  • Stream Monitoring
  • Implementation Details#

    Event Sourcing Implementation#

    Event Storage#

    public interface IEventStore<TEvent{Task AppendEventAsync(TEvent @event);
        Task<IEnumerable<TEvent>> GetEventsAsync();
    }

    State Management#

    public interface IStateManager<TState{Task<TState> GetStateAsync();Task UpdateStateAsync(TState newState);
    }

    Observer Pattern Implementation#

    The framework uses the Observer pattern for event handling:

  • EventWrapperBaseAsyncObserver: Base observer implementation
  • GAgentAsyncObserver: Agent-specific observer
  • GAgentBase.Observers: Observer management
  • Usage Guide#

    Creating a New Agent#

    [GAgent]
    public class CustomAgent : GAgentBase<CustomState, CustomEvent{protected override Task OnActivateAsync()
        {// Initialization logicreturn base.OnActivateAsync();
        }
    protected override Task HandleEventAsync(CustomEvent @event)
        {// Event handling logicreturn Task.CompletedTask;
        }
    }

    Event Publishing#

    await PublishEventAsync(new CustomEvent 
    {// Event properties
    });

    Subscription Setup#

    await SubscribeToAsync<CustomEvent>(async (@event) => 
    {// Subscription handling logic
    });

    Best Practices#

    Event Design#

  • Keep events immutable
  • Include timestamp and correlation ID
  • Use meaningful event names
  • Include necessary context
  • State Management#

  • Minimize state size
  • Use appropriate state versioning
  • Implement state validation
  • Handle state conflicts
  • Agent Design#

  • Single Responsibility Principle
  • Clear hierarchical structure
  • Proper error handling
  • Logging and monitoring
  • Performance Considerations#

  • Batch operations when possible
  • Implement proper indexing
  • Use appropriate grain placement
  • Monitor memory usage
  • Error Handling#

  • Implement retry mechanisms
  • Handle network failures
  • Proper exception logging
  • State recovery procedures
  • Security Considerations#

    Authentication#

  • Implement proper authentication
  • Use secure communication
  • Handle token management
  • Authorization#

  • Role-based access control
  • Agent-level permissions
  • Event-level security
  • Data Protection#

  • Encrypt sensitive data
  • Secure state persistence
  • Protected event streams
  • Monitoring and Maintenance#

    Logging#

  • Structured logging
  • Performance metrics
  • Error tracking
  • Event auditing
  • Diagnostics#

  • Health checks
  • Performance monitoring
  • Resource usage tracking
  • System metrics
  • Troubleshooting#

  • Debug logging
  • Event tracing
  • State inspection
  • Performance profiling
  • Edited on: 26 February 2025 09:43:35 GMT+0