Agent Handlers
Introduction
Within the uAgents Framework, functions can be decorated with handlers, to only be triggered on a condition caught by the uAgents library.
Below, we show how to use the following different event handlers:
- Interval tasks:
.on_interval()
- Handle messages:
.on_message()
- Answer queries:
.on_query()
- Triggered by event:
on_event()
Creating an interval task with on_interval()
handler
Sometimes an agent will need to perform a task periodically. To do this we can use the on_interval()
decorator which periodically repeats a given function for the agent. For instance, an agent could send a message every 2 seconds to another agent.
Let's get started and create our first interval task!
Prerequisites
Make sure you have read the following resources before going on with this guide:
- Quick Start Guide for uAgents Framework
- Creating your first agent
- Agents address
- Communicating with other agents
Imports needed
Walk-through
-
Let's create a Python script for this task, and name it:
windowsecho. > interval_task.py
-
Then import the necessary classes from
uagents
library,Agent
andContext
, and create our agent:
interval-task.pyfrom uagents import Agent, Context agent = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://localhost:8000/submit"])
- Create a function to handle the startup event, which will introduce the agent:
interval-task.py@agent.on_event("startup") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.")
- We can now define our agent's interval behavior. We want our agent to log a message every 2 seconds using the
on_interval
decorator:
interval-task.py@agent.on_interval(period=2.0) async def say_hello(ctx: Context): ctx.logger.info("Hello!") if __name__ == "__main__": agent.run()
The output will be printed out using the ctx.logger.info()
method.
- Save the script.
The overall script should look as follows:
interval-task.pyfrom uagents import Agent, Context agent = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://localhost:8000/submit"]) @agent.on_event("startup") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.") @agent.on_interval(period=2.0) async def say_hello(ctx: Context): ctx.logger.info("Hello!") if __name__ == "__main__": agent.run()
Run the script
Run the script: python interval_task.py
The output should be as follows:
INFO: [alice]: Registration on Almanac API successful INFO: [alice]: Registering on almanac contract... INFO: [alice]: Registering on almanac contract...complete INFO: [alice]: Agent inspector available at https://agentverse.ai/inspect/?uri=http%3A//127.0.0.1%3A8000&address=agent1qww3ju3h6kfcuqf54gkghvt2pqe8qp97a7nzm2vp8plfxflc0epzcjsv79t INFO: [alice]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit) INFO: [alice]: Hello, I'm agent alice and my address is agent1qww3ju3h6kfcuqf54gkghvt2pqe8qp97a7nzm2vp8plfxflc0epzcjsv79t. INFO: [alice]: Hello! INFO: [alice]: Hello! INFO: [alice]: Hello!
Handle messages using the on_message()
handler
We now showcase a scenario where three agents, named alice
, bob
, and charles
, use a custom protocol to communicate. In the example, Alice and Bob support the protocol, whereas Charles attempts to send broadcast messages to all agents using the protocol. Agents use the on_message()
handler which allows them to handle messages matching specific data models.
Let's get started!
Walk-through
-
First of all, let's create a Python script for this task, and name it:
windowsecho. > broadcast.py
-
We then need to import the
Agent
,Bureau
,Context
,Model
, andProtocol
classes from theuagents
library. Then, let's create the 3 different agents using the classAgent
. Each agent is initialized with a unique name and a seed phrase for wallet recovery.
broadcast.pyfrom uagents import Agent, Bureau, Context, Model, Protocol # create agents # alice and bob will support the protocol # charles will try to reach all agents supporting the protocol alice = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://127.0.0.1:8000/submit"]) bob = Agent(name="bob", seed="bob recovery phrase", port=8001, endpoint=["http://127.0.0.1:8001/submit"]) charles = Agent(name="charles", seed="charles recovery phrase", port=8002, endpoint=["http://127.0.0.1:8002/submit"])
It is optional but useful to include a seed
parameter when creating an agent to set fixed addresses . Otherwise, random addresses will be generated every time you run the agent.
- Let's then define the message data models to specify the type of messages being handled and exchanged by the agents. We define a
BroadcastExampleRequest
and aBroadcastExampleResponse
data models. Finally, create aprotocol
namedproto
with version1.0
:
broadcast.pyclass BroadcastExampleRequest(Model): pass class BroadcastExampleResponse(Model): text: str # define protocol proto = Protocol(name="proto", version="1.0")
- Let's now define a message handler function for incoming messages of type
BroadcastExampleRequest
in the protocol:
broadcast.py@proto.on_message(model=BroadcastExampleRequest, replies=BroadcastExampleResponse) async def handle_request(ctx: Context, sender: str, _msg: BroadcastExampleRequest): await ctx.send( sender, BroadcastExampleResponse(text=f"Hello from {ctx.agent.name}") )
Here we defined a handle_request()
function which is used whenever a request is received. This sends a response back to the sender. This function is decorated with the .on_message()
decorator indicating that this function is triggered whenever a message of type BroadcastExampleRequest
is received. The function sends a response containing a greeting message with the name of the agent that sent the request in the first place.
- Now, we need to include the
protocol
into the agents. Specifically, the protocol is included in bothalice
andbob
agents. This means they will follow the rules defined in the protocol when communicating:
broadcast.pyalice.include(proto) bob.include(proto)
After the first registration in the Almanac smart contract, it will take about 5 minutes before the agents can be found through the protocol.
- It is now time to define the behavior and function of
charles
agent:
broadcast.py@charles.on_interval(period=5) async def say_hello(ctx: Context): status_list = await ctx.broadcast(proto.digest, message=BroadcastExampleRequest()) ctx.logger.info(f"Trying to contact {len(status_list)} agents.") @charles.on_message(model=BroadcastExampleResponse) async def handle_response(ctx: Context, sender: str, msg: BroadcastExampleResponse): ctx.logger.info(f"Received response from {sender}: {msg.text}")
In the first part, we use the .on_interval()
decorator to define an interval behavior for this agent when the script is being run. In this case, the agent will execute the say_hello()
function every 5 seconds. The Context
object is a collection of data and functions related to the agent. Inside the say_hello()
function, the agent uses the ctx.broadcast()
method to send a broadcast message. The message is of type BroadcastExampleRequest()
and it is being sent using the protocol's digest (proto.digest
).
Then, we defined a .on_message()
decorator which decorates handle_response()
function. This function handles all incoming messages of type BroadcastExampleResponse
from other agents. When a response is received, it logs the information. Inside the handle_response()
function, the agent logs an informational message using ctx.logger.info()
method to print the sender and the content of the message. The message includes the sender's name and the text content of the response message.
- We are now ready to set up a
Bureau
object for agents to be run together at the same time, and we addalice
,bob
, andcharles
to it using thebureau.add()
method:
broadcast.pybureau = Bureau(port=8000, endpoint="http://localhost:8000/submit") bureau.add(alice) bureau.add(bob) bureau.add(charles) if __name__ == "__main__": bureau.run()
The bureau is assigned to listen on port=8000
and specifies an endpoint
at "http://localhost:8000/submit"
for submitting data.
- Save the script.
The overall script should look as follows:
broadcast.pyfrom uagents import Agent, Bureau, Context, Model, Protocol # create agents # alice and bob will support the protocol # charles will try to reach all agents supporting the protocol alice = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://127.0.0.1:8000/submit"]) bob = Agent(name="bob", seed="bob recovery phrase", port=8001, endpoint=["http://127.0.0.1:8001/submit"]) charles = Agent(name="charles", seed="charles recovery phrase", port=8002, endpoint=["http://127.0.0.1:8002/submit"]) class BroadcastExampleRequest(Model): pass class BroadcastExampleResponse(Model): text: str # define protocol proto = Protocol(name="proto", version="1.0") @proto.on_message(model=BroadcastExampleRequest, replies=BroadcastExampleResponse) async def handle_request(ctx: Context, sender: str, _msg: BroadcastExampleRequest): await ctx.send( sender, BroadcastExampleResponse(text=f"Hello from {ctx.agent.name}") ) # include protocol # Note: after the first registration on the almanac smart contract, it will # take about 5 minutes before the agents can be found through the protocol alice.include(proto) bob.include(proto) # let charles send the message to all agents supporting the protocol @charles.on_interval(period=5) async def say_hello(ctx: Context): status_list = await ctx.broadcast(proto.digest, message=BroadcastExampleRequest()) ctx.logger.info(f"Trying to contact {len(status_list)} agents.") @charles.on_message(model=BroadcastExampleResponse) async def handle_response(ctx: Context, sender: str, msg: BroadcastExampleResponse): ctx.logger.info(f"Received response from {sender}: {msg.text}") bureau = Bureau(port=8000, endpoint="http://localhost:8000/submit") bureau.add(alice) bureau.add(bob) bureau.add(charles) if __name__ == "__main__": bureau.run()
Run the script
Make sure to have activated your virtual environment correctly.
Run the script: python broadcast.py
The output would be:
INFO: [alice]: Registration on Almanac API successful INFO: [alice]: Registering on almanac contract... INFO: [alice]: Registering on almanac contract...complete INFO: [ bob]: Registration on Almanac API successful INFO: [ bob]: Registering on almanac contract... INFO: [ bob]: Registering on almanac contract...complete INFO: [charles]: Registration on Almanac API successful INFO: [charles]: Registration on Almanac API successful INFO: [charles]: Registering on almanac contract... INFO: [bureau]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit) INFO: [charles]: Trying to contact 2 agents. INFO: [charles]: Received response from agent1q0mau8vkmg78xx0sh8cyl4tpl4ktx94pqp2e94cylu6haugt2hd7j9vequ7: Hello from bob INFO: [charles]: Received response from agent1qww3ju3h6kfcuqf54gkghvt2pqe8qp97a7nzm2vp8plfxflc0epzcjsv79t: Hello from alice
Answer queries with on_query()
handler
The on_query()
handler is used to register a Function as a handler for incoming queries that match a specified Model
. This decorator enables the agent to respond to queries in an event-driven manner.
Walk-through
Agent's script
For the agent, the script sets up an agent to handle incoming queries. It defines two models: TestRequest
and Response
. Upon startup, it logs the agent's details. The core functionality lies in the query_handler
, decorated with @agent.on_query()
, which processes received queries and sends back a predefined response. This demonstrates creating responsive agents within the uagents
Framework, showcasing how they can interact with other agents or functions in an asynchronous, event-driven architecture.
agent.pyfrom uagents import Agent, Context, Model class TestRequest(Model): message: str class Response(Model): text: str # Initialize the agent with its configuration. agent = Agent( name="your_agent_name_here", seed="your_agent_seed_here", port=8001, endpoint="http://localhost:8001/submit", ) @agent.on_event("startup") async def startup(ctx: Context): ctx.logger.info(f"Starting up {agent.name}") ctx.logger.info(f"With address: {agent.address}") ctx.logger.info(f"And wallet address: {agent.wallet.address()}") # Decorator to handle incoming queries. @agent.on_query(model=TestRequest, replies={Response}) async def query_handler(ctx: Context, sender: str, _query: TestRequest): ctx.logger.info("Query received") try: # do something here await ctx.send(sender, Response(text="success")) except Exception: await ctx.send(sender, Response(text="fail")) # Main execution block to run the agent. if __name__ == "__main__": agent.run()
The agent is created using the Agent
class from uagents
library. You can initialize it by providing it with a name
, seed
, port
, and endpoint
. It defines an on_event()
handler for the startup
event, where it logs information about the agent's initialization. It defines an on_query()
handler for handling queries of type TestRequest
. Upon receiving a query, it processes it and sends back a Response
. The agent is then set to run.
Proxy
The proxy is implemented using FastAPI
. It sets up two routes: "/"
for a simple root message and "/endpoint"
for receiving requests. When a POST
request is made to "/endpoint"
with a JSON payload containing a TestRequest
, it triggers the make_agent_call
function. Inside make_agent_call
, it calls agent_query
to communicate with the agent. The agent receives the query, processes it, and sends back a response. The proxy receives the response from the agent and sends back a success message along with the response text.
Let's explore the Proxy code script step-by-step:
-
First of all navigate to directory where you want to create your project.
-
Create a Python script name
on_query.py
by running:windowsecho. > on_query.py
-
We need to import
json
,fastapi
,uagent
'sModel
andquery
. Then we would need to define the query format using theTestRequest
class as a subclass ofModel
:
on_query.pyimport json from fastapi import FastAPI, Request from uagents import Model from uagents.query import query from uagents.envelope import Envelope AGENT_ADDRESS = "agent1qt6ehs6kqdgtrsduuzslqnrzwkrcn3z0cfvwsdj22s27kvatrxu8sy3vag0" class TestRequest(Model): message: str
- Create
agent_query()
function to send query to agent and decode the response received.
on_query.pyasync def agent_query(req): response = await query(destination=AGENT_ADDRESS, message=req, timeout=15) if isinstance(response, Envelope): data = json.loads(response.decode_payload()) return data["text"] return response
- Initialize a
FastAPI
app:
on_query.pyapp = FastAPI()
- Define a root endpoint to test the server:
on_query.py@app.get("/") def read_root(): return "Hello from the Agent controller"
- Define an endpoint to make agent calls:
on_query.py@app.post("/endpoint") async def make_agent_call(req: Request): model = TestRequest.parse_obj(await req.json()) try: res = await agent_query(model) return f"successful call - agent response: {res}" except Exception: return "unsuccessful agent call"
- Save the script. Remember that you need to provide the
AGENT_ADDRESS
parameter to correctly run this code.
The overall script should look as follows:
on_query.pyimport json from fastapi import FastAPI, Request from uagents import Model from uagents.query import query from uagents.envelope import Envelope AGENT_ADDRESS = "agent1qt6ehs6kqdgtrsduuzslqnrzwkrcn3z0cfvwsdj22s27kvatrxu8sy3vag0" class TestRequest(Model): message: str async def agent_query(req): response = await query(destination=AGENT_ADDRESS, message=req, timeout=15) if isinstance(response, Envelope): data = json.loads(response.decode_payload()) return data["text"] return response app = FastAPI() @app.get("/") def read_root(): return "Hello from the Agent controller" @app.post("/endpoint") async def make_agent_call(req: Request): model = TestRequest.parse_obj(await req.json()) try: res = await agent_query(model) return f"successful call - agent response: {res}" except Exception: return "unsuccessful agent call"
Run the example
In separate terminals:
-
Run the FastAPI proxy:
uvicorn proxy:app
-
Run the agent:
python agent.py
-
Query the agent via the proxy:
curl -d '{"message": "test"}' -H "Content-Type: application/json" -X POST http://localhost:8000/endpoint
Catching events with on_event()
handler
During startup, and shutdown there are two events that are caught by the uAgents library, startup
and shutdown
.
Here's an example:
on_event("startup")
@agent.on_event("startup") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.") ...
on_event("shutdown")
@agent.on_event("shutdown") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and I am shutting down") ...