Integrations
Fetch Holidays Integration โœˆ๏ธ
Walk-through ๐Ÿ“

Walk-through ๐Ÿ“

You first need to satisfy the requirements in Getting Started โ†—๏ธ section for this example.

Organize the project

Inside the src folder in the fetch-holiday directory for the project, you will need to create 3 different directories and a Python script:

cd src
mkdir agents
mkdir messages
mkdir utils
touch main.py

These directories will contain the different files you will need to correctly run the project.

Define the scripts

Agents

Let's start by defining the needed agents for this and their respective behaviors and functions. Within the agents folder created above, create 3 different sub-folders, one for each agent we plan to define:

mkdir activities
mkdir destinations
mkdir flights

Top activities agent

Let's start by defining the script for a top activities agent which provides tourist activities based on specified city and date criteria.

  1. Enter the activities folder and create a script for this agent: touch top_activities.py

  2. Within this script, import the necessary modules, create an agent top_activities using a secret seed, and make sure it has enough funds to register in the Almanac contract โ†—:

    from langchain.chat_models import ChatOpenAI
    from langchain.agents import load_tools
    from langchain.agents import initialize_agent
    from langchain.output_parsers import CommaSeparatedListOutputParser
    from langchain.prompts import PromptTemplate
    from messages import UAgentResponse, UAgentResponseType, TopActivities, KeyValue
    from uagents import Agent, Context, Protocol
    from uagents.setup import fund_agent_if_low
    import os
     
    TOP_ACTIVITIES_SEED = os.getenv("TOP_ACTIVITIES_SEED", "top_activities really secret phrase :)")
     
    agent = Agent(
        name="top_activities",
        seed=TOP_ACTIVITIES_SEED
    )
     
    fund_agent_if_low(agent.wallet.address())
  3. Let's then define an output parser using CommaSeparatedListOutputParser, to help parse the results from the agent's response, and a prompt template defined using a PromptTemplate instance, which provides a structured message that explains the agent's role and expectations. It includes placeholders for variables like city, date, and preferred_activities_str:

    output_parser = CommaSeparatedListOutputParser()
    format_instructions = output_parser.get_format_instructions()
    prompt = PromptTemplate(
        template="""
            You are an expert AI in suggesting travel, holiday activities based on the date and city specified in user input.\n
            The question that SerpAPI has to answer: What are the top 5 tourist activities in {city} on {date}?\n
            {preferred_activities_str}\n
            You should find tourist attractions and programs which are available exactly on the specified date.\n
            {format_instructions}""",
        input_variables=["city", "date", "preferred_activities_str"],
        partial_variables={"format_instructions": format_instructions}
    )
  4. Let's create an instance of the ChatOpenAI class (referred to as llm) to interact with a large language model to understand and generate human-like text. The next steps would be loading tools for the agent and initialize it by also connecting the loaded tools and the large language model to it. Finally, define the communication protocol, called TopActivities:

    llm = ChatOpenAI(temperature=0.1)
    tools = load_tools(["serpapi"], llm=llm)
    langchain_agent = initialize_agent(tools, llm, agent="chat-zero-shot-react-description", verbose=True)
     
    top_activities_protocol = Protocol("TopActivities")
  5. We then need to define how the top_activities agent responds when it receives a message related to TopActivities. We define how the agent should respond when it gets a message about top travel activities. It takes the user's message, creates a template for the AI, sends that to the AI for a response, turns the AI's answer into a list of options, and sends these options back to the user. If something goes wrong, it lets the user know there was an issue:

    @top_activities_protocol.on_message(model=TopActivities, replies=UAgentResponse)
    async def get_top_activity(ctx: Context, sender: str, msg: TopActivities):
        ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
     
        preferred_activities_str = f"You should only offer programs and activities related to {msg.preferred_activities}" if msg.preferred_activities else ""
        _input = prompt.format(city=msg.city, date=msg.date, preferred_activities_str = preferred_activities_str)
        try:
            output = await langchain_agent.arun(_input)
            result = output_parser.parse(output)
            options = list(map(lambda x: KeyValue(key=x, value=x), result))
            ctx.logger.info(f"Agent executed and got following result: {result}. Mapped to options: {options}")
            await ctx.send(
                sender,
                UAgentResponse(
                    options=options,
                    type=UAgentResponseType.FINAL_OPTIONS,
                )
            )
        except Exception as exc:
          ctx.logger.warn(exc)
          await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
     
    agent.include(top_activities_protocol)

    We defined a function named get_top_activity() that will handle TopActivities messages as definded by the .on_message() decorator. The function then logs information about the received message, sender, and session using ctx.logger.info() method. The code then checks if msg.preferred_activities is provided. If it is, a message about preferred activities is generated. We proceed by generating the input for the language model using the filled prompt as input to the language model for generating a response. The code then parses the output using the output parser previously defined. The parsed result is used to create a list of options, each represented as a KeyValue object. The code then logs the result and the corresponding options. A response is sent back to the sender using ctx.send() method. The response includes the options generated from the parsed output and specifies the response type as UAgentResponseType.FINAL_OPTIONS.

    Any exceptions that may occur during this process are logged as warnings, and an error response is sent back to the sender. We finally use the .include() method to include the TopActivities protocol in the agent, ensuring it's ready to handle messages of this type.

  6. Save the script.

