Using News API to build network of Primary and Secondary functions in Agentverse
Bookmark

Using News API to build network of Primary and Secondary functions

Introduction

This file can be run on any platform supporting Python, with the necessary install permissions. This system helps users to read news of different types including Categorical, Country based or Keyword related news. The system operates based on a multiple layers structure of primary and secondary functions, each one of them playing a vital role in delivering personalized news content to users.

Supporting documentation

The agents

News Reading Agent

agent.py
 
# Here we demonstrate how we can create a news reading system agent that is compatible with DeltaV
 
# After running this agent, it can be registered to DeltaV on Agentverse My Agents tab. For registration you will have to use the agent's address
 
# Import required libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
 
# Define News Reading Model
class News(Model):
    news : str
 
# Define Protocol for news reading system
news_protocol = Protocol("News System")
 
# Define a handler for the News system protocol
@news_protocol.on_message(model=News, replies = UAgentResponse)
async def on_news_request(ctx: Context, sender: str, msg: News):
    #splitting the news titles with nnn and enumerating them with line break for visually better results
    result_list = msg.news.split(" nnn ")
    final_news = '\n'.join([f"{i + 1}. {title}" for i, title in enumerate(result_list)])
    #Printing the news response on logger
    ctx.logger.info(f"Received news request from {sender} with prompt: {final_news}")
    #sending final response to the DeltaV GUI
    await ctx.send(sender, UAgentResponse(message = final_news, type = UAgentResponseType.FINAL))
 
 
# Include the Generate News protocol in your agent
agent.include(news_protocol)

News Generating Agent

agent.py
# Here we demonstrate how we can create a news generating agent that is compatible with DeltaV.
# After running this agent, it can be registered to DeltaV on Agentverse My Agents tab. For registration you will have to use the agent's address.
 
# Import required libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
 
# Define News Generating Model.
class GenerateNews(Model):
news_type: str
news : str
 
# Define Generate news protocol.
generate_news_protocol = Protocol("Generate News")
 
 
# Define a handler for the News generation protocol
@generate_news_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
    try:
    # Generate news based on the requested category.
        ctx.logger.info('Generating News')
        ctx.logger.info(f'User have selected {msg.news_type} category')
 
        ctx.logger.info(f'Generate News \n {msg.news}')
        message = msg.news
 
        # Send a successful response with the generated news.
        await ctx.send(
            sender,
            UAgentResponse(
                message= message,
                type=UAgentResponseType.FINAL
                )
            )
 
    # Handle any exceptions that occur during news generation.
    except Exception as exc:
        ctx.logger.error(f"Error in generating  news: {exc}")
 
        # Send an error response with details of the encountered error.
        await ctx.send(
            sender,
            UAgentResponse(
                message=f"Error in generating news: {exc}",
                type=UAgentResponseType.ERROR
                )
                )
 
# Include the Generate News protocol in your agent.
agent.include(generate_news_protocol)

Generate Categorical News

agent.py
# Here we demonstrate how we can create a categorical news generating agent that is compatible with DeltaV.
# After running this agent, it can be registered to DeltaV on Agentverse My Agents tab. For registration you will have to use the agent's address.
 
# Importing libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
 
# Define the Generate News model.
class GenerateNews(Model):
    category: str
 
# Define protocol for categorical news generation.
generate_cat_news_protocol = Protocol("Generate Categorical News")
 
# Define function to generate news according to category in great britain - GB.
async def generate_news(category):
    api_key = 'YOUR_NEWS_API_KEY'
    main_url = f"https://newsapi.org/v2/top-headlines?country=gb&category={category}&apiKey={api_key}"
    news = requests.get(main_url).json()
    #strip the source, get top 10 news and join the list with ' nnn ' to return the news as string and not list (DeltaV compatible type)
    titles = [article['title'].split(' - ')[0].strip() for article in news['articles']]
    titles = titles[:10]
    results = ' nnn '.join([f"{title}" for title in titles])
 
    return results
 
# Define a handler for the Categorical News generation protocol.
@generate_cat_news_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
    #Logging category of news user wants to read
    ctx.logger.info(f"Received ticket request from {sender} with prompt: {msg.category}")
    try:
        # Generate news based on the requested category.
        news = generate_news(msg.category)
        #logging news
        ctx.logger.info(news)
        message = str(news)
        # Send a successful response with the generated news.
        await ctx.send(sender, UAgentResponse(message = message, type = UAgentResponseType.FINAL))
 
    # Handle any exceptions that occur during news generation.
    except Exception as exc:
        ctx.logger.error(f"Error in generating News: {exc}")
        # Send an error response with details of the encountered error.
        await ctx.send(
            sender,
            UAgentResponse(
                message=f"Error in generating News: {exc}",
                type=UAgentResponseType.ERROR
            )
        )
 
 
