Vectara RAG Agent
Introduction
This example demonstrates the use of uAgents with Vectara to build a Retrieval-Augmented Generation (RAG) system, enabling intelligent query handling and dynamic knowledge retrieval. The guide walks through the process of setting up a Vectara corpus, configuring agents for conversational AI, and utilizing helper functions to manage API interactions effectively. By following this example, you will gain insights into creating a seamless and scalable RAG solution using Vectara’s powerful capabilities.
Please check out the example code in our examples repo (opens in a new tab) to run this locally.
Supporting documentation
- Creating an agent
- Creating an interval task
- Communicating with other agents
- Register in Almanac
- Almanac Contract
Pre-requisites
Creating an Account on Vectara and Setting Up a Corpus
- Vectara Account:
- Sign up at Vectara (opens in a new tab)
- Access the Vectara console.
- Python Development Environment:
- Python: Download and install from Python official website (opens in a new tab).
- Poetry: Install by following the instructions on Poetry's official website (opens in a new tab).
- Dependencies installed via
poetry add uagents
and other required libraries.
- API Keys and Corpus Details:
- Create a new corpus in the Vectara console.
- Goto your corpus and click data tab.
- Click on
Load data into corpus
and add data by clicking onupload files
button. - Retrieve the
API key
from the Authorization section. - Make a note that
Corpus key
is the name of the corpus.
Project Structure
Outline of basic structure of the project:
vectara-rag-agent/ . ├── poetry.lock ├── pyproject.toml ├── rag-functions.py ├── README.md └── vectara-agent.py
Setting Up Environment Variables
To load the environment variables, use the following command:
- Navigate to the root directory and source the
.env
file by runsource .env
Example .env
File
Here is an example of what your .env
file might look like:
export CORPUS_KEY="YOUR_CORPUS_KEY" export API_KEY="YOUR_API_KEY"
Vectara RAG Agent Setup
Overview of create_chat_session
and add_chat_turn
functions
The create_chat_session
function is responsible for initiating a new chat session with the Vectara API. It takes the user's query and sends a POST request to the /v2/chats
endpoint. The payload includes the query, corpus details such as corpus_key
and semantics, and pagination parameters like offset and limit. On successful execution, it retrieves a unique chat_id
and the API's response, which are then used to manage the chat session. If an error occurs during the process, it logs the issue and returns None values.
async def create_chat_session(query, ctx): url = "https://api.vectara.io/v2/chats" payload = { "query": query, "search": { "corpora": [ { "corpus_key": CORPUS_KEY, "semantics": "default" } ], "offset": 0, "limit": 5 }, "chat": {"store": True} } headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'x-api-key': API_KEY } try: response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: chat_data = response.json() return chat_data['chat_id'], chat_data['answer'] else: ctx.logger.error(f"Failed to create chat session: {response.status_code}, {response.text}") return None, None except Exception as e: ctx.logger.error(f"Error creating chat session: {e}") return None, None
The add_chat_turn
function extends an existing chat session by adding a follow-up query. It sends a POST request to the /v2/chats/{chat_id}/turns
endpoint with the chat_id
of the ongoing session and the follow-up query as part of the payload. This function ensures continuity in the conversation by maintaining context through the chat ID. It retrieves and returns the response to the follow-up query, or logs and handles any errors that arise. Together, these two functions enable seamless multi-turn interactions with the Vectara API while providing robust error handling.
async def add_chat_turn(chat_id, query, ctx): url = f"https://api.vectara.io/v2/chats/{chat_id}/turns" payload = { "query": query, "search": { "corpora": [ { "corpus_key": CORPUS_KEY, "semantics": "default" } ], "offset": 0, "limit": 5 } } headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'x-api-key': API_KEY } try: response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: turn_data = response.json() return turn_data['chat_id'], turn_data.get('answer', "No answer returned") else: ctx.logger.error(f"Failed to add chat turn: {response.status_code}, {response.text}") return None, "Error" except Exception as e: ctx.logger.error(f"Error adding chat turn: {e}") return None, "Error"
You can obtain the CORPUS_KEY
and API_KEY
from the vectara console (opens in a new tab). and also add these two functions in the rag-functions.py
file
Vectara agent mechanism
The code uses two agents (user_agent
and vectara_agent
) to facilitate a structured chatbot mechanism. Here's how the important parts of the agent mechanism are implemented:
user_agent
The user_agent
handles user interactions, captures input, and forwards it to vectara_agent
.
- Startup Event
The initiate_query
function is triggered on application startup. It sends the user's initial query to the vectara_agent
.
@user_agent.on_event("startup") async def initiate_query(ctx: Context): ctx.logger.info("[user_agent] : Sending initial query.") await ctx.send(vectara_agent.address, QueryRequest(user_query=initial_query))
- Handling Responses
When vectara_agent
sends a response, it prompts the user for a follow-up query or exits the conversation.
@user_agent.on_message(model=ResponseData, replies={FollowUpQuery, ExitHandlerMessage}) async def handle_response(ctx: Context, sender: str, msg: ResponseData): ctx.logger.info(f"[user_agent] : Received response: {msg.response}") follow_up_query = input("Ask your question: ").strip() if follow_up_query.lower() in {"exit", "quit"}: await ctx.send(sender, ExitHandlerMessage(message="Exiting chat. Goodbye!")) else: await ctx.send(sender, FollowUpQuery(follow_up_query=follow_up_query, chat_id=msg.chat_id))
vectara_agent
The vectara_agent
processes user queries and communicates with the Vectara API.
- Processing Initial Queries:
When it receives a QueryRequest
, it uses the create_chat_session
function to initiate a chat session and sends the response back to user_agent
.
@vectara_agent.on_message(model=QueryRequest, replies=ResponseData) async def process_initial_query(ctx: Context, sender: str, msg: QueryRequest): ctx.logger.info(f"[vectara_agent] : Processing initial query: {msg.user_query}") chat_id, response = await create_chat_session(msg.user_query, ctx) if chat_id and response: await ctx.send(sender, ResponseData(response=response, chat_id=chat_id)) else: ctx.logger.error("Failed to process initial query.")
- Handling Follow-Up Queries:
For follow-up questions, it uses add_chat_turn
to continue the conversation in the same chat session and cleans the response before sending it back.
@vectara_agent.on_message(model=FollowUpQuery, replies={ResponseData, ExitHandlerMessage}) async def process_follow_up_query(ctx: Context, sender: str, msg: FollowUpQuery): ctx.logger.info(f"[vectara_agent]: Processing follow-up query for chat_id {msg.chat_id}: {msg.follow_up_query}") chat_id, response = await add_chat_turn(msg.chat_id, msg.follow_up_query, ctx) if chat_id and response: cleaned_response = re.sub(r'\[\d+\]', '', response) await ctx.send(sender, ResponseData(response=cleaned_response, chat_id=chat_id)) else: await ctx.send(sender, ExitHandlerMessage(message="Failed to process follow-up query."))
Whole Script
This section presents the entire script, allowing users to easily copy and paste it for testing or deployment.
vectara-agent.pyimport re from uagents import Field, Model, Context, Agent, Bureau from ragfunctions import create_chat_session, add_chat_turn class QueryRequest(Model): user_query: str = Field(description="The user's initial query to start the chat session.") class ResponseData(Model): response: str = Field(description="The response text returned by the Vectara API.") chat_id: str = Field(description="The unique identifier for the chat session.") class FollowUpQuery(Model): chat_id: str = Field(description="The unique identifier for the ongoing chat session.") follow_up_query: str = Field(description="The user's follow-up question for the ongoing chat.") class ExitHandlerMessage(Model): message: str = Field(description="The exit message sent to the user when the chat session ends.") user_agent = Agent(name="user_agent", seed="user_agent_recovery") vectara_agent = Agent(name="vectara_agent", seed="vectara_agent_recovery") initial_query = input("Ask your question: ").strip() @user_agent.on_event("startup") async def initiate_query(ctx: Context): ctx.logger.info("[user_agent] : Sending initial query.") await ctx.send(vectara_agent.address, QueryRequest(user_query=initial_query)) @user_agent.on_message(model=ResponseData, replies={FollowUpQuery, ExitHandlerMessage}) async def handle_response(ctx: Context, sender: str, msg: ResponseData): ctx.logger.info(f"[user_agent] : Received response: {msg.response}") follow_up_query = input("Ask your question: ").strip() if follow_up_query.lower() in {"exit", "quit"}: await ctx.send(sender, ExitHandlerMessage(message="Exiting chat. Goodbye!")) else: await ctx.send(sender, FollowUpQuery(follow_up_query=follow_up_query, chat_id=msg.chat_id)) @vectara_agent.on_message(model=QueryRequest, replies=ResponseData) async def process_initial_query(ctx: Context, sender: str, msg: QueryRequest): ctx.logger.info(f"[vectara_agent] : Processing initial query: {msg.user_query}") chat_id, response = await create_chat_session(msg.user_query, ctx) if chat_id and response: await ctx.send(sender, ResponseData(response=response, chat_id=chat_id)) else: ctx.logger.error("Failed to process initial query.") @vectara_agent.on_message(model=FollowUpQuery, replies={ResponseData, ExitHandlerMessage}) async def process_follow_up_query(ctx: Context, sender: str, msg: FollowUpQuery): ctx.logger.info(f"[vectara_agent]: Processing follow-up query for chat_id {msg.chat_id}: {msg.follow_up_query}") chat_id, response = await add_chat_turn(msg.chat_id, msg.follow_up_query, ctx) if chat_id and response: cleaned_response = re.sub(r'\[\d+\]', '', response) await ctx.send(sender, ResponseData(response=cleaned_response, chat_id=chat_id)) else: await ctx.send(sender, ExitHandlerMessage(message="Failed to process follow-up query.")) @vectara_agent.on_message(model=ExitHandlerMessage) async def handle_exit_message(ctx: Context, sender: str, msg: ExitHandlerMessage): ctx.logger.info(f"[vectara_agent] : {msg.message}") bureau = Bureau(endpoint=["http://localhost:8000/submit"]) bureau.add(user_agent) bureau.add(vectara_agent) if __name__ == "__main__": bureau.run()
Poetry Dependencies:
[tool.poetry.dependencies] python = ">=3.9,<3.13" uagents = "^0.18.1" python-dotenv = "^1.0.1"
Instructions to execute the example.
- Navigate to the root folder of the example.
- Update the
.env
file with require variables. - Install dependencies by running
poetry install
. - Execute the script with
python vectara-agent.py
.
Expected output
Ask your question: what is uagents? INFO: [user_agent]: Starting agent with address: agent1qf4gycjj5vxdsgt4svl72mtu2cy5js8jay0857arfuxm676h2eeq52v2vwg INFO: [user_agent]: [user_agent] : Sending initial query. INFO: [vectara_agent]: Starting agent with address: agent1qtv9gdupwpz4t6e4fklvzhkcwxfemgky76wxwtskgj43ecqtrzcw2wq6gkf INFO: [vectara_agent]: [vectara_agent] : Processing initial query: what is uagents? INFO: [bureau]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit) INFO: [user_agent]: [user_agent] : Received response: The uAgents framework is a lightweight Python library designed for developing decentralized agents. It simplifies the process of creating agents and is meant to be used within a Python environment. To use the uAgents library, Python must be installed on your system. Installing the uagents framework on a Windows machine is a straightforward process. This library serves as a tool to aid in building decentralized agents efficiently and effectively. Ask your question: what is agent address? INFO: [vectara_agent]: [vectara_agent]: Processing follow-up query for chat_id cht_ebd5dd8a-9168-4aeb-96e0-90680dbdc27a: what is agent address? INFO: [user_agent]: [user_agent] : Received response: The agent address is a unique identifier in the form of "agent1" . It is obtained from the attribute "address" and can be retrieved using the agent.address() method . This address is crucial for registering an agent to the Almanac contract . Additionally, the agent's address can be accessed through methods like .wallet.address() using the agent object . Ask your question: