Utilize the PostgreSQL database with the Agent
Introduction
In this documentation example, we demonstrate how to use the uAgent library to create agents that interact with PostgreSQL data within a Docker Compose setup. In this scenario, one agent handles the insertion of employee data into the PostgreSQL database, while another agent retrieves this data. This example illustrates the seamless integration between agents, PostgreSQL, and Docker, showcasing how to manage data flow and communication in a distributed system.
Please check out the example code in our examples repo (opens in a new tab) to run this locally.
Supporting Documents
- Almanac contract overview .
- How to create an agent .
- Registering in the Almanac Contract .
- Creating an interval task
- Communicating with other agents
Pre-requisites
- Python: Download and install from Python official website (opens in a new tab).
- Poetry: Install by following the instructions on Poetry's official website (opens in a new tab).
- Docker: Download and install from Docker official website (opens in a new tab).
- Docker Compose: Download and install from Docker Compose official documentation (opens in a new tab).
Project Structure
.postgres-database-with-an-agent ├── docker-compose.yml ├── Dockerfile ├── README.md └── src ├── constants.py ├── db │ ├── db_connection.py │ ├── __init__.py │ ├── models │ │ └── models.py │ └── schemas │ └── employees.sql └── main.py
Agent with PostgreSQL database
Set up the PostgreSQL connection with Docker using Docker Compose
This section details the files involved in setting up the PostgreSQL connection within your project.
db_connection.py
: Contains the functions to establish and close the connection to the PostgreSQL database using the psycopg2 library. The create_connection
function connects to the database with provided credentials, while the close_connection
function ensures the connection is safely terminated.
db_connection.pyimport psycopg2 from psycopg2 import OperationalError def create_connection(dbname, user, password, host="localhost", port="5432"): """ Create a connection to the PostgreSQL database. :param dbname: Name of the database :param user: Database user :param password: User's password :param host: Database host :param port: Database port :return: Connection object or None if connection fails """ try: conn = psycopg2.connect( dbname=dbname, user=user, password=password, host=host, port=port ) print("Connection successful") return conn except OperationalError as error: print(f"Error connecting to PostgreSQL database: {error}") return None def close_connection(conn): if conn: conn.close() print("Connection closed")
docker-compose.yml
: Configures the Docker services, including the PostgreSQL database and the application. It defines the environment variables for the database connection and maps the database schema from the host to the container.
version: "3.8" services: db: container_name: postgres_container image: postgres restart: always environment: POSTGRES_USER: ${DB_USER} POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_DB: ${DB_NAME} volumes: - "postgres:/var/lib/postgresql/data" - ./src/db/schemas/employees.sql:/docker-entrypoint-initdb.d/employees.sql ports: - "5432:5432" networks: - agent_network app: build: . container_name: poetry_app volumes: - .:/app ports: - "8000:8000" depends_on: - db networks: - agent_network environment: DB_USER: ${DB_USER} DB_PASSWORD: ${DB_PASSWORD} DB_NAME: ${DB_NAME} command: poetry run python ./src/main.py volumes: postgres: networks: agent_network: driver: bridge
Dockerfile
: Builds the application container, installing dependencies via Poetry and setting up the environment to run the application. The container exposes port 8000 for the application service.
FROM python:3.12-slim ENV PATH="$PATH:/root/.local/bin" RUN apt-get update && \ apt-get install -y curl gcc && \ curl -sSL https://install.python-poetry.org/ | python3 - WORKDIR /app ADD pyproject.toml poetry.lock /app/ RUN poetry install ADD . /app EXPOSE 8000 ENTRYPOINT ["poetry", "run"] CMD ["python", "main.py"]
Defining the Employees Database Schema and Model
The Employees
class defines a model representing employee data as a dictionary. The GetEmployees
class represents a model used to request employee information, with a flag indicating whether a response is expected.
The employees.sql
script defines a schema for an Employees table in a PostgreSQL database, if it doesn't already exist. This table includes columns for employee ID, first name, last name, birth date, and salary.
models.pyfrom uagents import Model class Employees(Model): employees_data: dict class GetEmployees(Model): reply_back: bool
employees.sqlCREATE TABLE IF NOT EXISTS Employees ( EmployeeID INT PRIMARY KEY, FirstName VARCHAR(50), LastName VARCHAR(50), BirthDate DATE, Salary DECIMAL(10, 2) );
Postgres data with agent
This script sets up and runs two agents, db_insert_agent
and db_fetch_agent
, which interact with a PostgreSQL database to manage employee data. The agents use asynchronous event handling to fetch and insert employee information into the database.
Database Connection
- The
create_connection
function is used to establish a connection to the PostgreSQL database using parameters defined indb_params
.
Agents
db_insert_agent
: Responsible for inserting employee data into the database.db_fetch_agent
: Responsible for fetching and reporting employee data from the database.
Event Handlers
-
on_startup
(db_fetch_agent):- Triggered when
db_fetch_agent
starts. - Retrieves and logs the PostgreSQL database version.
- Sends employee data to
db_insert_agent
if the version retrieval is successful.
- Triggered when
-
handle_employee_data
(db_insert_agent):- Handles messages with employee data.
- Inserts the received employee data into the
Employees
table in the database.
-
fetch_all_employee_details
(db_fetch_agent):- Handles messages requesting all employee details.
- Retrieves all employee records from the
Employees
table and logs the data.
Database Operations
- Fetching Database Version: Uses the query
SELECT version();
to get the PostgreSQL version. - Inserting Employee Data: Executes an
INSERT
query to add employee records to theEmployees
table. - Fetching Employee Details: Executes a
SELECT * FROM Employees
query to retrieve all employee records.
Execution
- Initializes a
Bureau
instance. - Adds both agents (
db_insert_agent
anddb_fetch_agent
) to the bureau. - Runs the bureau, which starts the agents and their event handlers.
Usage
- Startup: On startup,
db_fetch_agent
will log the database version and send employee data todb_insert_agent
. - Inserting Data:
db_insert_agent
will insert received employee data into the database. - Fetching Data:
db_fetch_agent
will fetch and log all employee details upon request.
main.pyfrom db.db_connection import create_connection from uagents import Agent, Context, Bureau from db.models.models import Employees, GetEmployees from constants import employees_data from constants import db_params, DB_FETCH_AGENT_ADDRESS def get_db_version(): """ Retrieves the PostgreSQL database version. :return: Database version string or None if retrieval fails """ conn = create_connection(**db_params) if conn: try: cursor = conn.cursor() cursor.execute("SELECT version();") db_version = cursor.fetchone() cursor.close() return db_version except Exception as error: print(f"Error executing query: {error}") return None db_insert_agent = Agent(name="db_inserter", seed="db_inserter_seed_phrase") db_fetch_agent = Agent(name="db_fetcher", seed="db_fetcher_seed_phrase") DB_FETCH_AGENT_ADDRESS = DB_FETCH_AGENT_ADDRESS @db_fetch_agent.on_event("startup") async def on_startup(ctx: Context): """ Event handler triggered on agent startup to fetch database version and send employee data. :param ctx: Context object for handling agent events """ db_version = get_db_version() if db_version: ctx.logger.info( f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. PostgreSQL database version: {db_version[0]}" ) await ctx.send(DB_FETCH_AGENT_ADDRESS, Employees(employees_data=employees_data)) else: ctx.logger.info( f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. Could not retrieve the database version." ) @db_insert_agent.on_message(model=Employees, replies=GetEmployees) async def handle_employee_data(ctx: Context, sender: str, msg: Employees): """ Handler for inserting employee data into the database. :param ctx: Context object for handling agent events :param sender: Sender of the message :param msg: Message containing employee data """ ctx.logger.info(f"Received request from {sender} {msg.dict()}") employee_data = msg.employees_data conn = create_connection(**db_params) if conn: try: cursor = conn.cursor() insert_query = """ INSERT INTO Employees (EmployeeID, FirstName, LastName, BirthDate, Salary) VALUES (%s, %s, %s, TO_DATE(%s, 'DD-MM-YYYY'), %s) """ cursor.execute( insert_query, ( employee_data["EmployeeID"], employee_data["FirstName"], employee_data["LastName"], employee_data["BirthDate"], employee_data["Salary"], ), ) REPLY_BACK = True conn.commit() cursor.close() ctx.logger.info(f"Inserted employee data: {employee_data}") await ctx.send(sender, GetEmployees(reply_back=REPLY_BACK)) except Exception as error: ctx.logger.error(f"Error inserting employee data: {error}") else: ctx.logger.error("Could not connect to the database.") @db_fetch_agent.on_message(model=GetEmployees) async def fetch_all_employee_details(ctx: Context, sender: str, msg: GetEmployees): """ Handler for fetching all employee details from the database. :param ctx: Context object for handling agent events :param sender: Sender of the message :param msg: Message triggering the fetch operation """ if msg.reply_back: conn = create_connection(**db_params) if conn: try: cursor = conn.cursor() query = "SELECT * FROM Employees" cursor.execute(query) all_employees = cursor.fetchall() cursor.close() employees_list = [] for employee in all_employees: employee_info = { "EmployeeID": employee[0], "FirstName": employee[1], "LastName": employee[2], "BirthDate": employee[3].strftime("%d-%m-%Y"), "Salary": employee[4], } employees_list.append(employee_info) ctx.logger.info(f"Retrieved all employee data: {employees_list}") except Exception as error: ctx.logger.error(f"Error retrieving employee data: {error}") else: ctx.logger.error("Could not connect to the database.") bureau = Bureau() bureau.add(db_insert_agent) bureau.add(db_fetch_agent) if __name__ == "__main__": bureau.run()
This constant file initializes a dictionary for storing employee data and configures the database connection parameters using environment variables. It also defines a constant for the address of the database fetch agent.
constants.pyimport os employees_data = { "EmployeeID": "", "FirstName": "", "LastName": "", "BirthDate": "", "Salary": 0, } db_params = { "dbname": os.getenv("DB_NAME"), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "host": "db", "port": "5432", } DB_FETCH_AGENT_ADDRESS = ( "agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu" )
Poetry Dependencies
pyproject.toml[tool.poetry.dependencies] python = "^3.10" psycopg2-binary = "^2.9.9" uagents = { version = "^0.13.0", python = ">=3.10,<3.13" }
How to Run This Example
Update the Required environment variables
.env.exampleDB_USER= DB_PASSWORD= DB_NAME=
Instructions to execute the example.
- Navigate to the root Folder of Example.
- Update the constant file with new entries to store in the database
- Run
docker-compose build
- Run
docker-compose up
Expected Output
poetry_app | Connection successful poetry_app | INFO: [db_fetcher]: Hello, I'm agent db_inserter and my address is agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu. PostgreSQL database version: PostgreSQL 16.3 (Debian 16.3-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit poetry_app | INFO: [db_inserter]: Received request from agent1qv470qn3vfgn3dwe5z90m8u6qvtn6chrgm4urfzdg2v9qyagln6sgnh4wwg {'employees_data': {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000}} poetry_app | Connection successful poetry_app | INFO: [db_inserter]: Inserted employee data: {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000} poetry_app | Connection successful poetry_app | INFO: [db_fetcher]: Retrieved all employee data: [{'EmployeeID': 0, 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': Decimal('50000.00')}] poetry_app | INFO: [bureau]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit)