mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-01 18:19:08 +00:00
suppport confluence pages indexing
This commit is contained in:
parent
e5bb26ea13
commit
131d362f1e
1 changed files with 369 additions and 0 deletions
|
@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
from sqlalchemy.future import select
|
from sqlalchemy.future import select
|
||||||
|
|
||||||
from app.config import config
|
from app.config import config
|
||||||
|
from app.connectors.confluence_connector import ConfluenceConnector
|
||||||
from app.connectors.discord_connector import DiscordConnector
|
from app.connectors.discord_connector import DiscordConnector
|
||||||
from app.connectors.github_connector import GitHubConnector
|
from app.connectors.github_connector import GitHubConnector
|
||||||
from app.connectors.jira_connector import JiraConnector
|
from app.connectors.jira_connector import JiraConnector
|
||||||
|
@ -2329,3 +2330,371 @@ async def index_jira_issues(
|
||||||
)
|
)
|
||||||
logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True)
|
logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True)
|
||||||
return 0, f"Failed to index JIRA issues: {e!s}"
|
return 0, f"Failed to index JIRA issues: {e!s}"
|
||||||
|
|
||||||
|
|
||||||
|
async def index_confluence_pages(
|
||||||
|
session: AsyncSession,
|
||||||
|
connector_id: int,
|
||||||
|
search_space_id: int,
|
||||||
|
user_id: str,
|
||||||
|
start_date: str | None = None,
|
||||||
|
end_date: str | None = None,
|
||||||
|
update_last_indexed: bool = True,
|
||||||
|
) -> tuple[int, str | None]:
|
||||||
|
"""
|
||||||
|
Index Confluence pages and comments.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: Database session
|
||||||
|
connector_id: ID of the Confluence connector
|
||||||
|
search_space_id: ID of the search space to store documents in
|
||||||
|
user_id: User ID
|
||||||
|
start_date: Start date for indexing (YYYY-MM-DD format)
|
||||||
|
end_date: End date for indexing (YYYY-MM-DD format)
|
||||||
|
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple containing (number of documents indexed, error message or None)
|
||||||
|
"""
|
||||||
|
task_logger = TaskLoggingService(session, search_space_id)
|
||||||
|
|
||||||
|
# Log task start
|
||||||
|
log_entry = await task_logger.log_task_start(
|
||||||
|
task_name="confluence_pages_indexing",
|
||||||
|
source="connector_indexing_task",
|
||||||
|
message=f"Starting Confluence pages indexing for connector {connector_id}",
|
||||||
|
metadata={
|
||||||
|
"connector_id": connector_id,
|
||||||
|
"user_id": str(user_id),
|
||||||
|
"start_date": start_date,
|
||||||
|
"end_date": end_date,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get the connector from the database
|
||||||
|
result = await session.execute(
|
||||||
|
select(SearchSourceConnector).filter(
|
||||||
|
SearchSourceConnector.id == connector_id,
|
||||||
|
SearchSourceConnector.connector_type
|
||||||
|
== SearchSourceConnectorType.CONFLUENCE_CONNECTOR,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
connector = result.scalars().first()
|
||||||
|
|
||||||
|
if not connector:
|
||||||
|
await task_logger.log_task_failure(
|
||||||
|
log_entry,
|
||||||
|
f"Connector with ID {connector_id} not found",
|
||||||
|
"Connector not found",
|
||||||
|
{"error_type": "ConnectorNotFound"},
|
||||||
|
)
|
||||||
|
return 0, f"Connector with ID {connector_id} not found"
|
||||||
|
|
||||||
|
# Get the Confluence credentials from the connector config
|
||||||
|
confluence_email = connector.config.get("CONFLUENCE_EMAIL")
|
||||||
|
confluence_api_token = connector.config.get("CONFLUENCE_API_TOKEN")
|
||||||
|
confluence_base_url = connector.config.get("CONFLUENCE_BASE_URL")
|
||||||
|
|
||||||
|
if not confluence_email or not confluence_api_token or not confluence_base_url:
|
||||||
|
await task_logger.log_task_failure(
|
||||||
|
log_entry,
|
||||||
|
f"Confluence credentials not found in connector config for connector {connector_id}",
|
||||||
|
"Missing Confluence credentials",
|
||||||
|
{"error_type": "MissingCredentials"},
|
||||||
|
)
|
||||||
|
return 0, "Confluence credentials not found in connector config"
|
||||||
|
|
||||||
|
# Initialize Confluence client
|
||||||
|
await task_logger.log_task_progress(
|
||||||
|
log_entry,
|
||||||
|
f"Initializing Confluence client for connector {connector_id}",
|
||||||
|
{"stage": "client_initialization"},
|
||||||
|
)
|
||||||
|
|
||||||
|
confluence_client = ConfluenceConnector(
|
||||||
|
base_url=confluence_base_url, email=confluence_email, api_token=confluence_api_token
|
||||||
|
)
|
||||||
|
|
||||||
|
# Calculate date range
|
||||||
|
if start_date is None or end_date is None:
|
||||||
|
# Fall back to calculating dates based on last_indexed_at
|
||||||
|
calculated_end_date = datetime.now()
|
||||||
|
|
||||||
|
# Use last_indexed_at as start date if available, otherwise use 365 days ago
|
||||||
|
if connector.last_indexed_at:
|
||||||
|
# Convert dates to be comparable (both timezone-naive)
|
||||||
|
last_indexed_naive = (
|
||||||
|
connector.last_indexed_at.replace(tzinfo=None)
|
||||||
|
if connector.last_indexed_at.tzinfo
|
||||||
|
else connector.last_indexed_at
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if last_indexed_at is in the future or after end_date
|
||||||
|
if last_indexed_naive > calculated_end_date:
|
||||||
|
logger.warning(
|
||||||
|
f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead."
|
||||||
|
)
|
||||||
|
calculated_start_date = calculated_end_date - timedelta(days=365)
|
||||||
|
else:
|
||||||
|
calculated_start_date = last_indexed_naive
|
||||||
|
logger.info(
|
||||||
|
f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
calculated_start_date = calculated_end_date - timedelta(
|
||||||
|
days=365
|
||||||
|
) # Use 365 days as default
|
||||||
|
logger.info(
|
||||||
|
f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use calculated dates if not provided
|
||||||
|
start_date_str = (
|
||||||
|
start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
|
||||||
|
)
|
||||||
|
end_date_str = (
|
||||||
|
end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Use provided dates
|
||||||
|
start_date_str = start_date
|
||||||
|
end_date_str = end_date
|
||||||
|
|
||||||
|
await task_logger.log_task_progress(
|
||||||
|
log_entry,
|
||||||
|
f"Fetching Confluence pages from {start_date_str} to {end_date_str}",
|
||||||
|
{
|
||||||
|
"stage": "fetching_pages",
|
||||||
|
"start_date": start_date_str,
|
||||||
|
"end_date": end_date_str,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get pages within date range
|
||||||
|
try:
|
||||||
|
pages, error = confluence_client.get_pages_by_date_range(
|
||||||
|
start_date=start_date_str, end_date=end_date_str, include_comments=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if error:
|
||||||
|
logger.error(f"Failed to get Confluence pages: {error}")
|
||||||
|
|
||||||
|
# Don't treat "No pages found" as an error that should stop indexing
|
||||||
|
if "No pages found" in error:
|
||||||
|
logger.info(
|
||||||
|
"No pages found is not a critical error, continuing with update"
|
||||||
|
)
|
||||||
|
if update_last_indexed:
|
||||||
|
connector.last_indexed_at = datetime.now()
|
||||||
|
await session.commit()
|
||||||
|
logger.info(
|
||||||
|
f"Updated last_indexed_at to {connector.last_indexed_at} despite no pages found"
|
||||||
|
)
|
||||||
|
|
||||||
|
await task_logger.log_task_success(
|
||||||
|
log_entry,
|
||||||
|
f"No Confluence pages found in date range {start_date_str} to {end_date_str}",
|
||||||
|
{"pages_found": 0},
|
||||||
|
)
|
||||||
|
return 0, None
|
||||||
|
else:
|
||||||
|
await task_logger.log_task_failure(
|
||||||
|
log_entry,
|
||||||
|
f"Failed to get Confluence pages: {error}",
|
||||||
|
"API Error",
|
||||||
|
{"error_type": "APIError"},
|
||||||
|
)
|
||||||
|
return 0, f"Failed to get Confluence pages: {error}"
|
||||||
|
|
||||||
|
logger.info(f"Retrieved {len(pages)} pages from Confluence API")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching Confluence pages: {e!s}", exc_info=True)
|
||||||
|
return 0, f"Error fetching Confluence pages: {e!s}"
|
||||||
|
|
||||||
|
# Process and index each page
|
||||||
|
documents_indexed = 0
|
||||||
|
skipped_pages = []
|
||||||
|
documents_skipped = 0
|
||||||
|
|
||||||
|
for page in pages:
|
||||||
|
try:
|
||||||
|
page_id = page.get("id")
|
||||||
|
page_title = page.get("title", "")
|
||||||
|
space_id = page.get("spaceId", "")
|
||||||
|
|
||||||
|
if not page_id or not page_title:
|
||||||
|
logger.warning(
|
||||||
|
f"Skipping page with missing ID or title: {page_id or 'Unknown'}"
|
||||||
|
)
|
||||||
|
skipped_pages.append(
|
||||||
|
f"{page_title or 'Unknown'} (missing data)"
|
||||||
|
)
|
||||||
|
documents_skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Extract page content
|
||||||
|
page_content = ""
|
||||||
|
if page.get("body") and page["body"].get("storage"):
|
||||||
|
page_content = page["body"]["storage"].get("value", "")
|
||||||
|
|
||||||
|
# Add comments to content
|
||||||
|
comments = page.get("comments", [])
|
||||||
|
comments_content = ""
|
||||||
|
if comments:
|
||||||
|
comments_content = "\n\n## Comments\n\n"
|
||||||
|
for comment in comments:
|
||||||
|
comment_body = ""
|
||||||
|
if comment.get("body") and comment["body"].get("storage"):
|
||||||
|
comment_body = comment["body"]["storage"].get("value", "")
|
||||||
|
|
||||||
|
comment_author = comment.get("version", {}).get("authorId", "Unknown")
|
||||||
|
comment_date = comment.get("version", {}).get("createdAt", "")
|
||||||
|
|
||||||
|
comments_content += f"**Comment by {comment_author}** ({comment_date}):\n{comment_body}\n\n"
|
||||||
|
|
||||||
|
# Combine page content with comments
|
||||||
|
full_content = f"# {page_title}\n\n{page_content}{comments_content}"
|
||||||
|
|
||||||
|
if not full_content.strip():
|
||||||
|
logger.warning(
|
||||||
|
f"Skipping page with no content: {page_title}"
|
||||||
|
)
|
||||||
|
skipped_pages.append(f"{page_title} (no content)")
|
||||||
|
documents_skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Create a simple summary
|
||||||
|
summary_content = f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n"
|
||||||
|
if page_content:
|
||||||
|
# Take first 500 characters of content for summary
|
||||||
|
content_preview = page_content[:500]
|
||||||
|
if len(page_content) > 500:
|
||||||
|
content_preview += "..."
|
||||||
|
summary_content += f"Content Preview: {content_preview}\n\n"
|
||||||
|
|
||||||
|
# Add comment count
|
||||||
|
comment_count = len(comments)
|
||||||
|
summary_content += f"Comments: {comment_count}"
|
||||||
|
|
||||||
|
# Generate content hash
|
||||||
|
content_hash = generate_content_hash(full_content, search_space_id)
|
||||||
|
|
||||||
|
# Check if document already exists
|
||||||
|
existing_doc_by_hash_result = await session.execute(
|
||||||
|
select(Document).where(Document.content_hash == content_hash)
|
||||||
|
)
|
||||||
|
existing_document_by_hash = (
|
||||||
|
existing_doc_by_hash_result.scalars().first()
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing_document_by_hash:
|
||||||
|
logger.info(
|
||||||
|
f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing."
|
||||||
|
)
|
||||||
|
documents_skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Generate embedding for the summary
|
||||||
|
summary_embedding = config.embedding_model_instance.embed(
|
||||||
|
summary_content
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process chunks - using the full page content with comments
|
||||||
|
chunks = [
|
||||||
|
Chunk(
|
||||||
|
content=chunk.text,
|
||||||
|
embedding=config.embedding_model_instance.embed(chunk.text),
|
||||||
|
)
|
||||||
|
for chunk in config.chunker_instance.chunk(full_content)
|
||||||
|
]
|
||||||
|
|
||||||
|
# Create and store new document
|
||||||
|
logger.info(
|
||||||
|
f"Creating new document for page {page_title}"
|
||||||
|
)
|
||||||
|
document = Document(
|
||||||
|
search_space_id=search_space_id,
|
||||||
|
title=f"Confluence - {page_title}",
|
||||||
|
document_type=DocumentType.CONFLUENCE_CONNECTOR,
|
||||||
|
document_metadata={
|
||||||
|
"page_id": page_id,
|
||||||
|
"page_title": page_title,
|
||||||
|
"space_id": space_id,
|
||||||
|
"comment_count": comment_count,
|
||||||
|
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
|
},
|
||||||
|
content=summary_content,
|
||||||
|
content_hash=content_hash,
|
||||||
|
embedding=summary_embedding,
|
||||||
|
chunks=chunks,
|
||||||
|
)
|
||||||
|
|
||||||
|
session.add(document)
|
||||||
|
documents_indexed += 1
|
||||||
|
logger.info(
|
||||||
|
f"Successfully indexed new page {page_title}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error processing page {page.get('title', 'Unknown')}: {e!s}",
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
skipped_pages.append(
|
||||||
|
f"{page.get('title', 'Unknown')} (processing error)"
|
||||||
|
)
|
||||||
|
documents_skipped += 1
|
||||||
|
continue # Skip this page and continue with others
|
||||||
|
|
||||||
|
# Update the last_indexed_at timestamp for the connector only if requested
|
||||||
|
total_processed = documents_indexed
|
||||||
|
if update_last_indexed:
|
||||||
|
connector.last_indexed_at = datetime.now()
|
||||||
|
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
|
||||||
|
|
||||||
|
# Commit all changes
|
||||||
|
await session.commit()
|
||||||
|
logger.info("Successfully committed all Confluence document changes to database")
|
||||||
|
|
||||||
|
# Log success
|
||||||
|
await task_logger.log_task_success(
|
||||||
|
log_entry,
|
||||||
|
f"Successfully completed Confluence indexing for connector {connector_id}",
|
||||||
|
{
|
||||||
|
"pages_processed": total_processed,
|
||||||
|
"documents_indexed": documents_indexed,
|
||||||
|
"documents_skipped": documents_skipped,
|
||||||
|
"skipped_pages_count": len(skipped_pages),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Confluence indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
total_processed,
|
||||||
|
None,
|
||||||
|
) # Return None as the error message to indicate success
|
||||||
|
|
||||||
|
except SQLAlchemyError as db_error:
|
||||||
|
await session.rollback()
|
||||||
|
await task_logger.log_task_failure(
|
||||||
|
log_entry,
|
||||||
|
f"Database error during Confluence indexing for connector {connector_id}",
|
||||||
|
str(db_error),
|
||||||
|
{"error_type": "SQLAlchemyError"},
|
||||||
|
)
|
||||||
|
logger.error(f"Database error: {db_error!s}", exc_info=True)
|
||||||
|
return 0, f"Database error: {db_error!s}"
|
||||||
|
except Exception as e:
|
||||||
|
await session.rollback()
|
||||||
|
await task_logger.log_task_failure(
|
||||||
|
log_entry,
|
||||||
|
f"Failed to index Confluence pages for connector {connector_id}",
|
||||||
|
str(e),
|
||||||
|
{"error_type": type(e).__name__},
|
||||||
|
)
|
||||||
|
logger.error(f"Failed to index Confluence pages: {e!s}", exc_info=True)
|
||||||
|
return 0, f"Failed to index Confluence pages: {e!s}"
|
||||||
|
|
Loading…
Add table
Reference in a new issue