update connector indexing / update connector service

This commit is contained in:
CREDO23 2025-07-24 11:52:06 +02:00
parent a6fe7e583b
commit ca98693005
2 changed files with 370 additions and 0 deletions

View file

@ -857,6 +857,120 @@ class ConnectorService:
return result_object, linear_chunks
async def search_jira(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
"""
Search for Jira issues and comments and return both the source information and langchain documents
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
search_mode: Search mode (CHUNKS or DOCUMENTS)
Returns:
tuple: (sources_info, langchain_documents)
"""
if search_mode == SearchMode.CHUNKS:
jira_chunks = await self.chunk_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="JIRA_CONNECTOR"
)
elif search_mode == SearchMode.DOCUMENTS:
jira_chunks = await self.document_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="JIRA_CONNECTOR"
)
# Transform document retriever results to match expected format
jira_chunks = self._transform_document_results(jira_chunks)
# Early return if no results
if not jira_chunks:
return {
"id": 10,
"name": "Jira Issues",
"type": "JIRA_CONNECTOR",
"sources": [],
}, []
# Process each chunk and create sources directly without deduplication
sources_list = []
async with self.counter_lock:
for _i, chunk in enumerate(jira_chunks):
# Extract document metadata
document = chunk.get('document', {})
metadata = document.get('metadata', {})
# Extract Jira-specific metadata
issue_key = metadata.get('issue_key', '')
issue_title = metadata.get('issue_title', 'Untitled Issue')
status = metadata.get('status', '')
priority = metadata.get('priority', '')
issue_type = metadata.get('issue_type', '')
comment_count = metadata.get('comment_count', 0)
# Create a more descriptive title for Jira issues
title = f"Jira: {issue_key} - {issue_title}"
if status:
title += f" ({status})"
# Create a more descriptive description for Jira issues
description = chunk.get('content', '')[:100]
if len(description) == 100:
description += "..."
# Add priority and type info to description
info_parts = []
if priority:
info_parts.append(f"Priority: {priority}")
if issue_type:
info_parts.append(f"Type: {issue_type}")
if comment_count:
info_parts.append(f"Comments: {comment_count}")
if info_parts:
if description:
description += f" | {' | '.join(info_parts)}"
else:
description = ' | '.join(info_parts)
# For URL, we could construct a URL to the Jira issue if we have the base URL
# For now, use a generic placeholder
url = ""
if issue_key and metadata.get('base_url'):
url = f"{metadata.get('base_url')}/browse/{issue_key}"
source = {
"id": document.get('id', self.source_id_counter),
"title": title,
"description": description,
"url": url,
"issue_key": issue_key,
"status": status,
"priority": priority,
"issue_type": issue_type,
"comment_count": comment_count
}
self.source_id_counter += 1
sources_list.append(source)
# Create result object
result_object = {
"id": 10, # Assign a unique ID for the Jira connector
"name": "Jira Issues",
"type": "JIRA_CONNECTOR",
"sources": sources_list,
}
return result_object, jira_chunks
async def search_linkup(self, user_query: str, user_id: str, mode: str = "standard") -> tuple:
"""
Search using Linkup API and return both the source information and documents

View file

@ -13,6 +13,7 @@ from app.connectors.notion_history import NotionHistoryConnector
from app.connectors.github_connector import GitHubConnector
from app.connectors.linear_connector import LinearConnector
from app.connectors.discord_connector import DiscordConnector
from app.connectors.jira_connector import JiraConnector
from slack_sdk.errors import SlackApiError
import logging
import asyncio
@ -1651,3 +1652,258 @@ async def index_discord_messages(
)
logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True)
return 0, f"Failed to index Discord messages: {str(e)}"
async def index_jira_issues(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str = None,
end_date: str = None,
update_last_indexed: bool = True
) -> Tuple[int, Optional[str]]:
"""
Index Jira issues and comments.
Args:
session: Database session
connector_id: ID of the Jira connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="jira_issues_indexing",
source="connector_indexing_task",
message=f"Starting Jira issues indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
# Get the connector from the database
result = await session.execute(
select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id)
)
connector = result.scalar_one_or_none()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found",
"Connector not found",
{"error_type": "ConnectorNotFound"}
)
return 0, f"Connector with ID {connector_id} not found"
# Get the Jira credentials from the connector config
jira_token = connector.config.get("JIRA_PERSONAL_ACCESS_TOKEN")
jira_base_url = connector.config.get("JIRA_BASE_URL")
if not jira_token or not jira_base_url:
await task_logger.log_task_failure(
log_entry,
f"Jira credentials not found in connector config for connector {connector_id}",
"Missing Jira credentials",
{"error_type": "MissingCredentials"}
)
return 0, "Jira credentials not found in connector config"
# Initialize Jira client
await task_logger.log_task_progress(
log_entry,
f"Initializing Jira client for connector {connector_id}",
{"stage": "client_initialization"}
)
jira_client = JiraConnector(base_url=jira_base_url, personal_access_token=jira_token)
# Calculate date range
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
if connector.last_indexed_at:
calculated_start_date = connector.last_indexed_at
else:
# If never indexed, go back 30 days
calculated_start_date = calculated_end_date - timedelta(days=30)
start_date_str = calculated_start_date.strftime('%Y-%m-%d')
end_date_str = calculated_end_date.strftime('%Y-%m-%d')
else:
start_date_str = start_date
end_date_str = end_date
await task_logger.log_task_progress(
log_entry,
f"Fetching Jira issues from {start_date_str} to {end_date_str}",
{"stage": "fetching_issues", "start_date": start_date_str, "end_date": end_date_str}
)
# Get issues within date range
try:
issues, error = jira_client.get_issues_by_date_range(
start_date=start_date_str,
end_date=end_date_str,
include_comments=True
)
if error:
logger.error(f"Failed to get Jira issues: {error}")
# Don't treat "No issues found" as an error that should stop indexing
if "No issues found" in error:
logger.info("No issues found is not a critical error, continuing with update")
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
await task_logger.log_task_completion(
log_entry,
f"No Jira issues found in date range {start_date_str} to {end_date_str}",
{"indexed_count": 0}
)
return 0, None
else:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Jira issues: {error}",
"API Error",
{"error_type": "APIError"}
)
return 0, f"Failed to get Jira issues: {error}"
logger.info(f"Retrieved {len(issues)} issues from Jira API")
await task_logger.log_task_progress(
log_entry,
f"Retrieved {len(issues)} issues from Jira API",
{"stage": "processing_issues", "issue_count": len(issues)}
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Error fetching Jira issues: {str(e)}",
"Fetch Error",
{"error_type": type(e).__name__}
)
logger.error(f"Error fetching Jira issues: {str(e)}", exc_info=True)
return 0, f"Error fetching Jira issues: {str(e)}"
# Process and index each issue
indexed_count = 0
for issue in issues:
try:
# Format the issue for better readability
formatted_issue = jira_client.format_issue(issue)
# Convert to markdown
issue_markdown = jira_client.format_issue_to_markdown(formatted_issue)
# Create document metadata
metadata = {
"issue_key": formatted_issue.get("key", ""),
"issue_title": formatted_issue.get("title", ""),
"status": formatted_issue.get("status", ""),
"priority": formatted_issue.get("priority", ""),
"issue_type": formatted_issue.get("issue_type", ""),
"project": formatted_issue.get("project", ""),
"assignee": formatted_issue.get("assignee", {}).get("display_name", "") if formatted_issue.get("assignee") else "",
"reporter": formatted_issue.get("reporter", {}).get("display_name", ""),
"created_at": formatted_issue.get("created_at", ""),
"updated_at": formatted_issue.get("updated_at", ""),
"comment_count": len(formatted_issue.get("comments", [])),
"connector_id": connector_id,
"source": "jira",
"base_url": jira_base_url
}
# Generate content hash
content_hash = generate_content_hash(issue_markdown)
# Check if document already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_doc = existing_doc_result.scalar_one_or_none()
if existing_doc:
logger.debug(f"Document with hash {content_hash} already exists, skipping")
continue
# Create new document
document = Document(
title=f"Jira: {formatted_issue.get('key', 'Unknown')} - {formatted_issue.get('title', 'Untitled')}",
document_type=DocumentType.JIRA_CONNECTOR,
document_metadata=metadata,
content=issue_markdown,
content_hash=content_hash,
search_space_id=search_space_id
)
# Generate embedding
embedding = await config.embedding_model_instance.get_embedding(issue_markdown)
document.embedding = embedding
session.add(document)
await session.flush() # Flush to get the document ID
# Create chunks for the document
chunks = await config.chunking_model_instance.chunk_document(issue_markdown)
for chunk_content in chunks:
chunk_embedding = await config.embedding_model_instance.get_embedding(chunk_content)
chunk = Chunk(
content=chunk_content,
embedding=chunk_embedding,
document_id=document.id
)
session.add(chunk)
indexed_count += 1
logger.debug(f"Indexed Jira issue: {formatted_issue.get('key', 'Unknown')}")
except Exception as e:
logger.error(f"Error processing Jira issue {issue.get('key', 'Unknown')}: {str(e)}", exc_info=True)
continue
# Commit all changes
await session.commit()
# Update last_indexed_at timestamp
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
await task_logger.log_task_completion(
log_entry,
f"Successfully indexed {indexed_count} Jira issues",
{"indexed_count": indexed_count}
)
logger.info(f"Successfully indexed {indexed_count} Jira issues")
return indexed_count, None
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to index Jira issues: {str(e)}",
str(e),
{"error_type": type(e).__name__}
)
logger.error(f"Failed to index Jira issues: {str(e)}", exc_info=True)
return 0, f"Failed to index Jira issues: {str(e)}"