mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-02 02:29:08 +00:00
296 lines
10 KiB
Python
296 lines
10 KiB
Python
import base64
|
|
import logging
|
|
from typing import Any
|
|
|
|
from github3 import exceptions as github_exceptions, login as github_login
|
|
from github3.exceptions import ForbiddenError, NotFoundError
|
|
from github3.repos.contents import Contents
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# List of common code file extensions to target
|
|
CODE_EXTENSIONS = {
|
|
".py",
|
|
".js",
|
|
".jsx",
|
|
".ts",
|
|
".tsx",
|
|
".java",
|
|
".c",
|
|
".cpp",
|
|
".h",
|
|
".hpp",
|
|
".cs",
|
|
".go",
|
|
".rb",
|
|
".php",
|
|
".swift",
|
|
".kt",
|
|
".scala",
|
|
".rs",
|
|
".m",
|
|
".sh",
|
|
".bash",
|
|
".ps1",
|
|
".lua",
|
|
".pl",
|
|
".pm",
|
|
".r",
|
|
".dart",
|
|
".sql",
|
|
}
|
|
|
|
# List of common documentation/text file extensions
|
|
DOC_EXTENSIONS = {
|
|
".md",
|
|
".txt",
|
|
".rst",
|
|
".adoc",
|
|
".html",
|
|
".htm",
|
|
".xml",
|
|
".json",
|
|
".yaml",
|
|
".yml",
|
|
".toml",
|
|
}
|
|
|
|
# Maximum file size in bytes (e.g., 1MB)
|
|
MAX_FILE_SIZE = 1 * 1024 * 1024
|
|
|
|
|
|
class GitHubConnector:
|
|
"""Connector for interacting with the GitHub API."""
|
|
|
|
# Directories to skip during file traversal
|
|
SKIPPED_DIRS = {
|
|
# Version control
|
|
".git",
|
|
# Dependencies
|
|
"node_modules",
|
|
"vendor",
|
|
# Build artifacts / Caches
|
|
"build",
|
|
"dist",
|
|
"target",
|
|
"__pycache__",
|
|
# Virtual environments
|
|
"venv",
|
|
".venv",
|
|
"env",
|
|
# IDE/Editor config
|
|
".vscode",
|
|
".idea",
|
|
".project",
|
|
".settings",
|
|
# Temporary / Logs
|
|
"tmp",
|
|
"logs",
|
|
# Add other project-specific irrelevant directories if needed
|
|
}
|
|
|
|
def __init__(self, token: str):
|
|
"""
|
|
Initializes the GitHub connector.
|
|
|
|
Args:
|
|
token: GitHub Personal Access Token (PAT).
|
|
"""
|
|
if not token:
|
|
raise ValueError("GitHub token cannot be empty.")
|
|
try:
|
|
self.gh = github_login(token=token)
|
|
# Try a simple authenticated call to check token validity
|
|
self.gh.me()
|
|
logger.info("Successfully authenticated with GitHub API.")
|
|
except (github_exceptions.AuthenticationFailed, ForbiddenError) as e:
|
|
logger.error(f"GitHub authentication failed: {e}")
|
|
raise ValueError("Invalid GitHub token or insufficient permissions.") from e
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize GitHub client: {e}")
|
|
raise e
|
|
|
|
def get_user_repositories(self) -> list[dict[str, Any]]:
|
|
"""Fetches repositories accessible by the authenticated user."""
|
|
repos_data = []
|
|
try:
|
|
# type='owner' fetches repos owned by the user
|
|
# type='member' fetches repos the user is a collaborator on (including orgs)
|
|
# type='all' fetches both
|
|
for repo in self.gh.repositories(type="all", sort="updated"):
|
|
repos_data.append(
|
|
{
|
|
"id": repo.id,
|
|
"name": repo.name,
|
|
"full_name": repo.full_name,
|
|
"private": repo.private,
|
|
"url": repo.html_url,
|
|
"description": repo.description or "",
|
|
"last_updated": repo.updated_at if repo.updated_at else None,
|
|
}
|
|
)
|
|
logger.info(f"Fetched {len(repos_data)} repositories.")
|
|
return repos_data
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch GitHub repositories: {e}")
|
|
return [] # Return empty list on error
|
|
|
|
def get_repository_files(
|
|
self, repo_full_name: str, path: str = ""
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Recursively fetches details of relevant files (code, docs) within a repository path.
|
|
|
|
Args:
|
|
repo_full_name: The full name of the repository (e.g., 'owner/repo').
|
|
path: The starting path within the repository (default is root).
|
|
|
|
Returns:
|
|
A list of dictionaries, each containing file details (path, sha, url, size).
|
|
Returns an empty list if the repository or path is not found or on error.
|
|
"""
|
|
files_list = []
|
|
try:
|
|
owner, repo_name = repo_full_name.split("/")
|
|
repo = self.gh.repository(owner, repo_name)
|
|
if not repo:
|
|
logger.warning(f"Repository '{repo_full_name}' not found.")
|
|
return []
|
|
contents = repo.directory_contents(
|
|
directory_path=path
|
|
) # Use directory_contents for clarity
|
|
|
|
# contents returns a list of tuples (name, content_obj)
|
|
for _item_name, content_item in contents:
|
|
if not isinstance(content_item, Contents):
|
|
continue
|
|
|
|
if content_item.type == "dir":
|
|
# Check if the directory name is in the skipped list
|
|
if content_item.name in self.SKIPPED_DIRS:
|
|
logger.debug(f"Skipping directory: {content_item.path}")
|
|
continue # Skip recursion for this directory
|
|
|
|
# Recursively fetch contents of subdirectory
|
|
files_list.extend(
|
|
self.get_repository_files(
|
|
repo_full_name, path=content_item.path
|
|
)
|
|
)
|
|
elif content_item.type == "file":
|
|
# Check if the file extension is relevant and size is within limits
|
|
file_extension = (
|
|
"." + content_item.name.split(".")[-1].lower()
|
|
if "." in content_item.name
|
|
else ""
|
|
)
|
|
is_code = file_extension in CODE_EXTENSIONS
|
|
is_doc = file_extension in DOC_EXTENSIONS
|
|
|
|
if (is_code or is_doc) and content_item.size <= MAX_FILE_SIZE:
|
|
files_list.append(
|
|
{
|
|
"path": content_item.path,
|
|
"sha": content_item.sha,
|
|
"url": content_item.html_url,
|
|
"size": content_item.size,
|
|
"type": "code" if is_code else "doc",
|
|
}
|
|
)
|
|
elif content_item.size > MAX_FILE_SIZE:
|
|
logger.debug(
|
|
f"Skipping large file: {content_item.path} ({content_item.size} bytes)"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"Skipping irrelevant file type: {content_item.path}"
|
|
)
|
|
|
|
except (NotFoundError, ForbiddenError) as e:
|
|
logger.warning(f"Cannot access path '{path}' in '{repo_full_name}': {e}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to get files for {repo_full_name} at path '{path}': {e}"
|
|
)
|
|
# Return what we have collected so far in case of partial failure
|
|
|
|
return files_list
|
|
|
|
def get_file_content(self, repo_full_name: str, file_path: str) -> str | None:
|
|
"""
|
|
Fetches the decoded content of a specific file.
|
|
|
|
Args:
|
|
repo_full_name: The full name of the repository (e.g., 'owner/repo').
|
|
file_path: The path to the file within the repository.
|
|
|
|
Returns:
|
|
The decoded file content as a string, or None if fetching fails or file is too large.
|
|
"""
|
|
try:
|
|
owner, repo_name = repo_full_name.split("/")
|
|
repo = self.gh.repository(owner, repo_name)
|
|
if not repo:
|
|
logger.warning(
|
|
f"Repository '{repo_full_name}' not found when fetching file '{file_path}'."
|
|
)
|
|
return None
|
|
|
|
content_item = repo.file_contents(
|
|
path=file_path
|
|
) # Use file_contents for clarity
|
|
|
|
if (
|
|
not content_item
|
|
or not isinstance(content_item, Contents)
|
|
or content_item.type != "file"
|
|
):
|
|
logger.warning(
|
|
f"File '{file_path}' not found or is not a file in '{repo_full_name}'."
|
|
)
|
|
return None
|
|
|
|
if content_item.size > MAX_FILE_SIZE:
|
|
logger.warning(
|
|
f"File '{file_path}' in '{repo_full_name}' exceeds max size ({content_item.size} > {MAX_FILE_SIZE}). Skipping content fetch."
|
|
)
|
|
return None
|
|
|
|
# Content is base64 encoded
|
|
if content_item.content:
|
|
try:
|
|
decoded_content = base64.b64decode(content_item.content).decode(
|
|
"utf-8"
|
|
)
|
|
return decoded_content
|
|
except UnicodeDecodeError:
|
|
logger.warning(
|
|
f"Could not decode file '{file_path}' in '{repo_full_name}' as UTF-8. Trying with 'latin-1'."
|
|
)
|
|
try:
|
|
# Try a fallback encoding
|
|
decoded_content = base64.b64decode(content_item.content).decode(
|
|
"latin-1"
|
|
)
|
|
return decoded_content
|
|
except Exception as decode_err:
|
|
logger.error(
|
|
f"Failed to decode file '{file_path}' with fallback encoding: {decode_err}"
|
|
)
|
|
return None # Give up if fallback fails
|
|
else:
|
|
logger.warning(
|
|
f"No content returned for file '{file_path}' in '{repo_full_name}'. It might be empty."
|
|
)
|
|
return "" # Return empty string for empty files
|
|
|
|
except (NotFoundError, ForbiddenError) as e:
|
|
logger.warning(
|
|
f"Cannot access file '{file_path}' in '{repo_full_name}': {e}"
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to get content for file '{file_path}' in '{repo_full_name}': {e}"
|
|
)
|
|
return None
|