Chat Protocol Using AI Agents
Introduction
The Chat Protocol enables seamless communication between autonomous agents in the uAgents ecosystem. It standardizes message formats, session handling, and content types for efficient exchanges. Supporting text communication, resource sharing, metadata exchange, and AI-driven responses which simplifies integration for developers.
Chat Protocol Overview
The Chat Protocol comprises several essential elements:
Content Types
The protocol supports multiple content types, enabling flexible communication between agents:
- TextContent: Standard text-based messages.
- ResourceContent: Messages containing resources such as files, images, or videos.
- MetadataContent: Extra metadata associated with messages.
- StartSessionContent: Marks the beginning of a communication session.
- EndSessionContent: Marks the end of a communication session.
- StartStreamContent: Indicates the start of a streaming session.
- EndStreamContent: Indicates the end of a streaming session.
Message Structures
The protocol defines structured message formats to ensure consistency:
- ChatMessage: A container for messages, including a timestamp, unique message ID, and a list of content elements.
- ChatAcknowledgement: Acknowledges message receipt, including the timestamp and the ID of the received message.
class ChatMessage(Model): # the timestamp for the message, should be in UTC timestamp: datetime # a unique message id that is generated from the message instigator msg_id: UUID4 # the list of content elements in the chat content: list[AgentContent] class ChatAcknowledgement(Model): # the timestamp for the message, should be in UTC timestamp: datetime # the msg id that is being acknowledged acknowledged_msg_id: UUID4 # optional acknowledgement metadata metadata: dict[str, str] | None = None
Detailed Explanation of Key Functions
create_text_chat
This function is responsible for creating a ChatMessage
object with a simple text-based content. It:
- Assigns a current timestamp.
- Generates a unique message ID.
- Wraps the text in a
TextContent
model.
def create_text_chat(text: str) -> ChatMessage: return ChatMessage( timestamp=datetime.now(), msg_id=uuid4(), content=[TextContent(type="text", text=text)], )
chat_proto
Definition
Here, a protocol instance named AgentChatProtocol
is defined. This protocol facilitates communication between agents by handling structured messages.
chat_proto = Protocol(name="AgentChatProtocol", version="0.2.1")
Handling Incoming Chat Messages
The handle_message
function processes incoming chat messages. It:
- Logs the received message.
- Stores the sender information.
- Sends a
ChatAcknowledgement
to confirm receipt. - Calls the AI-powered
get_completion
function to generate a reply. - Sends back the AI-generated response as a new chat message.
@chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, ChatAcknowledgement(timestamp=datetime.now(), acknowledged_msg_id=msg.msg_id), ) completion = get_completion(context="", prompt=msg.content[0].text) await ctx.send(sender, create_text_chat(completion))
Handling Acknowledgements
The handle_ack
function processes incoming ChatAcknowledgement
messages. It logs a confirmation message indicating that a particular message has been acknowledged by the recipient.
@chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}")
Full Chat Protocol Script
from typing import Any, Literal, TypedDict from datetime import datetime from pydantic.v1 import UUID4 from uagents import Model, Protocol, Context from uuid import uuid4 from ai import get_completion class Metadata(TypedDict): # primarily used with hte `Resource` model. This field specifies the mime_type of # resource that is being referenced. A full list can be found at `docs/mime_types.md` mime_type: str # the role of the resource role: str class TextContent(Model): type: Literal["text"] # The text of the content. The format of this field is UTF-8 encoded strings. Additionally, # markdown based formatting can be used and will be supported by most clients text: str class Resource(Model): # the uri of the resource uri: str # the set of metadata for this resource, for more detailed description of the set of # fields see `docs/metadata.md` metadata: dict[str, str] class ResourceContent(Model): type: Literal["resource"] # The resource id resource_id: UUID4 # The resource or list of resource for this content. typically only a single # resource will be sent, however, if there are accompanying resources like # thumbnails and audo tracks these can be additionally referenced # # In the case of the a list of resources, the first element of the list is always # considered the primary resource resource: Resource | list[Resource] class MetadataContent(Model): type: Literal["metadata"] # the set of metadata for this content, for more detailed description of the set of # fields see `docs/metadata.md` metadata: dict[str, str] class StartSessionContent(Model): type: Literal["start-session"] class EndSessionContent(Model): type: Literal["end-session"] class StartStreamContent(Model): type: Literal["start-stream"] stream_id: UUID4 class EndStreamContent(Model): type: Literal["start-stream"] stream_id: UUID4 # The combined agent content types AgentContent = ( TextContent | ResourceContent | MetadataContent | StartSessionContent | EndSessionContent | StartStreamContent | EndStreamContent ) class ChatMessage(Model): # the timestamp for the message, should be in UTC timestamp: datetime # a unique message id that is generated from the message instigator msg_id: UUID4 # the list of content elements in the chat content: list[AgentContent] class ChatAcknowledgement(Model): # the timestamp for the message, should be in UTC timestamp: datetime # the msg id that is being acknowledged acknowledged_msg_id: UUID4 # optional acknowledgement metadata metadata: dict[str, str] | None = None def create_text_chat(text: str) -> ChatMessage: return ChatMessage( timestamp=datetime.now(), msg_id=uuid4(), content=[TextContent(type="text", text=text)], ) chat_proto = Protocol(name="AgentChatProtcol", version="0.2.1") @chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, ChatAcknowledgement(timestamp=datetime.now(), acknowledged_msg_id=msg.msg_id), ) completion = get_completion(context="", prompt=msg.content[0].text) await ctx.send(sender, create_text_chat(completion)) @chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}")
AI Integration
The AI integration provides a function get_completion
, which is responsible for generating intelligent responses using OpenAI's GPT models. It:
- Configures API settings and model selection using environment variables.
- Sends a user prompt and optional context to the AI model.
- Handles structured responses if a schema is provided.
- Returns the generated content or an error message if an issue occurs.
Key Components:
- API Configuration: The script sets up API keys and model parameters from environment variables.
get_completion
Function: Handles message processing and AI response generation.
Example:
ai.pydef get_completion( context: str, prompt: str, response_schema: dict[str, Any] | None = None, max_tokens: int = MAX_TOKENS, ) -> str: if response_schema is not None: response_format = { "type": "json_schema", "json_schema": { "name": response_schema["title"], "strict": False, "schema": response_schema, }, } else: response_format = None try: response = client.chat.completions.create( model=MODEL_ENGINE, messages=[ {"role": "system", "content": context}, {"role": "user", "content": prompt}, ], response_format=response_format, max_tokens=max_tokens, ) except OpenAIError as e: return f"An error occurred: {e}" return response.choices[0].message.content
This function enables agents to generate AI-driven responses dynamically based on incoming messages, enhancing the interaction experience.
Chat Protocol with Agents
The Chat Protocol is integrated into agents so they can exchange messages using a standardized format.
AI Agent: Explanation and Full Code
Explanation
This Agent is set up as an AI-powered agent. It:
- Listens for incoming structured prompts (using ContextPrompt).
- Processes messages using the AI module via
get_completion
function fromai.py
. - Sends back a response.
- Integrates the Chat Protocol
chat_proto
alongside additional protocols like quota and health-check mechanisms.
This agent uses a quota protocol to limit the rate of incoming requests and includes health check logic to ensure it remains operational.
ai-agent.pyimport json import os from enum im copyport Enum from typing import Any from ai import get_completion from chat_proto import chat_proto from uagents import Agent, Context, Model from uagents.experimental.quota import QuotaProtocol, RateLimit from uagents.models import ErrorMessage AGENT_SEED = os.getenv("AGENT_SEED", "openai-test-agent") AGENT_NAME = os.getenv("AGENT_NAME", "OpenAI Agent") class ContextPrompt(Model): context: str text: str class Response(Model): text: str class StructuredOutputPrompt(Model): prompt: str output_schema: dict[str, Any] class StructuredOutputResponse(Model): output: dict[str, Any] PORT = 8000 agent = Agent( name=AGENT_NAME, seed=AGENT_SEED, port=PORT, endpoint=f"http://localhost:{PORT}/submit", ) proto = QuotaProtocol( storage_reference=agent.storage, name="LLM-Context-Response", version="0.1.0", default_rate_limit=RateLimit(window_size_minutes=60, max_requests=6), ) struct_proto = QuotaProtocol( storage_reference=agent.storage, name="LLM-Structured-Response", version="0.1.0", default_rate_limit=RateLimit(window_size_minutes=60, max_requests=6), ) @proto.on_message(ContextPrompt, replies={Response, ErrorMessage}) async def handle_request(ctx: Context, sender: str, msg: ContextPrompt): ctx.logger.info(f"Received message {msg.text}") response = get_completion(context=msg.context, prompt=msg.text) print(response, "response") print(sender, "sender") await ctx.send(sender, Response(text=f"{response}")) @struct_proto.on_message( StructuredOutputPrompt, replies={StructuredOutputResponse, ErrorMessage} ) async def handle_structured_request( ctx: Context, sender: str, msg: StructuredOutputPrompt ): ctx.logger.info(f"Received message: {msg.prompt}") response = get_completion( context="", prompt=msg.prompt, response_schema=msg.output_schema ) await ctx.send(sender, StructuredOutputResponse(output=json.loads(response))) agent.include(proto, publish_manifest=True) agent.include(struct_proto, publish_manifest=True) agent.include(chat_proto, publish_manifest=True) ### Health check related code def agent_is_healthy() -> bool: \"\"\"Implement the actual health check logic here. For example, check if the agent can connect to a third party API, check if the agent has enough resources, etc. \"\"\"\n condition = True # TODO: logic here return bool(condition) class HealthCheck(Model): pass class HealthStatus(str, Enum): HEALTHY = "healthy" UNHEALTHY = "unhealthy" class AgentHealth(Model): agent_name: str status: HealthStatus health_protocol = QuotaProtocol( storage_reference=agent.storage, name="HealthProtocol", version="0.1.0" ) @health_protocol.on_message(HealthCheck, replies={AgentHealth}) async def handle_health_check(ctx: Context, sender: str, msg: HealthCheck): status = HealthStatus.UNHEALTHY try: if agent_is_healthy(): status = HealthStatus.HEALTHY except Exception as err: ctx.logger.error(err) finally: await ctx.send(sender, AgentHealth(agent_name=AGENT_NAME, status=status)) agent.include(health_protocol, publish_manifest=True) if __name__ == "__main__": agent.run()
Client Agent: Explanation and Full Code
Explanation
Client Agent acts as a client that initiates interactions by sending prompts to AI Agent. Its main roles include:
- Sending a code snippet for debugging and a query for structured output (in this example, querying weather information).
- Listening for and handling the responses from AI Agent.
from typing import Any, Literal from datetime import datetime from pydantic.v1 import UUID4 from uagents import Agent, Model, Context from uuid import uuid4 from chat_proto import AgentContent, ChatAcknowledgement class ContextPrompt(Model): context: str text: str class Response(Model): text: str class StructuredOutputPrompt(Model): prompt: str output_schema: dict[str, Any] class StructuredOutputResponse(Model): output: dict[str, Any] class TextContent(Model): type: Literal["text"] # The text of the content. The format of this field is UTF-8 encoded strings. Additionally, # markdown based formatting can be used and will be supported by most clients text: str class ChatMessage(Model): # the timestamp for the message, should be in UTC timestamp: datetime # a unique message id that is generated from the message instigator msg_id: UUID4 # the list of content elements in the chat content: list[AgentContent] agent = Agent(name="agent-2", port=8001, seed="agent-2" , endpoint="http://localhost:8001/submit") AI_AGENT_ADDRESS = "agent1q09xe6rk6lqcnchdrkcn92ma4wlmjyty3v2yzxlxdx4jylq2fcfa2rv458x" code = """ def do_something(): for i in range(10) pass """ class Location(Model): city: str country: str temperature: float prompts = [ ContextPrompt( context="Find and fix the bug in the provided code snippet", text=code, ), StructuredOutputPrompt( prompt="How is the weather in London today?", output_schema=Location.schema(), ), ChatMessage( timestamp=datetime.now(), msg_id=uuid4(), content=[TextContent(type="text", text="who is president of US")], ) ] @agent.on_interval(period=10.0) async def send_message(ctx: Context): for prompt in prompts: await ctx.send(AI_AGENT_ADDRESS, prompt) @agent.on_message(Response) async def handle_response_ai(ctx: Context, sender: str, msg: Response): ctx.logger.info(f"Received response from {sender}: {msg.text}") @agent.on_message(StructuredOutputResponse) async def handle_structured_output_response(ctx: Context, sender: str, msg: StructuredOutputResponse): ctx.logger.info(f"[Received response from ...{sender[-8:]}]:") response = Location.parse_obj(msg.output) ctx.logger.info(response) @agent.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}") @agent.on_message(ChatMessage) async def handle_ack(ctx: Context, sender: str, msg: ChatMessage): ctx.logger.info(f"Received request from {sender} for {msg.content[0].text}") if __name__ == "__main__": agent.run()