fix issues indexing with jira connector

This commit is contained in:
CREDO23 2025-07-25 00:17:06 +02:00
parent 4984aab3f1
commit 655352fc09
4 changed files with 1262 additions and 1056 deletions

View file

@ -0,0 +1 @@
{"2d0ec64d93969318101ee479b664221b32241665":{"files":{"surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/page.tsx":["k+EnkTKWK6V5ATYvtszJhNn43kI=",true]},"modified":1753395412751}}

View file

@ -252,7 +252,7 @@ class JiraConnector:
fields.append("comment") fields.append("comment")
params = { params = {
"jql": jql, "jql": "",
"fields": ",".join(fields), "fields": ",".join(fields),
"maxResults": 100, "maxResults": 100,
"startAt": 0, "startAt": 0,

View file

@ -1,5 +1,4 @@
import asyncio import asyncio
import json
import logging import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple from typing import Optional, Tuple
@ -1376,9 +1375,9 @@ async def index_linear_issues(
# Process each issue # Process each issue
for issue in issues: for issue in issues:
try: try:
issue_id = issue.get("id") issue_id = issue.get("key")
issue_identifier = issue.get("identifier", "") issue_identifier = issue.get("id", "")
issue_title = issue.get("title", "") issue_title = issue.get("key", "")
if not issue_id or not issue_title: if not issue_id or not issue_title:
logger.warning( logger.warning(
@ -2026,11 +2025,13 @@ async def index_jira_issues(
try: try:
# Get the connector from the database # Get the connector from the database
result = await session.execute( result = await session.execute(
select(SearchSourceConnector).where( select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id SearchSourceConnector.id == connector_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
) )
) )
connector = result.scalar_one_or_none() connector = result.scalars().first()
if not connector: if not connector:
await task_logger.log_task_failure( await task_logger.log_task_failure(
@ -2071,15 +2072,43 @@ async def index_jira_issues(
# Fall back to calculating dates based on last_indexed_at # Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now() calculated_end_date = datetime.now()
# Use last_indexed_at as start date if available, otherwise use 365 days ago
if connector.last_indexed_at: if connector.last_indexed_at:
calculated_start_date = connector.last_indexed_at # Convert dates to be comparable (both timezone-naive)
else: last_indexed_naive = (
# If never indexed, go back 30 days connector.last_indexed_at.replace(tzinfo=None)
calculated_start_date = calculated_end_date - timedelta(days=30) if connector.last_indexed_at.tzinfo
else connector.last_indexed_at
)
start_date_str = calculated_start_date.strftime("%Y-%m-%d") # Check if last_indexed_at is in the future or after end_date
end_date_str = calculated_end_date.strftime("%Y-%m-%d") if last_indexed_naive > calculated_end_date:
logger.warning(
f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead."
)
calculated_start_date = calculated_end_date - timedelta(days=365)
else:
calculated_start_date = last_indexed_naive
logger.info(
f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
)
else:
calculated_start_date = calculated_end_date - timedelta(
days=365
) # Use 365 days as default
logger.info(
f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date"
)
# Use calculated dates if not provided
start_date_str = (
start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
)
end_date_str = (
end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
)
else: else:
# Use provided dates
start_date_str = start_date start_date_str = start_date
end_date_str = end_date end_date_str = end_date
@ -2099,8 +2128,6 @@ async def index_jira_issues(
start_date=start_date_str, end_date=end_date_str, include_comments=True start_date=start_date_str, end_date=end_date_str, include_comments=True
) )
print(json.dumps(issues, indent=2))
if error: if error:
logger.error(f"Failed to get Jira issues: {error}") logger.error(f"Failed to get Jira issues: {error}")
@ -2133,146 +2160,174 @@ async def index_jira_issues(
logger.info(f"Retrieved {len(issues)} issues from Jira API") logger.info(f"Retrieved {len(issues)} issues from Jira API")
await task_logger.log_task_progress(
log_entry,
f"Retrieved {len(issues)} issues from Jira API",
{"stage": "processing_issues", "issues_found": len(issues)},
)
except Exception as e: except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Error fetching Jira issues: {str(e)}",
"Fetch Error",
{"error_type": type(e).__name__},
)
logger.error(f"Error fetching Jira issues: {str(e)}", exc_info=True) logger.error(f"Error fetching Jira issues: {str(e)}", exc_info=True)
return 0, f"Error fetching Jira issues: {str(e)}" return 0, f"Error fetching Jira issues: {str(e)}"
# Process and index each issue # Process and index each issue
indexed_count = 0 documents_indexed = 0
skipped_issues = []
documents_skipped = 0
for issue in issues: for issue in issues:
try: try:
issue_id = issue.get("key")
issue_identifier = issue.get("key", "")
issue_title = issue.get("id", "")
if not issue_id or not issue_title:
logger.warning(
f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}"
)
skipped_issues.append(
f"{issue_identifier or 'Unknown'} (missing data)"
)
documents_skipped += 1
continue
# Format the issue for better readability # Format the issue for better readability
formatted_issue = jira_client.format_issue(issue) formatted_issue = jira_client.format_issue(issue)
# Convert to markdown # Convert to markdown
issue_markdown = jira_client.format_issue_to_markdown(formatted_issue) issue_content = jira_client.format_issue_to_markdown(formatted_issue)
# Create document metadata if not issue_content:
metadata = { logger.warning(
"issue_key": formatted_issue.get("key", ""), f"Skipping issue with no content: {issue_identifier} - {issue_title}"
"issue_title": formatted_issue.get("title", ""),
"status": formatted_issue.get("status", ""),
"priority": formatted_issue.get("priority", ""),
"issue_type": formatted_issue.get("issue_type", ""),
"project": formatted_issue.get("project", ""),
"assignee": (
formatted_issue.get("assignee", {}).get("display_name", "")
if formatted_issue.get("assignee")
else ""
),
"reporter": formatted_issue.get("reporter", {}).get(
"display_name", ""
),
"created_at": formatted_issue.get("created_at", ""),
"updated_at": formatted_issue.get("updated_at", ""),
"comment_count": len(formatted_issue.get("comments", [])),
"connector_id": connector_id,
"source": "jira",
"base_url": jira_base_url,
}
# Generate content hash
content_hash = generate_content_hash(issue_markdown)
# Check if document already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_doc = existing_doc_result.scalar_one_or_none()
if existing_doc:
logger.debug(
f"Document with hash {content_hash} already exists, skipping"
) )
skipped_issues.append(f"{issue_identifier} (no content)")
documents_skipped += 1
continue continue
# Create new document # Create a simple summary
document = Document( summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n"
title=f"Jira: {formatted_issue.get('key', 'Unknown')} - {formatted_issue.get('title', 'Untitled')}", if formatted_issue.get("description"):
document_type=DocumentType.JIRA_CONNECTOR, summary_content += (
document_metadata=metadata, f"Description: {formatted_issue.get('description')}\n\n"
content=issue_markdown, )
content_hash=content_hash,
search_space_id=search_space_id, # Add comment count
comment_count = len(formatted_issue.get("comments", []))
summary_content += f"Comments: {comment_count}"
# Generate content hash
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = (
existing_doc_by_hash_result.scalars().first()
) )
# Generate embedding if existing_document_by_hash:
embedding = await config.embedding_model_instance.get_embedding( logger.info(
issue_markdown f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing."
)
documents_skipped += 1
continue
# Generate embedding for the summary
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full issue content with comments
chunks = [
Chunk(
content=chunk.text,
embedding=config.embedding_model_instance.embed(chunk.text),
)
for chunk in config.chunker_instance.chunk(issue_content)
]
# Create and store new document
logger.info(
f"Creating new document for issue {issue_identifier} - {issue_title}"
)
document = Document(
search_space_id=search_space_id,
title=f"Jira - {issue_identifier}: {issue_title}",
document_type=DocumentType.JIRA_CONNECTOR,
document_metadata={
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": formatted_issue.get("status", "Unknown"),
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks,
) )
document.embedding = embedding
session.add(document) session.add(document)
await session.flush() # Flush to get the document ID documents_indexed += 1
logger.info(
# Create chunks for the document f"Successfully indexed new issue {issue_identifier} - {issue_title}"
chunks = await config.chunking_model_instance.chunk_document(
issue_markdown
)
for chunk_content in chunks:
chunk_embedding = (
await config.embedding_model_instance.get_embedding(
chunk_content
)
)
chunk = Chunk(
content=chunk_content,
embedding=chunk_embedding,
document_id=document.id,
)
session.add(chunk)
indexed_count += 1
logger.debug(
f"Indexed Jira issue: {formatted_issue.get('key', 'Unknown')}"
) )
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error processing Jira issue {issue.get('key', 'Unknown')}: {str(e)}", f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}",
exc_info=True, exc_info=True,
) )
continue skipped_issues.append(
f"{issue.get('identifier', 'Unknown')} (processing error)"
)
documents_skipped += 1
continue # Skip this issue and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Commit all changes # Commit all changes
await session.commit() await session.commit()
logger.info("Successfully committed all JIRA document changes to database")
# Update last_indexed_at timestamp # Log success
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
await task_logger.log_task_success( await task_logger.log_task_success(
log_entry, log_entry,
f"Successfully indexed {indexed_count} Jira issues", f"Successfully completed JIRA indexing for connector {connector_id}",
{"issues_indexed": indexed_count}, {
"issues_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_issues_count": len(skipped_issues),
},
) )
logger.info(f"Successfully indexed {indexed_count} Jira issues") logger.info(
return indexed_count, None f"JIRA indexing completed: {documents_indexed} new issues, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except Exception as e: except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, log_entry,
f"Failed to index Jira issues: {str(e)}", f"Database error during JIRA indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index JIRA issues for connector {connector_id}",
str(e), str(e),
{"error_type": type(e).__name__}, {"error_type": type(e).__name__},
) )
logger.error(f"Failed to index Jira issues: {str(e)}", exc_info=True) logger.error(f"Failed to index JIRA issues: {str(e)}", exc_info=True)
return 0, f"Failed to index Jira issues: {str(e)}" return 0, f"Failed to index JIRA issues: {str(e)}"