From 1eb072cc690cdbb399cc7df51e2f7581c1166675 Mon Sep 17 00:00:00 2001 From: "MSI\\ModSetter" Date: Wed, 16 Jul 2025 01:10:33 -0700 Subject: [PATCH] feat(BACKEND): Added Log Management System for better Bug's Tracking - Background tasks are now logged so non tech users can effectively track the failurte points easily. --- .../alembic/versions/12_add_logs_table.py | 71 ++++ surfsense_backend/app/db.py | 25 ++ surfsense_backend/app/routes/__init__.py | 2 + .../app/routes/documents_routes.py | 266 ++++++++++++++- surfsense_backend/app/routes/logs_routes.py | 280 ++++++++++++++++ surfsense_backend/app/schemas/__init__.py | 6 + surfsense_backend/app/schemas/logs.py | 44 +++ .../app/services/task_logging_service.py | 204 ++++++++++++ .../app/tasks/background_tasks.py | 305 +++++++++++++++++- 9 files changed, 1193 insertions(+), 10 deletions(-) create mode 100644 surfsense_backend/alembic/versions/12_add_logs_table.py create mode 100644 surfsense_backend/app/routes/logs_routes.py create mode 100644 surfsense_backend/app/schemas/logs.py create mode 100644 surfsense_backend/app/services/task_logging_service.py diff --git a/surfsense_backend/alembic/versions/12_add_logs_table.py b/surfsense_backend/alembic/versions/12_add_logs_table.py new file mode 100644 index 0000000..0b2cc13 --- /dev/null +++ b/surfsense_backend/alembic/versions/12_add_logs_table.py @@ -0,0 +1,71 @@ +"""Add LogLevel and LogStatus enums and logs table + +Revision ID: 12 +Revises: 11 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSON + + +# revision identifiers, used by Alembic. +revision: str = "12" +down_revision: Union[str, None] = "11" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema - add LogLevel and LogStatus enums and logs table.""" + + # Create LogLevel enum + op.execute(""" + CREATE TYPE loglevel AS ENUM ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') + """) + + # Create LogStatus enum + op.execute(""" + CREATE TYPE logstatus AS ENUM ('IN_PROGRESS', 'SUCCESS', 'FAILED') + """) + + # Create logs table + op.execute(""" + CREATE TABLE logs ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + level loglevel NOT NULL, + status logstatus NOT NULL, + message TEXT NOT NULL, + source VARCHAR(200), + log_metadata JSONB DEFAULT '{}', + search_space_id INTEGER NOT NULL REFERENCES searchspaces(id) ON DELETE CASCADE + ) + """) + + # Create indexes + op.create_index(op.f('ix_logs_id'), 'logs', ['id'], unique=False) + op.create_index(op.f('ix_logs_created_at'), 'logs', ['created_at'], unique=False) + op.create_index(op.f('ix_logs_level'), 'logs', ['level'], unique=False) + op.create_index(op.f('ix_logs_status'), 'logs', ['status'], unique=False) + op.create_index(op.f('ix_logs_source'), 'logs', ['source'], unique=False) + + +def downgrade() -> None: + """Downgrade schema - remove logs table and enums.""" + + # Drop indexes + op.drop_index(op.f('ix_logs_source'), table_name='logs') + op.drop_index(op.f('ix_logs_status'), table_name='logs') + op.drop_index(op.f('ix_logs_level'), table_name='logs') + op.drop_index(op.f('ix_logs_created_at'), table_name='logs') + op.drop_index(op.f('ix_logs_id'), table_name='logs') + + # Drop logs table + op.drop_table('logs') + + # Drop enums + op.execute("DROP TYPE IF EXISTS logstatus") + op.execute("DROP TYPE IF EXISTS loglevel") \ No newline at end of file diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 0c6311a..7caf365 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -91,6 +91,18 @@ class LiteLLMProvider(str, Enum): ALEPH_ALPHA = "ALEPH_ALPHA" PETALS = "PETALS" CUSTOM = "CUSTOM" + +class LogLevel(str, Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + +class LogStatus(str, Enum): + IN_PROGRESS = "IN_PROGRESS" + SUCCESS = "SUCCESS" + FAILED = "FAILED" class Base(DeclarativeBase): pass @@ -163,6 +175,7 @@ class SearchSpace(BaseModel, TimestampMixin): documents = relationship("Document", back_populates="search_space", order_by="Document.id", cascade="all, delete-orphan") podcasts = relationship("Podcast", back_populates="search_space", order_by="Podcast.id", cascade="all, delete-orphan") chats = relationship('Chat', back_populates='search_space', order_by='Chat.id', cascade="all, delete-orphan") + logs = relationship("Log", back_populates="search_space", order_by="Log.id", cascade="all, delete-orphan") class SearchSourceConnector(BaseModel, TimestampMixin): __tablename__ = "search_source_connectors" @@ -196,6 +209,18 @@ class LLMConfig(BaseModel, TimestampMixin): user_id = Column(UUID(as_uuid=True), ForeignKey("user.id", ondelete='CASCADE'), nullable=False) user = relationship("User", back_populates="llm_configs", foreign_keys=[user_id]) +class Log(BaseModel, TimestampMixin): + __tablename__ = "logs" + + level = Column(SQLAlchemyEnum(LogLevel), nullable=False, index=True) + status = Column(SQLAlchemyEnum(LogStatus), nullable=False, index=True) + message = Column(Text, nullable=False) + source = Column(String(200), nullable=True, index=True) # Service/component that generated the log + log_metadata = Column(JSON, nullable=True, default={}) # Additional context data + + search_space_id = Column(Integer, ForeignKey("searchspaces.id", ondelete='CASCADE'), nullable=False) + search_space = relationship("SearchSpace", back_populates="logs") + if config.AUTH_TYPE == "GOOGLE": class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base): pass diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index 3420f35..dd6be9b 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -5,6 +5,7 @@ from .podcasts_routes import router as podcasts_router from .chats_routes import router as chats_router from .search_source_connectors_routes import router as search_source_connectors_router from .llm_config_routes import router as llm_config_router +from .logs_routes import router as logs_router router = APIRouter() @@ -14,3 +15,4 @@ router.include_router(podcasts_router) router.include_router(chats_router) router.include_router(search_source_connectors_router) router.include_router(llm_config_router) +router.include_router(logs_router) diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 9ca59c0..7d30d9e 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -135,11 +135,19 @@ async def process_file_in_background( filename: str, search_space_id: int, user_id: str, - session: AsyncSession + session: AsyncSession, + task_logger: 'TaskLoggingService', + log_entry: 'Log' ): try: # Check if the file is a markdown or text file if filename.lower().endswith(('.md', '.markdown', '.txt')): + await task_logger.log_task_progress( + log_entry, + f"Processing markdown/text file: {filename}", + {"file_type": "markdown", "processing_stage": "reading_file"} + ) + # For markdown files, read the content directly with open(file_path, 'r', encoding='utf-8') as f: markdown_content = f.read() @@ -151,16 +159,42 @@ async def process_file_in_background( except: pass + await task_logger.log_task_progress( + log_entry, + f"Creating document from markdown content: {filename}", + {"processing_stage": "creating_document", "content_length": len(markdown_content)} + ) + # Process markdown directly through specialized function - await add_received_markdown_file_document( + result = await add_received_markdown_file_document( session, filename, markdown_content, search_space_id, user_id ) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed markdown file: {filename}", + {"document_id": result.id, "content_hash": result.content_hash, "file_type": "markdown"} + ) + else: + await task_logger.log_task_success( + log_entry, + f"Markdown file already exists (duplicate): {filename}", + {"duplicate_detected": True, "file_type": "markdown"} + ) + # Check if the file is an audio file elif filename.lower().endswith(('.mp3', '.mp4', '.mpeg', '.mpga', '.m4a', '.wav', '.webm')): + await task_logger.log_task_progress( + log_entry, + f"Processing audio file for transcription: {filename}", + {"file_type": "audio", "processing_stage": "starting_transcription"} + ) + # Open the audio file for transcription with open(file_path, "rb") as audio_file: # Use LiteLLM for audio transcription @@ -184,6 +218,12 @@ async def process_file_in_background( # Add metadata about the transcription transcribed_text = f"# Transcription of {filename}\n\n{transcribed_text}" + await task_logger.log_task_progress( + log_entry, + f"Transcription completed, creating document: {filename}", + {"processing_stage": "transcription_complete", "transcript_length": len(transcribed_text)} + ) + # Clean up the temp file try: os.unlink(file_path) @@ -191,15 +231,35 @@ async def process_file_in_background( pass # Process transcription as markdown document - await add_received_markdown_file_document( + result = await add_received_markdown_file_document( session, filename, transcribed_text, search_space_id, user_id ) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully transcribed and processed audio file: {filename}", + {"document_id": result.id, "content_hash": result.content_hash, "file_type": "audio", "transcript_length": len(transcribed_text)} + ) + else: + await task_logger.log_task_success( + log_entry, + f"Audio file transcript already exists (duplicate): {filename}", + {"duplicate_detected": True, "file_type": "audio"} + ) + else: if app_config.ETL_SERVICE == "UNSTRUCTURED": + await task_logger.log_task_progress( + log_entry, + f"Processing file with Unstructured ETL: {filename}", + {"file_type": "document", "etl_service": "UNSTRUCTURED", "processing_stage": "loading"} + ) + from langchain_unstructured import UnstructuredLoader # Process the file @@ -215,6 +275,12 @@ async def process_file_in_background( docs = await loader.aload() + await task_logger.log_task_progress( + log_entry, + f"Unstructured ETL completed, creating document: {filename}", + {"processing_stage": "etl_complete", "elements_count": len(docs)} + ) + # Clean up the temp file import os try: @@ -223,14 +289,34 @@ async def process_file_in_background( pass # Pass the documents to the existing background task - await add_received_file_document_using_unstructured( + result = await add_received_file_document_using_unstructured( session, filename, docs, search_space_id, user_id ) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed file with Unstructured: {filename}", + {"document_id": result.id, "content_hash": result.content_hash, "file_type": "document", "etl_service": "UNSTRUCTURED"} + ) + else: + await task_logger.log_task_success( + log_entry, + f"Document already exists (duplicate): {filename}", + {"duplicate_detected": True, "file_type": "document", "etl_service": "UNSTRUCTURED"} + ) + elif app_config.ETL_SERVICE == "LLAMACLOUD": + await task_logger.log_task_progress( + log_entry, + f"Processing file with LlamaCloud ETL: {filename}", + {"file_type": "document", "etl_service": "LLAMACLOUD", "processing_stage": "parsing"} + ) + from llama_cloud_services import LlamaParse from llama_cloud_services.parse.utils import ResultType @@ -257,21 +343,48 @@ async def process_file_in_background( # Get markdown documents from the result markdown_documents = await result.aget_markdown_documents(split_by_page=False) + await task_logger.log_task_progress( + log_entry, + f"LlamaCloud parsing completed, creating documents: {filename}", + {"processing_stage": "parsing_complete", "documents_count": len(markdown_documents)} + ) + for doc in markdown_documents: # Extract text content from the markdown documents markdown_content = doc.text # Process the documents using our LlamaCloud background task - await add_received_file_document_using_llamacloud( + doc_result = await add_received_file_document_using_llamacloud( session, filename, llamacloud_markdown_document=markdown_content, search_space_id=search_space_id, user_id=user_id ) + + if doc_result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed file with LlamaCloud: {filename}", + {"document_id": doc_result.id, "content_hash": doc_result.content_hash, "file_type": "document", "etl_service": "LLAMACLOUD"} + ) + else: + await task_logger.log_task_success( + log_entry, + f"Document already exists (duplicate): {filename}", + {"duplicate_detected": True, "file_type": "document", "etl_service": "LLAMACLOUD"} + ) + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to process file: {filename}", + str(e), + {"error_type": type(e).__name__, "filename": filename} + ) import logging logging.error(f"Error processing file in background: {str(e)}") + raise # Re-raise so the wrapper can also handle it @router.get("/documents/", response_model=List[DocumentRead]) @@ -442,11 +555,47 @@ async def process_extension_document_with_new_session( ): """Create a new session and process extension document.""" from app.db import async_session_maker + from app.services.task_logging_service import TaskLoggingService async with async_session_maker() as session: + # Initialize task logging service + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="process_extension_document", + source="document_processor", + message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}", + metadata={ + "document_type": "EXTENSION", + "url": individual_document.metadata.VisitedWebPageURL, + "title": individual_document.metadata.VisitedWebPageTitle, + "user_id": user_id + } + ) + try: - await add_extension_received_document(session, individual_document, search_space_id, user_id) + result = await add_extension_received_document(session, individual_document, search_space_id, user_id) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}", + {"document_id": result.id, "content_hash": result.content_hash} + ) + else: + await task_logger.log_task_success( + log_entry, + f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}", + {"duplicate_detected": True} + ) except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}", + str(e), + {"error_type": type(e).__name__} + ) import logging logging.error(f"Error processing extension document: {str(e)}") @@ -458,11 +607,46 @@ async def process_crawled_url_with_new_session( ): """Create a new session and process crawled URL.""" from app.db import async_session_maker + from app.services.task_logging_service import TaskLoggingService async with async_session_maker() as session: + # Initialize task logging service + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="process_crawled_url", + source="document_processor", + message=f"Starting URL crawling and processing for: {url}", + metadata={ + "document_type": "CRAWLED_URL", + "url": url, + "user_id": user_id + } + ) + try: - await add_crawled_url_document(session, url, search_space_id, user_id) + result = await add_crawled_url_document(session, url, search_space_id, user_id) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully crawled and processed URL: {url}", + {"document_id": result.id, "title": result.title, "content_hash": result.content_hash} + ) + else: + await task_logger.log_task_success( + log_entry, + f"URL document already exists (duplicate): {url}", + {"duplicate_detected": True} + ) except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to crawl URL: {url}", + str(e), + {"error_type": type(e).__name__} + ) import logging logging.error(f"Error processing crawled URL: {str(e)}") @@ -475,9 +659,38 @@ async def process_file_in_background_with_new_session( ): """Create a new session and process file.""" from app.db import async_session_maker + from app.services.task_logging_service import TaskLoggingService async with async_session_maker() as session: - await process_file_in_background(file_path, filename, search_space_id, user_id, session) + # Initialize task logging service + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="process_file_upload", + source="document_processor", + message=f"Starting file processing for: {filename}", + metadata={ + "document_type": "FILE", + "filename": filename, + "file_path": file_path, + "user_id": user_id + } + ) + + try: + await process_file_in_background(file_path, filename, search_space_id, user_id, session, task_logger, log_entry) + + # Note: success/failure logging is handled within process_file_in_background + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to process file: {filename}", + str(e), + {"error_type": type(e).__name__} + ) + import logging + logging.error(f"Error processing file: {str(e)}") async def process_youtube_video_with_new_session( @@ -487,11 +700,46 @@ async def process_youtube_video_with_new_session( ): """Create a new session and process YouTube video.""" from app.db import async_session_maker + from app.services.task_logging_service import TaskLoggingService async with async_session_maker() as session: + # Initialize task logging service + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="process_youtube_video", + source="document_processor", + message=f"Starting YouTube video processing for: {url}", + metadata={ + "document_type": "YOUTUBE_VIDEO", + "url": url, + "user_id": user_id + } + ) + try: - await add_youtube_video_document(session, url, search_space_id, user_id) + result = await add_youtube_video_document(session, url, search_space_id, user_id) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed YouTube video: {result.title}", + {"document_id": result.id, "video_id": result.document_metadata.get("video_id"), "content_hash": result.content_hash} + ) + else: + await task_logger.log_task_success( + log_entry, + f"YouTube video document already exists (duplicate): {url}", + {"duplicate_detected": True} + ) except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to process YouTube video: {url}", + str(e), + {"error_type": type(e).__name__} + ) import logging logging.error(f"Error processing YouTube video: {str(e)}") diff --git a/surfsense_backend/app/routes/logs_routes.py b/surfsense_backend/app/routes/logs_routes.py new file mode 100644 index 0000000..65e33ec --- /dev/null +++ b/surfsense_backend/app/routes/logs_routes.py @@ -0,0 +1,280 @@ +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy import and_, desc +from typing import List, Optional +from datetime import datetime, timedelta + +from app.db import get_async_session, User, SearchSpace, Log, LogLevel, LogStatus +from app.schemas import LogCreate, LogUpdate, LogRead, LogFilter +from app.users import current_active_user +from app.utils.check_ownership import check_ownership + +router = APIRouter() + +@router.post("/logs/", response_model=LogRead) +async def create_log( + log: LogCreate, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user) +): + """Create a new log entry.""" + try: + # Check if the user owns the search space + await check_ownership(session, SearchSpace, log.search_space_id, user) + + db_log = Log(**log.model_dump()) + session.add(db_log) + await session.commit() + await session.refresh(db_log) + return db_log + except HTTPException: + raise + except Exception as e: + await session.rollback() + raise HTTPException( + status_code=500, + detail=f"Failed to create log: {str(e)}" + ) + +@router.get("/logs/", response_model=List[LogRead]) +async def read_logs( + skip: int = 0, + limit: int = 100, + search_space_id: Optional[int] = None, + level: Optional[LogLevel] = None, + status: Optional[LogStatus] = None, + source: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user) +): + """Get logs with optional filtering.""" + try: + # Build base query - only logs from user's search spaces + query = ( + select(Log) + .join(SearchSpace) + .filter(SearchSpace.user_id == user.id) + .order_by(desc(Log.created_at)) # Most recent first + ) + + # Apply filters + filters = [] + + if search_space_id is not None: + await check_ownership(session, SearchSpace, search_space_id, user) + filters.append(Log.search_space_id == search_space_id) + + if level is not None: + filters.append(Log.level == level) + + if status is not None: + filters.append(Log.status == status) + + if source is not None: + filters.append(Log.source.ilike(f"%{source}%")) + + if start_date is not None: + filters.append(Log.created_at >= start_date) + + if end_date is not None: + filters.append(Log.created_at <= end_date) + + if filters: + query = query.filter(and_(*filters)) + + # Apply pagination + result = await session.execute(query.offset(skip).limit(limit)) + return result.scalars().all() + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to fetch logs: {str(e)}" + ) + +@router.get("/logs/{log_id}", response_model=LogRead) +async def read_log( + log_id: int, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user) +): + """Get a specific log by ID.""" + try: + # Get log and verify user owns the search space + result = await session.execute( + select(Log) + .join(SearchSpace) + .filter(Log.id == log_id, SearchSpace.user_id == user.id) + ) + log = result.scalars().first() + + if not log: + raise HTTPException(status_code=404, detail="Log not found") + + return log + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to fetch log: {str(e)}" + ) + +@router.put("/logs/{log_id}", response_model=LogRead) +async def update_log( + log_id: int, + log_update: LogUpdate, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user) +): + """Update a log entry.""" + try: + # Get log and verify user owns the search space + result = await session.execute( + select(Log) + .join(SearchSpace) + .filter(Log.id == log_id, SearchSpace.user_id == user.id) + ) + db_log = result.scalars().first() + + if not db_log: + raise HTTPException(status_code=404, detail="Log not found") + + # Update only provided fields + update_data = log_update.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(db_log, field, value) + + await session.commit() + await session.refresh(db_log) + return db_log + except HTTPException: + raise + except Exception as e: + await session.rollback() + raise HTTPException( + status_code=500, + detail=f"Failed to update log: {str(e)}" + ) + +@router.delete("/logs/{log_id}") +async def delete_log( + log_id: int, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user) +): + """Delete a log entry.""" + try: + # Get log and verify user owns the search space + result = await session.execute( + select(Log) + .join(SearchSpace) + .filter(Log.id == log_id, SearchSpace.user_id == user.id) + ) + db_log = result.scalars().first() + + if not db_log: + raise HTTPException(status_code=404, detail="Log not found") + + await session.delete(db_log) + await session.commit() + return {"message": "Log deleted successfully"} + except HTTPException: + raise + except Exception as e: + await session.rollback() + raise HTTPException( + status_code=500, + detail=f"Failed to delete log: {str(e)}" + ) + +@router.get("/logs/search-space/{search_space_id}/summary") +async def get_logs_summary( + search_space_id: int, + hours: int = 24, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user) +): + """Get a summary of logs for a search space in the last X hours.""" + try: + # Check ownership + await check_ownership(session, SearchSpace, search_space_id, user) + + # Calculate time window + since = datetime.utcnow().replace(microsecond=0) - timedelta(hours=hours) + + # Get logs from the time window + result = await session.execute( + select(Log) + .filter( + and_( + Log.search_space_id == search_space_id, + Log.created_at >= since + ) + ) + .order_by(desc(Log.created_at)) + ) + logs = result.scalars().all() + + # Create summary + summary = { + "total_logs": len(logs), + "time_window_hours": hours, + "by_status": {}, + "by_level": {}, + "by_source": {}, + "active_tasks": [], + "recent_failures": [] + } + + # Count by status and level + for log in logs: + # Status counts + status_str = log.status.value + summary["by_status"][status_str] = summary["by_status"].get(status_str, 0) + 1 + + # Level counts + level_str = log.level.value + summary["by_level"][level_str] = summary["by_level"].get(level_str, 0) + 1 + + # Source counts + if log.source: + summary["by_source"][log.source] = summary["by_source"].get(log.source, 0) + 1 + + # Active tasks (IN_PROGRESS) + if log.status == LogStatus.IN_PROGRESS: + task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown" + summary["active_tasks"].append({ + "id": log.id, + "task_name": task_name, + "message": log.message, + "started_at": log.created_at, + "source": log.source + }) + + # Recent failures + if log.status == LogStatus.FAILED and len(summary["recent_failures"]) < 10: + task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown" + summary["recent_failures"].append({ + "id": log.id, + "task_name": task_name, + "message": log.message, + "failed_at": log.created_at, + "source": log.source, + "error_details": log.log_metadata.get("error_details") if log.log_metadata else None + }) + + return summary + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to generate logs summary: {str(e)}" + ) \ No newline at end of file diff --git a/surfsense_backend/app/schemas/__init__.py b/surfsense_backend/app/schemas/__init__.py index f62172a..89525c9 100644 --- a/surfsense_backend/app/schemas/__init__.py +++ b/surfsense_backend/app/schemas/__init__.py @@ -14,6 +14,7 @@ from .podcasts import PodcastBase, PodcastCreate, PodcastUpdate, PodcastRead, Po from .chats import ChatBase, ChatCreate, ChatUpdate, ChatRead, AISDKChatRequest from .search_source_connector import SearchSourceConnectorBase, SearchSourceConnectorCreate, SearchSourceConnectorUpdate, SearchSourceConnectorRead from .llm_config import LLMConfigBase, LLMConfigCreate, LLMConfigUpdate, LLMConfigRead +from .logs import LogBase, LogCreate, LogUpdate, LogRead, LogFilter __all__ = [ "AISDKChatRequest", @@ -53,4 +54,9 @@ __all__ = [ "LLMConfigCreate", "LLMConfigUpdate", "LLMConfigRead", + "LogBase", + "LogCreate", + "LogUpdate", + "LogRead", + "LogFilter", ] \ No newline at end of file diff --git a/surfsense_backend/app/schemas/logs.py b/surfsense_backend/app/schemas/logs.py new file mode 100644 index 0000000..1d9d7e7 --- /dev/null +++ b/surfsense_backend/app/schemas/logs.py @@ -0,0 +1,44 @@ +from datetime import datetime +from typing import Optional, Dict, Any +from pydantic import BaseModel, ConfigDict +from .base import IDModel, TimestampModel +from app.db import LogLevel, LogStatus + +class LogBase(BaseModel): + level: LogLevel + status: LogStatus + message: str + source: Optional[str] = None + log_metadata: Optional[Dict[str, Any]] = None + +class LogCreate(BaseModel): + level: LogLevel + status: LogStatus + message: str + source: Optional[str] = None + log_metadata: Optional[Dict[str, Any]] = None + search_space_id: int + +class LogUpdate(BaseModel): + level: Optional[LogLevel] = None + status: Optional[LogStatus] = None + message: Optional[str] = None + source: Optional[str] = None + log_metadata: Optional[Dict[str, Any]] = None + +class LogRead(LogBase, IDModel, TimestampModel): + id: int + created_at: datetime + search_space_id: int + + model_config = ConfigDict(from_attributes=True) + +class LogFilter(BaseModel): + level: Optional[LogLevel] = None + status: Optional[LogStatus] = None + source: Optional[str] = None + search_space_id: Optional[int] = None + start_date: Optional[datetime] = None + end_date: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) \ No newline at end of file diff --git a/surfsense_backend/app/services/task_logging_service.py b/surfsense_backend/app/services/task_logging_service.py new file mode 100644 index 0000000..c50e420 --- /dev/null +++ b/surfsense_backend/app/services/task_logging_service.py @@ -0,0 +1,204 @@ +from typing import Optional, Dict, Any +from sqlalchemy.ext.asyncio import AsyncSession +from app.db import Log, LogLevel, LogStatus +import logging +import json +from datetime import datetime + +logger = logging.getLogger(__name__) + +class TaskLoggingService: + """Service for logging background tasks using the database Log model""" + + def __init__(self, session: AsyncSession, search_space_id: int): + self.session = session + self.search_space_id = search_space_id + + async def log_task_start( + self, + task_name: str, + source: str, + message: str, + metadata: Optional[Dict[str, Any]] = None + ) -> Log: + """ + Log the start of a task with IN_PROGRESS status + + Args: + task_name: Name/identifier of the task + source: Source service/component (e.g., 'document_processor', 'slack_indexer') + message: Human-readable message about the task + metadata: Additional context data + + Returns: + Log: The created log entry + """ + log_metadata = metadata or {} + log_metadata.update({ + "task_name": task_name, + "started_at": datetime.utcnow().isoformat() + }) + + log_entry = Log( + level=LogLevel.INFO, + status=LogStatus.IN_PROGRESS, + message=message, + source=source, + log_metadata=log_metadata, + search_space_id=self.search_space_id + ) + + self.session.add(log_entry) + await self.session.commit() + await self.session.refresh(log_entry) + + logger.info(f"Started task {task_name}: {message}") + return log_entry + + async def log_task_success( + self, + log_entry: Log, + message: str, + additional_metadata: Optional[Dict[str, Any]] = None + ) -> Log: + """ + Update a log entry to SUCCESS status + + Args: + log_entry: The original log entry to update + message: Success message + additional_metadata: Additional metadata to merge + + Returns: + Log: The updated log entry + """ + # Update the existing log entry + log_entry.status = LogStatus.SUCCESS + log_entry.message = message + + # Merge additional metadata + if additional_metadata: + if log_entry.log_metadata is None: + log_entry.log_metadata = {} + log_entry.log_metadata.update(additional_metadata) + log_entry.log_metadata["completed_at"] = datetime.utcnow().isoformat() + + await self.session.commit() + await self.session.refresh(log_entry) + + task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown" + logger.info(f"Completed task {task_name}: {message}") + return log_entry + + async def log_task_failure( + self, + log_entry: Log, + error_message: str, + error_details: Optional[str] = None, + additional_metadata: Optional[Dict[str, Any]] = None + ) -> Log: + """ + Update a log entry to FAILED status + + Args: + log_entry: The original log entry to update + error_message: Error message + error_details: Detailed error information + additional_metadata: Additional metadata to merge + + Returns: + Log: The updated log entry + """ + # Update the existing log entry + log_entry.status = LogStatus.FAILED + log_entry.level = LogLevel.ERROR + log_entry.message = error_message + + # Merge additional metadata + if log_entry.log_metadata is None: + log_entry.log_metadata = {} + + log_entry.log_metadata.update({ + "failed_at": datetime.utcnow().isoformat(), + "error_details": error_details + }) + + if additional_metadata: + log_entry.log_metadata.update(additional_metadata) + + await self.session.commit() + await self.session.refresh(log_entry) + + task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown" + logger.error(f"Failed task {task_name}: {error_message}") + if error_details: + logger.error(f"Error details: {error_details}") + + return log_entry + + async def log_task_progress( + self, + log_entry: Log, + progress_message: str, + progress_metadata: Optional[Dict[str, Any]] = None + ) -> Log: + """ + Update a log entry with progress information while keeping IN_PROGRESS status + + Args: + log_entry: The log entry to update + progress_message: Progress update message + progress_metadata: Additional progress metadata + + Returns: + Log: The updated log entry + """ + log_entry.message = progress_message + + if progress_metadata: + if log_entry.log_metadata is None: + log_entry.log_metadata = {} + log_entry.log_metadata.update(progress_metadata) + log_entry.log_metadata["last_progress_update"] = datetime.utcnow().isoformat() + + await self.session.commit() + await self.session.refresh(log_entry) + + task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown" + logger.info(f"Progress update for task {task_name}: {progress_message}") + return log_entry + + async def log_simple_event( + self, + level: LogLevel, + source: str, + message: str, + metadata: Optional[Dict[str, Any]] = None + ) -> Log: + """ + Log a simple event (not a long-running task) + + Args: + level: Log level + source: Source service/component + message: Log message + metadata: Additional context data + + Returns: + Log: The created log entry + """ + log_entry = Log( + level=level, + status=LogStatus.SUCCESS, # Simple events are immediately complete + message=message, + source=source, + log_metadata=metadata or {}, + search_space_id=self.search_space_id + ) + + self.session.add(log_entry) + await self.session.commit() + await self.session.refresh(log_entry) + + logger.info(f"Logged event from {source}: {message}") + return log_entry \ No newline at end of file diff --git a/surfsense_backend/app/tasks/background_tasks.py b/surfsense_backend/app/tasks/background_tasks.py index 53347b2..f18890f 100644 --- a/surfsense_backend/app/tasks/background_tasks.py +++ b/surfsense_backend/app/tasks/background_tasks.py @@ -8,6 +8,7 @@ from app.config import config from app.prompts import SUMMARY_PROMPT_TEMPLATE from app.utils.document_converters import convert_document_to_markdown, generate_content_hash from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService from langchain_core.documents import Document as LangChainDocument from langchain_community.document_loaders import FireCrawlLoader, AsyncChromiumLoader from langchain_community.document_transformers import MarkdownifyTransformer @@ -22,10 +23,34 @@ md = MarkdownifyTransformer() async def add_crawled_url_document( session: AsyncSession, url: str, search_space_id: int, user_id: str ) -> Optional[Document]: + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="crawl_url_document", + source="background_task", + message=f"Starting URL crawling process for: {url}", + metadata={"url": url, "user_id": user_id} + ) + try: + # URL validation step + await task_logger.log_task_progress( + log_entry, + f"Validating URL: {url}", + {"stage": "validation"} + ) + if not validators.url(url): raise ValueError(f"Url {url} is not a valid URL address") + # Set up crawler + await task_logger.log_task_progress( + log_entry, + f"Setting up crawler for URL: {url}", + {"stage": "crawler_setup", "firecrawl_available": bool(config.FIRECRAWL_API_KEY)} + ) + if config.FIRECRAWL_API_KEY: crawl_loader = FireCrawlLoader( url=url, @@ -39,6 +64,13 @@ async def add_crawled_url_document( else: crawl_loader = AsyncChromiumLoader(urls=[url], headless=True) + # Perform crawling + await task_logger.log_task_progress( + log_entry, + f"Crawling URL content: {url}", + {"stage": "crawling", "crawler_type": type(crawl_loader).__name__} + ) + url_crawled = await crawl_loader.aload() if type(crawl_loader) == FireCrawlLoader: @@ -46,6 +78,13 @@ async def add_crawled_url_document( elif type(crawl_loader) == AsyncChromiumLoader: content_in_markdown = md.transform_documents(url_crawled)[0].page_content + # Format document + await task_logger.log_task_progress( + log_entry, + f"Processing crawled content from: {url}", + {"stage": "content_processing", "content_length": len(content_in_markdown)} + ) + # Format document metadata in a more maintainable way metadata_sections = [ ( @@ -74,6 +113,13 @@ async def add_crawled_url_document( combined_document_string = "\n".join(document_parts) content_hash = generate_content_hash(combined_document_string, search_space_id) + # Check for duplicates + await task_logger.log_task_progress( + log_entry, + f"Checking for duplicate content: {url}", + {"stage": "duplicate_check", "content_hash": content_hash} + ) + # Check if document with this content hash already exists existing_doc_result = await session.execute( select(Document).where(Document.content_hash == content_hash) @@ -81,15 +127,33 @@ async def add_crawled_url_document( existing_document = existing_doc_result.scalars().first() if existing_document: + await task_logger.log_task_success( + log_entry, + f"Document already exists for URL: {url}", + {"duplicate_detected": True, "existing_document_id": existing_document.id} + ) logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") return existing_document + # Get LLM for summary generation + await task_logger.log_task_progress( + log_entry, + f"Preparing for summary generation: {url}", + {"stage": "llm_setup"} + ) + # Get user's long context LLM user_llm = await get_user_long_context_llm(session, user_id) if not user_llm: raise RuntimeError(f"No long context LLM configured for user {user_id}") # Generate summary + await task_logger.log_task_progress( + log_entry, + f"Generating summary for URL content: {url}", + {"stage": "summary_generation"} + ) + summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm summary_result = await summary_chain.ainvoke( {"document": combined_document_string} @@ -98,6 +162,12 @@ async def add_crawled_url_document( summary_embedding = config.embedding_model_instance.embed(summary_content) # Process chunks + await task_logger.log_task_progress( + log_entry, + f"Processing content chunks for URL: {url}", + {"stage": "chunk_processing"} + ) + chunks = [ Chunk( content=chunk.text, @@ -107,6 +177,12 @@ async def add_crawled_url_document( ] # Create and store document + await task_logger.log_task_progress( + log_entry, + f"Creating document in database for URL: {url}", + {"stage": "document_creation", "chunks_count": len(chunks)} + ) + document = Document( search_space_id=search_space_id, title=url_crawled[0].metadata["title"] @@ -124,13 +200,38 @@ async def add_crawled_url_document( await session.commit() await session.refresh(document) + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully crawled and processed URL: {url}", + { + "document_id": document.id, + "title": document.title, + "content_hash": content_hash, + "chunks_count": len(chunks), + "summary_length": len(summary_content) + } + ) + return document except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error while processing URL: {url}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) raise db_error except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to crawl URL: {url}", + str(e), + {"error_type": type(e).__name__} + ) raise RuntimeError(f"Failed to crawl URL: {str(e)}") @@ -148,6 +249,20 @@ async def add_extension_received_document( Returns: Document object if successful, None if failed """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="extension_document", + source="background_task", + message=f"Processing extension document: {content.metadata.VisitedWebPageTitle}", + metadata={ + "url": content.metadata.VisitedWebPageURL, + "title": content.metadata.VisitedWebPageTitle, + "user_id": user_id + } + ) + try: # Format document metadata in a more maintainable way metadata_sections = [ @@ -188,6 +303,11 @@ async def add_extension_received_document( existing_document = existing_doc_result.scalars().first() if existing_document: + await task_logger.log_task_success( + log_entry, + f"Extension document already exists: {content.metadata.VisitedWebPageTitle}", + {"duplicate_detected": True, "existing_document_id": existing_document.id} + ) logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") return existing_document @@ -229,19 +349,52 @@ async def add_extension_received_document( await session.commit() await session.refresh(document) + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully processed extension document: {content.metadata.VisitedWebPageTitle}", + { + "document_id": document.id, + "content_hash": content_hash, + "url": content.metadata.VisitedWebPageURL + } + ) + return document except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error processing extension document: {content.metadata.VisitedWebPageTitle}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) raise db_error except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to process extension document: {content.metadata.VisitedWebPageTitle}", + str(e), + {"error_type": type(e).__name__} + ) raise RuntimeError(f"Failed to process extension document: {str(e)}") async def add_received_markdown_file_document( session: AsyncSession, file_name: str, file_in_markdown: str, search_space_id: int, user_id: str ) -> Optional[Document]: + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="markdown_file_document", + source="background_task", + message=f"Processing markdown file: {file_name}", + metadata={"filename": file_name, "user_id": user_id, "content_length": len(file_in_markdown)} + ) + try: content_hash = generate_content_hash(file_in_markdown, search_space_id) @@ -252,6 +405,11 @@ async def add_received_markdown_file_document( existing_document = existing_doc_result.scalars().first() if existing_document: + await task_logger.log_task_success( + log_entry, + f"Markdown file document already exists: {file_name}", + {"duplicate_detected": True, "existing_document_id": existing_document.id} + ) logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") return existing_document @@ -293,12 +451,36 @@ async def add_received_markdown_file_document( await session.commit() await session.refresh(document) + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully processed markdown file: {file_name}", + { + "document_id": document.id, + "content_hash": content_hash, + "chunks_count": len(chunks), + "summary_length": len(summary_content) + } + ) + return document except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error processing markdown file: {file_name}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) raise db_error except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to process markdown file: {file_name}", + str(e), + {"error_type": type(e).__name__} + ) raise RuntimeError(f"Failed to process file document: {str(e)}") @@ -478,8 +660,24 @@ async def add_youtube_video_document( SQLAlchemyError: If there's a database error RuntimeError: If the video processing fails """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="youtube_video_document", + source="background_task", + message=f"Starting YouTube video processing for: {url}", + metadata={"url": url, "user_id": user_id} + ) + try: # Extract video ID from URL + await task_logger.log_task_progress( + log_entry, + f"Extracting video ID from URL: {url}", + {"stage": "video_id_extraction"} + ) + def get_youtube_video_id(url: str): parsed_url = urlparse(url) hostname = parsed_url.hostname @@ -501,7 +699,19 @@ async def add_youtube_video_document( if not video_id: raise ValueError(f"Could not extract video ID from URL: {url}") - # Get video metadata using async HTTP client + await task_logger.log_task_progress( + log_entry, + f"Video ID extracted: {video_id}", + {"stage": "video_id_extracted", "video_id": video_id} + ) + + # Get video metadata + await task_logger.log_task_progress( + log_entry, + f"Fetching video metadata for: {video_id}", + {"stage": "metadata_fetch"} + ) + params = { "format": "json", "url": f"https://www.youtube.com/watch?v={video_id}", @@ -512,7 +722,19 @@ async def add_youtube_video_document( async with http_session.get(oembed_url, params=params) as response: video_data = await response.json() + await task_logger.log_task_progress( + log_entry, + f"Video metadata fetched: {video_data.get('title', 'Unknown')}", + {"stage": "metadata_fetched", "title": video_data.get('title'), "author": video_data.get('author_name')} + ) + # Get video transcript + await task_logger.log_task_progress( + log_entry, + f"Fetching transcript for video: {video_id}", + {"stage": "transcript_fetch"} + ) + try: captions = YouTubeTranscriptApi.get_transcript(video_id) # Include complete caption information with timestamps @@ -524,8 +746,26 @@ async def add_youtube_video_document( timestamp = f"[{start_time:.2f}s-{start_time + duration:.2f}s]" transcript_segments.append(f"{timestamp} {text}") transcript_text = "\n".join(transcript_segments) + + await task_logger.log_task_progress( + log_entry, + f"Transcript fetched successfully: {len(captions)} segments", + {"stage": "transcript_fetched", "segments_count": len(captions), "transcript_length": len(transcript_text)} + ) except Exception as e: transcript_text = f"No captions available for this video. Error: {str(e)}" + await task_logger.log_task_progress( + log_entry, + f"No transcript available for video: {video_id}", + {"stage": "transcript_unavailable", "error": str(e)} + ) + + # Format document + await task_logger.log_task_progress( + log_entry, + f"Processing video content: {video_data.get('title', 'YouTube Video')}", + {"stage": "content_processing"} + ) # Format document metadata in a more maintainable way metadata_sections = [ @@ -558,6 +798,13 @@ async def add_youtube_video_document( combined_document_string = "\n".join(document_parts) content_hash = generate_content_hash(combined_document_string, search_space_id) + # Check for duplicates + await task_logger.log_task_progress( + log_entry, + f"Checking for duplicate video content: {video_id}", + {"stage": "duplicate_check", "content_hash": content_hash} + ) + # Check if document with this content hash already exists existing_doc_result = await session.execute( select(Document).where(Document.content_hash == content_hash) @@ -565,15 +812,33 @@ async def add_youtube_video_document( existing_document = existing_doc_result.scalars().first() if existing_document: + await task_logger.log_task_success( + log_entry, + f"YouTube video document already exists: {video_data.get('title', 'YouTube Video')}", + {"duplicate_detected": True, "existing_document_id": existing_document.id, "video_id": video_id} + ) logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") return existing_document + # Get LLM for summary generation + await task_logger.log_task_progress( + log_entry, + f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}", + {"stage": "llm_setup"} + ) + # Get user's long context LLM user_llm = await get_user_long_context_llm(session, user_id) if not user_llm: raise RuntimeError(f"No long context LLM configured for user {user_id}") # Generate summary + await task_logger.log_task_progress( + log_entry, + f"Generating summary for video: {video_data.get('title', 'YouTube Video')}", + {"stage": "summary_generation"} + ) + summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm summary_result = await summary_chain.ainvoke( {"document": combined_document_string} @@ -582,6 +847,12 @@ async def add_youtube_video_document( summary_embedding = config.embedding_model_instance.embed(summary_content) # Process chunks + await task_logger.log_task_progress( + log_entry, + f"Processing content chunks for video: {video_data.get('title', 'YouTube Video')}", + {"stage": "chunk_processing"} + ) + chunks = [ Chunk( content=chunk.text, @@ -591,6 +862,11 @@ async def add_youtube_video_document( ] # Create document + await task_logger.log_task_progress( + log_entry, + f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}", + {"stage": "document_creation", "chunks_count": len(chunks)} + ) document = Document( title=video_data.get("title", "YouTube Video"), @@ -613,11 +889,38 @@ async def add_youtube_video_document( await session.commit() await session.refresh(document) + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully processed YouTube video: {video_data.get('title', 'YouTube Video')}", + { + "document_id": document.id, + "video_id": video_id, + "title": document.title, + "content_hash": content_hash, + "chunks_count": len(chunks), + "summary_length": len(summary_content), + "has_transcript": "No captions available" not in transcript_text + } + ) + return document except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error while processing YouTube video: {url}", + str(db_error), + {"error_type": "SQLAlchemyError", "video_id": video_id if 'video_id' in locals() else None} + ) raise db_error except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to process YouTube video: {url}", + str(e), + {"error_type": type(e).__name__, "video_id": video_id if 'video_id' in locals() else None} + ) logging.error(f"Failed to process YouTube video: {str(e)}") raise