The overall script should look as follows:

top_activities.py
from langchain.chat_models import ChatOpenAI
from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate
from messages import UAgentResponse, UAgentResponseType, TopActivities, KeyValue
from uagents import Agent, Context, Protocol
from uagents.setup import fund_agent_if_low
import os
 
TOP_ACTIVITIES_SEED = os.getenv("TOP_ACTIVITIES_SEED", "top_activities really secret phrase :)")
 
agent = Agent(
    name="top_activities",
    seed=TOP_ACTIVITIES_SEED
)
 
fund_agent_if_low(agent.wallet.address())
 
output_parser = CommaSeparatedListOutputParser()
format_instructions = output_parser.get_format_instructions()
prompt = PromptTemplate(
    template="""
        You are an expert AI in suggesting travel, holiday activities based on the date and city specified in user input.\n
        The question that SerpAPI has to answer: What are the top 5 tourist activities in {city} on {date}?\n
        {preferred_activities_str}\n
        You should find tourist attractions and programs which are available exactly on the specified date.\n
        {format_instructions}""",
    input_variables=["city", "date", "preferred_activities_str"],
    partial_variables={"format_instructions": format_instructions}
)
 
llm = ChatOpenAI(temperature=0.1)
tools = load_tools(["serpapi"], llm=llm)
langchain_agent = initialize_agent(tools, llm, agent="chat-zero-shot-react-description", verbose=True)
 
top_activities_protocol = Protocol("TopActivities")
 
@top_activities_protocol.on_message(model=TopActivities, replies=UAgentResponse)
async def get_top_activity(ctx: Context, sender: str, msg: TopActivities):
    ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
 
    preferred_activities_str = f"You should only offer programs and activities related to {msg.preferred_activities}" if msg.preferred_activities else ""
    _input = prompt.format(city=msg.city, date=msg.date, preferred_activities_str = preferred_activities_str)
    try:
        output = await langchain_agent.arun(_input)
        result = output_parser.parse(output)
        options = list(map(lambda x: KeyValue(key=x, value=x), result))
        ctx.logger.info(f"Agent executed and got following result: {result}. Mapped to options: {options}")
        await ctx.send(
            sender,
            UAgentResponse(
                options=options,
                type=UAgentResponseType.FINAL_OPTIONS,
            )
        )
    except Exception as exc:
      ctx.logger.warn(exc)
      await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
 
agent.include(top_activities_protocol)

Top destinations agent

Let's now define the script for a top destinations agent which provides tourist destinations based on specified city and date criteria.

  1. Within the destinations directory previously created, create a Python script for this agent and name it: touch top_destinations.py.

  2. Let's import the needed modules and initialize an agent named top_destinations using a secret seed. Let's ensure the agent has sufficient funds in its wallet. We would then need to define the large language model setup and the communication protocol which defines the communication rules for handling messages about top destinations:

    from uagents import Agent, Context, Protocol
    from messages import TopDestinations, UAgentResponse, UAgentResponseType, KeyValue
    from uagents.setup import fund_agent_if_low
    from utils.llm import get_llm
    import os
     
    TOP_DESTINATIONS_SEED = os.getenv("TOP_DESTINATIONS_SEED", "top_destinations really secret phrase :)")
     
    agent = Agent(
        name="top_destinations",
        seed=TOP_DESTINATIONS_SEED
    )
     
    fund_agent_if_low(agent.wallet.address())
     
    llm = get_llm()
    top_destinations_protocol = Protocol("TopDestinations")
  3. Let's then define a get_top_destinations() function which is triggered whenever a TopDestinations message is received. This function will reply with a message of type UAgentResponse:

    @top_destinations_protocol.on_message(model=TopDestinations, replies=UAgentResponse)
    async def get_top_destinations(ctx: Context, sender: str, msg: TopDestinations):
        ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
        prompt = f"""You are an expert AI in suggesting travel, holiday destination based on some user input.
    User input might not be provided, in which case suggest popular destinations.
    If user input is present, then suggest destinations based on user input.
    The response should be a list of destinations, each destination should have information about why it is a good destination.
    After listing all the suggestions say END. Every destination should be separated by a new line.
     
    Example:
    User input: I want to go to a place with good weather and beaches.
    Response:
    1. Goa, India. Goa is a popular destination for tourists. It has good weather and beaches.
    2. Malรฉ, Maldives. Maldives is a popular destination for tourists. It has good weather and beaches.
    END
     
    User preferences: {msg.preferences}
    """
        try:
            response = await llm.complete("", prompt, "Response:", max_tokens=500, stop=["END"])
            result = response.strip()
            result = result.split("\n")
            await ctx.send(
                sender,
                UAgentResponse(
                    options=list(map(lambda x: KeyValue(key=x, value=x), result)),
                    type=UAgentResponseType.FINAL_OPTIONS
                )
            )
        except Exception as exc:
            ctx.logger.warn(exc)
            await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
     
    agent.include(top_destinations_protocol)

    The get_top_destinations() function logs the sender and session of the received message using ctx.logger.info() method. It then defines a prompt explaining what the AI does and providing an example. It then uses the large language model to generate a response based on the prompt, and processes the response to get a list of suggested destinations. Afterwards, it sends back the list as a response to the sender. Any exceptions that may occur during this process are logged as warnings. An error response is sent back to the sender if an exception occurs. Finally, we use the .include() to add the TopDestinations protocol to the agent, enabling it to handle messages of this type.

  4. Save the script.

