How to use Agents to set up a cleaning service
Bookmark

Cleaning service with Agents

Introduction

This file can be run on any platform supporting Python, with the necessary install permissions. This example shows how to set up a cleaning service using Agents ann uAgents library tools.

Guide

Supporting documentation

The agents

The Protocol

__init__.py
from datetime import datetime, timedelta
from typing import List
 
from geopy.geocoders import Nominatim
from geopy.distance import geodesic
 
from uagents import Context, Model, Protocol
from .models import Provider, Availability, User
 
PROTOCOL_NAME = "cleaning"
PROTOCOL_VERSION = "0.1.0"
 
class ServiceRequest(Model):
    user: str
    location: str
    time_start: datetime
    duration: timedelta
    services: List[int]
    max_price: float
 
class ServiceResponse(Model):
    accept: bool
    price: float
 
class ServiceBooking(Model):
    location: str
    time_start: datetime
    duration: timedelta
    services: List[int]
    price: float
 
class BookingResponse(Model):
    success: bool
 
cleaning_proto = Protocol(name=PROTOCOL_NAME, version=PROTOCOL_VERSION)
 
def in_service_region(
    location: str, availability: Availability, provider: Provider
) -> bool:
    geolocator = Nominatim(user_agent="micro_agents")
 
    user_location = geolocator.geocode(location)
    cleaner_location = geolocator.geocode(provider.location)
 
    if user_location is None:
        raise RuntimeError(f"user location {location} not found")
 
    if cleaner_location is None:
        raise RuntimeError(f"provider location {provider.location} not found")
 
    cleaner_coordinates = (cleaner_location.latitude, cleaner_location.longitude)
    user_coordinates = (user_location.latitude, user_location.longitude)
 
    service_distance = geodesic(user_coordinates, cleaner_coordinates).miles
    in_range = service_distance <= availability.max_distance
 
    return in_range
 
@cleaning_proto.on_message(model=ServiceRequest, replies=ServiceResponse)
async def handle_query_request(ctx: Context, sender: str, msg: ServiceRequest):
    provider = await Provider.filter(name=ctx.name).first()
    availability = await Availability.get(provider=provider)
    services = [int(service.type) for service in await provider.services]
    markup = provider.markup
 
    user, _ = await User.get_or_create(name=msg.user, address=sender)
    msg_duration_hours: float = msg.duration.total_seconds() / 3600
    ctx.logger.info(f"Received service request from user `{user.name}`")
 
    if (
        set(msg.services) <= set(services)
        and in_service_region(msg.location, availability, provider)
        and availability.time_start <= msg.time_start
        and availability.time_end >= msg.time_start + msg.duration
        and availability.min_hourly_price * msg_duration_hours < msg.max_price
    ):
        accept = True
        price = markup * availability.min_hourly_price * msg_duration_hours
        ctx.logger.info(f"I am available! Proposing price: {price}.")
    else:
        accept = False
        price = 0
        ctx.logger.info("I am not available. Declining request.")
 
    await ctx.send(sender, ServiceResponse(accept=accept, price=price))
 
@cleaning_proto.on_message(model=ServiceBooking, replies=BookingResponse)
async def handle_book_request(ctx: Context, sender: str, msg: ServiceBooking):
    provider = await Provider.filter(name=ctx.name).first()
    availability = await Availability.get(provider=provider)
    services = [int(service.type) for service in await provider.services]
 
    user = await User.get(address=sender)
    msg_duration_hours: float = msg.duration.total_seconds() / 3600
    ctx.logger.info(f"Received booking request from user `{user.name}`")
 
    success = (
        set(msg.services) <= set(services)
        and availability.time_start <= msg.time_start
        and availability.time_end >= msg.time_start + msg.duration
        and msg.price <= availability.min_hourly_price * msg_duration_hours
    )
 
    if success:
        availability.time_start = msg.time_start + msg.duration
        await availability.save()
        ctx.logger.info("Accepted task and updated availability.")
 
    # send the response
    await ctx.send(sender, BookingResponse(success=success))

The Cleaner Agent

cleaner.py
from datetime import datetime
from pytz import utc
 
from tortoise import Tortoise
 
