From 131d362f1e56b52f6e53f5dcf13ec7696cdf7b5a Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Sat, 26 Jul 2025 14:44:26 +0200 Subject: [PATCH] suppport confluence pages indexing --- .../app/tasks/connectors_indexing_tasks.py | 369 ++++++++++++++++++ 1 file changed, 369 insertions(+) diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index e028a47..732e55b 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from app.config import config +from app.connectors.confluence_connector import ConfluenceConnector from app.connectors.discord_connector import DiscordConnector from app.connectors.github_connector import GitHubConnector from app.connectors.jira_connector import JiraConnector @@ -2329,3 +2330,371 @@ async def index_jira_issues( ) logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True) return 0, f"Failed to index JIRA issues: {e!s}" + + +async def index_confluence_pages( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None = None, + end_date: str | None = None, + update_last_indexed: bool = True, +) -> tuple[int, str | None]: + """ + Index Confluence pages and comments. + + Args: + session: Database session + connector_id: ID of the Confluence connector + search_space_id: ID of the search space to store documents in + user_id: User ID + start_date: Start date for indexing (YYYY-MM-DD format) + end_date: End date for indexing (YYYY-MM-DD format) + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + + Returns: + Tuple containing (number of documents indexed, error message or None) + """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="confluence_pages_indexing", + source="connector_indexing_task", + message=f"Starting Confluence pages indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, + ) + + try: + # Get the connector from the database + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.CONFLUENCE_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found", + "Connector not found", + {"error_type": "ConnectorNotFound"}, + ) + return 0, f"Connector with ID {connector_id} not found" + + # Get the Confluence credentials from the connector config + confluence_email = connector.config.get("CONFLUENCE_EMAIL") + confluence_api_token = connector.config.get("CONFLUENCE_API_TOKEN") + confluence_base_url = connector.config.get("CONFLUENCE_BASE_URL") + + if not confluence_email or not confluence_api_token or not confluence_base_url: + await task_logger.log_task_failure( + log_entry, + f"Confluence credentials not found in connector config for connector {connector_id}", + "Missing Confluence credentials", + {"error_type": "MissingCredentials"}, + ) + return 0, "Confluence credentials not found in connector config" + + # Initialize Confluence client + await task_logger.log_task_progress( + log_entry, + f"Initializing Confluence client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + confluence_client = ConfluenceConnector( + base_url=confluence_base_url, email=confluence_email, api_token=confluence_api_token + ) + + # Calculate date range + if start_date is None or end_date is None: + # Fall back to calculating dates based on last_indexed_at + calculated_end_date = datetime.now() + + # Use last_indexed_at as start date if available, otherwise use 365 days ago + if connector.last_indexed_at: + # Convert dates to be comparable (both timezone-naive) + last_indexed_naive = ( + connector.last_indexed_at.replace(tzinfo=None) + if connector.last_indexed_at.tzinfo + else connector.last_indexed_at + ) + + # Check if last_indexed_at is in the future or after end_date + if last_indexed_naive > calculated_end_date: + logger.warning( + f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead." + ) + calculated_start_date = calculated_end_date - timedelta(days=365) + else: + calculated_start_date = last_indexed_naive + logger.info( + f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date" + ) + else: + calculated_start_date = calculated_end_date - timedelta( + days=365 + ) # Use 365 days as default + logger.info( + f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date" + ) + + # Use calculated dates if not provided + start_date_str = ( + start_date if start_date else calculated_start_date.strftime("%Y-%m-%d") + ) + end_date_str = ( + end_date if end_date else calculated_end_date.strftime("%Y-%m-%d") + ) + else: + # Use provided dates + start_date_str = start_date + end_date_str = end_date + + await task_logger.log_task_progress( + log_entry, + f"Fetching Confluence pages from {start_date_str} to {end_date_str}", + { + "stage": "fetching_pages", + "start_date": start_date_str, + "end_date": end_date_str, + }, + ) + + # Get pages within date range + try: + pages, error = confluence_client.get_pages_by_date_range( + start_date=start_date_str, end_date=end_date_str, include_comments=True + ) + + if error: + logger.error(f"Failed to get Confluence pages: {error}") + + # Don't treat "No pages found" as an error that should stop indexing + if "No pages found" in error: + logger.info( + "No pages found is not a critical error, continuing with update" + ) + if update_last_indexed: + connector.last_indexed_at = datetime.now() + await session.commit() + logger.info( + f"Updated last_indexed_at to {connector.last_indexed_at} despite no pages found" + ) + + await task_logger.log_task_success( + log_entry, + f"No Confluence pages found in date range {start_date_str} to {end_date_str}", + {"pages_found": 0}, + ) + return 0, None + else: + await task_logger.log_task_failure( + log_entry, + f"Failed to get Confluence pages: {error}", + "API Error", + {"error_type": "APIError"}, + ) + return 0, f"Failed to get Confluence pages: {error}" + + logger.info(f"Retrieved {len(pages)} pages from Confluence API") + + except Exception as e: + logger.error(f"Error fetching Confluence pages: {e!s}", exc_info=True) + return 0, f"Error fetching Confluence pages: {e!s}" + + # Process and index each page + documents_indexed = 0 + skipped_pages = [] + documents_skipped = 0 + + for page in pages: + try: + page_id = page.get("id") + page_title = page.get("title", "") + space_id = page.get("spaceId", "") + + if not page_id or not page_title: + logger.warning( + f"Skipping page with missing ID or title: {page_id or 'Unknown'}" + ) + skipped_pages.append( + f"{page_title or 'Unknown'} (missing data)" + ) + documents_skipped += 1 + continue + + # Extract page content + page_content = "" + if page.get("body") and page["body"].get("storage"): + page_content = page["body"]["storage"].get("value", "") + + # Add comments to content + comments = page.get("comments", []) + comments_content = "" + if comments: + comments_content = "\n\n## Comments\n\n" + for comment in comments: + comment_body = "" + if comment.get("body") and comment["body"].get("storage"): + comment_body = comment["body"]["storage"].get("value", "") + + comment_author = comment.get("version", {}).get("authorId", "Unknown") + comment_date = comment.get("version", {}).get("createdAt", "") + + comments_content += f"**Comment by {comment_author}** ({comment_date}):\n{comment_body}\n\n" + + # Combine page content with comments + full_content = f"# {page_title}\n\n{page_content}{comments_content}" + + if not full_content.strip(): + logger.warning( + f"Skipping page with no content: {page_title}" + ) + skipped_pages.append(f"{page_title} (no content)") + documents_skipped += 1 + continue + + # Create a simple summary + summary_content = f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n" + if page_content: + # Take first 500 characters of content for summary + content_preview = page_content[:500] + if len(page_content) > 500: + content_preview += "..." + summary_content += f"Content Preview: {content_preview}\n\n" + + # Add comment count + comment_count = len(comments) + summary_content += f"Comments: {comment_count}" + + # Generate content hash + content_hash = generate_content_hash(full_content, search_space_id) + + # Check if document already exists + existing_doc_by_hash_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document_by_hash = ( + existing_doc_by_hash_result.scalars().first() + ) + + if existing_document_by_hash: + logger.info( + f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing." + ) + documents_skipped += 1 + continue + + # Generate embedding for the summary + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks - using the full page content with comments + chunks = [ + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) + for chunk in config.chunker_instance.chunk(full_content) + ] + + # Create and store new document + logger.info( + f"Creating new document for page {page_title}" + ) + document = Document( + search_space_id=search_space_id, + title=f"Confluence - {page_title}", + document_type=DocumentType.CONFLUENCE_CONNECTOR, + document_metadata={ + "page_id": page_id, + "page_title": page_title, + "space_id": space_id, + "comment_count": comment_count, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + }, + content=summary_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks, + ) + + session.add(document) + documents_indexed += 1 + logger.info( + f"Successfully indexed new page {page_title}" + ) + + except Exception as e: + logger.error( + f"Error processing page {page.get('title', 'Unknown')}: {e!s}", + exc_info=True, + ) + skipped_pages.append( + f"{page.get('title', 'Unknown')} (processing error)" + ) + documents_skipped += 1 + continue # Skip this page and continue with others + + # Update the last_indexed_at timestamp for the connector only if requested + total_processed = documents_indexed + if update_last_indexed: + connector.last_indexed_at = datetime.now() + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") + + # Commit all changes + await session.commit() + logger.info("Successfully committed all Confluence document changes to database") + + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Confluence indexing for connector {connector_id}", + { + "pages_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_pages_count": len(skipped_pages), + }, + ) + + logger.info( + f"Confluence indexing completed: {documents_indexed} new pages, {documents_skipped} skipped" + ) + return ( + total_processed, + None, + ) # Return None as the error message to indicate success + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Confluence indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Confluence pages for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index Confluence pages: {e!s}", exc_info=True) + return 0, f"Failed to index Confluence pages: {e!s}"