Detailed Implementation

aevatar Framework#

Core Layers#

  • Aevatar.Core: Contains the main implementation of the framework
  • Aevatar.Core.Abstractions: Defines interfaces and abstractions
  • Aevatar.EventSourcing.Core: Implements event sourcing functionality
  • GAgentBase#

    The foundation class that provides:

  • Event sourcing capabilities through Orleans' JournaledGrain
  • State management with automatic persistence
  • Event publishing and subscription mechanisms
  • Hierarchical agent relationship management
  • Key Features#

  • Event Handling
    • Automatic event forwarding between agents
    • Custom event handler registration
    • Support for base and specialized handlers
  • State Management
    • Strongly-typed state containers
    • Automatic state persistence
    • State change notification system
  • Agent Registration
    • Dynamic agent registration/unregistration
    • Hierarchical agent relationships
    • Subscription management system
  • Stream Processing
    • Built-in stream provider integration
    • Automatic stream subscription management
    • Event forwarding capabilities
  • 1. Agent Creation and Initialization#

  • Client → GAgentFactory: “Create Agent” The client requests the creation of a new agent instance through the GAgentFactory.
  • GAgentFactory → GAgentBase: “Initialize Agent” Once the factory has decided how to create the agent, it calls into GAgentBase (the agent implementation) to initialize the new agent object.
  • GAgentBase → StateLogEventStorage: “Initialize State” During agent initialization, GAgentBase sets up its initial internal state by interacting with StateLogEventStorage. This storage component is presumably responsible for holding the persisted state or logs of state changes.
  • StateLogEventStorage → GAgentBase: “State Initialized” The storage component confirms that it has initialized the agent’s state (e.g., from a snapshot or a fresh instance).
  • GAgentBase → StreamProvider: “Setup Stream Subscriptions” The agent then subscribes to any relevant external streams (e.g., an event bus or message queue) via a StreamProvider.
  • StreamProvider → GAgentBase: “Subscriptions Active” The agent receives confirmation that these subscriptions are now active (meaning the agent can receive events from the streams).
  • Agent Ready At this point, the agent is considered fully initialized and ready for use.
  • 2. Publishing an Event#

  • Client → GAgentBase: “PublishEventAsync(event)” At runtime, the client wants the agent to handle a new event. The client calls an asynchronous method on GAgentBase to publish the event.
  • GAgentBase → StateLogEventStorage: “AppendEventAsync(event)” Upon receiving the event, the agent appends it to its event log in StateLogEventStorage.
  • GAgentBase → GAgentBase: “UpdateStateAsync(newState)” After logging the event, the agent updates its own in‐memory state to reflect the effect of the newly arrived event. (In some designs, this call might still happen via the storage component, but logically it’s “state is updated” as part of the agent logic.)
  • GAgentBase → StreamProvider: “Publish to Stream” The agent then publishes this event (or a derivative) to a broader system stream so that interested subscribers can act on it.
  • StreamProvider → OtherAgents: “Notify Subscribers” The StreamProvider notifies other subscribers (possibly other agents or external systems) that a new event has been published.
  • (Within GAgentBase) “HandleEventAsync(event)” Internally, the agent may also process the event via some kind of HandleEventAsync call (e.g., applying validations or domain logic).
  • Validate Event If there is any validation logic, the agent confirms the event is valid before applying it.
  • Apply Event to State The agent applies the event (again, or in more detail) to update its in‐memory state.
  • State Updated The in‐memory state is now up to date.
  • Record Event After successfully applying the event, the agent may record additional metadata or confirmations in the event log.
  • Event Handled The process of handling the event (inside the agent) is complete.
  • 3. State Recovery#

  • Client → GAgentBase: “Recover State” At some later point—perhaps after a restart or if we need to restore the agent’s state from persistent storage—the client requests that the agent recover or reconstruct its state.
  • GAgentBase → StateLogEventStorage: “GetSnapshot()” The agent asks the storage for the most recent snapshot of its state.
  • StateLogEventStorage → GAgentBase: “Snapshot(version)” The storage returns the snapshot along with its version number. (For instance, version N.)
  • GAgentBase → StateLogEventStorage: “GetLatestVersion()” The agent then checks the latest persisted version in case there have been subsequent events beyond the snapshot.
  • StateLogEventStorage → GAgentBase: “Latest Version” The storage provides the latest available version (for example, if additional events have been appended, this might be version N+2).
  • alt [Versions Match / Don’t Match]
    • If the snapshot’s version matches the latest version in storage (e.g., N == N), then the agent can use the snapshot directly and skip additional replay.
    • If the snapshot’s version does not match (e.g., N < N+2), there are new events in the log that occurred after the snapshot was taken. The agent must fetch these newer events and apply them.
  • GAgentBase → StateLogEventStorage: “GetEventRecentVersion(version)” (Only if there’s a mismatch.) The agent requests the missing events (those newer than the snapshot’s version).
  • StateLogEventStorage → GAgentBase: “New Events” The storage returns the list of events that happened after the snapshot was taken.
  • Apply Events to Snapshot The agent iterates through these new events and applies them to the snapshot state.
  • UpdateSnapshot(newState) The agent updates the snapshot with the resulting new state (now that the snapshot is caught up).
  • Snapshot Updated The persisted snapshot in StateLogEventStorage is refreshed (if desired) or at least the agent’s in‐memory state is fully up to date.
  • Recovery Complete Finally, the agent signals back to the client that state recovery is finished. The agent is now in the latest known good state and can resume normal operations.
  • Summary#

  • Agent Creation & Initialization: The client asks GAgentFactory to create an agent, which initializes state with StateLogEventStorage and sets up subscriptions via StreamProvider.
  • Event Publishing & Handling: The client (or other systems) publish events to the agent, which appends them to the event storage, updates its in‐memory state, and publishes to an external stream if necessary.
  • State Recovery: When needed, the agent retrieves a snapshot and any subsequent events from the storage, applies all the changes, and ends up with an up‐to‐date state.
  • System Architecture#

    Core Layers#

    Aevatar.Core#

    The main implementation layer contains:

  • GAgentBase: The foundational agent class
  • Event handling and processing mechanisms
  • State management implementation
  • Publishing and subscription logic
  • Aevatar.Core.Abstractions#

    Defines the contract layer including:

  • Interfaces for agents
  • Event definitions
  • State management contracts
  • Message handling protocols
  • Aevatar.EventSourcing.Core#

    Implements event sourcing functionality:

  • Event storage and retrieval
  • Event replay mechanisms
  • Event stream management
  • Consistency guarantees
  • Core Components#

    GAgentBase#

    public abstract class GAgentBase<TState, TEvent>

    The central component of the framework implementing:

  • Event sourcing through Orleans' JournaledGrain
  • State management with automatic persistence
  • Event publishing and subscription
  • Hierarchical agent relationships
  • Key Methods#

  • OnActivateAsync(): Agent activation
  • HandleEventAsync(): Event processing
  • PublishEventAsync(): Event publishing
  • SubscribeToAsync(): Event subscription
  • Event Handling System
  • Event Processing Pipeline#

  • Event Reception
  • Validation
  • Processing
  • State Update
  • Event Publication
  • Event Types Support#

  • System Events
  • Custom Events
  • State Change Events
  • Hierarchical Events
  • State Management
  • Features#

  • Strongly-typed state containers
  • Automatic persistence
  • State change notifications
  • Versioning support
  • State Operations#

  • State initialization
  • State updates
  • State querying
  • State validation
  • Agent Registration and Management
  • Registration Process#

  • Agent Creation
  • Initialization
  • State Setup
  • Subscription Configuration
  • Hierarchical Structure#

  • Parent-Child Relationships
  • Subscription Inheritance
  • Event Propagation
  • Stream Processing
  • Stream Provider Integration#

  • Orleans Stream Provider
  • Custom Stream Implementations
  • Stream Configuration
  • Stream Operations#

  • Stream Creation
  • Subscription Management
  • Event Broadcasting
  • Stream Monitoring
  • Implementation Details#

    Event Sourcing Implementation#

    Event Storage#

    public interface IEventStore<TEvent{Task AppendEventAsync(TEvent @event);
        Task<IEnumerable<TEvent>> GetEventsAsync();
    }

    State Management#

    public interface IStateManager<TState{Task<TState> GetStateAsync();Task UpdateStateAsync(TState newState);
    }

    Observer Pattern Implementation#

    The framework uses the Observer pattern for event handling:

  • EventWrapperBaseAsyncObserver: Base observer implementation
  • GAgentAsyncObserver: Agent-specific observer
  • GAgentBase.Observers: Observer management
  • Usage Guide#

    Creating a New Agent#

    [GAgent]
    public class CustomAgent : GAgentBase<CustomState, CustomEvent{protected override Task OnActivateAsync()
        {// Initialization logicreturn base.OnActivateAsync();
        }
    protected override Task HandleEventAsync(CustomEvent @event)
        {// Event handling logicreturn Task.CompletedTask;
        }
    }

    Event Publishing#

    await PublishEventAsync(new CustomEvent 
    {// Event properties
    });

    Subscription Setup#

    await SubscribeToAsync<CustomEvent>(async (@event) => 
    {// Subscription handling logic
    });

    Best Practices#

    Event Design#

  • Keep events immutable
  • Include timestamp and correlation ID
  • Use meaningful event names
  • Include necessary context
  • State Management#

  • Minimize state size
  • Use appropriate state versioning
  • Implement state validation
  • Handle state conflicts
  • Agent Design#

  • Single Responsibility Principle
  • Clear hierarchical structure
  • Proper error handling
  • Logging and monitoring
  • Performance Considerations#

  • Batch operations when possible
  • Implement proper indexing
  • Use appropriate grain placement
  • Monitor memory usage
  • Error Handling#

  • Implement retry mechanisms
  • Handle network failures
  • Proper exception logging
  • State recovery procedures
  • Security Considerations#

    Authentication#

  • Implement proper authentication
  • Use secure communication
  • Handle token management
  • Authorization#

  • Role-based access control
  • Agent-level permissions
  • Event-level security
  • Data Protection#

  • Encrypt sensitive data
  • Secure state persistence
  • Protected event streams
  • Monitoring and Maintenance#

    Logging#

  • Structured logging
  • Performance metrics
  • Error tracking
  • Event auditing
  • Diagnostics#

  • Health checks
  • Performance monitoring
  • Resource usage tracking
  • System metrics
  • Troubleshooting#

  • Debug logging
  • Event tracing
  • State inspection
  • Performance profiling
  • aevatar Station#

    aevatar Station is a cutting-edge developer platform designed to simplify the creation, management, and deployment of intelligent AI agents. With a focus on flexibility, scalability, and ease of use, aevatar Station empowers developers and organizations to harness the power of AI in a streamlined and efficient way.

    Interface Layer#

  • Aevatar.HttpApi: HTTP API interface definitions
  • Aevatar.HttpApi.Host: Web API host
  • Aevatar.AuthServer: Authentication server
  • Application Layer#

  • Aevatar.Application: Application service implementations
  • Aevatar.Application.Contracts: Application service interfaces and DTOs
  • Aevatar.Application.Grains: Orleans Grain application services
  • Domain Layer#

  • Aevatar.Domain: Domain models, services, and repository interfaces
  • Aevatar.Domain.Shared: Shared domain constants, enums, etc.
  • Aevatar.Domain.Grains: Orleans Grain domain services
  • Infrastructure Layer#

  • Aevatar.CQRS: CQRS implementation
  • Aevatar.MongoDB: MongoDB data access
  • Aevatar.EventSourcing.MongoDB: MongoDB-based event sourcing implementation
  • Aevatar.Worker: Background worker services
  • Aevatar.Silo: Orleans Silo host
  • Key Features#

  • CQRS Pattern
    • Command and Query Responsibility Segregation
    • Event storage through Event Sourcing
    • MongoDB for event store and read models
  • Distributed Architecture
    • Orleans as distributed computing framework
    • Grain-based microservices architecture
    • Horizontal scalability support
  • Modular Design
    • Clear layer separation
    • Low coupling between modules
    • Extensive use of dependency injection
  • Authentication & Authorization
    • Standalone authentication server
    • OpenID Connect-based identity
    • Fine-grained permission control
  • aevatar GAgents#

    Aevatar GAgents is a custom intelligent agent solution designed to enable developers to customize agents and quickly create, manage, and deploy them on aevatar Station.

    Component Overview#

    Aevatar GAgents is a sophisticated intelligent agent platform built on .NET 8.0, designed to enable rapid development, management, and deployment of custom agents on Aevatar Station. The system utilizes event-sourcing architecture and distributed computing principles to create a scalable and maintainable agent ecosystem.

    Key Features#

  • Custom agent creation and management
  • Event-sourcing based state management
  • Distributed computing support
  • Social media integration
  • Blockchain integration
  • Advanced AI capabilities
  • Technical Architecture
  • Technology Stack#

  • .NET 8.0: Core development framework
  • ABP 8.2.0: Application framework for enterprise development
  • Orleans 7.0: Distributed computing framework
  • Orleans Event Sourcing: State management
  • Orleans Stream: Event streaming
  • System Components#

  • Core Layer (Basic Module)
    • Base agent implementations
    • Event sourcing infrastructure
    • State management
    • Message publishing system
  • AI Layer
    • Autogen integration (will be replaced)
    • MicroAI processing
    • RAG (Retrieval-Augmented Generation)
  • Integration Layer
    • Social media connectors (Twitter, Telegram)
    • Blockchain integration (AElf)
    • External API handlers
  • Core Components
  • Basic Module (Aevatar.GAgents.Basic)#

    The foundation of the system, providing core functionality for agent creation and management.

    Key Components:#

  • GAgentBase<TState, TEvent>
    • Base class for all agents
    • Handles state management
    • Implements event sourcing
    • Manages agent lifecycle
  • PublishingGAgent
    • Handles event publishing
    • Manages message distribution
    • Implements IPublishingGAgent interface
  • GroupGAgent
    • Manages agent groups
    • Handles agent registration/unregistration
    • Coordinates group activities
  • Event Sourcing System#

  • StateBase: Base class for agent state
  • SEventBase: Base class for event sourcing events
  • EventBase: Base class for external messages
  • Integration Components
  • Social Media Integration#

  • Twitter Integration (Aevatar.GAgents.Twitter)
    • Tweet management
    • User authentication
    • Response handling
  • Telegram Integration (Aevatar.GAgents.Telegram)
    • Bot implementation
    • Message handling
    • User interaction
  • Blockchain Integration (Aevatar.GAgents.AElf)#

  • Smart contract interaction
  • Transaction management
  • Chain state monitoring
  • AI Integration#

  • Autogen Integration
    • AI model management
    • Response generation
    • Context handling
  • RAG System
    • Document retrieval
    • Knowledge base management
    • Query processing
  • Development Guide
  • Creating a New Agent#

    Define Agent State#

    [GenerateSerializer]
    public class CustomAgentState : StateBase
    {
        [Id(0)] public Guid Id { get; set; }// Add custom state properties
    }

    Create Event Classes#

    public class CustomGEvent : SEventBase
    {
        [Id(0)] public string EventData { get; set; }
    }

    Implement Agent Class#

    [StorageProvider(ProviderName = "PubSubStore")]
    [LogConsistencyProvider(ProviderName = "LogStorage")]
    public class CustomGAgent : GAgentBase<CustomAgentState, CustomGEvent{// Implement agent logic
    }

    Best Practices#

  • Always use event sourcing for state changes
  • Implement proper error handling
  • Use async/await for all I/O operations
  • Follow the Orleans actor model patterns
  • Implement proper logging
  • Deployment
  • Prerequisites#

  • .NET 8.0 SDK
  • Orleans runtime
  • Required dependency packages:
  • Aevatar.Core (v1.0.2)
    Aevatar.EventSourcing.Core (v1.0.2)
    Aevatar.Core.Abstractions (v1.0.2)

    Configuration#

  • Storage Provider setup
  • Log Consistency Provider configuration
  • Network configuration
  • Security settings
  • Deployment Steps#

  • Build the solution
  • Configure environment variables
  • Deploy Orleans silo hosts
  • Start agent services
  • Monitor system health
  • Security Considerations#

  • API key management
  • Authentication and Authorization
  • Data encryption
  • Secure communication channels
  • Audit logging
  • Performance Optimization#

  • State management optimization
  • Event sourcing best practices
  • Network communication optimization
  • Resource utilization monitoring
  • Edited on: 18 February 2025 07:14:54 GMT+0