The overall script should look as follows:

top_destinations.py
from uagents import Agent, Context, Protocol
from messages import TopDestinations, UAgentResponse, UAgentResponseType, KeyValue
from uagents.setup import fund_agent_if_low
from utils.llm import get_llm
import os
 
TOP_DESTINATIONS_SEED = os.getenv("TOP_DESTINATIONS_SEED", "top_destinations really secret phrase :)")
 
agent = Agent(
    name="top_destinations",
    seed=TOP_DESTINATIONS_SEED
)
 
fund_agent_if_low(agent.wallet.address())
 
llm = get_llm()
top_destinations_protocol = Protocol("TopDestinations")
 
@top_destinations_protocol.on_message(model=TopDestinations, replies=UAgentResponse)
async def get_top_destinations(ctx: Context, sender: str, msg: TopDestinations):
    ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
    prompt = f"""You are an expert AI in suggesting travel, holiday destination based on some user input.
User input might not be provided, in which case suggest popular destinations.
If user input is present, then suggest destinations based on user input.
The response should be a list of destinations, each destination should have information about why it is a good destination.
After listing all the suggestions say END. Every destination should be separated by a new line.
 
Example:
User input: I want to go to a place with good weather and beaches.
Response:
1. Goa, India. Goa is a popular destination for tourists. It has good weather and beaches.
2. Malรฉ, Maldives. Maldives is a popular destination for tourists. It has good weather and beaches.
END
 
User preferences: {msg.preferences}
"""
    try:
        response = await llm.complete("", prompt, "Response:", max_tokens=500, stop=["END"])
        result = response.strip()
        result = result.split("\n")
        await ctx.send(
            sender,
            UAgentResponse(
                options=list(map(lambda x: KeyValue(key=x, value=x), result)),
                type=UAgentResponseType.FINAL_OPTIONS
            )
        )
    except Exception as exc:
        ctx.logger.warn(exc)
        await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
 
agent.include(top_destinations_protocol)

Flights agent

