Langchain RAG Agent
Introduction
This example demonstrates how LangChain's RAG (Retrieval-Augmented Generation) technology integrates with uAgents. It shows how to create a RAG application that can answer questions based on a document.
Supporting documentation
- Creating an agent
- Creating an interval task
- Communicating with other agents
- Register in Almanac
- Almanac Contract
- Utilising the Agentverse Mailroom service
- Protocols
- Agentverse Functions
- Register an Agent Function on the Agentverse
Pre-requisites
- Python : Download and install from Python official website (opens in a new tab).
Steps to Obtain API Keys
To run this example, you will need two API keys: one from OpenAI and one from Cohere. Follow the steps below to obtain these keys.
OpenAI API Key
- Visit the OpenAI website (opens in a new tab).
- Sign up or log in to your account.
- Navigate to the API section.
- Generate or retrieve your API key.
Cohere API Key
- Visit the Cohere website (opens in a new tab).
- Sign up or log in to your account.
- Go to the API Keys section.
- Copy an existing key or create a new one.
Project Structure
Outline of basic structure of the project:
langchain-rag/
.
├── poetry.lock
├── pyproject.toml
└── src
├── agents
│ ├── langchain_rag_agent.py
│ └── langchain_rag_user.py
├── main.py
└── messages
└── requests.py
Setting Up Environment Variables
To load the environment variables, use the following command:
-
Navigate to the
src
directory and source the.env
file:cd src source .env
Example .env
File
Here is an example of what your .env
file might look like:
export COHERE_API_KEY="YOUR_COHERE_API_KEY"
export OPENAI_API_KEY="YOUR_OPENAI_API_KEY"
Langchain RAG Setup
Poetry Dependencies:
[tool.poetry.dependencies]
python = ">=3.10,<3.12"
uagents = "*"
requests = "^2.31.0"
langchain = "^0.2.11"
openai = "^1.12.0"
langchain-openai = "^0.1.19"
tiktoken = "^0.7.0"
cohere = "^4.47"
faiss-cpu = "^1.7.4"
validators = "^0.22.0"
uagents-ai-engine = "^0.4.0"
unstructured = "^0.12.4"
RagRequest Model
The RagRequest
model represents a request for retrieving an answer to a user's question from a specified document URL. It includes an optional parameter to determine if nested pages should also be read.
from typing import Optional
from uagents import Model, Field
class RagRequest(Model):
question: str = Field(
description="The question that the user wants to have an answer for."
)
url: str = Field(description="The url of the docs where the answer is.")
deep_read: Optional[str] = Field(
description="Specifies weather all nested pages referenced from the starting URL should be read or not. The value should be yes or no.",
default="no",
)
Langchain RAG agent
This agent answers questions by fetching and summarizing information from a given website. It checks and scrapes URLs, uses LangChain to find important documents, and generates answers with OpenAI's GPT-4o-mini model.
import traceback
from uagents import Agent, Context, Protocol
import validators
from messages.requests import RagRequest
import os
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_community.document_loaders import UnstructuredURLLoader
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
from ai_engine import UAgentResponse, UAgentResponseType
import nltk
from uagents.setup import fund_agent_if_low
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
LANGCHAIN_RAG_SEED = "YOUR_LANGCHAIN_RAG_SEED"
AGENT_MAILBOX_KEY = "YOUR_MAILBOX_KEY"
agent = Agent(
name="langchain_rag_agent",
seed=LANGCHAIN_RAG_SEED,
mailbox=f"{AGENT_MAILBOX_KEY}@https://agentverse.ai",
)
fund_agent_if_low(agent.wallet.address())
docs_bot_protocol = Protocol("DocsBot")
PROMPT_TEMPLATE = """
Answer the question based only on the following context:
{context}
---
Answer the question based on the above context: {question}
"""
def create_retriever(
ctx: Context, url: str, deep_read: bool
) -> ContextualCompressionRetriever:
def scrape(site: str):
if not validators.url(site):
ctx.logger.info(f"Url {site} is not valid")
return
r = requests.get(site)
soup = BeautifulSoup(r.text, "html.parser")
parsed_url = urlparse(url)
base_domain = parsed_url.scheme + "://" + parsed_url.netloc
link_array = soup.find_all("a")
for link in link_array:
href: str = link.get("href", "")
if len(href) == 0:
continue
current_site = f"{base_domain}{href}" if href.startswith("/") else href
if (
".php" in current_site
or "#" in current_site
or not current_site.startswith(url)
or current_site in urls
):
continue
urls.append(current_site)
scrape(current_site)
urls = [url]
if deep_read:
scrape(url)
ctx.logger.info(f"After deep scraping - urls to parse: {urls}")
try:
loader = UnstructuredURLLoader(urls=urls)
docs = loader.load_and_split()
db = FAISS.from_documents(docs, OpenAIEmbeddings())
compression_retriever = ContextualCompressionRetriever(
base_compressor=CohereRerank(), base_retriever=db.as_retriever()
)
return compression_retriever
except Exception as exc:
ctx.logger.error(f"Error happened: {exc}")
traceback.format_exception(exc)
@docs_bot_protocol.on_message(model=RagRequest, replies={UAgentResponse})
async def answer_question(ctx: Context, sender: str, msg: RagRequest):
ctx.logger.info(f"Received message from {sender}, session: {ctx.session}")
ctx.logger.info(
f"input url: {msg.url}, question: {msg.question}, is deep scraping: {msg.deep_read}"
)
parsed_url = urlparse(msg.url)
if not parsed_url.scheme or not parsed_url.netloc:
ctx.logger.error("invalid input url")
await ctx.send(
sender,
UAgentResponse(
message="Input url is not valid",
type=UAgentResponseType.FINAL,
),
)
return
base_domain = parsed_url.scheme + "://" + parsed_url.netloc
ctx.logger.info(f"Base domain: {base_domain}")
retriever = create_retriever(ctx, url=msg.url, deep_read=msg.deep_read == "yes")
compressed_docs = retriever.get_relevant_documents(msg.question)
context_text = "\n\n---\n\n".join([doc.page_content for doc in compressed_docs])
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=msg.question)
model = ChatOpenAI(model="gpt-4o-mini")
response = model.predict(prompt)
ctx.logger.info(f"Response: {response}")
await ctx.send(
sender, UAgentResponse(message=response, type=UAgentResponseType.FINAL)
)
agent.include(docs_bot_protocol, publish_manifest=True)
if __name__ == "__main__":
agent.run()
Langchain user agent
The agent is designed to ask a predefined question to a RAG agent at regular intervals and handle the responses.
from uagents import Agent, Context, Protocol
from messages.requests import RagRequest
from ai_engine import UAgentResponse
from uagents.setup import fund_agent_if_low
QUESTION = "How to install uagents using pip"
URL = "https://fetch.ai/docs/guides/agents/installing-uagent"
DEEP_READ = (
"no"
)
RAG_AGENT_ADDRESS = "YOUR_LANGCHAIN_RAG_AGENT_ADDRESS"
user = Agent(
name="langchain_rag_user",
port=8000,
endpoint=["http://127.0.0.1:8000/submit"],
)
fund_agent_if_low(user.wallet.address())
rag_user = Protocol("LangChain RAG user")
@rag_user.on_interval(60, messages=RagRequest)
async def ask_question(ctx: Context):
ctx.logger.info(
f"Asking RAG agent to answer {QUESTION} based on document located at {URL}, readin nested pages too: {DEEP_READ}"
)
await ctx.send(
RAG_AGENT_ADDRESS, RagRequest(question=QUESTION, url=URL, deep_read=DEEP_READ)
)
@rag_user.on_message(model=UAgentResponse)
async def handle_data(ctx: Context, sender: str, data: UAgentResponse):
ctx.logger.info(f"Got response from RAG agent: {data.message}")
user.include(rag_user)
if __name__ == "__main__":
rag_user.run()
from uagents import Bureau
from agents.langchain_rag_agent import agent
from agents.langchain_rag_user import user
if __name__ == "__main__":
bureau = Bureau(endpoint="http://127.0.0.1:8000/submit", port=8000)
print(f"Adding RAG agent to Bureau: {agent.address}")
bureau.add(agent)
print(f"Adding user agent to Bureau: {user.address}")
bureau.add(user)
bureau.run()