# Include the Generate News protocol in your agent.
agent.include(generate_cat_news_protocol)

Generate Regional News

agent.py
# Import libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
 
# Define dictionary with country codes
country_codes = {
    "argentina": "ar", "australia": "au", "austria": "at", "belgium": "be",
    "bulgaria": "bg", "brazil": "br", "canada": "ca", "china": "cn",
    "colombia": "co", "cuba": "cu", "czech republic": "cz", "germany": "de",
    "egypt": "eg", "france": "fr", "united kingdom": "gb", "greece": "gr",
    "hong kong": "hk", "hungary": "hu", "indonesia": "id", "ireland": "ie",
    "israel": "il", "india": "in", "italy": "it", "japan": "jp",
    "south korea": "kr", "lithuania": "lt", "latvia": "lv", "morocco": "ma",
    "mexico": "mx", "malaysia": "my", "nigeria": "ng", "netherlands": "nl",
    "norway": "no", "new zealand": "nz", "philippines": "ph", "poland": "pl",
    "portugal": "pt", "romania": "ro", "serbia": "rs", "russia": "ru",
    "saudi arabia": "sa", "sweden": "se", "singapore": "sg", "slovenia": "si",
    "slovakia": "sk", "thailand": "th", "turkey": "tr", "taiwan": "tw",
    "ukraine": "ua", "united states": "us", "venezuela": "ve", "south africa": "za"
}
 
# Define the Generate News model
class GenerateNews(Model):
    country: str
 
# Define function to generate regional news according to country
async def get_regional_news(country):
    api_key = 'YOUR_API_KEY'
    main_url = f"https://newsapi.org/v2/top-headlines?country={country_codes.get(country.lower())}&apiKey={api_key}"
    news = requests.get(main_url).json()
    # Strip the source, get top 10 news and join the list with nnn to return the news as string and not list - DeltaV compatible type
    titles = [article['title'].split(' - ')[0].strip()for article in news['articles']]
    titles = titles[:10]
    results = ' nnn '.join([f"{title}" for title in titles])
 
    return results
 
# Define protocol for regional news generation Protocol
generate_news_reg_protocol = Protocol("Generate Regional News")
 
# Define a handler for the Regional News generation protocol
@generate_news_reg_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
 
    ctx.logger.info(f"Received ticket request from {sender} with prompt: {msg.country}")
    try:
        # Get the country code from the country_code dictionary
        country_code = country_codes.get(msg.country.lower())
        # Generate news based on the requested country and log it on agentverse
        message = await get_regional_news(msg.country)
        ctx.logger.info(f"Message from endpoint: {message}")
        # Send a successful response with the generated news
        await ctx.send(sender, UAgentResponse(message=message, type=UAgentResponseType.FINAL))
    # Handle any exceptions that occur during news generation
    except Exception as exc:
        ctx.logger.error(f"Error in generating News: {exc}")
        # Send an error response with details of the encountered error
        await ctx.send(
            sender,
            UAgentResponse(
                message=f"Error in generating News: {exc}",
                type=UAgentResponseType.ERROR
            )
        )
 
 
# Include the Generate Regional News protocol in your agent
agent.include(generate_news_protocol)

Generate Keyword News

agent.py
# Import libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
 
# Define the Generate News model
class GenerateNews(Model):
    keyword: str
 
# Define protocol for keyword news generation
generate_news_keyw_protocol = Protocol("Generate Keyword News")
 
# Define function to generate news according to keyword
async def get_keyword_news(keyword):
 
    api_key = 'YOUR_API_KEY'
    main_url = f"https://newsapi.org/v2/top-headlines?q={keyword}&apiKey={api_key}"
    news = requests.get(main_url).json()
    # Strip the source, get top 10 news and join the list with nnn to return the news as string and not list - DeltaV compatible type
    titles = [article['title'].split(' - ')[0].strip() for article in news['articles']]
    titles = titles[:10]
    results = ' nnn '.join([f"{title}" for title in titles])
 
# Define a handler for the Keyword News generation protocol
@generate_news_keyw_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
 
    ctx.logger.info(f"Received news request from {sender} with prompt: {msg.keyword}")
    # Generate news based on the requested keyword
    try:
        news = get_keyword_news(msg.keyword)
        # Send a successful response with the generated news
        await ctx.send(
            sender,
            UAgentResponse(
                message=news,
                type=UAgentResponseType.FINAL
            )
        )
 
    # Handle any exceptions that occur during news generation
    except Exception as exc:
        ctx.logger.error(f"Error in generating News: {exc}")
        # Send an error response with details of the encountered error
        await ctx.send(
            sender,
            UAgentResponse(
                message=f"Error in generating News: {exc}",
                type=UAgentResponseType.ERROR
            )
        )
 
# Include the Generate Keyword News protocol in your agent
agent.include(generate_news_keyw_protocol)

Was this page helpful?

Bookmark