Let's now define the script for a flights agent which provides flights information based on specified city and date criteria.

  1. Within the flights directory previously created, create a Python script for this agent and name it: touch flights.py.

  2. Let's import the needed modules and initialize an agent named flights_adaptor using a secret seed. Let's ensure the agent has sufficient funds in its wallet:

    from uagents import Agent, Context, Protocol
    from uagents.setup import fund_agent_if_low
    import requests
    from messages import Flights, UAgentResponse, UAgentResponseType, KeyValue
    import os
    import uuid
     
    FLIGHTS_SEED = os.getenv("FLIGHTS_SEED", "flights really secret phrase :)")
     
    agent = Agent(
        name="flights_adaptor",
        seed=FLIGHTS_SEED
    )
     
    fund_agent_if_low(agent.wallet.address())
  3. We then need to retrieve the RapidAPI key from the environment variables and asserts its presence. We would proceed by setting up the URL for the Skyscanner API, and by defining the headers needed for sending requests to the Skyscanner API, including the RapidAPI key:

    RAPIDAPI_API_KEY = os.environ.get("RAPIDAPI_API_KEY", "")
     
    assert RAPIDAPI_API_KEY, "RAPIDAPI_API_KEY environment variable is missing from .env"
     
    SKY_SCANNER_URL = "https://skyscanner-api.p.rapidapi.com/v3e/flights/live/search/synced"
    headers = {
        "content-type": "application/json",
        "X-RapidAPI-Key": RAPIDAPI_API_KEY,
        "X-RapidAPI-Host": "skyscanner-api.p.rapidapi.com"
      }
  4. Let's then define a skyscanner_format_data() function to process data from the Skyscanner API response to extract essential flight details:

    def skyscanner_format_data(data):
      r = data["content"]["results"]
      carriers = r["carriers"]
      itineraries = r["itineraries"]
      segments = r["segments"]
      sorted_itineraries = data["content"]["sortingOptions"]["cheapest"]
      results = []
      for o in sorted_itineraries:
        _id = o["itineraryId"]
        it = itineraries[_id]
        for option in it["pricingOptions"]:
          price = option["price"]["amount"]
          if len(option["items"]) != 1:
            continue
          fares = option["items"][0]["fares"]
          if len(fares) != 1:
            continue
          segment_id = fares[0]["segmentId"]
          seg = segments[segment_id]
          carrier = carriers[seg["marketingCarrierId"]]
          duration = seg["durationInMinutes"]
          departure = seg["departureDateTime"]
          arrival = seg["arrivalDateTime"]
          results.append({
            "price": price,
            "duration": duration,
            "departure": departure,
            "arrival": arrival,
            "carrier": carrier
          })
      return results

    This function iterates through the sorted itineraries, extracts pricing options, and captures information such as price, carrier, duration, departure, and arrival times. This information is organized into a list of formatted flight options. The function then returns this list for further use, providing a clear overview of the available flight choices.

  5. We proceed and define a function named skyscanner_top5() which takes a list of flight offers (fares) and returns the top 5 options from the list. We then initialize a protocol named Flights which will guide the agent's interactions related to flight offers:

    def skyscanner_top5(fares):
      return [fares[i] for i in range(len(fares)) if i < 5]
     
    flights_protocol = Protocol("Flights")
  6. We would need to create and define a function flight_offers() to handle messages related to flight offers within the Flights protocol:

    @flights_protocol.on_message(model=Flights, replies={UAgentResponse})
    async def flight_offers(ctx: Context, sender: str, msg: Flights):
        ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
     
        dept_year, dept_month, dept_day = msg.date.split("-")
        payload = {
             "query": {
                "market": "UK",
                "locale": "en-GB",
                "currency": "GBP",
                "queryLegs": [
                    {
                        "originPlaceId": {"iata": msg.from_},
                        "destinationPlaceId": {"iata": msg.to},
                        "date": {
                            "year": int(dept_year),
                            "month": int(dept_month),
                            "day": int(dept_day)
                        }
                    }
                ],
                "cabinClass": "CABIN_CLASS_ECONOMY",
                "adults": msg.persons
            }
        }
        print("CALLING SKYSCANNER, payload ", payload)
        try:
            response = requests.request("POST", SKY_SCANNER_URL, json=payload, headers=headers)
            if response.status_code != 200:
                print("SKYSCANNER STATUS CODE not 200: ", response.json())
                await ctx.send(sender, UAgentResponse(message=response.text, type=UAgentResponseType.ERROR))
                return
            formatted = skyscanner_format_data(response.json())
            top5 = skyscanner_top5(formatted)
            print("SKYSCANNER TOP5 RESPONSE: ", top5)
            request_id = str(uuid.uuid4())
            options = []
            for idx, o in enumerate(top5):
                dep = f"{o['departure']['hour']:02}:{o['departure']['minute']:02}"
                arr = f"{o['arrival']['hour']:02}:{o['arrival']['minute']:02}"
                option = f"""{o["carrier"]["name"]} for ยฃ{o['price']},  {dep} - {arr}, flight time {o['duration']} min"""
                options.append(KeyValue(key=idx, value=option))
            await ctx.send(sender, UAgentResponse(options=options, type=UAgentResponseType.SELECT_FROM_OPTIONS, request_id=request_id))
        except Exception as exc:
             ctx.logger.error(exc)
             await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
     
    agent.include(flights_protocol)

    Whenever a message of Flights model type is received, this function first extracts the necessary information (e.g., origin, destination, date, and number of persons) from the user's input, and constructs a payload for a Skyscanner API request which is then sent. It then formats the API response to extract essential flight details and selects the top 5 flight options from the data. It then creates a unique request ID for tracking, Then, it constructs a response message containing the top flight options and sends it to the user. Any exception that might occur during this process and communicates errors to the user is handled by the function. Finally, we include the Flights protocol using the .include() method, so enabling it to utilize the flight_offers() function to manage and respond to flight offer-related messages.

  7. Save the script.

The overall script should look as follows:

