Using News API to build network of Primary and Secondary functions
Introduction
This file can be run on any platform supporting Python, with the necessary install permissions. This system helps users to read news of different types including Categorical, Country based or Keyword related news. The system operates based on a multiple layers structure of primary and secondary functions, each one of them playing a vital role in delivering personalized news content to users.
Supporting documentation
- Creating an agent
- Register in Almanac
- Almanac Contract
- Protocols
- Agentverse Functions
- Register an Agent Function on the Agentverse
The agents
News Reading Agent
Self hosted
agent.py# Here we demonstrate how we can create a news reading system agent that is compatible with DeltaV # Import required libraries import requests import json from uagents import Agent, Context, Model, Field, Protocol from uagents.setup import fund_agent_if_low from ai_engine import UAgentResponse, UAgentResponseType # Define News Reading Model class News(Model): news : str # First generate a secure seed phrase (e.g. https://pypi.org/project/mnemonic/) SEED_PHRASE = "put_your_seed_phrase_here" # Now go to https://agentverse.ai, register your agent in the Mailroom by providing the address you just copied. # Then, copy the agent's mailbox key and insert it here below inline AGENT_MAILBOX_KEY = "put_your_AGENT_MAILBOX_KEY_here" # Now your agent is ready to join the agentverse! news_agent = Agent( name="news_agent", seed=SEED_PHRASE, mailbox=f"{AGENT_MAILBOX_KEY}@https://agentverse.ai", ) fund_agent_if_low(news_agent.wallet.address()) # Copy the address shown below print(f"Your agent's address is: {news_agent.address}") # Define Protocol for news reading system news_protocol = Protocol("News System") # Define a handler for the News system protocol @news_protocol.on_message(model=News, replies = UAgentResponse) async def on_news_request(ctx: Context, sender: str, msg: News): #splitting the news titles with nnn and enumerating them with line break for visually better results result_list = msg.news.split(" nnn ") final_news = '\n'.join([f"{i + 1}. {title}" for i, title in enumerate(result_list)]) #Printing the news response on logger ctx.logger.info(f"Received news request from {sender} with prompt: {final_news}") #sending final response to the DeltaV GUI await ctx.send(sender, UAgentResponse(message = final_news, type = UAgentResponseType.FINAL)) # Include the Generate News protocol in your agent news_agent.include(news_protocol) news_agent.run()
News Generating Agent
Self hosted
agent.py# Here we demonstrate how we can create a news generating agent that is compatible with DeltaV. # Import required libraries import requests import json from uagents import Agent, Context, Model, Field, Protocol from uagents.setup import fund_agent_if_low from ai_engine import UAgentResponse, UAgentResponseType # Define News Generating Model. class GenerateNews(Model): news_type: str news : str # First generate a secure seed phrase (e.g. https://pypi.org/project/mnemonic/) SEED_PHRASE = "put_your_seed_phrase_here" # Now go to https://agentverse.ai, register your agent in the Mailroom by providing the address you just copied. # Then, copy the agent's mailbox key and insert it here below inline AGENT_MAILBOX_KEY = "put_your_AGENT_MAILBOX_KEY_here" # Now your agent is ready to join the agentverse! generate_news_agent = Agent( name="generate_news_agent", seed=SEED_PHRASE, mailbox=f"{AGENT_MAILBOX_KEY}@https://agentverse.ai", ) fund_agent_if_low(generate_news_agent.wallet.address()) # Copy the address shown below print(f"Your agent's address is: {generate_news_agent.address}") # Define Generate news protocol. generate_news_protocol = Protocol("Generate News") # Define a handler for the News generation protocol @generate_news_protocol.on_message(model=GenerateNews, replies=UAgentResponse) async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews): try: # Generate news based on the requested category. ctx.logger.info('Generating News') ctx.logger.info(f'User have selected {msg.news_type} category') ctx.logger.info(f'Generate News \n {msg.news}') message = msg.news # Send a successful response with the generated news. await ctx.send( sender, UAgentResponse( message= message, type=UAgentResponseType.FINAL ) ) # Handle any exceptions that occur during news generation. except Exception as exc: ctx.logger.error(f"Error in generating news: {exc}") # Send an error response with details of the encountered error. await ctx.send( sender, UAgentResponse( message=f"Error in generating news: {exc}", type=UAgentResponseType.ERROR ) ) # Include the Generate News protocol in your agent. generate_news_agent.include(generate_news_protocol) generate_news_agent.run()
Generate Categorical News
Self hosted
agent.py# Here we demonstrate how we can create a categorical news generating agent that is compatible with DeltaV. # Importing libraries import requests import json from uagents import Agent, Context, Model, Field, Protocol from uagents.setup import fund_agent_if_low from ai_engine import UAgentResponse, UAgentResponseType # Define the Generate News model. class GenerateNews(Model): category: str # First generate a secure seed phrase (e.g. https://pypi.org/project/mnemonic/) SEED_PHRASE = "put_your_seed_phrase_here" # Now go to https://agentverse.ai, register your agent in the Mailroom by providing the address you just copied. # Then, copy the agent's mailbox key and insert it here below inline AGENT_MAILBOX_KEY = "put_your_AGENT_MAILBOX_KEY_here" # Now your agent is ready to join the agentverse! generate_cat_news_agent = Agent( name="generate_cat_news_agent", seed=SEED_PHRASE, mailbox=f"{AGENT_MAILBOX_KEY}@https://agentverse.ai", ) fund_agent_if_low(generate_cat_news_agent.wallet.address()) # Copy the address shown below print(f"Your agent's address is: {generate_cat_news_agent.address}") # Define protocol for categorical news generation. generate_cat_news_protocol = Protocol("Generate Categorical News") # Define function to generate news according to category in great britain - GB. async def generate_news(category): api_key = 'YOUR_NEWS_API_KEY' main_url = f"https://newsapi.org/v2/top-headlines?country=gb&category={category}&apiKey={api_key}" news = requests.get(main_url).json() #strip the source, get top 10 news and join the list with ' nnn ' to return the news as string and not list (DeltaV compatible type) titles = [article['title'].split(' - ')[0].strip() for article in news['articles']] titles = titles[:10] results = ' nnn '.join([f"{title}" for title in titles]) return results # Define a handler for the Categorical News generation protocol. @generate_cat_news_protocol.on_message(model=GenerateNews, replies=UAgentResponse) async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews): #Logging category of news user wants to read ctx.logger.info(f"Received ticket request from {sender} with prompt: {msg.category}") try: # Generate news based on the requested category. news = generate_news(msg.category) #logging news ctx.logger.info(news) message = str(news) # Send a successful response with the generated news. await ctx.send(sender, UAgentResponse(message = message, type = UAgentResponseType.FINAL)) # Handle any exceptions that occur during news generation. except Exception as exc: ctx.logger.error(f"Error in generating News: {exc}") # Send an error response with details of the encountered error. await ctx.send( sender, UAgentResponse( message=f"Error in generating News: {exc}", type=UAgentResponseType.ERROR ) ) # Include the Generate News protocol in your agent. generate_cat_news_agent.include(generate_cat_news_protocol) generate_cat_news_agent.run()
Generate Regional News
Self hosted
agent.py# Import libraries import requests import json from uagents import Agent, Context, Model, Field, Protocol from uagents.setup import fund_agent_if_low from ai_engine import UAgentResponse, UAgentResponseType # Define dictionary with country codes country_codes = { "argentina": "ar", "australia": "au", "austria": "at", "belgium": "be", "bulgaria": "bg", "brazil": "br", "canada": "ca", "china": "cn", "colombia": "co", "cuba": "cu", "czech republic": "cz", "germany": "de", "egypt": "eg", "france": "fr", "united kingdom": "gb", "greece": "gr", "hong kong": "hk", "hungary": "hu", "indonesia": "id", "ireland": "ie", "israel": "il", "india": "in", "italy": "it", "japan": "jp", "south korea": "kr", "lithuania": "lt", "latvia": "lv", "morocco": "ma", "mexico": "mx", "malaysia": "my", "nigeria": "ng", "netherlands": "nl", "norway": "no", "new zealand": "nz", "philippines": "ph", "poland": "pl", "portugal": "pt", "romania": "ro", "serbia": "rs", "russia": "ru", "saudi arabia": "sa", "sweden": "se", "singapore": "sg", "slovenia": "si", "slovakia": "sk", "thailand": "th", "turkey": "tr", "taiwan": "tw", "ukraine": "ua", "united states": "us", "venezuela": "ve", "south africa": "za" } # Define the Generate News model class GenerateNews(Model): country: str # First generate a secure seed phrase (e.g. https://pypi.org/project/mnemonic/) SEED_PHRASE = "put_your_seed_phrase_here" # Now go to https://agentverse.ai, register your agent in the Mailroom by providing the address you just copied. # Then, copy the agent's mailbox key and insert it here below inline AGENT_MAILBOX_KEY = "put_your_AGENT_MAILBOX_KEY_here" # Now your agent is ready to join the agentverse! generate_news_reg_agent = Agent( name="generate_news_reg_agent", seed=SEED_PHRASE, mailbox=f"{AGENT_MAILBOX_KEY}@https://agentverse.ai", ) fund_agent_if_low(generate_news_reg_agent.wallet.address()) # Copy the address shown below print(f"Your agent's address is: {generate_news_reg_agent.address}") # Define function to generate regional news according to country async def get_regional_news(country): api_key = 'YOUR_API_KEY' main_url = f"https://newsapi.org/v2/top-headlines?country={country_codes.get(country.lower())}&apiKey={api_key}" news = requests.get(main_url).json() # Strip the source, get top 10 news and join the list with nnn to return the news as string and not list - DeltaV compatible type titles = [article['title'].split(' - ')[0].strip()for article in news['articles']] titles = titles[:10] results = ' nnn '.join([f"{title}" for title in titles]) return results # Define protocol for regional news generation Protocol generate_news_reg_protocol = Protocol("Generate Regional News") # Define a handler for the Regional News generation protocol @generate_news_reg_protocol.on_message(model=GenerateNews, replies=UAgentResponse) async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews): ctx.logger.info(f"Received ticket request from {sender} with prompt: {msg.country}") try: # Get the country code from the country_code dictionary country_code = country_codes.get(msg.country.lower()) # Generate news based on the requested country and log it on agentverse message = await get_regional_news(msg.country) ctx.logger.info(f"Message from endpoint: {message}") # Send a successful response with the generated news await ctx.send(sender, UAgentResponse(message=message, type=UAgentResponseType.FINAL)) # Handle any exceptions that occur during news generation except Exception as exc: ctx.logger.error(f"Error in generating News: {exc}") # Send an error response with details of the encountered error await ctx.send( sender, UAgentResponse( message=f"Error in generating News: {exc}", type=UAgentResponseType.ERROR ) ) # Include the Generate Regional News protocol in your agent generate_news_reg_agent.include(generate_news_protocol) generate_news_reg_agent.run()
Generate Keyword News
Self hosted
agent.py# Import libraries import requests from uagents import Agent, Context, Model, Field, Protocol from uagents.setup import fund_agent_if_low import json from ai_engine import UAgentResponse, UAgentResponseType # Define the Generate News model class GenerateNews(Model): keyword: str # First generate a secure seed phrase (e.g. https://pypi.org/project/mnemonic/) SEED_PHRASE = "put_your_seed_phrase_here" # Now go to https://agentverse.ai, register your agent in the Mailroom by providing the address you just copied. # Then, copy the agent's mailbox key and insert it here below inline AGENT_MAILBOX_KEY = "put_your_AGENT_MAILBOX_KEY_here" # Now your agent is ready to join the agentverse! generate_news_keyw_agent = Agent( name="generate_news_keyw_agent", seed=SEED_PHRASE, mailbox=f"{AGENT_MAILBOX_KEY}@https://agentverse.ai", ) fund_agent_if_low(generate_news_keyw_agent.wallet.address()) # Copy the address shown below print(f"Your agent's address is: {generate_news_keyw_agent.address}") # Define protocol for keyword news generation generate_news_keyw_protocol = Protocol("Generate Keyword News") # Define function to generate news according to keyword async def get_keyword_news(keyword): api_key = 'YOUR_API_KEY' main_url = f"https://newsapi.org/v2/top-headlines?q={keyword}&apiKey={api_key}" news = requests.get(main_url).json() # Strip the source, get top 10 news and join the list with nnn to return the news as string and not list - DeltaV compatible type titles = [article['title'].split(' - ')[0].strip() for article in news['articles']] titles = titles[:10] results = ' nnn '.join([f"{title}" for title in titles]) # Define a handler for the Keyword News generation protocol @generate_news_keyw_protocol.on_message(model=GenerateNews, replies=UAgentResponse) async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews): ctx.logger.info(f"Received news request from {sender} with prompt: {msg.keyword}") # Generate news based on the requested keyword try: news = get_keyword_news(msg.keyword) # Send a successful response with the generated news await ctx.send( sender, UAgentResponse( message=news, type=UAgentResponseType.FINAL ) ) # Handle any exceptions that occur during news generation except Exception as exc: ctx.logger.error(f"Error in generating News: {exc}") # Send an error response with details of the encountered error await ctx.send( sender, UAgentResponse( message=f"Error in generating News: {exc}", type=UAgentResponseType.ERROR ) ) # Include the Generate Keyword News protocol in your agent generate_news_keyw_agent.include(generate_news_keyw_protocol) generate_news_keyw_agent.run()