fix: Fixed Slack Reindexing

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-03-26 17:44:38 -07:00
parent 23da404177
commit 24fd873ca7
2 changed files with 91 additions and 34 deletions

View file

@ -370,19 +370,19 @@ async def run_slack_indexing(
""" """
try: try:
# Index Slack messages without updating last_indexed_at (we'll do it separately) # Index Slack messages without updating last_indexed_at (we'll do it separately)
documents_indexed, error_or_warning = await index_slack_messages( documents_processed, error_or_warning = await index_slack_messages(
session=session, session=session,
connector_id=connector_id, connector_id=connector_id,
search_space_id=search_space_id, search_space_id=search_space_id,
update_last_indexed=False # Don't update timestamp in the indexing function update_last_indexed=False # Don't update timestamp in the indexing function
) )
# Only update last_indexed_at if indexing was successful # Only update last_indexed_at if indexing was successful (either new docs or updated docs)
if documents_indexed > 0 and (error_or_warning is None or "Indexed" in error_or_warning): if documents_processed > 0:
await update_connector_last_indexed(session, connector_id) await update_connector_last_indexed(session, connector_id)
logger.info(f"Slack indexing completed successfully: {documents_indexed} documents indexed") logger.info(f"Slack indexing completed successfully: {documents_processed} documents processed")
else: else:
logger.error(f"Slack indexing failed or no documents indexed: {error_or_warning}") logger.error(f"Slack indexing failed or no documents processed: {error_or_warning}")
except Exception as e: except Exception as e:
logger.error(f"Error in background Slack indexing task: {str(e)}") logger.error(f"Error in background Slack indexing task: {str(e)}")

View file

@ -59,15 +59,7 @@ async def index_slack_messages(
end_date = datetime.now() end_date = datetime.now()
# Use last_indexed_at as start date if available, otherwise use 365 days ago # Use last_indexed_at as start date if available, otherwise use 365 days ago
if connector.last_indexed_at:
# Check if last_indexed_at is today
today = datetime.now().date()
if connector.last_indexed_at.date() == today:
# If last indexed today, go back 7 day to ensure we don't miss anything
start_date = end_date - timedelta(days=7)
else:
start_date = connector.last_indexed_at
else:
start_date = end_date - timedelta(days=365) start_date = end_date - timedelta(days=365)
# Format dates for Slack API # Format dates for Slack API
@ -83,8 +75,28 @@ async def index_slack_messages(
if not channels: if not channels:
return 0, "No Slack channels found" return 0, "No Slack channels found"
# Get existing documents for this search space and connector type to prevent duplicates
existing_docs_result = await session.execute(
select(Document)
.filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.SLACK_CONNECTOR
)
)
existing_docs = existing_docs_result.scalars().all()
# Create a lookup dictionary of existing documents by channel_id
existing_docs_by_channel_id = {}
for doc in existing_docs:
if "channel_id" in doc.document_metadata:
existing_docs_by_channel_id[doc.document_metadata["channel_id"]] = doc
logger.info(f"Found {len(existing_docs_by_channel_id)} existing Slack documents in database")
# Track the number of documents indexed # Track the number of documents indexed
documents_indexed = 0 documents_indexed = 0
documents_updated = 0
documents_skipped = 0
skipped_channels = [] skipped_channels = []
# Process each channel # Process each channel
@ -102,11 +114,13 @@ async def index_slack_messages(
if not is_member: if not is_member:
logger.warning(f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping.") logger.warning(f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping.")
skipped_channels.append(f"{channel_name} (private, bot not a member)") skipped_channels.append(f"{channel_name} (private, bot not a member)")
documents_skipped += 1
continue continue
except SlackApiError as e: except SlackApiError as e:
if "not_in_channel" in str(e) or "channel_not_found" in str(e): if "not_in_channel" in str(e) or "channel_not_found" in str(e):
logger.warning(f"Bot cannot access channel {channel_name} ({channel_id}). Skipping.") logger.warning(f"Bot cannot access channel {channel_name} ({channel_id}). Skipping.")
skipped_channels.append(f"{channel_name} (access error)") skipped_channels.append(f"{channel_name} (access error)")
documents_skipped += 1
continue continue
else: else:
# Re-raise if it's a different error # Re-raise if it's a different error
@ -123,10 +137,12 @@ async def index_slack_messages(
if error: if error:
logger.warning(f"Error getting messages from channel {channel_name}: {error}") logger.warning(f"Error getting messages from channel {channel_name}: {error}")
skipped_channels.append(f"{channel_name} (error: {error})") skipped_channels.append(f"{channel_name} (error: {error})")
documents_skipped += 1
continue # Skip this channel if there's an error continue # Skip this channel if there's an error
if not messages: if not messages:
logger.info(f"No messages found in channel {channel_name} for the specified date range.") logger.info(f"No messages found in channel {channel_name} for the specified date range.")
documents_skipped += 1
continue # Skip if no messages continue # Skip if no messages
# Format messages with user info # Format messages with user info
@ -141,6 +157,7 @@ async def index_slack_messages(
if not formatted_messages: if not formatted_messages:
logger.info(f"No valid messages found in channel {channel_name} after filtering.") logger.info(f"No valid messages found in channel {channel_name} after filtering.")
documents_skipped += 1
continue # Skip if no valid messages after filtering continue # Skip if no valid messages after filtering
# Convert messages to markdown format # Convert messages to markdown format
@ -195,7 +212,41 @@ async def index_slack_messages(
for chunk in config.chunker_instance.chunk(channel_content) for chunk in config.chunker_instance.chunk(channel_content)
] ]
# Create and store document # Check if this channel already exists in our database
existing_document = existing_docs_by_channel_id.get(channel_id)
if existing_document:
# Update existing document instead of creating a new one
logger.info(f"Updating existing document for channel {channel_name}")
# Update document fields
existing_document.title = f"Slack - {channel_name}"
existing_document.document_metadata = {
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
existing_document.content = summary_content
existing_document.embedding = summary_embedding
# Delete existing chunks and add new ones
await session.execute(
delete(Chunk)
.where(Chunk.document_id == existing_document.id)
)
# Assign new chunks to existing document
for chunk in chunks:
chunk.document_id = existing_document.id
session.add(chunk)
documents_updated += 1
else:
# Create and store new document
document = Document( document = Document(
search_space_id=search_space_id, search_space_id=search_space_id,
title=f"Slack - {channel_name}", title=f"Slack - {channel_name}",
@ -215,20 +266,23 @@ async def index_slack_messages(
session.add(document) session.add(document)
documents_indexed += 1 documents_indexed += 1
logger.info(f"Successfully indexed channel {channel_name} with {len(formatted_messages)} messages") logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages")
except SlackApiError as slack_error: except SlackApiError as slack_error:
logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}") logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}")
skipped_channels.append(f"{channel_name} (Slack API error)") skipped_channels.append(f"{channel_name} (Slack API error)")
documents_skipped += 1
continue # Skip this channel and continue with others continue # Skip this channel and continue with others
except Exception as e: except Exception as e:
logger.error(f"Error processing channel {channel_name}: {str(e)}") logger.error(f"Error processing channel {channel_name}: {str(e)}")
skipped_channels.append(f"{channel_name} (processing error)") skipped_channels.append(f"{channel_name} (processing error)")
documents_skipped += 1
continue # Skip this channel and continue with others continue # Skip this channel and continue with others
# Update the last_indexed_at timestamp for the connector only if requested # Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one channel # and if we successfully indexed at least one channel
if update_last_indexed and documents_indexed > 0: total_processed = documents_indexed + documents_updated
if update_last_indexed and total_processed > 0:
connector.last_indexed_at = datetime.now() connector.last_indexed_at = datetime.now()
# Commit all changes # Commit all changes
@ -237,9 +291,12 @@ async def index_slack_messages(
# Prepare result message # Prepare result message
result_message = None result_message = None
if skipped_channels: if skipped_channels:
result_message = f"Indexed {documents_indexed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated). Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
else:
result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated)."
return documents_indexed, result_message logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_updated} updated, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error: except SQLAlchemyError as db_error:
await session.rollback() await session.rollback()