flights.py
from uagents import Agent, Context, Protocol
from uagents.setup import fund_agent_if_low
import requests
from messages import Flights, UAgentResponse, UAgentResponseType, KeyValue
import os
import uuid
 
FLIGHTS_SEED = os.getenv("FLIGHTS_SEED", "flights really secret phrase :)")
 
agent = Agent(
    name="flights_adaptor",
    seed=FLIGHTS_SEED
)
 
fund_agent_if_low(agent.wallet.address())
 
RAPIDAPI_API_KEY = os.environ.get("RAPIDAPI_API_KEY", "")
 
assert RAPIDAPI_API_KEY, "RAPIDAPI_API_KEY environment variable is missing from .env"
 
SKY_SCANNER_URL = "https://skyscanner-api.p.rapidapi.com/v3e/flights/live/search/synced"
headers = {
    "content-type": "application/json",
    "X-RapidAPI-Key": RAPIDAPI_API_KEY,
    "X-RapidAPI-Host": "skyscanner-api.p.rapidapi.com"
  }
 
def skyscanner_format_data(data):
  r = data["content"]["results"]
  carriers = r["carriers"]
  itineraries = r["itineraries"]
  segments = r["segments"]
  sorted_itineraries = data["content"]["sortingOptions"]["cheapest"]
  results = []
  for o in sorted_itineraries:
    _id = o["itineraryId"]
    it = itineraries[_id]
    for option in it["pricingOptions"]:
      price = option["price"]["amount"]
      if len(option["items"]) != 1:
        continue
      fares = option["items"][0]["fares"]
      if len(fares) != 1:
        continue
      segment_id = fares[0]["segmentId"]
      seg = segments[segment_id]
      carrier = carriers[seg["marketingCarrierId"]]
      duration = seg["durationInMinutes"]
      departure = seg["departureDateTime"]
      arrival = seg["arrivalDateTime"]
      results.append({
        "price": price,
        "duration": duration,
        "departure": departure,
        "arrival": arrival,
        "carrier": carrier
      })
  return results
 
def skyscanner_top5(fares):
  return [fares[i] for i in range(len(fares)) if i < 5]
 
flights_protocol = Protocol("Flights")
 
@flights_protocol.on_message(model=Flights, replies={UAgentResponse})
async def flight_offers(ctx: Context, sender: str, msg: Flights):
    ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
 
    dept_year, dept_month, dept_day = msg.date.split("-")
    payload = {
         "query": {
            "market": "UK",
            "locale": "en-GB",
            "currency": "GBP",
            "queryLegs": [
                {
                    "originPlaceId": {"iata": msg.from_},
                    "destinationPlaceId": {"iata": msg.to},
                    "date": {
                        "year": int(dept_year),
                        "month": int(dept_month),
                        "day": int(dept_day)
                    }
                }
            ],
            "cabinClass": "CABIN_CLASS_ECONOMY",
            "adults": msg.persons
	    }
    }
    print("CALLING SKYSCANNER, payload ", payload)
    try:
        response = requests.request("POST", SKY_SCANNER_URL, json=payload, headers=headers)
        if response.status_code != 200:
            print("SKYSCANNER STATUS CODE not 200: ", response.json())
            await ctx.send(sender, UAgentResponse(message=response.text, type=UAgentResponseType.ERROR))
            return
        formatted = skyscanner_format_data(response.json())
        top5 = skyscanner_top5(formatted)
        print("SKYSCANNER TOP5 RESPONSE: ", top5)
        request_id = str(uuid.uuid4())
        options = []
        for idx, o in enumerate(top5):
            dep = f"{o['departure']['hour']:02}:{o['departure']['minute']:02}"
            arr = f"{o['arrival']['hour']:02}:{o['arrival']['minute']:02}"
            option = f"""{o["carrier"]["name"]} for ยฃ{o['price']},  {dep} - {arr}, flight time {o['duration']} min"""
            options.append(KeyValue(key=idx, value=option))
        await ctx.send(sender, UAgentResponse(options=options, type=UAgentResponseType.SELECT_FROM_OPTIONS, request_id=request_id))
    except Exception as exc:
         ctx.logger.error(exc)
         await ctx.send(sender, UAgentResponse(message=str(exc), type=UAgentResponseType.ERROR))
 
agent.include(flights_protocol)

Messages

Now that we have defined our agents, we would need to define the structured data formats that these agents will use to communicate with each other or with external systems. There are 4 structured data models which we need to define within the messages directory previously created within the src folder:

Flight