from protocols.cleaning import cleaning_proto
from protocols.cleaning.models import Availability, Provider, Service, ServiceType
 
from uagents import Agent, Context
from uagents.setup import fund_agent_if_low
 
cleaner = Agent(
    name="cleaner",
    port=8001,
    seed="cleaner secret phrase",
    endpoint={
        "http://127.0.0.1:8001/submit": {},
    },
)
 
fund_agent_if_low(cleaner.wallet.address())
 
# build the cleaning service agent from the cleaning protocol
cleaner.include(cleaning_proto)
 
@cleaner.on_event("startup")
async def startup(_ctx: Context):
    await Tortoise.init(
        db_url="sqlite://db.sqlite3", modules={"models": ["protocols.cleaning.models"]}
    )
    await Tortoise.generate_schemas()
 
    provider = await Provider.create(name=cleaner.name, location="London Kings Cross")
 
    floor = await Service.create(type=ServiceType.FLOOR)
    window = await Service.create(type=ServiceType.WINDOW)
    laundry = await Service.create(type=ServiceType.LAUNDRY)
 
    await provider.services.add(floor)
    await provider.services.add(window)
    await provider.services.add(laundry)
 
    await Availability.create(
        provider=provider,
        time_start=utc.localize(datetime.fromisoformat("2022-01-31 00:00:00")),
        time_end=utc.localize(datetime.fromisoformat("2023-05-01 00:00:00")),
        max_distance=10,
        min_hourly_price=5,
    )
 
@cleaner.on_event("shutdown")
async def shutdown(_ctx: Context):
    await Tortoise.close_connections()
 
if __name__ == "__main__":
    cleaner.run()

The User Agent

user.py
from datetime import datetime, timedelta
from pytz import utc
 
from protocols.cleaning import (
    ServiceBooking,
    BookingResponse,
    ServiceRequest,
    ServiceResponse,
)
from protocols.cleaning.models import ServiceType
from uagents import Agent, Context
from uagents.setup import fund_agent_if_low
 
CLEANER_ADDRESS = "agent1qdfdx6952trs028fxyug7elgcktam9f896ays6u9art4uaf75hwy2j9m87w"
 
user = Agent(
    name="user",
    port=8000,
    seed="cleaning user recovery phrase",
    endpoint={
        "http://127.0.0.1:8000/submit": {},
    },
)
 
fund_agent_if_low(user.wallet.address())
 
request = ServiceRequest(
    user=user.name,
    location="London Kings Cross",
    time_start=utc.localize(datetime.fromisoformat("2023-04-10 16:00:00")),
    duration=timedelta(hours=4),
    services=[ServiceType.WINDOW, ServiceType.LAUNDRY],
    max_price=60,
)
 
MARKDOWN = 0.8
 
@user.on_interval(period=3.0, messages=ServiceRequest)
async def interval(ctx: Context):
    ctx.storage.set("markdown", MARKDOWN)
    completed = ctx.storage.get("completed")
 
    if not completed:
        ctx.logger.info(f"Requesting cleaning service: {request}")
        await ctx.send(CLEANER_ADDRESS, request)
 
@user.on_message(ServiceResponse, replies=ServiceBooking)
async def handle_query_response(ctx: Context, sender: str, msg: ServiceResponse):
    markdown = ctx.storage.get("markdown")
    if msg.accept:
        ctx.logger.info("Cleaner is available, attempting to book now")
        booking = ServiceBooking(
            location=request.location,
            time_start=request.time_start,
            duration=request.duration,
            services=request.services,
            price=markdown * msg.price,
        )
        await ctx.send(sender, booking)
    else:
        ctx.logger.info("Cleaner is not available - nothing more to do")
        ctx.storage.set("completed", True)
 
@user.on_message(BookingResponse, replies=set())
async def handle_book_response(ctx: Context, _sender: str, msg: BookingResponse):
    if msg.success:
        ctx.logger.info("Booking was successful")
    else:
        ctx.logger.info("Booking was UNSUCCESSFUL")
 
    ctx.storage.set("completed", True)
 
if __name__ == "__main__":
    user.run()

Was this page helpful?

Bookmark