Agent to Agent message conversion system
Introduction
The MessageHandler
framework provides a robust, extensible system for validating, transforming, and managing structured communication between distributed AI subjects (e.g., agents, services, components). It enables seamless interoperability through:
- Schema-driven input/output validation
- Template-aware DSL execution for data transformation
- Subject-aware caching and introspection
- Metadata-rich message type descriptions
This system is designed to support dynamic, runtime-configurable messaging standards across diverse subjects in large-scale, modular AI systems. Each subject defines its communication patterns and supported message types using templates and DSL workflows, which are stored and accessed via a centralized subject registry.
Core Capabilities
- Validation: Validates messages against subject-specific input/output templates, supporting nested schemas and field-level constraints.
- Transformation: Executes DSL workflows to transform messages between subjects using configurable field mappings and metadata.
- Discovery: Provides structured inspection of supported message types, associated metadata, and registered DSLs.
- Caching: Uses an in-memory cache to minimize registry queries and ensure fast local access.
Whether you are building an AI coordination network, a message routing system, or a multi-agent orchestration engine, this framework offers a unified interface to manage structured communication effectively.
Schema Description
This section documents the structure and purpose of the key schemas used in the message validation and communication system.
Communication Schema
The SubjectCommunication
schema defines how a subject communicates, including the protocols it supports and the types of messages it can send or receive.
Data Class
@dataclass
class SubjectCommunication:
protocols: List[Dict[str, Any]]
message_types: List[MessageType]
@staticmethod
def from_dict(data: Dict[str, Any]) -> "SubjectCommunication":
return SubjectCommunication(
protocols=data.get("protocols", []),
message_types=[MessageType.from_dict(mt) for mt in data.get("message_types", [])]
)
Field Descriptions
Field | Type | Description |
---|---|---|
protocols |
List[Dict[str, Any]] |
A list of supported communication protocols. Each dictionary may include keys like type , port , path , etc. |
message_types |
List[MessageType] |
A list of message types that the subject can handle, defined by the MessageType schema. |
Message Type Schema
The MessageType
schema describes a specific type of communication message, including templates for input and output validation, DSL transformation logic, and metadata.
Data Class
@dataclass
class MessageType:
type: str
input_template: Dict[str, Any]
output_template: Dict[str, Any]
metadata: MessageTypeMetadata
standard_fields_mapping_input: Dict[str, str]
standard_fields_mapping_output: Dict[str, str]
input_dsl_workflow_id: str
output_dsl_workflow_id: str
@staticmethod
def from_dict(data: Dict[str, Any]) -> "MessageType":
return MessageType(
type=data["type"],
input_template=data.get("input_template", {}),
output_template=data.get("output_template", {}),
metadata=MessageTypeMetadata.from_dict(data.get("metadata", {})),
standard_fields_mapping_input=data.get("standard_fields_mapping_input", {}),
standard_fields_mapping_output=data.get("standard_fields_mapping_output", {}),
input_dsl_workflow_id=data.get("input_dsl_workflow_id", ""),
output_dsl_workflow_id=data.get("output_dsl_workflow_id", "")
)
Field Descriptions
Field | Type | Description |
---|---|---|
type |
str |
Unique identifier for the message type. |
input_template |
Dict[str, Any] |
A schema that defines the structure and constraints of incoming data. |
output_template |
Dict[str, Any] |
A schema that defines the structure and constraints of outgoing data. |
metadata |
MessageTypeMetadata |
Additional descriptive information such as description, usage docs, etc. |
standard_fields_mapping_input |
Dict[str, str] |
Mapping from standardized field names to keys in the input template. |
standard_fields_mapping_output |
Dict[str, str] |
Mapping from standardized field names to keys in the output template. |
input_dsl_workflow_id |
str |
DSL workflow ID for transforming input from external to internal structure. |
output_dsl_workflow_id |
str |
DSL workflow ID for transforming output from internal to external structure. |
Templates
Message templates define the expected structure and validation constraints for the input and output of a MessageType
. These templates are used by the MessageHandler
to verify correctness and by the DSL executor to assist in transformation.
Templates support field-level validation for types, patterns, enumerations, structure, and value constraints. The format is recursive and can express deeply nested schemas for objects and arrays.
Writing a Template
Each template is a Dict[str, Any]
, where each key represents a field name, and the corresponding value is a schema dictionary containing validation rules.
Supported Validation Fields
Field | Type | Applicable To | Description |
---|---|---|---|
type |
string |
All | Data type: string , number , boolean , array , object , any . |
description |
string |
All | Optional human-readable description of the field. |
pattern |
string |
string | Regular expression the string must match. |
length |
number |
string | Maximum allowed length of the string. |
choices |
array |
string, number | Allowed set of values. |
min |
number |
number | Minimum allowed numeric value. |
max |
number |
number | Maximum allowed numeric value. |
max_length |
number |
array | Maximum number of array elements. |
items |
object |
array | Template for validating each item in an array. |
properties |
object |
object | Dictionary defining field schemas inside an object. |
optional |
boolean |
All | Whether the field is optional (default is false). |
Example: Agentic Template
This input template could be used by a messaging agent that receives a structured task request:
input_template = {
"task_id": {
"type": "string",
"description": "Unique identifier for the task",
"pattern": "^[a-zA-Z0-9_-]{8,32}$"
},
"priority": {
"type": "number",
"description": "Priority level from 1 (low) to 10 (high)",
"min": 1,
"max": 10
},
"mode": {
"type": "string",
"description": "Execution mode",
"choices": ["sync", "async", "stream"]
},
"parameters": {
"type": "object",
"description": "Parameter block for the task execution",
"properties": {
"temperature": {
"type": "number",
"min": 0.0,
"max": 1.0
},
"max_tokens": {
"type": "number",
"max": 2048
},
"debug": {
"type": "boolean"
}
}
},
"tags": {
"type": "array",
"description": "Optional search or routing tags",
"items": {
"type": "string"
},
"max_length": 10,
"optional": True
}
}
This template defines:
- Required scalar fields with pattern, numeric range, and enumeration
- A nested
parameters
object with its own validation rules - An optional list of tags with a maximum length
SubjectsLocalDB
SubjectsLocalDB
provides an in-memory cache of all subject metadata and communication configurations retrieved from the subject registry. It reduces repetitive API calls by locally storing subject definitions and communication schemas.
Class Initialization
SubjectsLocalDB(db_client: SubjectDBAPIClient)
db_client
: an instance ofSubjectDBAPIClient
used to fetch subject definitions from the registry.
Methods
populate_cache()
Fetches all runtime subjects from the registry and populates the internal cache.
local_db.populate_cache()
get_runtime_subject(subject_id: str) -> Optional[Subject]
Retrieves a subject by ID from the local cache.
subject = local_db.get_runtime_subject("agent-123")
list_runtime_subjects() -> List[Subject]
Returns a list of all cached subjects.
subjects = local_db.list_runtime_subjects()
query_runtime_subjects(query: Dict[str, Any]) -> List[Subject]
Filters subjects based on top-level equality conditions. Supports list containment for array fields.
agents = local_db.query_runtime_subjects({"subject_type": "agent"})
get_communication_data(subject_id: str) -> Optional[SubjectCommunication]
Parses and returns the SubjectCommunication
object from the subject_config["communication"]
field.
comm = local_db.get_communication_data("agent-123")
get_communications_searchable_fields(subject_id: str) -> List[Dict[str, Any]]
Returns a list of message types and associated templates and metadata for indexing/search purposes.
fields = local_db.get_communications_searchable_fields("agent-123")
get_communications_searchable_representation(subject_id: str) -> Optional[str]
Formats the communication metadata into a human-readable string (useful for LLMs or descriptions).
summary = local_db.get_communications_searchable_representation("agent-123")
MessageHandler
MessageHandler
provides schema-aware validation, transformation, and DSL execution utilities for communication between subjects.
Class Initialization
MessageHandler(subject_cache: SubjectsLocalDB)
subject_cache
: an instance ofSubjectsLocalDB
with preloaded subject data.
Validation Methods
validate_input_source(message_type: str, data: Dict[str, Any])
Validates data
against the input template of the current subject (determined by SUBJECT_ID
).
handler.validate_input_source("chat", {"message": "Hello"})
validate_output_source(message_type: str, data: Dict[str, Any])
Validates data
against the output template of the current subject.
handler.validate_output_source("chat", {"response": "Hi"})
validate_input_for_subject(subject_id: str, message_type: str, data: Dict[str, Any])
Validates data as input for the given subject's message type.
handler.validate_input_for_subject("agent-2", "chat", {"message": "Hello"})
validate_output_for_subject(subject_id: str, message_type: str, data: Dict[str, Any])
Validates data as output for the given subject's message type.
handler.validate_output_for_subject("agent-2", "chat", {"response": "Hi"})
Transformation Methods
Each conversion method uses the DSL executor with appropriate input/output templates and field mappings.
convert_input_to_source(message_type: str, data: Dict[str, Any]) -> Dict
Converts raw input into the current subject's expected input format.
converted = handler.convert_input_to_source("chat", {"user_input": "Hello"})
convert_output_to_source(message_type: str, data: Dict[str, Any]) -> Dict
Converts output data into the current subject's expected output format.
converted = handler.convert_output_to_source("chat", {"reply": "Hi"})
convert_input_to_target_subject(subject_id: str, message_type: str, data: Dict[str, Any]) -> Dict
Transforms input data from the current subject to the target subject’s expected format.
converted = handler.convert_input_to_target_subject("agent-2", "chat", {"user_input": "Hi"})
convert_output_to_target_subject(subject_id: str, message_type: str, data: Dict[str, Any]) -> Dict
Transforms output data from the current subject to the target subject’s output format.
converted = handler.convert_output_to_target_subject("agent-2", "chat", {"reply": "Hello"})
Introspection Utilities
is_valid_message_type(subject_id: str, message_type: str) -> bool
Checks whether a message type is defined for a subject.
handler.is_valid_message_type("agent-2", "chat")
has_input_dsl(subject_id: str, message_type: str) -> bool
Checks if input DSL is defined for the message type.
handler.has_input_dsl("agent-2", "chat")
has_output_dsl(subject_id: str, message_type: str) -> bool
Checks if output DSL is defined for the message type.
handler.has_output_dsl("agent-2", "chat")
get_input_dsl(subject_id: str, message_type: str) -> Optional[str]
Returns the input DSL workflow ID for a message type.
dsl = handler.get_input_dsl("agent-2", "chat")
get_output_dsl(subject_id: str, message_type: str) -> Optional[str]
Returns the output DSL workflow ID for a message type.
dsl = handler.get_output_dsl("agent-2", "chat")
get_all_message_types(subject_id: str) -> List[str]
Returns all supported message types for the subject.
types = handler.get_all_message_types("agent-2")
get_message_type_metadata(subject_id: str, message_type: str) -> Dict[str, Any]
Returns metadata associated with the message type.
meta = handler.get_message_type_metadata("agent-2", "chat")
DSL Workflows for Message Translation
In a distributed system of AI agents or services, each subject may have its own data representation standards. To enable seamless communication, the MessageHandler
supports message translation via DSL workflows.
DSL (Domain-Specific Language) workflows act as transformation bridges that convert input/output data structures between source and target subjects, while preserving semantics and respecting schema constraints.
Why DSL Workflows?
Subjects may differ in how they:
- Name fields (
user_input
vsmessage
) - Structure objects (flat vs nested)
- Interpret values (enum strings vs numeric codes)
Rather than hard-coding these mappings, DSL workflows provide a declarative, reusable mechanism to encode transformation logic and apply it dynamically at runtime.
DSL Workflow Types
Each message type may define two transformation workflows:
Field | Purpose |
---|---|
input_dsl_workflow_id |
Transforms incoming messages into the source subject's input format |
output_dsl_workflow_id |
Transforms outgoing messages into the target subject's output format |
These are stored per MessageType
and executed by the MessageHandler
as needed.
Workflow Execution Context
When a DSL is executed for message translation, the runtime is provided with structured context to support intelligent transformation:
Field | Description |
---|---|
data |
The input data to transform |
message_type |
The type identifier of the message |
communication |
The full SubjectCommunication object of the current subject |
field_mapping |
Mapping from standardized fields to subject-specific fields |
source_template |
Schema for the source subject's input/output structure |
target_template |
Schema for the target subject's input/output structure |
metadata |
Human-readable metadata describing the message (description, example, etc.) |
This context is injected as part of the addons
argument when executing the workflow via:
executor = new_dsl_workflow_executor(
workflow_id=workflow_id,
workflows_base_uri=WORKFLOWS_API_URL,
is_remote=False,
addons=context
)
result = executor.execute({"data": input_data})
Example Use Case
Let’s say we have two agents:
- Agent A expects:
{ "user_input": "Hello" }
- Agent B expects:
{ "message": "Hello" }
The DSL workflow for translating A’s output to B’s input might contain rules like:
{
"workflow": {
"steps": [
{
"transform": {
"output.message": "input.user_input"
}
}
]
}
}
When the MessageHandler
calls convert_input_to_target_subject(...)
, this DSL is executed to reshape the message appropriately.
Design Principles
- DSL workflows should not embed hardcoded assumptions about external systems.
- They should rely on the provided
field_mapping
,source_template
, andtarget_template
. - Workflows should be reusable across subjects with similar roles or message semantics.
- Errors in DSL execution should be caught and logged with fallback if needed.
Here is the documentation for the new section:
Selecting Message Type
In systems with multiple message types per subject, it becomes essential to support discovery, matching, and intelligent selection of message types for a given communication goal. The MessageHandler
and SubjectsLocalDB
provide utilities for listing, inspecting, and rendering message types into a searchable representation that can be used by humans, scripts, or language models (LLMs).
Purpose
- Identify the most appropriate
message_type
for a given subject - Present the available types in a structured, searchable form
- Enable LLMs or rule engines to make automated decisions about message routing
Methods for Message Type Discovery
get_all_message_types(subject_id: str) -> List[str]
Returns a flat list of all message types defined for a subject.
types = handler.get_all_message_types("agent-123")
# Output: ['chat', 'search_query', 'summarization_request']
get_message_type_metadata(subject_id: str, message_type: str) -> Dict[str, Any]
Returns the metadata dictionary for a given message type.
meta = handler.get_message_type_metadata("agent-123", "chat")
# Output: { "description": "Chat response agent", ... }
get_communications_searchable_fields(subject_id: str) -> List[Dict[str, Any]]
Returns a list of message types with input/output templates and metadata fields that can be indexed or used in selection logic.
search_fields = local_db.get_communications_searchable_fields("agent-123")
# Output: list of dicts for each message_type
Searchable Representation for LLM Selection
To support automated reasoning by LLMs or interactive UI selection, the framework provides a formatted, structured textual representation of the available message types.
Method: get_communications_searchable_representation(subject_id: str) -> str
Returns a multi-line string that includes:
- Message type identifier
- Input and output template summaries
- Metadata fields: description, usage docs, examples
Example Output
Type: chat
Input Template: {'message': {'type': 'string'}}
Output Template: {'response': {'type': 'string'}}
Metadata:
description: Handles conversational input and produces agent replies
usage_doc_link: http://docs.aios/agent/chat
example: {"message": "What's the weather like today?"}
Type: summarization_request
Input Template: {'content': {'type': 'string'}}
Output Template: {'summary': {'type': 'string'}}
Metadata:
description: Converts long text into short summaries
usage_doc_link: http://docs.aios/agent/summarize
example: {"content": "Article text here..."}
Example Usage
representation = local_db.get_communications_searchable_representation("agent-123")
print(representation)
This output can be:
- Indexed for search
- Embedded into a chat prompt
- Displayed in a selection UI
- Fed into an LLM to ask: “Which message type should I use to summarize a document?”
Use in Agentic Systems
- Use the searchable representation to dynamically select the best
message_type
based on user intent - LLMs can be used to map user prompts to message types by inspecting their
description
,input_template
, andexample
- Rule engines can select types based on expected field structure and target format