The Flight model is designed to hold flight-related information with various fields, each annotated with descriptions and optional constraints. It's used to standardize and validate data exchanged between agents or systems within the uAgents integration.

  1. Within the messages folder, let's create a Python script and name it: touch flight.py.

  2. In here, we need to import the necessary modules and define a model to represent structured data for flight-related information:

    flight.py
    from uagents import Model
    from pydantic import Field
    from typing import Optional
     
    class Flights(Model):
      from_: str = Field(alias="from", description="This field is the airport IATA code for of the airport from where the user wants to fly from. This should be airport IATA code. IATA airport code is a three-character alphanumeric geocode.")
      to: str = Field(description="This field is the airport IATA code of the destination airport! This should be airport IATA code. IATA airport code is a three-character alphanumeric geocode.")
      trip: Optional[str] = Field(description="This can be oneway or return")
      date: str = Field(description="Contains the date of flying out.")
      back_date: Optional[str] = Field(description="Optional field only for return flight. This is the date when the user wants to fly back")
      route: Optional[int] = Field(description="Selects the maximum number of stops, 0 means direct flight, 1 means with maximum 1 stop.")
      persons: int = Field(description="Describes how many persons are going to fly.")
     
      class Config:
        allow_population_by_field_name = True

    Here, we define a class named Flights which inherits from the Model class. Inside this class, we defines a field named from_ which represents the airport IATA code for the departure airport. It includes a description and uses the Field class to provide additional metadata. We then define a field named to which represents the airport IATA code for the destination airport. It includes a description. An optional field named trip is defined. This can hold values like "oneway" or "return" to indicate the type of trip. We also defined a field date containing the date of the outbound flight and a field back_date for return flights. route indicates the maximum number of stops for the flight route. The field named persons represents the number of people flying.

General

  1. Within the messages folder, let's create another Python script and name it: touch general.py.

  2. In here, we need to import the necessary modules and define the different data models:

    general.py
    from uagents import Model
    from enum import Enum
    from typing import Optional, List
     
    class UAgentResponseType(Enum):
      ERROR = "error"
      SELECT_FROM_OPTIONS = "select_from_options"
      FINAL_OPTIONS = "final_options"
     
    class KeyValue(Model):
      key: str
      value: str
     
    class UAgentResponse(Model):
      type: UAgentResponseType
      agent_address: Optional[str]
      message: Optional[str]
      options: Optional[List[KeyValue]]
      request_id: Optional[str]
     
    class BookingRequest(Model):
      request_id: str
      user_response: str
      user_email: str
      user_full_name: str

    Here, we defined structured data models to represent different types of data within the uAgents framework, making communication between agents and systems more organized and standardized:

    • UAgentResponseType: This is an enumeration that defines different response types an agent can use.
    • KeyValue: A model representing a key-value pair, often used to display options or details.
    • UAgentResponse: This model represents a response from an agent. It includes the response type, an optional agent address, an optional message, a list of optional key-value options, and an optional request ID.
    • BookingRequest: This model represents a booking request. It includes a request ID, user response, user email, and user full name.

Top activities

  1. Within the messages folder, let's create an additional Python script and name it: touch top_activities.py.

  2. In here, we need to import the necessary modules and define the data model through which agents can exchange data about users' preferences for top activities in a specific city, helping them to make more informed and tailored suggestions to users:

    general.py
    from uagents import Model
    from typing import Optional
    from pydantic import Field
     
    class TopActivities(Model):
      city: str = Field(description="Describes the city where we want to go and find activity, for example: London. This shouldn't be based on user current location, but the city where user wants to go.")
      date: str = Field(description="Describes which date user wants to find top activities in given city")
      preferred_activities: Optional[str] = Field(description="Describes what activities the user prefers. It is optional")

    This data model represents information for finding top activities in a city. It takes in different parameters: - city: A required field that describes the city where users want to find activities. - date: A required field that specifies the date when the user wants to find top activities in the given city. - preferred_activities: An optional field that describes the activities the user prefers.

Top destinations

By using this model, agents can communicate and understand user preferences for holiday destinations, allowing them to provide more tailored suggestions and recommendations based on the user's interests.

  1. Within the messages folder, let's create a final Python script and name it: touch top_destinations.py.

  2. In here, we need to import the necessary modules and define the data model through which agents can exchange data about users' preferences for top destinations:

    general.py
    from uagents import Model
    from pydantic import Field
     
    class TopDestinations(Model):
      preferences: str = Field(description="The field expresses what the user prefer. Can be left empty. For example: 'beach', 'mountains, rivers', 'city', etc.")

    Here, we defined a TopDestinations model which represents user preferences for top holiday destinations. This class takes in a preferences parameter which indicated a field that expresses what the user prefers for their holiday destination. It can be left empty or contain preferences like beach, mountains, rivers, city, and so on.

LLM - Large Language Model

