mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-10 14:28:57 +00:00
add github connector, add alembic for db migrations, fix bug updating connectors
This commit is contained in:
parent
fa5dbb786f
commit
bb198e38c0
18 changed files with 1232 additions and 184 deletions
|
@ -3,12 +3,13 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy import delete
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchSourceConnectorType
|
||||
from app.config import config
|
||||
from app.prompts import SUMMARY_PROMPT_TEMPLATE
|
||||
from app.connectors.slack_history import SlackHistory
|
||||
from app.connectors.notion_history import NotionHistoryConnector
|
||||
from app.connectors.github_connector import GitHubConnector
|
||||
from slack_sdk.errors import SlackApiError
|
||||
import logging
|
||||
|
||||
|
@ -589,3 +590,195 @@ async def index_notion_pages(
|
|||
await session.rollback()
|
||||
logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True)
|
||||
return 0, f"Failed to index Notion pages: {str(e)}"
|
||||
|
||||
async def index_github_repos(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
update_last_indexed: bool = True
|
||||
) -> Tuple[int, Optional[str]]:
|
||||
"""
|
||||
Index code and documentation files from accessible GitHub repositories.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the GitHub connector
|
||||
search_space_id: ID of the search space to store documents in
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
"""
|
||||
documents_processed = 0
|
||||
errors = []
|
||||
|
||||
try:
|
||||
# 1. Get the GitHub connector from the database
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector)
|
||||
.filter(
|
||||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.GITHUB_CONNECTOR
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
|
||||
if not connector:
|
||||
return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector"
|
||||
|
||||
# 2. Get the GitHub PAT from the connector config
|
||||
github_pat = connector.config.get("GITHUB_PAT")
|
||||
if not github_pat:
|
||||
return 0, "GitHub Personal Access Token (PAT) not found in connector config"
|
||||
|
||||
# 3. Initialize GitHub connector client
|
||||
try:
|
||||
github_client = GitHubConnector(token=github_pat)
|
||||
except ValueError as e:
|
||||
return 0, f"Failed to initialize GitHub client: {str(e)}"
|
||||
|
||||
# 4. Get list of accessible repositories
|
||||
repositories = github_client.get_user_repositories()
|
||||
if not repositories:
|
||||
logger.info("No accessible GitHub repositories found for the provided token.")
|
||||
return 0, "No accessible GitHub repositories found."
|
||||
|
||||
logger.info(f"Found {len(repositories)} repositories to potentially index.")
|
||||
|
||||
# 5. Get existing documents for this search space and connector type to prevent duplicates
|
||||
existing_docs_result = await session.execute(
|
||||
select(Document)
|
||||
.filter(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.GITHUB_CONNECTOR
|
||||
)
|
||||
)
|
||||
existing_docs = existing_docs_result.scalars().all()
|
||||
# Create a lookup dict: key=repo_fullname/file_path, value=Document object
|
||||
existing_docs_lookup = {doc.document_metadata.get("full_path"): doc for doc in existing_docs if doc.document_metadata.get("full_path")}
|
||||
logger.info(f"Found {len(existing_docs_lookup)} existing GitHub documents in database for search space {search_space_id}")
|
||||
|
||||
# 6. Iterate through repositories and index files
|
||||
for repo_info in repositories:
|
||||
repo_full_name = repo_info.get("full_name")
|
||||
if not repo_full_name:
|
||||
logger.warning(f"Skipping repository with missing full_name: {repo_info.get('name')}")
|
||||
continue
|
||||
|
||||
logger.info(f"Processing repository: {repo_full_name}")
|
||||
try:
|
||||
files_to_index = github_client.get_repository_files(repo_full_name)
|
||||
if not files_to_index:
|
||||
logger.info(f"No indexable files found in repository: {repo_full_name}")
|
||||
continue
|
||||
|
||||
logger.info(f"Found {len(files_to_index)} files to process in {repo_full_name}")
|
||||
|
||||
for file_info in files_to_index:
|
||||
file_path = file_info.get("path")
|
||||
file_url = file_info.get("url")
|
||||
file_sha = file_info.get("sha")
|
||||
file_type = file_info.get("type") # 'code' or 'doc'
|
||||
full_path_key = f"{repo_full_name}/{file_path}"
|
||||
|
||||
if not file_path or not file_url or not file_sha:
|
||||
logger.warning(f"Skipping file with missing info in {repo_full_name}: {file_info}")
|
||||
continue
|
||||
|
||||
# Check if document already exists and if content hash matches
|
||||
existing_doc = existing_docs_lookup.get(full_path_key)
|
||||
if existing_doc and existing_doc.document_metadata.get("sha") == file_sha:
|
||||
logger.debug(f"Skipping unchanged file: {full_path_key}")
|
||||
continue # Skip if SHA matches (content hasn't changed)
|
||||
|
||||
# Get file content
|
||||
file_content = github_client.get_file_content(repo_full_name, file_path)
|
||||
|
||||
if file_content is None:
|
||||
logger.warning(f"Could not retrieve content for {full_path_key}. Skipping.")
|
||||
continue # Skip if content fetch failed
|
||||
|
||||
# Use file_content directly for chunking, maybe summary for main content?
|
||||
# For now, let's use the full content for both, might need refinement
|
||||
summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary
|
||||
summary_embedding = config.embedding_model_instance.embed(summary_content)
|
||||
|
||||
# Chunk the content
|
||||
try:
|
||||
chunks_data = [
|
||||
Chunk(content=chunk.text, embedding=chunk.embedding)
|
||||
for chunk in config.chunker_instance.chunk(file_content)
|
||||
]
|
||||
except Exception as chunk_err:
|
||||
logger.error(f"Failed to chunk file {full_path_key}: {chunk_err}")
|
||||
errors.append(f"Chunking failed for {full_path_key}: {chunk_err}")
|
||||
continue # Skip this file if chunking fails
|
||||
|
||||
doc_metadata = {
|
||||
"repository_full_name": repo_full_name,
|
||||
"file_path": file_path,
|
||||
"full_path": full_path_key, # For easier lookup
|
||||
"url": file_url,
|
||||
"sha": file_sha,
|
||||
"type": file_type,
|
||||
"indexed_at": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
|
||||
if existing_doc:
|
||||
# Update existing document
|
||||
logger.info(f"Updating document for file: {full_path_key}")
|
||||
existing_doc.title = f"GitHub - {file_path}"
|
||||
existing_doc.document_metadata = doc_metadata
|
||||
existing_doc.content = summary_content # Update summary
|
||||
existing_doc.embedding = summary_embedding # Update embedding
|
||||
|
||||
# Delete old chunks
|
||||
await session.execute(
|
||||
delete(Chunk)
|
||||
.where(Chunk.document_id == existing_doc.id)
|
||||
)
|
||||
# Add new chunks
|
||||
for chunk_obj in chunks_data:
|
||||
chunk_obj.document_id = existing_doc.id
|
||||
session.add(chunk_obj)
|
||||
|
||||
documents_processed += 1
|
||||
else:
|
||||
# Create new document
|
||||
logger.info(f"Creating new document for file: {full_path_key}")
|
||||
document = Document(
|
||||
title=f"GitHub - {file_path}",
|
||||
document_type=DocumentType.GITHUB_CONNECTOR,
|
||||
document_metadata=doc_metadata,
|
||||
content=summary_content, # Store summary
|
||||
embedding=summary_embedding,
|
||||
search_space_id=search_space_id,
|
||||
chunks=chunks_data # Associate chunks directly
|
||||
)
|
||||
session.add(document)
|
||||
documents_processed += 1
|
||||
|
||||
# Commit periodically or at the end? For now, commit per repo
|
||||
# await session.commit()
|
||||
|
||||
except Exception as repo_err:
|
||||
logger.error(f"Failed to process repository {repo_full_name}: {repo_err}")
|
||||
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
|
||||
|
||||
# Commit all changes at the end
|
||||
await session.commit()
|
||||
logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.")
|
||||
|
||||
except SQLAlchemyError as db_err:
|
||||
await session.rollback()
|
||||
logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}")
|
||||
errors.append(f"Database error: {db_err}")
|
||||
return documents_processed, "; ".join(errors) if errors else str(db_err)
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True)
|
||||
errors.append(f"Unexpected error: {e}")
|
||||
return documents_processed, "; ".join(errors) if errors else str(e)
|
||||
|
||||
error_message = "; ".join(errors) if errors else None
|
||||
return documents_processed, error_message
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue