Walk-through ๐
You first need to satisfy the requirements in Getting Started โ๏ธ section for this example.
Organize the project
Inside the src
folder in the fetch-holiday
directory for the project, you will need to create 3 different directories and a Python script:
cd src
mkdir agents
mkdir messages
mkdir utils
touch main.py
These directories will contain the different files you will need to correctly run the project.
Define the scripts
Agents
Let's start by defining the needed agents for this and their respective behaviors and functions. Within the agents folder created above, create 3 different sub-folders, one for each agent we plan to define:
mkdir activities
mkdir destinations
mkdir flights
Top activities agent
Let's start by defining the script for a top activities agent which provides tourist activities based on specified city and date criteria.
-
Enter the
activities
folder and create a script for this agent:touch top_activities.py
-
Within this script, import the necessary modules, create an agent
top_activities
using a secretseed
, and make sure it has enough funds to register in the Almanac contract โ:from langchain.chat_models import ChatOpenAI from langchain.agents import load_tools from langchain.agents import initialize_agent from langchain.output_parsers import CommaSeparatedListOutputParser from langchain.prompts import PromptTemplate from messages import UAgentResponse, UAgentResponseType, TopActivities, KeyValue from uagents import Agent, Context, Protocol from uagents.setup import fund_agent_if_low import os TOP_ACTIVITIES_SEED = os.getenv("TOP_ACTIVITIES_SEED", "top_activities really secret phrase :)") agent = Agent( name="top_activities", seed=TOP_ACTIVITIES_SEED ) fund_agent_if_low(agent.wallet.address())
-
Let's then define an output parser using
CommaSeparatedListOutputParser
, to help parse the results from the agent's response, and a prompt template defined using aPromptTemplate
instance, which provides a structured message that explains the agent's role and expectations. It includes placeholders for variables likecity
,date
, andpreferred_activities_str
:output_parser = CommaSeparatedListOutputParser() format_instructions = output_parser.get_format_instructions() prompt = PromptTemplate( template=""" You are an expert AI in suggesting travel, holiday activities based on the date and city specified in user input.\n The question that SerpAPI has to answer: What are the top 5 tourist activities in {city} on {date}?\n {preferred_activities_str}\n You should find tourist attractions and programs which are available exactly on the specified date.\n {format_instructions}""", input_variables=["city", "date", "preferred_activities_str"], partial_variables={"format_instructions": format_instructions} )
-
Let's create an instance of the
ChatOpenAI
class (referred to asllm
) to interact with a large language model to understand and generate human-like text. The next steps would be loading tools for the agent and initialize it by also connecting the loaded tools and the large language model to it. Finally, define the communication protocol, calledTopActivities
:llm = ChatOpenAI(temperature=0.1) tools = load_tools(["serpapi"], llm=llm) langchain_agent = initialize_agent(tools, llm, agent="chat-zero-shot-react-description", verbose=True) top_activities_protocol = Protocol("TopActivities")
-
We then need to define how the
top_activities
agent responds when it receives a message related toTopActivities
. We define how the agent should respond when it gets a message about top travel activities. It takes the user's message, creates a template for the AI, sends that to the AI for a response, turns the AI's answer into a list of options, and sends these options back to the user. If something goes wrong, it lets the user know there was an issue:@top_activities_protocol.on_message(model=TopActivities, replies=UAgentResponse) async def get_top_activity(ctx: Context, sender: str, msg: TopActivities): ctx.logger.info(f"Received message from {sender}, session: {ctx.session}") preferred_activities_str = f"You should only offer programs and activities related to {msg.preferred_activities}" if msg.preferred_activities else "" _input = prompt.format(city=msg.city, date=msg.date, preferred_activities_str = preferred_activities_str) try: output = await langchain_agent.arun(_input) result = output_parser.parse(output) options = list(map(lambda x: KeyValue(key=x, value=x), result)) ctx.logger.info(f"Agent executed and got following result: {result}. Mapped to options: {options}") await ctx.send( sender, UAgentResponse( options=options, type=UAgentResponseType.FINAL_OPTIONS, ) ) except Exception as exc: ctx.logger.warn(exc) await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR)) agent.include(top_activities_protocol)
We defined a function named
get_top_activity()
that will handleTopActivities
messages as definded by the.on_message()
decorator. The function then logs information about the received message, sender, and session usingctx.logger.info()
method. The code then checks ifmsg.preferred_activities
is provided. If it is, a message about preferred activities is generated. We proceed by generating the input for the language model using the filled prompt as input to the language model for generating a response. The code then parses the output using the output parser previously defined. The parsed result is used to create a list of options, each represented as aKeyValue
object. The code then logs the result and the corresponding options. A response is sent back to the sender usingctx.send()
method. The response includes the options generated from the parsed output and specifies the response type asUAgentResponseType.FINAL_OPTIONS
.Any exceptions that may occur during this process are logged as warnings, and an error response is sent back to the sender. We finally use the
.include()
method to include theTopActivities
protocol in the agent, ensuring it's ready to handle messages of this type. -
Save the script.
The overall script should look as follows:
from langchain.chat_models import ChatOpenAI
from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate
from messages import UAgentResponse, UAgentResponseType, TopActivities, KeyValue
from uagents import Agent, Context, Protocol
from uagents.setup import fund_agent_if_low
import os
TOP_ACTIVITIES_SEED = os.getenv("TOP_ACTIVITIES_SEED", "top_activities really secret phrase :)")
agent = Agent(
name="top_activities",
seed=TOP_ACTIVITIES_SEED
)
fund_agent_if_low(agent.wallet.address())
output_parser = CommaSeparatedListOutputParser()
format_instructions = output_parser.get_format_instructions()
prompt = PromptTemplate(
template="""
You are an expert AI in suggesting travel, holiday activities based on the date and city specified in user input.\n
The question that SerpAPI has to answer: What are the top 5 tourist activities in {city} on {date}?\n
{preferred_activities_str}\n
You should find tourist attractions and programs which are available exactly on the specified date.\n
{format_instructions}""",
input_variables=["city", "date", "preferred_activities_str"],
partial_variables={"format_instructions": format_instructions}
)
llm = ChatOpenAI(temperature=0.1)
tools = load_tools(["serpapi"], llm=llm)
langchain_agent = initialize_agent(tools, llm, agent="chat-zero-shot-react-description", verbose=True)
top_activities_protocol = Protocol("TopActivities")
@top_activities_protocol.on_message(model=TopActivities, replies=UAgentResponse)
async def get_top_activity(ctx: Context, sender: str, msg: TopActivities):
ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
preferred_activities_str = f"You should only offer programs and activities related to {msg.preferred_activities}" if msg.preferred_activities else ""
_input = prompt.format(city=msg.city, date=msg.date, preferred_activities_str = preferred_activities_str)
try:
output = await langchain_agent.arun(_input)
result = output_parser.parse(output)
options = list(map(lambda x: KeyValue(key=x, value=x), result))
ctx.logger.info(f"Agent executed and got following result: {result}. Mapped to options: {options}")
await ctx.send(
sender,
UAgentResponse(
options=options,
type=UAgentResponseType.FINAL_OPTIONS,
)
)
except Exception as exc:
ctx.logger.warn(exc)
await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
agent.include(top_activities_protocol)
Top destinations agent
Let's now define the script for a top destinations agent which provides tourist destinations based on specified city and date criteria.
-
Within the
destinations
directory previously created, create a Python script for this agent and name it:touch top_destinations.py
. -
Let's import the needed modules and initialize an agent named
top_destinations
using a secretseed
. Let's ensure the agent has sufficient funds in its wallet. We would then need to define the large language model setup and the communication protocol which defines the communication rules for handling messages about top destinations:from uagents import Agent, Context, Protocol from messages import TopDestinations, UAgentResponse, UAgentResponseType, KeyValue from uagents.setup import fund_agent_if_low from utils.llm import get_llm import os TOP_DESTINATIONS_SEED = os.getenv("TOP_DESTINATIONS_SEED", "top_destinations really secret phrase :)") agent = Agent( name="top_destinations", seed=TOP_DESTINATIONS_SEED ) fund_agent_if_low(agent.wallet.address()) llm = get_llm() top_destinations_protocol = Protocol("TopDestinations")
-
Let's then define a
get_top_destinations()
function which is triggered whenever aTopDestinations
message is received. This function will reply with a message of typeUAgentResponse
:@top_destinations_protocol.on_message(model=TopDestinations, replies=UAgentResponse) async def get_top_destinations(ctx: Context, sender: str, msg: TopDestinations): ctx.logger.info(f"Received message from {sender}, session: {ctx.session}") prompt = f"""You are an expert AI in suggesting travel, holiday destination based on some user input. User input might not be provided, in which case suggest popular destinations. If user input is present, then suggest destinations based on user input. The response should be a list of destinations, each destination should have information about why it is a good destination. After listing all the suggestions say END. Every destination should be separated by a new line. Example: User input: I want to go to a place with good weather and beaches. Response: 1. Goa, India. Goa is a popular destination for tourists. It has good weather and beaches. 2. Malรฉ, Maldives. Maldives is a popular destination for tourists. It has good weather and beaches. END User preferences: {msg.preferences} """ try: response = await llm.complete("", prompt, "Response:", max_tokens=500, stop=["END"]) result = response.strip() result = result.split("\n") await ctx.send( sender, UAgentResponse( options=list(map(lambda x: KeyValue(key=x, value=x), result)), type=UAgentResponseType.FINAL_OPTIONS ) ) except Exception as exc: ctx.logger.warn(exc) await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR)) agent.include(top_destinations_protocol)
The
get_top_destinations()
function logs the sender and session of the received message usingctx.logger.info()
method. It then defines a prompt explaining what the AI does and providing an example. It then uses the large language model to generate a response based on the prompt, and processes the response to get a list of suggested destinations. Afterwards, it sends back the list as a response to the sender. Any exceptions that may occur during this process are logged as warnings. An error response is sent back to the sender if an exception occurs. Finally, we use the.include()
to add theTopDestinations
protocol to the agent, enabling it to handle messages of this type. -
Save the script.
The overall script should look as follows:
from uagents import Agent, Context, Protocol
from messages import TopDestinations, UAgentResponse, UAgentResponseType, KeyValue
from uagents.setup import fund_agent_if_low
from utils.llm import get_llm
import os
TOP_DESTINATIONS_SEED = os.getenv("TOP_DESTINATIONS_SEED", "top_destinations really secret phrase :)")
agent = Agent(
name="top_destinations",
seed=TOP_DESTINATIONS_SEED
)
fund_agent_if_low(agent.wallet.address())
llm = get_llm()
top_destinations_protocol = Protocol("TopDestinations")
@top_destinations_protocol.on_message(model=TopDestinations, replies=UAgentResponse)
async def get_top_destinations(ctx: Context, sender: str, msg: TopDestinations):
ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
prompt = f"""You are an expert AI in suggesting travel, holiday destination based on some user input.
User input might not be provided, in which case suggest popular destinations.
If user input is present, then suggest destinations based on user input.
The response should be a list of destinations, each destination should have information about why it is a good destination.
After listing all the suggestions say END. Every destination should be separated by a new line.
Example:
User input: I want to go to a place with good weather and beaches.
Response:
1. Goa, India. Goa is a popular destination for tourists. It has good weather and beaches.
2. Malรฉ, Maldives. Maldives is a popular destination for tourists. It has good weather and beaches.
END
User preferences: {msg.preferences}
"""
try:
response = await llm.complete("", prompt, "Response:", max_tokens=500, stop=["END"])
result = response.strip()
result = result.split("\n")
await ctx.send(
sender,
UAgentResponse(
options=list(map(lambda x: KeyValue(key=x, value=x), result)),
type=UAgentResponseType.FINAL_OPTIONS
)
)
except Exception as exc:
ctx.logger.warn(exc)
await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
agent.include(top_destinations_protocol)
Flights agent
Let's now define the script for a flights agent which provides flights information based on specified city and date criteria.
-
Within the
flights
directory previously created, create a Python script for this agent and name it:touch flights.py
. -
Let's import the needed modules and initialize an agent named
flights_adaptor
using a secretseed
. Let's ensure the agent has sufficient funds in its wallet:from uagents import Agent, Context, Protocol from uagents.setup import fund_agent_if_low import requests from messages import Flights, UAgentResponse, UAgentResponseType, KeyValue import os import uuid FLIGHTS_SEED = os.getenv("FLIGHTS_SEED", "flights really secret phrase :)") agent = Agent( name="flights_adaptor", seed=FLIGHTS_SEED ) fund_agent_if_low(agent.wallet.address())
-
We then need to retrieve the RapidAPI key from the environment variables and asserts its presence. We would proceed by setting up the URL for the Skyscanner API, and by defining the headers needed for sending requests to the Skyscanner API, including the RapidAPI key:
RAPIDAPI_API_KEY = os.environ.get("RAPIDAPI_API_KEY", "") assert RAPIDAPI_API_KEY, "RAPIDAPI_API_KEY environment variable is missing from .env" SKY_SCANNER_URL = "https://skyscanner-api.p.rapidapi.com/v3e/flights/live/search/synced" headers = { "content-type": "application/json", "X-RapidAPI-Key": RAPIDAPI_API_KEY, "X-RapidAPI-Host": "skyscanner-api.p.rapidapi.com" }
-
Let's then define a
skyscanner_format_data()
function to process data from the Skyscanner API response to extract essential flight details:def skyscanner_format_data(data): r = data["content"]["results"] carriers = r["carriers"] itineraries = r["itineraries"] segments = r["segments"] sorted_itineraries = data["content"]["sortingOptions"]["cheapest"] results = [] for o in sorted_itineraries: _id = o["itineraryId"] it = itineraries[_id] for option in it["pricingOptions"]: price = option["price"]["amount"] if len(option["items"]) != 1: continue fares = option["items"][0]["fares"] if len(fares) != 1: continue segment_id = fares[0]["segmentId"] seg = segments[segment_id] carrier = carriers[seg["marketingCarrierId"]] duration = seg["durationInMinutes"] departure = seg["departureDateTime"] arrival = seg["arrivalDateTime"] results.append({ "price": price, "duration": duration, "departure": departure, "arrival": arrival, "carrier": carrier }) return results
This function iterates through the sorted itineraries, extracts pricing options, and captures information such as price, carrier, duration, departure, and arrival times. This information is organized into a list of formatted flight options. The function then returns this list for further use, providing a clear overview of the available flight choices.
-
We proceed and define a function named
skyscanner_top5()
which takes a list of flight offers (fares) and returns the top 5 options from the list. We then initialize a protocol namedFlights
which will guide the agent's interactions related to flight offers:def skyscanner_top5(fares): return [fares[i] for i in range(len(fares)) if i < 5] flights_protocol = Protocol("Flights")
-
We would need to create and define a function
flight_offers()
to handle messages related to flight offers within theFlights
protocol:@flights_protocol.on_message(model=Flights, replies={UAgentResponse}) async def flight_offers(ctx: Context, sender: str, msg: Flights): ctx.logger.info(f"Received message from {sender}, session: {ctx.session}") dept_year, dept_month, dept_day = msg.date.split("-") payload = { "query": { "market": "UK", "locale": "en-GB", "currency": "GBP", "queryLegs": [ { "originPlaceId": {"iata": msg.from_}, "destinationPlaceId": {"iata": msg.to}, "date": { "year": int(dept_year), "month": int(dept_month), "day": int(dept_day) } } ], "cabinClass": "CABIN_CLASS_ECONOMY", "adults": msg.persons } } print("CALLING SKYSCANNER, payload ", payload) try: response = requests.request("POST", SKY_SCANNER_URL, json=payload, headers=headers) if response.status_code != 200: print("SKYSCANNER STATUS CODE not 200: ", response.json()) await ctx.send(sender, UAgentResponse(message=response.text, type=UAgentResponseType.ERROR)) return formatted = skyscanner_format_data(response.json()) top5 = skyscanner_top5(formatted) print("SKYSCANNER TOP5 RESPONSE: ", top5) request_id = str(uuid.uuid4()) options = [] for idx, o in enumerate(top5): dep = f"{o['departure']['hour']:02}:{o['departure']['minute']:02}" arr = f"{o['arrival']['hour']:02}:{o['arrival']['minute']:02}" option = f"""{o["carrier"]["name"]} for ยฃ{o['price']}, {dep} - {arr}, flight time {o['duration']} min""" options.append(KeyValue(key=idx, value=option)) await ctx.send(sender, UAgentResponse(options=options, type=UAgentResponseType.SELECT_FROM_OPTIONS, request_id=request_id)) except Exception as exc: ctx.logger.error(exc) await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR)) agent.include(flights_protocol)
Whenever a message of
Flights
model type is received, this function first extracts the necessary information (e.g., origin, destination, date, and number of persons) from the user's input, and constructs a payload for a Skyscanner API request which is then sent. It then formats the API response to extract essential flight details and selects the top 5 flight options from the data. It then creates a unique request ID for tracking, Then, it constructs a response message containing the top flight options and sends it to the user. Any exception that might occur during this process and communicates errors to the user is handled by the function. Finally, we include theFlights
protocol using the.include()
method, so enabling it to utilize theflight_offers()
function to manage and respond to flight offer-related messages. -
Save the script.
The overall script should look as follows:
from uagents import Agent, Context, Protocol
from uagents.setup import fund_agent_if_low
import requests
from messages import Flights, UAgentResponse, UAgentResponseType, KeyValue
import os
import uuid
FLIGHTS_SEED = os.getenv("FLIGHTS_SEED", "flights really secret phrase :)")
agent = Agent(
name="flights_adaptor",
seed=FLIGHTS_SEED
)
fund_agent_if_low(agent.wallet.address())
RAPIDAPI_API_KEY = os.environ.get("RAPIDAPI_API_KEY", "")
assert RAPIDAPI_API_KEY, "RAPIDAPI_API_KEY environment variable is missing from .env"
SKY_SCANNER_URL = "https://skyscanner-api.p.rapidapi.com/v3e/flights/live/search/synced"
headers = {
"content-type": "application/json",
"X-RapidAPI-Key": RAPIDAPI_API_KEY,
"X-RapidAPI-Host": "skyscanner-api.p.rapidapi.com"
}
def skyscanner_format_data(data):
r = data["content"]["results"]
carriers = r["carriers"]
itineraries = r["itineraries"]
segments = r["segments"]
sorted_itineraries = data["content"]["sortingOptions"]["cheapest"]
results = []
for o in sorted_itineraries:
_id = o["itineraryId"]
it = itineraries[_id]
for option in it["pricingOptions"]:
price = option["price"]["amount"]
if len(option["items"]) != 1:
continue
fares = option["items"][0]["fares"]
if len(fares) != 1:
continue
segment_id = fares[0]["segmentId"]
seg = segments[segment_id]
carrier = carriers[seg["marketingCarrierId"]]
duration = seg["durationInMinutes"]
departure = seg["departureDateTime"]
arrival = seg["arrivalDateTime"]
results.append({
"price": price,
"duration": duration,
"departure": departure,
"arrival": arrival,
"carrier": carrier
})
return results
def skyscanner_top5(fares):
return [fares[i] for i in range(len(fares)) if i < 5]
flights_protocol = Protocol("Flights")
@flights_protocol.on_message(model=Flights, replies={UAgentResponse})
async def flight_offers(ctx: Context, sender: str, msg: Flights):
ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
dept_year, dept_month, dept_day = msg.date.split("-")
payload = {
"query": {
"market": "UK",
"locale": "en-GB",
"currency": "GBP",
"queryLegs": [
{
"originPlaceId": {"iata": msg.from_},
"destinationPlaceId": {"iata": msg.to},
"date": {
"year": int(dept_year),
"month": int(dept_month),
"day": int(dept_day)
}
}
],
"cabinClass": "CABIN_CLASS_ECONOMY",
"adults": msg.persons
}
}
print("CALLING SKYSCANNER, payload ", payload)
try:
response = requests.request("POST", SKY_SCANNER_URL, json=payload, headers=headers)
if response.status_code != 200:
print("SKYSCANNER STATUS CODE not 200: ", response.json())
await ctx.send(sender, UAgentResponse(message=response.text, type=UAgentResponseType.ERROR))
return
formatted = skyscanner_format_data(response.json())
top5 = skyscanner_top5(formatted)
print("SKYSCANNER TOP5 RESPONSE: ", top5)
request_id = str(uuid.uuid4())
options = []
for idx, o in enumerate(top5):
dep = f"{o['departure']['hour']:02}:{o['departure']['minute']:02}"
arr = f"{o['arrival']['hour']:02}:{o['arrival']['minute']:02}"
option = f"""{o["carrier"]["name"]} for ยฃ{o['price']}, {dep} - {arr}, flight time {o['duration']} min"""
options.append(KeyValue(key=idx, value=option))
await ctx.send(sender, UAgentResponse(options=options, type=UAgentResponseType.SELECT_FROM_OPTIONS, request_id=request_id))
except Exception as exc:
ctx.logger.error(exc)
await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
agent.include(flights_protocol)
Messages
Now that we have defined our agents, we would need to define the structured data formats that these agents will use to communicate with each other or with external systems. There are 4 structured data models which we need to define within the messages
directory previously created within the src
folder:
Flight
The Flight model is designed to hold flight-related information with various fields, each annotated with descriptions and optional constraints. It's used to standardize and validate data exchanged between agents or systems within the uAgents integration.
-
Within the
messages
folder, let's create a Python script and name it:touch flight.py
. -
In here, we need to import the necessary modules and define a model to represent structured data for flight-related information:
flight.pyfrom uagents import Model from pydantic import Field from typing import Optional class Flights(Model): from_: str = Field(alias="from", description="This field is the airport IATA code for of the airport from where the user wants to fly from. This should be airport IATA code. IATA airport code is a three-character alphanumeric geocode.") to: str = Field(description="This field is the airport IATA code of the destination airport! This should be airport IATA code. IATA airport code is a three-character alphanumeric geocode.") trip: Optional[str] = Field(description="This can be oneway or return") date: str = Field(description="Contains the date of flying out.") back_date: Optional[str] = Field(description="Optional field only for return flight. This is the date when the user wants to fly back") route: Optional[int] = Field(description="Selects the maximum number of stops, 0 means direct flight, 1 means with maximum 1 stop.") persons: int = Field(description="Describes how many persons are going to fly.") class Config: allow_population_by_field_name = True
Here, we define a class named
Flights
which inherits from theModel
class. Inside this class, we defines a field namedfrom_
which represents the airport IATA code for the departure airport. It includes a description and uses theField
class to provide additional metadata. We then define a field namedto
which represents the airport IATA code for the destination airport. It includes a description. An optional field namedtrip
is defined. This can hold values like"oneway"
or"return"
to indicate the type of trip. We also defined a fielddate
containing the date of the outbound flight and a fieldback_date
for return flights.route
indicates the maximum number of stops for the flight route. The field namedpersons
represents the number of people flying.
General
-
Within the
messages
folder, let's create another Python script and name it:touch general.py
. -
In here, we need to import the necessary modules and define the different data models:
general.pyfrom uagents import Model from enum import Enum from typing import Optional, List class UAgentResponseType(Enum): ERROR = "error" SELECT_FROM_OPTIONS = "select_from_options" FINAL_OPTIONS = "final_options" class KeyValue(Model): key: str value: str class UAgentResponse(Model): type: UAgentResponseType agent_address: Optional[str] message: Optional[str] options: Optional[List[KeyValue]] request_id: Optional[str] class BookingRequest(Model): request_id: str user_response: str user_email: str user_full_name: str
Here, we defined structured data models to represent different types of data within the uAgents framework, making communication between agents and systems more organized and standardized:
UAgentResponseType
: This is an enumeration that defines different response types an agent can use.KeyValue
: A model representing a key-value pair, often used to display options or details.UAgentResponse
: This model represents a response from an agent. It includes the response type, an optional agent address, an optional message, a list of optional key-value options, and an optional request ID.BookingRequest
: This model represents a booking request. It includes a request ID, user response, user email, and user full name.
Top activities
-
Within the
messages
folder, let's create an additional Python script and name it:touch top_activities.py
. -
In here, we need to import the necessary modules and define the data model through which agents can exchange data about users' preferences for top activities in a specific city, helping them to make more informed and tailored suggestions to users:
general.pyfrom uagents import Model from typing import Optional from pydantic import Field class TopActivities(Model): city: str = Field(description="Describes the city where we want to go and find activity, for example: London. This shouldn't be based on user current location, but the city where user wants to go.") date: str = Field(description="Describes which date user wants to find top activities in given city") preferred_activities: Optional[str] = Field(description="Describes what activities the user prefers. It is optional")
This data model represents information for finding top activities in a city. It takes in different parameters: -
city
: A required field that describes the city where users want to find activities. -date
: A required field that specifies the date when the user wants to find top activities in the given city. -preferred_activities
: An optional field that describes the activities the user prefers.
Top destinations
By using this model, agents can communicate and understand user preferences for holiday destinations, allowing them to provide more tailored suggestions and recommendations based on the user's interests.
-
Within the
messages
folder, let's create a final Python script and name it:touch top_destinations.py
. -
In here, we need to import the necessary modules and define the data model through which agents can exchange data about users' preferences for top destinations:
general.pyfrom uagents import Model from pydantic import Field class TopDestinations(Model): preferences: str = Field(description="The field expresses what the user prefer. Can be left empty. For example: 'beach', 'mountains, rivers', 'city', etc.")
Here, we defined a
TopDestinations
model which represents user preferences for top holiday destinations. This class takes in apreferences
parameter which indicated a field that expresses what the user prefers for their holiday destination. It can be left empty or contain preferences likebeach
,mountains
,rivers
,city
, and so on.
LLM - Large Language Model
We need to define a convenient way to interact with OpenAI's large language model (LLM) for text generation, embedding, and completions.
-
To start, we need to enter now the
utils
folder created at the beginning within thesrc
folder. -
In here, let's create a Python file and name it:
touch llm.py
. -
First of all, we need to import the needed classes. Then, we proceed and define a
OpenAILLM
class. This class encapsulates methods for interacting with OpenAI's large language model:import openai import time from typing import List import os class OpenAILLM: def __init__(self, completion_model: str = "text-davinci-003", embedding_model: str = "text-embedding-ada-002" ): self.completion_model = completion_model self.embedding_model = embedding_model self.usage_counter = None async def embedding(self, text: str) -> str: text = text.replace("\n", " ") emb = await openai.Embedding.acreate(input=[text], model=self.embedding_model) if self.usage_counter: await self.usage_counter.add_embedding_usage(emb["usage"]) return emb["data"][0]["embedding"] async def complete( self, header: str, prompt: str, complete: str, temperature: float = 0.5, max_tokens: int = 100, stop: List[str] = None, ) -> str: try: if self.completion_model != "gpt-4" and self.completion_model.find("gpt-3.5") == -1: #Call GPT-3 DaVinci model response = await openai.Completion.acreate( engine=self.completion_model, prompt=header+prompt+complete, temperature=temperature, max_tokens=max_tokens, top_p=1, frequency_penalty=0, presence_penalty=0, stop=stop ) if self.usage_counter: await self.usage_counter.add_completion_usage(response.usage) return response.choices[0].text.strip() else: #Call GPT-4/gpt-3.5 chat model messages=[{"role": "system", "content": header}, {"role": "user", "content": prompt}, {"role": "assistant", "content": complete}] response = await openai.ChatCompletion.acreate( model=self.completion_model, messages = messages, temperature=temperature, max_tokens=max_tokens, n=1, stop=stop, ) if self.usage_counter: await self.usage_counter.add_completion_usage(response.usage) return response.choices[0].message.content.strip() except openai.InvalidRequestError as err: print("openai_call InvalidRequestError: ", err) print("\n\nยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงPROMPTยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง") print(prompt) print("ยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง\n\n") raise err except Exception as err: print("openai_call Exception: ", err) print("Retry...") time.sleep(2) return await self.complete(header, prompt, complete, temperature, max_tokens, stop)
The
OpenAILLM
class provides methods for interacting with OpenAI's LLM. It accepts two arguments during initialization:completion_model
: The model to use for completions (default:"text-davinci-003"
).embedding_model
: The model to use for text embeddings (default:"text-embedding-ada-002"
).
The class has different methods:
embedding()
: This takes a text input, creates an embedding using the specified embedding model, and returns the embedding.complete()
: This generates completions based on the input prompt using the specified completion model. It supports both GPT-3 DaVinci model and GPT-4/gpt-3.5 chat model.
-
We then need to define a
get_llm()
function which returns an instance ofOpenAILLM
class with a specific completion model ("gpt-3.5-turbo"
).def get_llm(): OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") if len(OPENAI_API_KEY) > 0: openai.api_key = OPENAI_API_KEY return OpenAILLM(completion_model="gpt-3.5-turbo")
We defined a
get_llm()
function which retrieves the OpenAI API key from the environment variableOPENAI_API_KEY
. If the API key is available, it sets the APY key for the OpenAI library. It returns an instance ofOpenAILLM
with the specified completion model ("gpt-3.5-turbo"
). -
Save the script.
The overall script should look as follows:
import openai
import time
from typing import List
import os
class OpenAILLM:
def __init__(self,
completion_model: str = "text-davinci-003",
embedding_model: str = "text-embedding-ada-002"
):
self.completion_model = completion_model
self.embedding_model = embedding_model
self.usage_counter = None
async def embedding(self, text: str) -> str:
text = text.replace("\n", " ")
emb = await openai.Embedding.acreate(input=[text], model=self.embedding_model)
if self.usage_counter:
await self.usage_counter.add_embedding_usage(emb["usage"])
return emb["data"][0]["embedding"]
async def complete(
self,
header: str,
prompt: str,
complete: str,
temperature: float = 0.5,
max_tokens: int = 100,
stop: List[str] = None,
) -> str:
try:
if self.completion_model != "gpt-4" and self.completion_model.find("gpt-3.5") == -1:
#Call GPT-3 DaVinci model
response = await openai.Completion.acreate(
engine=self.completion_model,
prompt=header+prompt+complete,
temperature=temperature,
max_tokens=max_tokens,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
stop=stop
)
if self.usage_counter:
await self.usage_counter.add_completion_usage(response.usage)
return response.choices[0].text.strip()
else:
#Call GPT-4/gpt-3.5 chat model
messages=[{"role": "system", "content": header}, {"role": "user", "content": prompt}, {"role": "assistant", "content": complete}]
response = await openai.ChatCompletion.acreate(
model=self.completion_model,
messages = messages,
temperature=temperature,
max_tokens=max_tokens,
n=1,
stop=stop,
)
if self.usage_counter:
await self.usage_counter.add_completion_usage(response.usage)
return response.choices[0].message.content.strip()
except openai.InvalidRequestError as err:
print("openai_call InvalidRequestError: ", err)
print("\n\nยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงPROMPTยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง")
print(prompt)
print("ยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง\n\n")
raise err
except Exception as err:
print("openai_call Exception: ", err)
print("Retry...")
time.sleep(2)
return await self.complete(header, prompt, complete, temperature, max_tokens, stop)
def get_llm():
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
if len(OPENAI_API_KEY) > 0:
openai.api_key = OPENAI_API_KEY
return OpenAILLM(completion_model="gpt-3.5-turbo")
Main script
We are now ready to write the code for our main script. In the src
folder in fetch-holiday
integration, let's enter the main.py
script initially created.
-
Let's import the needed class and let's create a
Bureau
object as an instance of theBureau
class. We need to add the three different agents to it, and then run theBureau
to manage and coordinate their execution:main.pyfrom uagents import Bureau from agents.activities.top_activities import agent as top_activities_agent from agents.destinations.top_destinations import agent as top_destinations_agent from agents.flights.flights import agent as flights_agent if __name__ == "__main__": bureau = Bureau(endpoint="http://127.0.0.1:8000/submit", port=8000) print(f"Adding top activities agent to Bureau: {top_activities_agent.address}") bureau.add(top_activities_agent) print(f"Adding top destinations agent to Bureau: {top_destinations_agent.address}") bureau.add(top_destinations_agent) print(f"Adding flights agent to Bureau: {flights_agent.address}") bureau.add(flights_agent) bureau.run()
-
Save the script.
Run the main script
Once you finalized the main.py
script, you will need to run the project and its agents: poetry run python main.py
You need to look for the following output in the logs:
Adding top destinations agent to Bureau: {top_destinations_agent.address}
Copy the {top_destinations_agent.address}
value and paste it somewhere safe. You will need it in the following step.
Set up a client script
Now that we have set up the integrations, let's run a client script that will showcase the top destinations
.
-
To do this, create a new Python file in the
src
folder and name it:touch top_dest_client.py
. -
You will need to paste the following code in it:
top_dest_client.pyfrom messages import TopDestinations, UAgentResponse from uagents import Agent, Context from uagents.setup import fund_agent_if_low import os TOP_DESTINATIONS_CLIENT_SEED = os.getenv("TOP_DESTINATIONS_CLIENT_SEED", "top_destinations_client really secret phrase :)") top_dest_client = Agent( name="top_destinations_client", port=8008, seed=TOP_DESTINATIONS_CLIENT_SEED, endpoint=["http://127.0.0.1:8008/submit"], ) fund_agent_if_low(top_dest_client.wallet.address()) top_dest_request = TopDestinations(preferences="new york") @top_dest_client.on_interval(period=10.0) async def send_message(ctx: Context): await ctx.send("{top_destinations_agent.address}", top_dest_request) @top_dest_client.on_message(model=UAgentResponse) async def message_handler(ctx: Context, _: str, msg: UAgentResponse): ctx.logger.info(f"Received top destination options from: {msg.options}") if __name__ == "__main__": top_dest_client.run()
Remember to replace the address in
ctx.send()
method with the value you received in the previous step. This code sends a request to get the top destinations (in this example, from New York). To do this, it sends a request to thetop destinations agent
every 10 seconds and displays the options in the console. -
Save the script.
Run the client script
Open a new terminal (let the previous one be as is), and navigate to the src
folder to run the client script.
cd src
poetry run python top_dest_client.py
Once you hit enter, a request will be sent to the top destinations agent every 10 seconds, and you will be able to see your results in the console!