We need to define a convenient way to interact with OpenAI's large language model (LLM) for text generation, embedding, and completions.

  1. To start, we need to enter now the utils folder created at the beginning within the src folder.

  2. In here, let's create a Python file and name it: touch llm.py.

  3. First of all, we need to import the needed classes. Then, we proceed and define a OpenAILLM class. This class encapsulates methods for interacting with OpenAI's large language model:

    import openai
    import time
    from typing import List
    import os
     
    class OpenAILLM:
      def __init__(self,
          completion_model: str = "text-davinci-003",
          embedding_model: str = "text-embedding-ada-002"
        ):
        self.completion_model = completion_model
        self.embedding_model = embedding_model
        self.usage_counter = None
     
      async def embedding(self, text: str) -> str:
        text = text.replace("\n", " ")
        emb = await openai.Embedding.acreate(input=[text], model=self.embedding_model)
        if self.usage_counter:
            await self.usage_counter.add_embedding_usage(emb["usage"])
        return emb["data"][0]["embedding"]
     
      async def complete(
        self,
        header: str,
        prompt: str,
        complete: str,
        temperature: float = 0.5,
        max_tokens: int = 100,
        stop: List[str] = None,
      ) -> str:
        try:
          if self.completion_model != "gpt-4" and self.completion_model.find("gpt-3.5") == -1:
              #Call GPT-3 DaVinci model
              response = await openai.Completion.acreate(
                  engine=self.completion_model,
                  prompt=header+prompt+complete,
                  temperature=temperature,
                  max_tokens=max_tokens,
                  top_p=1,
                  frequency_penalty=0,
                  presence_penalty=0,
                  stop=stop
              )
              if self.usage_counter:
                await self.usage_counter.add_completion_usage(response.usage)
              return response.choices[0].text.strip()
          else:
              #Call GPT-4/gpt-3.5 chat model
              messages=[{"role": "system", "content": header}, {"role": "user", "content": prompt}, {"role": "assistant", "content": complete}]
              response = await openai.ChatCompletion.acreate(
                  model=self.completion_model,
                  messages = messages,
                  temperature=temperature,
                  max_tokens=max_tokens,
                  n=1,
                  stop=stop,
              )
              if self.usage_counter:
                await self.usage_counter.add_completion_usage(response.usage)
              return response.choices[0].message.content.strip()
        except openai.InvalidRequestError as err:
          print("openai_call InvalidRequestError: ", err)
          print("\n\nยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงPROMPTยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง")
          print(prompt)
          print("ยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง\n\n")
          raise err
        except Exception as err:
            print("openai_call Exception: ", err)
            print("Retry...")
            time.sleep(2)
            return await self.complete(header, prompt, complete, temperature, max_tokens, stop)

    The OpenAILLM class provides methods for interacting with OpenAI's LLM. It accepts two arguments during initialization:

    • completion_model: The model to use for completions (default: "text-davinci-003").
    • embedding_model: The model to use for text embeddings (default: "text-embedding-ada-002").

    The class has different methods:

    • embedding(): This takes a text input, creates an embedding using the specified embedding model, and returns the embedding.
    • complete(): This generates completions based on the input prompt using the specified completion model. It supports both GPT-3 DaVinci model and GPT-4/gpt-3.5 chat model.
  4. We then need to define a get_llm() function which returns an instance of OpenAILLM class with a specific completion model ("gpt-3.5-turbo").

    def get_llm():
        OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
        if len(OPENAI_API_KEY) > 0:
            openai.api_key = OPENAI_API_KEY
        return OpenAILLM(completion_model="gpt-3.5-turbo")

    We defined a get_llm() function which retrieves the OpenAI API key from the environment variable OPENAI_API_KEY. If the API key is available, it sets the APY key for the OpenAI library. It returns an instance of OpenAILLM with the specified completion model ("gpt-3.5-turbo").

  5. Save the script.

The overall script should look as follows:

