feat: Added content based hashing to prevent duplicates and fix resync issues

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-05-28 23:52:00 -07:00
parent 38516e74f9
commit 5411bac8e0
17 changed files with 297 additions and 334 deletions

View file

@ -14,6 +14,8 @@ from app.connectors.linear_connector import LinearConnector
from slack_sdk.errors import SlackApiError
import logging
from app.utils.document_converters import generate_content_hash
# Set up logging
logger = logging.getLogger(__name__)
@ -67,13 +69,13 @@ async def index_slack_messages(
# Check if last_indexed_at is in the future or after end_date
if last_indexed_naive > end_date:
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead.")
start_date = end_date - timedelta(days=30)
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.")
start_date = end_date - timedelta(days=365)
else:
start_date = last_indexed_naive
logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date")
else:
start_date = end_date - timedelta(days=30) # Use 30 days instead of 365 to catch recent issues
start_date = end_date - timedelta(days=365) # Use 365 days as default
logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (30 days ago) as start date")
# Format dates for Slack API
@ -89,27 +91,8 @@ async def index_slack_messages(
if not channels:
return 0, "No Slack channels found"
# Get existing documents for this search space and connector type to prevent duplicates
existing_docs_result = await session.execute(
select(Document)
.filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.SLACK_CONNECTOR
)
)
existing_docs = existing_docs_result.scalars().all()
# Create a lookup dictionary of existing documents by channel_id
existing_docs_by_channel_id = {}
for doc in existing_docs:
if "channel_id" in doc.document_metadata:
existing_docs_by_channel_id[doc.document_metadata["channel_id"]] = doc
logger.info(f"Found {len(existing_docs_by_channel_id)} existing Slack documents in database")
# Track the number of documents indexed
documents_indexed = 0
documents_updated = 0
documents_skipped = 0
skipped_channels = []
@ -189,10 +172,9 @@ async def index_slack_messages(
("METADATA", [
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"START_DATE: {start_date_str}",
f"END_DATE: {end_date_str}",
f"MESSAGE_COUNT: {len(formatted_messages)}",
f"INDEXED_AT: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
# f"START_DATE: {start_date_str}",
# f"END_DATE: {end_date_str}",
f"MESSAGE_COUNT: {len(formatted_messages)}"
]),
("CONTENT", [
"FORMAT: markdown",
@ -213,6 +195,18 @@ async def index_slack_messages(
document_parts.append("</DOCUMENT>")
combined_document_string = '\n'.join(document_parts)
content_hash = generate_content_hash(combined_document_string)
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
if existing_document_by_hash:
logger.info(f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing.")
documents_skipped += 1
continue
# Generate summary
summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance
@ -226,61 +220,28 @@ async def index_slack_messages(
for chunk in config.chunker_instance.chunk(channel_content)
]
# Check if this channel already exists in our database
existing_document = existing_docs_by_channel_id.get(channel_id)
if existing_document:
# Update existing document instead of creating a new one
logger.info(f"Updating existing document for channel {channel_name}")
# Update document fields
existing_document.title = f"Slack - {channel_name}"
existing_document.document_metadata = {
# Create and store new document
document = Document(
search_space_id=search_space_id,
title=f"Slack - {channel_name}",
document_type=DocumentType.SLACK_CONNECTOR,
document_metadata={
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
existing_document.content = summary_content
existing_document.embedding = summary_embedding
# Delete existing chunks and add new ones
await session.execute(
delete(Chunk)
.where(Chunk.document_id == existing_document.id)
)
# Assign new chunks to existing document
for chunk in chunks:
chunk.document_id = existing_document.id
session.add(chunk)
documents_updated += 1
else:
# Create and store new document
document = Document(
search_space_id=search_space_id,
title=f"Slack - {channel_name}",
document_type=DocumentType.SLACK_CONNECTOR,
document_metadata={
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages")
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages")
except SlackApiError as slack_error:
logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}")
@ -295,7 +256,7 @@ async def index_slack_messages(
# Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one channel
total_processed = documents_indexed + documents_updated
total_processed = documents_indexed
if update_last_indexed and total_processed > 0:
connector.last_indexed_at = datetime.now()
@ -305,11 +266,11 @@ async def index_slack_messages(
# Prepare result message
result_message = None
if skipped_channels:
result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated). Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
result_message = f"Processed {total_processed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
else:
result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated)."
result_message = f"Processed {total_processed} channels."
logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_updated} updated, {documents_skipped} skipped")
logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
@ -386,27 +347,8 @@ async def index_notion_pages(
logger.info("No Notion pages found to index")
return 0, "No Notion pages found"
# Get existing documents for this search space and connector type to prevent duplicates
existing_docs_result = await session.execute(
select(Document)
.filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.NOTION_CONNECTOR
)
)
existing_docs = existing_docs_result.scalars().all()
# Create a lookup dictionary of existing documents by page_id
existing_docs_by_page_id = {}
for doc in existing_docs:
if "page_id" in doc.document_metadata:
existing_docs_by_page_id[doc.document_metadata["page_id"]] = doc
logger.info(f"Found {len(existing_docs_by_page_id)} existing Notion documents in database")
# Track the number of documents indexed
documents_indexed = 0
documents_updated = 0
documents_skipped = 0
skipped_pages = []
@ -482,8 +424,7 @@ async def index_notion_pages(
metadata_sections = [
("METADATA", [
f"PAGE_TITLE: {page_title}",
f"PAGE_ID: {page_id}",
f"INDEXED_AT: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
f"PAGE_ID: {page_id}"
]),
("CONTENT", [
"FORMAT: markdown",
@ -504,6 +445,18 @@ async def index_notion_pages(
document_parts.append("</DOCUMENT>")
combined_document_string = '\n'.join(document_parts)
content_hash = generate_content_hash(combined_document_string)
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
if existing_document_by_hash:
logger.info(f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing.")
documents_skipped += 1
continue
# Generate summary
logger.debug(f"Generating summary for page {page_title}")
@ -519,55 +472,25 @@ async def index_notion_pages(
for chunk in config.chunker_instance.chunk(markdown_content)
]
# Check if this page already exists in our database
existing_document = existing_docs_by_page_id.get(page_id)
if existing_document:
# Update existing document instead of creating a new one
logger.info(f"Updating existing document for page {page_title}")
# Update document fields
existing_document.title = f"Notion - {page_title}"
existing_document.document_metadata = {
# Create and store new document
document = Document(
search_space_id=search_space_id,
title=f"Notion - {page_title}",
document_type=DocumentType.NOTION_CONNECTOR,
document_metadata={
"page_title": page_title,
"page_id": page_id,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
existing_document.content = summary_content
existing_document.embedding = summary_embedding
# Delete existing chunks and add new ones
await session.execute(
delete(Chunk)
.where(Chunk.document_id == existing_document.id)
)
# Assign new chunks to existing document
for chunk in chunks:
chunk.document_id = existing_document.id
session.add(chunk)
documents_updated += 1
else:
# Create and store new document
document = Document(
search_space_id=search_space_id,
title=f"Notion - {page_title}",
document_type=DocumentType.NOTION_CONNECTOR,
document_metadata={
"page_title": page_title,
"page_id": page_id,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new Notion page: {page_title}")
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new Notion page: {page_title}")
except Exception as e:
logger.error(f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}", exc_info=True)
@ -577,7 +500,7 @@ async def index_notion_pages(
# Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one page
total_processed = documents_indexed + documents_updated
total_processed = documents_indexed
if update_last_indexed and total_processed > 0:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at for connector {connector_id}")
@ -588,11 +511,11 @@ async def index_notion_pages(
# Prepare result message
result_message = None
if skipped_pages:
result_message = f"Processed {total_processed} pages ({documents_indexed} new, {documents_updated} updated). Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}"
result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}"
else:
result_message = f"Processed {total_processed} pages ({documents_indexed} new, {documents_updated} updated)."
result_message = f"Processed {total_processed} pages."
logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_updated} updated, {documents_skipped} skipped")
logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
@ -660,19 +583,6 @@ async def index_github_repos(
# If a repo is inaccessible, get_repository_files will likely fail gracefully later.
logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.")
# 5. Get existing documents for this search space and connector type to prevent duplicates
existing_docs_result = await session.execute(
select(Document)
.filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GITHUB_CONNECTOR
)
)
existing_docs = existing_docs_result.scalars().all()
# Create a lookup dict: key=repo_fullname/file_path, value=Document object
existing_docs_lookup = {doc.document_metadata.get("full_path"): doc for doc in existing_docs if doc.document_metadata.get("full_path")}
logger.info(f"Found {len(existing_docs_lookup)} existing GitHub documents in database for search space {search_space_id}")
# 6. Iterate through selected repositories and index files
for repo_full_name in repo_full_names_to_index:
if not repo_full_name or not isinstance(repo_full_name, str):
@ -699,12 +609,6 @@ async def index_github_repos(
logger.warning(f"Skipping file with missing info in {repo_full_name}: {file_info}")
continue
# Check if document already exists and if content hash matches
existing_doc = existing_docs_lookup.get(full_path_key)
if existing_doc and existing_doc.document_metadata.get("sha") == file_sha:
logger.debug(f"Skipping unchanged file: {full_path_key}")
continue # Skip if SHA matches (content hasn't changed)
# Get file content
file_content = github_client.get_file_content(repo_full_name, file_path)
@ -712,6 +616,18 @@ async def index_github_repos(
logger.warning(f"Could not retrieve content for {full_path_key}. Skipping.")
continue # Skip if content fetch failed
content_hash = generate_content_hash(file_content)
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
if existing_document_by_hash:
logger.info(f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing.")
continue
# Use file_content directly for chunking, maybe summary for main content?
# For now, let's use the full content for both, might need refinement
summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary
@ -738,42 +654,20 @@ async def index_github_repos(
"indexed_at": datetime.now(timezone.utc).isoformat()
}
if existing_doc:
# Update existing document
logger.info(f"Updating document for file: {full_path_key}")
existing_doc.title = f"GitHub - {file_path}"
existing_doc.document_metadata = doc_metadata
existing_doc.content = summary_content # Update summary
existing_doc.embedding = summary_embedding # Update embedding
# Delete old chunks
await session.execute(
delete(Chunk)
.where(Chunk.document_id == existing_doc.id)
)
# Add new chunks
for chunk_obj in chunks_data:
chunk_obj.document_id = existing_doc.id
session.add(chunk_obj)
documents_processed += 1
else:
# Create new document
logger.info(f"Creating new document for file: {full_path_key}")
document = Document(
title=f"GitHub - {file_path}",
document_type=DocumentType.GITHUB_CONNECTOR,
document_metadata=doc_metadata,
content=summary_content, # Store summary
embedding=summary_embedding,
search_space_id=search_space_id,
chunks=chunks_data # Associate chunks directly
)
session.add(document)
documents_processed += 1
# Commit periodically or at the end? For now, commit per repo
# await session.commit()
# Create new document
logger.info(f"Creating new document for file: {full_path_key}")
document = Document(
title=f"GitHub - {file_path}",
document_type=DocumentType.GITHUB_CONNECTOR,
document_metadata=doc_metadata,
content=summary_content, # Store summary
content_hash=content_hash,
embedding=summary_embedding,
search_space_id=search_space_id,
chunks=chunks_data # Associate chunks directly
)
session.add(document)
documents_processed += 1
except Exception as repo_err:
logger.error(f"Failed to process repository {repo_full_name}: {repo_err}")
@ -847,14 +741,14 @@ async def index_linear_issues(
# Check if last_indexed_at is in the future or after end_date
if last_indexed_naive > end_date:
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead.")
start_date = end_date - timedelta(days=30)
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.")
start_date = end_date - timedelta(days=365)
else:
start_date = last_indexed_naive
logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date")
else:
start_date = end_date - timedelta(days=30) # Use 30 days instead of 365 to catch recent issues
logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (30 days ago) as start date")
start_date = end_date - timedelta(days=365) # Use 365 days as default
logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
# Format dates for Linear API
start_date_str = start_date.strftime("%Y-%m-%d")
@ -905,35 +799,8 @@ async def index_linear_issues(
if len(issues) > 10:
logger.info(f" ...and {len(issues) - 10} more issues")
# Get existing documents for this search space and connector type to prevent duplicates
existing_docs_result = await session.execute(
select(Document)
.filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.LINEAR_CONNECTOR
)
)
existing_docs = existing_docs_result.scalars().all()
# Create a lookup dictionary of existing documents by issue_id
existing_docs_by_issue_id = {}
for doc in existing_docs:
if "issue_id" in doc.document_metadata:
existing_docs_by_issue_id[doc.document_metadata["issue_id"]] = doc
logger.info(f"Found {len(existing_docs_by_issue_id)} existing Linear documents in database")
# Log existing document IDs for debugging
if existing_docs_by_issue_id:
logger.info("Existing Linear document issue IDs in database:")
for idx, (issue_id, doc) in enumerate(list(existing_docs_by_issue_id.items())[:10]): # Log first 10
logger.info(f" {idx+1}. {issue_id} - {doc.document_metadata.get('issue_identifier', 'Unknown')} - {doc.document_metadata.get('issue_title', 'Unknown')}")
if len(existing_docs_by_issue_id) > 10:
logger.info(f" ...and {len(existing_docs_by_issue_id) - 10} more existing documents")
# Track the number of documents indexed
documents_indexed = 0
documents_updated = 0
documents_skipped = 0
skipped_issues = []
@ -979,6 +846,19 @@ async def index_linear_issues(
comment_count = len(formatted_issue.get("comments", []))
summary_content += f"Comments: {comment_count}"
content_hash = generate_content_hash(issue_content)
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
if existing_document_by_hash:
logger.info(f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing.")
documents_skipped += 1
continue
# Generate embedding for the summary
summary_embedding = config.embedding_model_instance.embed(summary_content)
@ -988,62 +868,29 @@ async def index_linear_issues(
for chunk in config.chunker_instance.chunk(issue_content)
]
# Check if this issue already exists in our database
existing_document = existing_docs_by_issue_id.get(issue_id)
if existing_document:
# Update existing document instead of creating a new one
logger.info(f"Updating existing document for issue {issue_identifier} - {issue_title}")
# Update document fields
existing_document.title = f"Linear - {issue_identifier}: {issue_title}"
existing_document.document_metadata = {
# Create and store new document
logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}")
document = Document(
search_space_id=search_space_id,
title=f"Linear - {issue_identifier}: {issue_title}",
document_type=DocumentType.LINEAR_CONNECTOR,
document_metadata={
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
existing_document.content = summary_content
existing_document.embedding = summary_embedding
# Delete existing chunks and add new ones
await session.execute(
delete(Chunk)
.where(Chunk.document_id == existing_document.id)
)
# Assign new chunks to existing document
for chunk in chunks:
chunk.document_id = existing_document.id
session.add(chunk)
documents_updated += 1
else:
# Create and store new document
logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}")
document = Document(
search_space_id=search_space_id,
title=f"Linear - {issue_identifier}: {issue_title}",
document_type=DocumentType.LINEAR_CONNECTOR,
document_metadata={
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}")
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}")
except Exception as e:
logger.error(f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}", exc_info=True)
@ -1052,7 +899,7 @@ async def index_linear_issues(
continue # Skip this issue and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed + documents_updated
total_processed = documents_indexed
if update_last_indexed:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
@ -1062,7 +909,7 @@ async def index_linear_issues(
logger.info(f"Successfully committed all Linear document changes to database")
logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_updated} updated, {documents_skipped} skipped")
logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped")
return total_processed, None # Return None as the error message to indicate success
except SQLAlchemyError as db_error: