diff --git a/services/autonomous_listing/Dockerfile b/services/autonomous_listing/Dockerfile new file mode 100644 index 000000000..b9fdbfb21 --- /dev/null +++ b/services/autonomous_listing/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app ./app + +EXPOSE 8000 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/services/autonomous_listing/README.md b/services/autonomous_listing/README.md new file mode 100644 index 000000000..0b0a40f96 --- /dev/null +++ b/services/autonomous_listing/README.md @@ -0,0 +1,36 @@ +# Autonomous Listing Service (MVP Scaffold) + +This folder contains a FastAPI-based microservice that simulates the AI pipeline defined in `docs/autonomous_listing_service.md`. It is not production-ready yet, but it provides a runnable skeleton that: + +1. Accepts listing requests with seller notes, assets, preferences, and target platforms. +2. Runs through placeholder pipelines for image enhancement, marketing copy, and multi-platform publishing. +3. Returns a structured preview containing the listing status, suggested price, generated description, and enhanced asset URIs. + +## Quick Start + +```bash +cd services/autonomous_listing +python -m venv .venv && source .venv/bin/activate +pip install -r requirements.txt +uvicorn app.main:app --reload +``` + +POST a sample request: + +```bash +curl -X POST http://localhost:8000/listings \ + -H "Content-Type: application/json" \ + -d '{ + "raw_description": "Mid-century walnut coffee table, gentle wear, includes glass top.", + "category": "furniture", + "location": "Austin, TX", + "assets": [{"source_uri": "https://example.com/photo1.jpg"}], + "target_platforms": ["craigslist", "mercari"], + "preferences": {"tone": "premium", "target_price": 350} + }' +``` + +## Next Steps +- Replace the stubbed pipelines (`image_enhancer`, `description_generator`, `publisher`) with real AI agents and marketplace adapters. +- Connect to object storage for asset handling and to the knowledge/memory layers described in the main blueprint. +- Embed telemetry + mission diary hooks so the service feeds the agency’s iterative improvement loop. diff --git a/services/autonomous_listing/app/__init__.py b/services/autonomous_listing/app/__init__.py new file mode 100644 index 000000000..c9f1f8219 --- /dev/null +++ b/services/autonomous_listing/app/__init__.py @@ -0,0 +1,3 @@ +from . import schemas + +__all__ = ["schemas"] diff --git a/services/autonomous_listing/app/main.py b/services/autonomous_listing/app/main.py new file mode 100644 index 000000000..477bd5c14 --- /dev/null +++ b/services/autonomous_listing/app/main.py @@ -0,0 +1,34 @@ +from fastapi import FastAPI + +from . import schemas +from .services.orchestrator import ListingOrchestrator +from .services.pipelines.description_generator import DescriptionGenerator +from .services.pipelines.image_enhancer import ImageEnhancer +from .services.pipelines.publisher import ChannelPublisher + +app = FastAPI( + title="Autonomous Listing Service", + description="Transforms seller inputs into multi-channel listings via AI pipelines.", + version="0.1.0", +) + +orchestrator = ListingOrchestrator( + enhancer=ImageEnhancer(), + copywriter=DescriptionGenerator(), + publisher=ChannelPublisher(), +) + + +@app.get("/health") +async def health_check() -> dict: + return {"status": "ok"} + + +@app.post("/listings", response_model=schemas.ListingResponse) +async def create_listing(payload: schemas.ListingRequest) -> schemas.ListingResponse: + """ + Entry point for creating a new autonomous listing. + Downstream pipelines are mocked for now and should be replaced with actual AI services. + """ + + return await orchestrator.create_listing(payload) diff --git a/services/autonomous_listing/app/schemas.py b/services/autonomous_listing/app/schemas.py new file mode 100644 index 000000000..f40d2fba2 --- /dev/null +++ b/services/autonomous_listing/app/schemas.py @@ -0,0 +1,93 @@ +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Field, HttpUrl + + +class PlatformEnum(str, Enum): + craigslist = "craigslist" + mercari = "mercari" + nextdoor = "nextdoor" + offerup = "offerup" + custom = "custom" + + +class SellerPreference(BaseModel): + tone: Optional[str] = Field( + None, + description="Preferred copy tone (e.g., premium, playful, concise)", + ) + min_price: Optional[float] = Field( + None, + description="Lowest acceptable price for auto-negotiation guardrails", + ) + target_price: Optional[float] = Field( + None, + description="Ideal listing price suggested to pricing agent", + ) + pickup_only: Optional[bool] = Field( + False, description="If true, restrict listings to local pickup" + ) + + +class ListingAsset(BaseModel): + source_uri: Optional[HttpUrl] = Field( + None, description="Publicly accessible URL for the uploaded asset." + ) + base64_payload: Optional[str] = Field( + None, + description="Optional base64 encoded asset body when direct upload is used.", + ) + caption: Optional[str] = Field(None, description="User-provided caption or note.") + + +class ListingRequest(BaseModel): + title_hint: Optional[str] = Field( + None, description="Optional working title supplied by the seller." + ) + raw_description: str = Field( + ..., description="Free-form notes describing the item, condition, and story." + ) + category: Optional[str] = Field( + None, description="High-level category to help routing (e.g., furniture)." + ) + location: Optional[str] = Field( + None, description="City/region for localized marketplaces." + ) + assets: List[ListingAsset] = Field( + default_factory=list, description="Collection of reference photos/videos." + ) + target_platforms: List[PlatformEnum] = Field( + default_factory=lambda: [PlatformEnum.craigslist], + description="Marketplaces that should receive this listing.", + ) + preferences: SellerPreference = Field( + default_factory=SellerPreference, + description="Preferences controlling tone, pricing, negotiations.", + ) + + +class ListingStatus(BaseModel): + listing_id: str = Field(..., description="Internal tracking identifier.") + state: str = Field( + ..., + description="State machine stage (ingesting, enhancing, drafting, publishing, live, closed).", + ) + platforms_live: List[PlatformEnum] = Field( + default_factory=list, description="Platforms with confirmed publication." + ) + notes: Optional[str] = Field(None, description="Additional context for the seller.") + + +class ListingResponse(BaseModel): + status: ListingStatus + recommended_price: Optional[float] = Field( + None, description="Initial suggestion from valuation pipeline." + ) + preview_description: Optional[str] = Field( + None, description="First-pass marketing copy preview." + ) + enhanced_assets: List[str] = Field( + default_factory=list, + description="URIs for enhanced images stored in object storage.", + ) diff --git a/services/autonomous_listing/app/services/orchestrator.py b/services/autonomous_listing/app/services/orchestrator.py new file mode 100644 index 000000000..c256cd3f8 --- /dev/null +++ b/services/autonomous_listing/app/services/orchestrator.py @@ -0,0 +1,67 @@ +import uuid +from typing import List, Tuple + +from .pipelines.description_generator import DescriptionGenerator +from .pipelines.image_enhancer import ImageEnhancer +from .pipelines.publisher import ChannelPublisher +from .. import schemas + + +class ListingOrchestrator: + """Thin coordination layer that chains the enhancement, copywriting, pricing and publishing steps.""" + + def __init__( + self, + enhancer: ImageEnhancer, + copywriter: DescriptionGenerator, + publisher: ChannelPublisher, + ) -> None: + self._enhancer = enhancer + self._copywriter = copywriter + self._publisher = publisher + + async def create_listing( + self, payload: schemas.ListingRequest + ) -> schemas.ListingResponse: + listing_id = str(uuid.uuid4()) + enhanced_assets = await self._enhancer.process(listing_id, payload.assets) + + preview_description, suggested_price = await self._copywriter.generate( + listing_id=listing_id, + request=payload, + enhanced_assets=enhanced_assets, + ) + + publish_results = await self._publisher.schedule_publication( + listing_id=listing_id, + request=payload, + enhanced_assets=enhanced_assets, + description=preview_description, + recommended_price=suggested_price, + ) + + status = schemas.ListingStatus( + listing_id=listing_id, + state="publishing" if publish_results.pending else "live", + platforms_live=publish_results.confirmed_platforms, + notes=publish_results.notes, + ) + + return schemas.ListingResponse( + status=status, + recommended_price=suggested_price, + preview_description=preview_description, + enhanced_assets=enhanced_assets, + ) + + +class PublishResult: + def __init__( + self, + pending: bool, + confirmed_platforms: List[schemas.PlatformEnum], + notes: str = "", + ) -> None: + self.pending = pending + self.confirmed_platforms = confirmed_platforms + self.notes = notes diff --git a/services/autonomous_listing/app/services/pipelines/description_generator.py b/services/autonomous_listing/app/services/pipelines/description_generator.py new file mode 100644 index 000000000..530bb27be --- /dev/null +++ b/services/autonomous_listing/app/services/pipelines/description_generator.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import asyncio +import random +from typing import List, Tuple + +from ... import schemas + + +class DescriptionGenerator: + """ + Simplified marketing copy generator. + Replace with actual LLM/RAG pipeline wired to provider SDKs. + """ + + async def generate( + self, + listing_id: str, + request: schemas.ListingRequest, + enhanced_assets: List[str], + ) -> Tuple[str, float]: + await asyncio.sleep(0.1) + hero_line = request.title_hint or "Stunning find ready for a new home" + detail = request.raw_description.strip() + asset_note = ( + f"Includes {len(enhanced_assets)} professionally enhanced photos." + if enhanced_assets + else "Image enhancement pending." + ) + + narrative = ( + f"{hero_line}\n\n" + f"{detail}\n\n" + f"{asset_note} Curated for {', '.join([p.value for p in request.target_platforms])}." + ) + + suggested_price = request.preferences.target_price or round( + random.uniform(20, 200), 2 + ) + + return narrative, suggested_price diff --git a/services/autonomous_listing/app/services/pipelines/image_enhancer.py b/services/autonomous_listing/app/services/pipelines/image_enhancer.py new file mode 100644 index 000000000..b5b9499a7 --- /dev/null +++ b/services/autonomous_listing/app/services/pipelines/image_enhancer.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import asyncio +from typing import List + +from ... import schemas + + +class ImageEnhancer: + """ + Placeholder enhancer that emulates an asynchronous vision pipeline. + In production, this would call GPU-backed services (Real-ESRGAN, ControlNet, etc.). + """ + + async def process( + self, listing_id: str, assets: List[schemas.ListingAsset] + ) -> List[str]: + if not assets: + return [] + + await asyncio.sleep(0.1) # simulate async workload + enhanced_uris = [ + asset.source_uri or f"s3://placeholder/{listing_id}/{idx}.jpg" + for idx, asset in enumerate(assets) + ] + return enhanced_uris diff --git a/services/autonomous_listing/app/services/pipelines/publisher.py b/services/autonomous_listing/app/services/pipelines/publisher.py new file mode 100644 index 000000000..fa10082b3 --- /dev/null +++ b/services/autonomous_listing/app/services/pipelines/publisher.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from typing import List + +from ... import schemas +from ..orchestrator import PublishResult + + +@dataclass +class PublicationTask: + platform: schemas.PlatformEnum + status: str + reference_id: str + + +class ChannelPublisher: + """ + Stubbed marketplace publisher. + Replace with real adapters (Craigslist headless automation, Mercari API, etc.). + """ + + async def schedule_publication( + self, + listing_id: str, + request: schemas.ListingRequest, + enhanced_assets: List[str], + description: str, + recommended_price: float, + ) -> PublishResult: + await asyncio.sleep(0.1) + confirmed: List[schemas.PlatformEnum] = [] + pending = False + + for platform in request.target_platforms: + if platform in (schemas.PlatformEnum.craigslist, schemas.PlatformEnum.nextdoor): + # emulate asynchronous approval queues for certain platforms + pending = True + else: + confirmed.append(platform) + + notes = ( + "Some platforms require manual review before going live." + if pending + else "All requested platforms confirmed." + ) + + return PublishResult( + pending=pending, + confirmed_platforms=confirmed, + notes=notes, + ) diff --git a/services/autonomous_listing/requirements.txt b/services/autonomous_listing/requirements.txt new file mode 100644 index 000000000..155672749 --- /dev/null +++ b/services/autonomous_listing/requirements.txt @@ -0,0 +1,3 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.6 +pydantic==1.10.18