llm.py
import openai
import time
from typing import List
import os
 
 
class OpenAILLM:
  def __init__(self,
      completion_model: str = "text-davinci-003",
      embedding_model: str = "text-embedding-ada-002"
    ):
    self.completion_model = completion_model
    self.embedding_model = embedding_model
    self.usage_counter = None
 
  async def embedding(self, text: str) -> str:
    text = text.replace("\n", " ")
    emb = await openai.Embedding.acreate(input=[text], model=self.embedding_model)
    if self.usage_counter:
        await self.usage_counter.add_embedding_usage(emb["usage"])
    return emb["data"][0]["embedding"]
 
  async def complete(
    self,
    header: str,
    prompt: str,
    complete: str,
    temperature: float = 0.5,
    max_tokens: int = 100,
    stop: List[str] = None,
  ) -> str:
    try:
      if self.completion_model != "gpt-4" and self.completion_model.find("gpt-3.5") == -1:
          #Call GPT-3 DaVinci model
          response = await openai.Completion.acreate(
              engine=self.completion_model,
              prompt=header+prompt+complete,
              temperature=temperature,
              max_tokens=max_tokens,
              top_p=1,
              frequency_penalty=0,
              presence_penalty=0,
              stop=stop
          )
          if self.usage_counter:
            await self.usage_counter.add_completion_usage(response.usage)
          return response.choices[0].text.strip()
      else:
          #Call GPT-4/gpt-3.5 chat model
          messages=[{"role": "system", "content": header}, {"role": "user", "content": prompt}, {"role": "assistant", "content": complete}]
          response = await openai.ChatCompletion.acreate(
              model=self.completion_model,
              messages = messages,
              temperature=temperature,
              max_tokens=max_tokens,
              n=1,
              stop=stop,
          )
          if self.usage_counter:
            await self.usage_counter.add_completion_usage(response.usage)
          return response.choices[0].message.content.strip()
    except openai.InvalidRequestError as err:
      print("openai_call InvalidRequestError: ", err)
      print("\n\nยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงPROMPTยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง")
      print(prompt)
      print("ยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยงยง\n\n")
      raise err
    except Exception as err:
        print("openai_call Exception: ", err)
        print("Retry...")
        time.sleep(2)
        return await self.complete(header, prompt, complete, temperature, max_tokens, stop)
 
 
def get_llm():
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
    if len(OPENAI_API_KEY) > 0:
        openai.api_key = OPENAI_API_KEY
    return OpenAILLM(completion_model="gpt-3.5-turbo")

Main script

We are now ready to write the code for our main script. In the src folder in fetch-holiday integration, let's enter the main.py script initially created.

  1. Let's import the needed class and let's create a Bureau object as an instance of the Bureau class. We need to add the three different agents to it, and then run the Bureau to manage and coordinate their execution:

    main.py
    from uagents import Bureau
     
    from agents.activities.top_activities import agent as top_activities_agent
    from agents.destinations.top_destinations import agent as top_destinations_agent
    from agents.flights.flights import agent as flights_agent
     
     
    if __name__ == "__main__":
        bureau = Bureau(endpoint="http://127.0.0.1:8000/submit", port=8000)
        print(f"Adding top activities agent to Bureau: {top_activities_agent.address}")
        bureau.add(top_activities_agent)
        print(f"Adding top destinations agent to Bureau: {top_destinations_agent.address}")
        bureau.add(top_destinations_agent)
        print(f"Adding flights agent to Bureau: {flights_agent.address}")
        bureau.add(flights_agent)
        bureau.run()
  2. Save the script.

Run the main script

Once you finalized the main.py script, you will need to run the project and its agents: poetry run python main.py

You need to look for the following output in the logs:

Adding top destinations agent to Bureau: {top_destinations_agent.address}

Copy the {top_destinations_agent.address} value and paste it somewhere safe. You will need it in the following step.

Set up a client script

Now that we have set up the integrations, let's run a client script that will showcase the top destinations.

  1. To do this, create a new Python file in the src folder and name it: touch top_dest_client.py.

  2. You will need to paste the following code in it:

    top_dest_client.py
    from messages import TopDestinations, UAgentResponse
    from uagents import Agent, Context
    from uagents.setup import fund_agent_if_low
    import os
     
    TOP_DESTINATIONS_CLIENT_SEED = os.getenv("TOP_DESTINATIONS_CLIENT_SEED", "top_destinations_client really secret phrase :)")
     
    top_dest_client = Agent(
        name="top_destinations_client",
        port=8008,
        seed=TOP_DESTINATIONS_CLIENT_SEED,
        endpoint=["http://127.0.0.1:8008/submit"],
    )
    fund_agent_if_low(top_dest_client.wallet.address())
     
    top_dest_request = TopDestinations(preferences="new york")
     
    @top_dest_client.on_interval(period=10.0)
    async def send_message(ctx: Context):
        await ctx.send("{top_destinations_agent.address}", top_dest_request)
     
    @top_dest_client.on_message(model=UAgentResponse)
    async def message_handler(ctx: Context, _: str, msg: UAgentResponse):
        ctx.logger.info(f"Received top destination options from: {msg.options}")
     
    if __name__ == "__main__":
        top_dest_client.run()

    Remember to replace the address in ctx.send() method with the value you received in the previous step. This code sends a request to get the top destinations (in this example, from New York). To do this, it sends a request to the top destinations agent every 10 seconds and displays the options in the console.

  3. Save the script.

Run the client script

Open a new terminal (let the previous one be as is), and navigate to the src folder to run the client script.

cd src
poetry run python top_dest_client.py

Once you hit enter, a request will be sent to the top destinations agent every 10 seconds, and you will be able to see your results in the console!