mirror of
https://github.com/cyclotruc/gitingest.git
synced 2026-04-26 15:40:40 +00:00
feat: use gitpython for git stuff (#504)
Some checks failed
CI / test (macos-latest, 3.8) (push) Has been cancelled
CI / test (true, ubuntu-latest, 3.13) (push) Has been cancelled
CI / test (ubuntu-latest, 3.8) (push) Has been cancelled
CI / test (macos-latest, 3.13) (push) Has been cancelled
CI / test (windows-latest, 3.13) (push) Has been cancelled
CI / test (windows-latest, 3.8) (push) Has been cancelled
CodeQL / Analyze (push) Has been cancelled
Build & Push Container / ECR (push) Has been cancelled
Build & Push Container / GHCR (push) Has been cancelled
release-please / release (push) Has been cancelled
OSSF Scorecard / Scorecard analysis (push) Has been cancelled
Some checks failed
CI / test (macos-latest, 3.8) (push) Has been cancelled
CI / test (true, ubuntu-latest, 3.13) (push) Has been cancelled
CI / test (ubuntu-latest, 3.8) (push) Has been cancelled
CI / test (macos-latest, 3.13) (push) Has been cancelled
CI / test (windows-latest, 3.13) (push) Has been cancelled
CI / test (windows-latest, 3.8) (push) Has been cancelled
CodeQL / Analyze (push) Has been cancelled
Build & Push Container / ECR (push) Has been cancelled
Build & Push Container / GHCR (push) Has been cancelled
release-please / release (push) Has been cancelled
OSSF Scorecard / Scorecard analysis (push) Has been cancelled
Co-authored-by: Iwan Burel <iwan.burel@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
parent
c9fff75cc1
commit
c057f6e062
9 changed files with 457 additions and 396 deletions
|
|
@ -117,6 +117,7 @@ repos:
|
|||
boto3>=1.28.0,
|
||||
click>=8.0.0,
|
||||
'fastapi[standard]>=0.109.1',
|
||||
gitpython>=3.1.0,
|
||||
httpx,
|
||||
loguru>=0.7.0,
|
||||
pathspec>=0.12.1,
|
||||
|
|
@ -144,6 +145,7 @@ repos:
|
|||
boto3>=1.28.0,
|
||||
click>=8.0.0,
|
||||
'fastapi[standard]>=0.109.1',
|
||||
gitpython>=3.1.0,
|
||||
httpx,
|
||||
loguru>=0.7.0,
|
||||
pathspec>=0.12.1,
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ COPY src/ ./src/
|
|||
|
||||
RUN set -eux; \
|
||||
pip install --no-cache-dir --upgrade pip; \
|
||||
pip install --no-cache-dir --timeout 1000 .[server]
|
||||
pip install --no-cache-dir --timeout 1000 .[server,mcp]
|
||||
|
||||
# Stage 2: Runtime image
|
||||
FROM python:3.13.5-slim@sha256:4c2cf9917bd1cbacc5e9b07320025bdb7cdf2df7b0ceaccb55e9dd7e30987419
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ readme = {file = "README.md", content-type = "text/markdown" }
|
|||
requires-python = ">= 3.8"
|
||||
dependencies = [
|
||||
"click>=8.0.0",
|
||||
"gitpython>=3.1.0",
|
||||
"httpx",
|
||||
"loguru>=0.7.0",
|
||||
"pathspec>=0.12.1",
|
||||
|
|
|
|||
|
|
@ -5,16 +5,17 @@ from __future__ import annotations
|
|||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import git
|
||||
|
||||
from gitingest.config import DEFAULT_TIMEOUT
|
||||
from gitingest.utils.git_utils import (
|
||||
check_repo_exists,
|
||||
checkout_partial_clone,
|
||||
create_git_auth_header,
|
||||
create_git_command,
|
||||
create_git_repo,
|
||||
ensure_git_installed,
|
||||
git_auth_context,
|
||||
is_github_host,
|
||||
resolve_commit,
|
||||
run_command,
|
||||
)
|
||||
from gitingest.utils.logging_config import get_logger
|
||||
from gitingest.utils.os_utils import ensure_directory_exists_or_create
|
||||
|
|
@ -46,6 +47,8 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
|
|||
------
|
||||
ValueError
|
||||
If the repository is not found, if the provided URL is invalid, or if the token format is invalid.
|
||||
RuntimeError
|
||||
If Git operations fail during the cloning process.
|
||||
|
||||
"""
|
||||
# Extract and validate query parameters
|
||||
|
|
@ -83,20 +86,34 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
|
|||
commit = await resolve_commit(config, token=token)
|
||||
logger.debug("Resolved commit", extra={"commit": commit})
|
||||
|
||||
clone_cmd = ["git"]
|
||||
if token and is_github_host(url):
|
||||
clone_cmd += ["-c", create_git_auth_header(token, url=url)]
|
||||
# Clone the repository using GitPython with proper authentication
|
||||
logger.info("Executing git clone operation", extra={"url": "<redacted>", "local_path": local_path})
|
||||
try:
|
||||
clone_kwargs = {
|
||||
"single_branch": True,
|
||||
"no_checkout": True,
|
||||
"depth": 1,
|
||||
}
|
||||
|
||||
clone_cmd += ["clone", "--single-branch", "--no-checkout", "--depth=1"]
|
||||
if partial_clone:
|
||||
clone_cmd += ["--filter=blob:none", "--sparse"]
|
||||
with git_auth_context(url, token) as (git_cmd, auth_url):
|
||||
if partial_clone:
|
||||
# For partial clones, use git.Git() with filter and sparse options
|
||||
cmd_args = ["--single-branch", "--no-checkout", "--depth=1"]
|
||||
cmd_args.extend(["--filter=blob:none", "--sparse"])
|
||||
cmd_args.extend([auth_url, local_path])
|
||||
git_cmd.clone(*cmd_args)
|
||||
elif token and is_github_host(url):
|
||||
# For authenticated GitHub repos, use git_cmd with auth URL
|
||||
cmd_args = ["--single-branch", "--no-checkout", "--depth=1", auth_url, local_path]
|
||||
git_cmd.clone(*cmd_args)
|
||||
else:
|
||||
# For non-authenticated repos, use the standard GitPython method
|
||||
git.Repo.clone_from(url, local_path, **clone_kwargs)
|
||||
|
||||
clone_cmd += [url, local_path]
|
||||
|
||||
# Clone the repository
|
||||
logger.info("Executing git clone command", extra={"command": " ".join([*clone_cmd[:-1], "<url>", local_path])})
|
||||
await run_command(*clone_cmd)
|
||||
logger.info("Git clone completed successfully")
|
||||
logger.info("Git clone completed successfully")
|
||||
except git.GitCommandError as exc:
|
||||
msg = f"Git clone failed: {exc}"
|
||||
raise RuntimeError(msg) from exc
|
||||
|
||||
# Checkout the subpath if it is a partial clone
|
||||
if partial_clone:
|
||||
|
|
@ -104,20 +121,56 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
|
|||
await checkout_partial_clone(config, token=token)
|
||||
logger.debug("Partial clone setup completed")
|
||||
|
||||
git = create_git_command(["git"], local_path, url, token)
|
||||
|
||||
# Ensure the commit is locally available
|
||||
logger.debug("Fetching specific commit", extra={"commit": commit})
|
||||
await run_command(*git, "fetch", "--depth=1", "origin", commit)
|
||||
|
||||
# Write the work-tree at that commit
|
||||
logger.info("Checking out commit", extra={"commit": commit})
|
||||
await run_command(*git, "checkout", commit)
|
||||
|
||||
# Update submodules
|
||||
if config.include_submodules:
|
||||
logger.info("Updating submodules")
|
||||
await run_command(*git, "submodule", "update", "--init", "--recursive", "--depth=1")
|
||||
logger.debug("Submodules updated successfully")
|
||||
# Perform post-clone operations
|
||||
await _perform_post_clone_operations(config, local_path, url, token, commit)
|
||||
|
||||
logger.info("Git clone operation completed successfully", extra={"local_path": local_path})
|
||||
|
||||
|
||||
async def _perform_post_clone_operations(
|
||||
config: CloneConfig,
|
||||
local_path: str,
|
||||
url: str,
|
||||
token: str | None,
|
||||
commit: str,
|
||||
) -> None:
|
||||
"""Perform post-clone operations like fetching, checkout, and submodule updates.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
config : CloneConfig
|
||||
The configuration for cloning the repository.
|
||||
local_path : str
|
||||
The local path where the repository was cloned.
|
||||
url : str
|
||||
The repository URL.
|
||||
token : str | None
|
||||
GitHub personal access token (PAT) for accessing private repositories.
|
||||
commit : str
|
||||
The commit SHA to checkout.
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
If any Git operation fails.
|
||||
|
||||
"""
|
||||
try:
|
||||
repo = create_git_repo(local_path, url, token)
|
||||
|
||||
# Ensure the commit is locally available
|
||||
logger.debug("Fetching specific commit", extra={"commit": commit})
|
||||
repo.git.fetch("--depth=1", "origin", commit)
|
||||
|
||||
# Write the work-tree at that commit
|
||||
logger.info("Checking out commit", extra={"commit": commit})
|
||||
repo.git.checkout(commit)
|
||||
|
||||
# Update submodules
|
||||
if config.include_submodules:
|
||||
logger.info("Updating submodules")
|
||||
repo.git.submodule("update", "--init", "--recursive", "--depth=1")
|
||||
logger.debug("Submodules updated successfully")
|
||||
except git.GitCommandError as exc:
|
||||
msg = f"Git operation failed: {exc}"
|
||||
raise RuntimeError(msg) from exc
|
||||
|
|
|
|||
|
|
@ -6,12 +6,12 @@ import asyncio
|
|||
import base64
|
||||
import re
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Final, Iterable
|
||||
from urllib.parse import urlparse
|
||||
from typing import TYPE_CHECKING, Final, Generator, Iterable
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
import httpx
|
||||
from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND
|
||||
import git
|
||||
|
||||
from gitingest.utils.compat_func import removesuffix
|
||||
from gitingest.utils.exceptions import InvalidGitHubTokenError
|
||||
|
|
@ -50,6 +50,9 @@ def is_github_host(url: str) -> bool:
|
|||
async def run_command(*args: str) -> tuple[bytes, bytes]:
|
||||
"""Execute a shell command asynchronously and return (stdout, stderr) bytes.
|
||||
|
||||
This function is kept for backward compatibility with non-git commands.
|
||||
Git operations should use GitPython directly.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*args : str
|
||||
|
|
@ -92,21 +95,27 @@ async def ensure_git_installed() -> None:
|
|||
|
||||
"""
|
||||
try:
|
||||
await run_command("git", "--version")
|
||||
except RuntimeError as exc:
|
||||
# Use GitPython to check git availability
|
||||
git_cmd = git.Git()
|
||||
git_cmd.version()
|
||||
except git.GitCommandError as exc:
|
||||
msg = "Git is not installed or not accessible. Please install Git first."
|
||||
raise RuntimeError(msg) from exc
|
||||
except Exception as exc:
|
||||
msg = "Git is not installed or not accessible. Please install Git first."
|
||||
raise RuntimeError(msg) from exc
|
||||
|
||||
if sys.platform == "win32":
|
||||
try:
|
||||
stdout, _ = await run_command("git", "config", "core.longpaths")
|
||||
if stdout.decode().strip().lower() != "true":
|
||||
longpaths_value = git_cmd.config("core.longpaths")
|
||||
if longpaths_value.lower() != "true":
|
||||
logger.warning(
|
||||
"Git clone may fail on Windows due to long file paths. "
|
||||
"Consider enabling long path support with: 'git config --global core.longpaths true'. "
|
||||
"Note: This command may require administrator privileges.",
|
||||
extra={"platform": "windows", "longpaths_enabled": False},
|
||||
)
|
||||
except RuntimeError:
|
||||
except git.GitCommandError:
|
||||
# Ignore if checking 'core.longpaths' fails.
|
||||
pass
|
||||
|
||||
|
|
@ -126,35 +135,15 @@ async def check_repo_exists(url: str, token: str | None = None) -> bool:
|
|||
bool
|
||||
``True`` if the repository exists, ``False`` otherwise.
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
If the host returns an unrecognised status code.
|
||||
|
||||
"""
|
||||
headers = {}
|
||||
|
||||
if token and is_github_host(url):
|
||||
host, owner, repo = _parse_github_url(url)
|
||||
# Public GitHub vs. GitHub Enterprise
|
||||
base_api = "https://api.github.com" if host == "github.com" else f"https://{host}/api/v3"
|
||||
url = f"{base_api}/repos/{owner}/{repo}"
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||
try:
|
||||
response = await client.head(url, headers=headers)
|
||||
except httpx.RequestError:
|
||||
return False
|
||||
|
||||
status_code = response.status_code
|
||||
|
||||
if status_code == HTTP_200_OK:
|
||||
return True
|
||||
if status_code in {HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND}:
|
||||
try:
|
||||
# Try to resolve HEAD - if repo exists, this will work
|
||||
await _resolve_ref_to_sha(url, "HEAD", token=token)
|
||||
except (ValueError, Exception):
|
||||
# Repository doesn't exist, is private without proper auth, or other error
|
||||
return False
|
||||
msg = f"Unexpected HTTP status {status_code} for {url}"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _parse_github_url(url: str) -> tuple[str, str, str]:
|
||||
|
|
@ -216,52 +205,51 @@ async def fetch_remote_branches_or_tags(url: str, *, ref_type: str, token: str |
|
|||
------
|
||||
ValueError
|
||||
If the ``ref_type`` parameter is not "branches" or "tags".
|
||||
RuntimeError
|
||||
If fetching branches or tags from the remote repository fails.
|
||||
|
||||
"""
|
||||
if ref_type not in ("branches", "tags"):
|
||||
msg = f"Invalid fetch type: {ref_type}"
|
||||
raise ValueError(msg)
|
||||
|
||||
cmd = ["git"]
|
||||
|
||||
# Add authentication if needed
|
||||
if token and is_github_host(url):
|
||||
cmd += ["-c", create_git_auth_header(token, url=url)]
|
||||
|
||||
cmd += ["ls-remote"]
|
||||
|
||||
fetch_tags = ref_type == "tags"
|
||||
to_fetch = "tags" if fetch_tags else "heads"
|
||||
|
||||
cmd += [f"--{to_fetch}"]
|
||||
|
||||
# `--refs` filters out the peeled tag objects (those ending with "^{}") (for tags)
|
||||
if fetch_tags:
|
||||
cmd += ["--refs"]
|
||||
|
||||
cmd += [url]
|
||||
|
||||
await ensure_git_installed()
|
||||
stdout, _ = await run_command(*cmd)
|
||||
# For each line in the output:
|
||||
# - Skip empty lines and lines that don't contain "refs/{to_fetch}/"
|
||||
# - Extract the branch or tag name after "refs/{to_fetch}/"
|
||||
return [
|
||||
line.split(f"refs/{to_fetch}/", 1)[1]
|
||||
for line in stdout.decode().splitlines()
|
||||
if line.strip() and f"refs/{to_fetch}/" in line
|
||||
]
|
||||
|
||||
# Use GitPython to get remote references
|
||||
try:
|
||||
fetch_tags = ref_type == "tags"
|
||||
to_fetch = "tags" if fetch_tags else "heads"
|
||||
|
||||
# Build ls-remote command
|
||||
cmd_args = [f"--{to_fetch}"]
|
||||
if fetch_tags:
|
||||
cmd_args.append("--refs") # Filter out peeled tag objects
|
||||
cmd_args.append(url)
|
||||
|
||||
# Run the command with proper authentication
|
||||
with git_auth_context(url, token) as (git_cmd, auth_url):
|
||||
# Replace the URL in cmd_args with the authenticated URL
|
||||
cmd_args[-1] = auth_url # URL is the last argument
|
||||
output = git_cmd.ls_remote(*cmd_args)
|
||||
|
||||
# Parse output
|
||||
return [
|
||||
line.split(f"refs/{to_fetch}/", 1)[1]
|
||||
for line in output.splitlines()
|
||||
if line.strip() and f"refs/{to_fetch}/" in line
|
||||
]
|
||||
except git.GitCommandError as exc:
|
||||
msg = f"Failed to fetch {ref_type} from {url}: {exc}"
|
||||
raise RuntimeError(msg) from exc
|
||||
|
||||
|
||||
def create_git_command(base_cmd: list[str], local_path: str, url: str, token: str | None = None) -> list[str]:
|
||||
"""Create a git command with authentication if needed.
|
||||
def create_git_repo(local_path: str, url: str, token: str | None = None) -> git.Repo:
|
||||
"""Create a GitPython Repo object with authentication if needed.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
base_cmd : list[str]
|
||||
The base git command to start with.
|
||||
local_path : str
|
||||
The local path where the git command should be executed.
|
||||
The local path where the git repository is located.
|
||||
url : str
|
||||
The repository URL to check if it's a GitHub repository.
|
||||
token : str | None
|
||||
|
|
@ -269,14 +257,30 @@ def create_git_command(base_cmd: list[str], local_path: str, url: str, token: st
|
|||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
The git command with authentication if needed.
|
||||
git.Repo
|
||||
A GitPython Repo object configured with authentication.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the local path is not a valid git repository.
|
||||
|
||||
"""
|
||||
cmd = [*base_cmd, "-C", local_path]
|
||||
if token and is_github_host(url):
|
||||
cmd += ["-c", create_git_auth_header(token, url=url)]
|
||||
return cmd
|
||||
try:
|
||||
repo = git.Repo(local_path)
|
||||
|
||||
# Configure authentication if needed
|
||||
if token and is_github_host(url):
|
||||
auth_header = create_git_auth_header(token, url=url)
|
||||
# Set the auth header in git config for this repo
|
||||
key, value = auth_header.split("=", 1)
|
||||
repo.git.config(key, value)
|
||||
|
||||
except git.InvalidGitRepositoryError as exc:
|
||||
msg = f"Invalid git repository at {local_path}"
|
||||
raise ValueError(msg) from exc
|
||||
|
||||
return repo
|
||||
|
||||
|
||||
def create_git_auth_header(token: str, url: str = "https://github.com") -> str:
|
||||
|
|
@ -310,6 +314,70 @@ def create_git_auth_header(token: str, url: str = "https://github.com") -> str:
|
|||
return f"http.https://{hostname}/.extraheader=Authorization: Basic {basic}"
|
||||
|
||||
|
||||
def create_authenticated_url(url: str, token: str | None = None) -> str:
|
||||
"""Create an authenticated URL for Git operations.
|
||||
|
||||
This is the safest approach for multi-user environments - no global state.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url : str
|
||||
The repository URL.
|
||||
token : str | None
|
||||
GitHub personal access token (PAT) for accessing private repositories.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The URL with authentication embedded (for GitHub) or original URL.
|
||||
|
||||
"""
|
||||
if not (token and is_github_host(url)):
|
||||
return url
|
||||
|
||||
parsed = urlparse(url)
|
||||
# Add token as username in URL (GitHub supports this)
|
||||
netloc = f"x-oauth-basic:{token}@{parsed.hostname}"
|
||||
if parsed.port:
|
||||
netloc += f":{parsed.port}"
|
||||
|
||||
return urlunparse(
|
||||
(
|
||||
parsed.scheme,
|
||||
netloc,
|
||||
parsed.path,
|
||||
parsed.params,
|
||||
parsed.query,
|
||||
parsed.fragment,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def git_auth_context(url: str, token: str | None = None) -> Generator[tuple[git.Git, str]]:
|
||||
"""Context manager that provides Git command and authenticated URL.
|
||||
|
||||
Returns both a Git command object and the authenticated URL to use.
|
||||
This avoids any global state contamination between users.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url : str
|
||||
The repository URL to check if authentication is needed.
|
||||
token : str | None
|
||||
GitHub personal access token (PAT) for accessing private repositories.
|
||||
|
||||
Yields
|
||||
------
|
||||
Generator[tuple[git.Git, str]]
|
||||
Tuple of (Git command object, authenticated URL to use).
|
||||
|
||||
"""
|
||||
git_cmd = git.Git()
|
||||
auth_url = create_authenticated_url(url, token)
|
||||
yield git_cmd, auth_url
|
||||
|
||||
|
||||
def validate_github_token(token: str) -> None:
|
||||
"""Validate the format of a GitHub Personal Access Token.
|
||||
|
||||
|
|
@ -338,13 +406,23 @@ async def checkout_partial_clone(config: CloneConfig, token: str | None) -> None
|
|||
token : str | None
|
||||
GitHub personal access token (PAT) for accessing private repositories.
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
If the sparse-checkout configuration fails.
|
||||
|
||||
"""
|
||||
subpath = config.subpath.lstrip("/")
|
||||
if config.blob:
|
||||
# Remove the file name from the subpath when ingesting from a file url (e.g. blob/branch/path/file.txt)
|
||||
subpath = str(Path(subpath).parent.as_posix())
|
||||
checkout_cmd = create_git_command(["git"], config.local_path, config.url, token)
|
||||
await run_command(*checkout_cmd, "sparse-checkout", "set", subpath)
|
||||
|
||||
try:
|
||||
repo = create_git_repo(config.local_path, config.url, token)
|
||||
repo.git.sparse_checkout("set", subpath)
|
||||
except git.GitCommandError as exc:
|
||||
msg = f"Failed to configure sparse-checkout: {exc}"
|
||||
raise RuntimeError(msg) from exc
|
||||
|
||||
|
||||
async def resolve_commit(config: CloneConfig, token: str | None) -> str:
|
||||
|
|
@ -400,18 +478,20 @@ async def _resolve_ref_to_sha(url: str, pattern: str, token: str | None = None)
|
|||
If the ref does not exist in the remote repository.
|
||||
|
||||
"""
|
||||
# Build: git [-c http.<host>/.extraheader=Auth...] ls-remote <url> <pattern>
|
||||
cmd: list[str] = ["git"]
|
||||
if token and is_github_host(url):
|
||||
cmd += ["-c", create_git_auth_header(token, url=url)]
|
||||
try:
|
||||
# Execute ls-remote command with proper authentication
|
||||
with git_auth_context(url, token) as (git_cmd, auth_url):
|
||||
output = git_cmd.ls_remote(auth_url, pattern)
|
||||
lines = output.splitlines()
|
||||
|
||||
cmd += ["ls-remote", url, pattern]
|
||||
stdout, _ = await run_command(*cmd)
|
||||
lines = stdout.decode().splitlines()
|
||||
sha = _pick_commit_sha(lines)
|
||||
if not sha:
|
||||
msg = f"{pattern!r} not found in {url}"
|
||||
raise ValueError(msg)
|
||||
sha = _pick_commit_sha(lines)
|
||||
if not sha:
|
||||
msg = f"{pattern!r} not found in {url}"
|
||||
raise ValueError(msg)
|
||||
|
||||
except git.GitCommandError as exc:
|
||||
msg = f"Failed to resolve {pattern} in {url}:\n{exc}"
|
||||
raise ValueError(msg) from exc
|
||||
|
||||
return sha
|
||||
|
||||
|
|
|
|||
|
|
@ -308,7 +308,7 @@ async def process_query(
|
|||
_print_error(query.url, exc, max_file_size, pattern_type, pattern)
|
||||
# Clean up repository even if processing failed
|
||||
_cleanup_repository(clone_config)
|
||||
return IngestErrorResponse(error=str(exc))
|
||||
return IngestErrorResponse(error=f"{exc!s}")
|
||||
|
||||
if len(content) > MAX_DISPLAY_SIZE:
|
||||
content = (
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import sys
|
|||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict
|
||||
from unittest.mock import AsyncMock
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -183,20 +183,21 @@ def stub_branches(mocker: MockerFixture) -> Callable[[list[str]], None]:
|
|||
"""Return a function that stubs git branch discovery to *branches*."""
|
||||
|
||||
def _factory(branches: list[str]) -> None:
|
||||
stdout = (
|
||||
"\n".join(f"{DEMO_COMMIT[:12]}{i:02d}\trefs/heads/{b}" for i, b in enumerate(branches)).encode() + b"\n"
|
||||
)
|
||||
mocker.patch(
|
||||
"gitingest.utils.git_utils.run_command",
|
||||
new_callable=AsyncMock,
|
||||
return_value=(stdout, b""),
|
||||
)
|
||||
# Patch the GitPython fetch function
|
||||
mocker.patch(
|
||||
"gitingest.utils.git_utils.fetch_remote_branches_or_tags",
|
||||
new_callable=AsyncMock,
|
||||
return_value=branches,
|
||||
)
|
||||
|
||||
# Patch GitPython's ls_remote method to return the mocked output
|
||||
ls_remote_output = "\n".join(f"{DEMO_COMMIT[:12]}{i:02d}\trefs/heads/{b}" for i, b in enumerate(branches))
|
||||
mock_git_cmd = mocker.patch("git.Git")
|
||||
mock_git_cmd.return_value.ls_remote.return_value = ls_remote_output
|
||||
|
||||
# Also patch the git module imports in our utils
|
||||
mocker.patch("gitingest.utils.git_utils.git.Git", return_value=mock_git_cmd.return_value)
|
||||
|
||||
return _factory
|
||||
|
||||
|
||||
|
|
@ -215,10 +216,62 @@ def run_command_mock(mocker: MockerFixture) -> AsyncMock:
|
|||
"""
|
||||
mock = AsyncMock(side_effect=_fake_run_command)
|
||||
mocker.patch("gitingest.utils.git_utils.run_command", mock)
|
||||
mocker.patch("gitingest.clone.run_command", mock)
|
||||
|
||||
# Mock GitPython components
|
||||
_setup_gitpython_mocks(mocker)
|
||||
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gitpython_mocks(mocker: MockerFixture) -> dict[str, MagicMock]:
|
||||
"""Provide comprehensive GitPython mocks for testing."""
|
||||
return _setup_gitpython_mocks(mocker)
|
||||
|
||||
|
||||
def _setup_gitpython_mocks(mocker: MockerFixture) -> dict[str, MagicMock]:
|
||||
"""Set up comprehensive GitPython mocks."""
|
||||
# Mock git.Git class
|
||||
mock_git_cmd = MagicMock()
|
||||
mock_git_cmd.version.return_value = "git version 2.34.1"
|
||||
mock_git_cmd.config.return_value = "true"
|
||||
mock_git_cmd.execute.return_value = f"{DEMO_COMMIT}\trefs/heads/main\n"
|
||||
mock_git_cmd.ls_remote.return_value = f"{DEMO_COMMIT}\trefs/heads/main\n"
|
||||
mock_git_cmd.clone.return_value = ""
|
||||
|
||||
# Mock git.Repo class
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.git = MagicMock()
|
||||
mock_repo.git.fetch = MagicMock()
|
||||
mock_repo.git.checkout = MagicMock()
|
||||
mock_repo.git.submodule = MagicMock()
|
||||
mock_repo.git.execute = MagicMock()
|
||||
mock_repo.git.config = MagicMock()
|
||||
mock_repo.git.sparse_checkout = MagicMock()
|
||||
|
||||
# Mock git.Repo.clone_from
|
||||
mock_clone_from = MagicMock(return_value=mock_repo)
|
||||
|
||||
git_git_mock = mocker.patch("git.Git", return_value=mock_git_cmd)
|
||||
git_repo_mock = mocker.patch("git.Repo", return_value=mock_repo)
|
||||
mocker.patch("git.Repo.clone_from", mock_clone_from)
|
||||
|
||||
# Patch imports in our modules
|
||||
mocker.patch("gitingest.utils.git_utils.git.Git", return_value=mock_git_cmd)
|
||||
mocker.patch("gitingest.utils.git_utils.git.Repo", return_value=mock_repo)
|
||||
mocker.patch("gitingest.clone.git.Git", return_value=mock_git_cmd)
|
||||
mocker.patch("gitingest.clone.git.Repo", return_value=mock_repo)
|
||||
mocker.patch("gitingest.clone.git.Repo.clone_from", mock_clone_from)
|
||||
|
||||
return {
|
||||
"git_cmd": mock_git_cmd,
|
||||
"repo": mock_repo,
|
||||
"clone_from": mock_clone_from,
|
||||
"git_git_mock": git_git_mock,
|
||||
"git_repo_mock": git_repo_mock,
|
||||
}
|
||||
|
||||
|
||||
async def _fake_run_command(*args: str) -> tuple[bytes, bytes]:
|
||||
if "ls-remote" in args:
|
||||
# single match: <sha> <tab>refs/heads/main
|
||||
|
|
|
|||
|
|
@ -6,23 +6,19 @@ and handling edge cases such as nonexistent URLs, timeouts, redirects, and speci
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from typing import TYPE_CHECKING
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND
|
||||
|
||||
from gitingest.clone import clone_repo
|
||||
from gitingest.schemas import CloneConfig
|
||||
from gitingest.utils.exceptions import AsyncTimeoutError
|
||||
from gitingest.utils.git_utils import check_repo_exists
|
||||
from tests.conftest import DEMO_COMMIT, DEMO_URL, LOCAL_REPO_PATH
|
||||
from tests.conftest import DEMO_URL, LOCAL_REPO_PATH
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
|
|
@ -35,14 +31,13 @@ GIT_INSTALLED_CALLS = 2 if sys.platform == "win32" else 1
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_commit(repo_exists_true: AsyncMock, run_command_mock: AsyncMock) -> None:
|
||||
async def test_clone_with_commit(repo_exists_true: AsyncMock, gitpython_mocks: dict) -> None:
|
||||
"""Test cloning a repository with a specific commit hash.
|
||||
|
||||
Given a valid URL and a commit hash:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned and checked out at that commit.
|
||||
"""
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 3 # ensure_git_installed + clone + fetch + checkout
|
||||
commit_hash = "a" * 40 # Simulating a valid commit hash
|
||||
clone_config = CloneConfig(
|
||||
url=DEMO_URL,
|
||||
|
|
@ -54,26 +49,21 @@ async def test_clone_with_commit(repo_exists_true: AsyncMock, run_command_mock:
|
|||
await clone_repo(clone_config)
|
||||
|
||||
repo_exists_true.assert_any_call(clone_config.url, token=None)
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=commit_hash)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
|
||||
# Verify GitPython calls were made
|
||||
mock_git_cmd = gitpython_mocks["git_cmd"]
|
||||
mock_repo = gitpython_mocks["repo"]
|
||||
mock_clone_from = gitpython_mocks["clone_from"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_without_commit(repo_exists_true: AsyncMock, run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning a repository when no commit hash is provided.
|
||||
# Should have called version (for ensure_git_installed)
|
||||
mock_git_cmd.version.assert_called()
|
||||
|
||||
Given a valid URL and no commit hash:
|
||||
When ``clone_repo`` is called,
|
||||
Then only the clone_repo operation should be performed (no checkout).
|
||||
"""
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 4 # ensure_git_installed + resolve_commit + clone + fetch + checkout
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, commit=None, branch="main")
|
||||
# Should have called clone_from (since partial_clone=False)
|
||||
mock_clone_from.assert_called_once()
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
repo_exists_true.assert_any_call(clone_config.url, token=None)
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=DEMO_COMMIT)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
# Should have called fetch and checkout on the repo
|
||||
mock_repo.git.fetch.assert_called()
|
||||
mock_repo.git.checkout.assert_called_with(commit_hash)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -101,249 +91,133 @@ async def test_clone_nonexistent_repository(repo_exists_true: AsyncMock) -> None
|
|||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
("status_code", "expected"),
|
||||
("git_command_succeeds", "expected"),
|
||||
[
|
||||
(HTTP_200_OK, True),
|
||||
(HTTP_401_UNAUTHORIZED, False),
|
||||
(HTTP_403_FORBIDDEN, False),
|
||||
(HTTP_404_NOT_FOUND, False),
|
||||
(True, True), # git ls-remote succeeds -> repo exists
|
||||
(False, False), # git ls-remote fails -> repo doesn't exist or no access
|
||||
],
|
||||
)
|
||||
async def test_check_repo_exists(status_code: int, *, expected: bool, mocker: MockerFixture) -> None:
|
||||
"""Verify that ``check_repo_exists`` interprets httpx results correctly."""
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value = mock_client # context-manager protocol
|
||||
mock_client.head.return_value = httpx.Response(status_code=status_code)
|
||||
mocker.patch("httpx.AsyncClient", return_value=mock_client)
|
||||
async def test_check_repo_exists(
|
||||
git_command_succeeds: bool, # noqa: FBT001
|
||||
*,
|
||||
expected: bool,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Verify that ``check_repo_exists`` works by using _resolve_ref_to_sha."""
|
||||
mock_resolve = mocker.patch("gitingest.utils.git_utils._resolve_ref_to_sha")
|
||||
|
||||
if git_command_succeeds:
|
||||
mock_resolve.return_value = "abc123def456" # Mock SHA
|
||||
else:
|
||||
mock_resolve.side_effect = ValueError("Repository not found")
|
||||
|
||||
result = await check_repo_exists(DEMO_URL)
|
||||
|
||||
assert result is expected
|
||||
mock_resolve.assert_called_once_with(DEMO_URL, "HEAD", token=None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_custom_branch(run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning a repository with a specified custom branch.
|
||||
async def test_clone_without_commit(repo_exists_true: AsyncMock, gitpython_mocks: dict) -> None:
|
||||
"""Test cloning a repository when no commit hash is provided.
|
||||
|
||||
Given a valid URL and a branch:
|
||||
Given a valid URL and no commit hash:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned shallowly to that branch.
|
||||
Then the repository should be cloned and checked out at the resolved commit.
|
||||
"""
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 4 # ensure_git_installed + resolve_commit + clone + fetch + checkout
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, branch="feature-branch")
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, commit=None, branch="main")
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=DEMO_COMMIT)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
repo_exists_true.assert_any_call(clone_config.url, token=None)
|
||||
|
||||
# Verify GitPython calls were made
|
||||
mock_git_cmd = gitpython_mocks["git_cmd"]
|
||||
mock_repo = gitpython_mocks["repo"]
|
||||
mock_clone_from = gitpython_mocks["clone_from"]
|
||||
|
||||
# Should have resolved the commit via ls_remote
|
||||
mock_git_cmd.ls_remote.assert_called()
|
||||
# Should have cloned the repo
|
||||
mock_clone_from.assert_called_once()
|
||||
# Should have fetched and checked out
|
||||
mock_repo.git.fetch.assert_called()
|
||||
mock_repo.git.checkout.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_git_command_failure(run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning when the Git command fails during execution.
|
||||
|
||||
Given a valid URL, but ``run_command`` raises a RuntimeError:
|
||||
When ``clone_repo`` is called,
|
||||
Then a RuntimeError should be raised with the correct message.
|
||||
"""
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH)
|
||||
|
||||
run_command_mock.side_effect = RuntimeError("Git is not installed or not accessible. Please install Git first.")
|
||||
|
||||
with pytest.raises(RuntimeError, match="Git is not installed or not accessible"):
|
||||
await clone_repo(clone_config)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_default_shallow_clone(run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning a repository with the default shallow clone options.
|
||||
|
||||
Given a valid URL and no branch or commit:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned with ``--depth=1`` and ``--single-branch``.
|
||||
"""
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 4 # ensure_git_installed + resolve_commit + clone + fetch + checkout
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=DEMO_COMMIT)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_commit(run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning when a commit hash is provided.
|
||||
|
||||
Given a valid URL and a commit hash:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned and checked out at that commit.
|
||||
"""
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 3 # ensure_git_installed + clone + fetch + checkout
|
||||
commit_hash = "a" * 40 # Simulating a valid commit hash
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, commit=commit_hash)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=commit_hash)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_repo_exists_with_redirect(mocker: MockerFixture) -> None:
|
||||
"""Test ``check_repo_exists`` when a redirect (302) is returned.
|
||||
|
||||
Given a URL that responds with "302 Found":
|
||||
When ``check_repo_exists`` is called,
|
||||
Then it should return ``False``, indicating the repo is inaccessible.
|
||||
"""
|
||||
mock_exec = mocker.patch("asyncio.create_subprocess_exec", new_callable=AsyncMock)
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (b"302\n", b"")
|
||||
mock_process.returncode = 0 # Simulate successful request
|
||||
mock_exec.return_value = mock_process
|
||||
|
||||
repo_exists = await check_repo_exists(DEMO_URL)
|
||||
|
||||
assert repo_exists is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_timeout(run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning a repository when a timeout occurs.
|
||||
|
||||
Given a valid URL, but ``run_command`` times out:
|
||||
When ``clone_repo`` is called,
|
||||
Then an ``AsyncTimeoutError`` should be raised to indicate the operation exceeded time limits.
|
||||
"""
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH)
|
||||
|
||||
run_command_mock.side_effect = asyncio.TimeoutError
|
||||
|
||||
with pytest.raises(AsyncTimeoutError, match="Operation timed out after"):
|
||||
await clone_repo(clone_config)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_branch_with_slashes(tmp_path: Path, run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning a branch with slashes in the name.
|
||||
|
||||
Given a valid repository URL and a branch name with slashes:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned and checked out at that branch.
|
||||
"""
|
||||
branch_name = "fix/in-operator"
|
||||
local_path = tmp_path / "gitingest"
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 4 # ensure_git_installed + resolve_commit + clone + fetch + checkout
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=str(local_path), branch=branch_name)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=DEMO_COMMIT)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_creates_parent_directory(tmp_path: Path, run_command_mock: AsyncMock) -> None:
|
||||
async def test_clone_creates_parent_directory(tmp_path: Path, gitpython_mocks: dict) -> None:
|
||||
"""Test that ``clone_repo`` creates parent directories if they don't exist.
|
||||
|
||||
Given a local path with non-existent parent directories:
|
||||
When ``clone_repo`` is called,
|
||||
Then it should create the parent directories before attempting to clone.
|
||||
"""
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 4 # ensure_git_installed + resolve_commit + clone + fetch + checkout
|
||||
nested_path = tmp_path / "deep" / "nested" / "path" / "repo"
|
||||
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=str(nested_path))
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
# Verify parent directories were created
|
||||
assert nested_path.parent.exists()
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=DEMO_COMMIT)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
|
||||
# Verify clone operation happened
|
||||
mock_clone_from = gitpython_mocks["clone_from"]
|
||||
mock_clone_from.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_specific_subpath(run_command_mock: AsyncMock) -> None:
|
||||
async def test_clone_with_specific_subpath(gitpython_mocks: dict) -> None:
|
||||
"""Test cloning a repository with a specific subpath.
|
||||
|
||||
Given a valid repository URL and a specific subpath:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned with sparse checkout enabled and the specified subpath.
|
||||
Then the repository should be cloned with sparse checkout enabled.
|
||||
"""
|
||||
# ensure_git_installed + resolve_commit + clone + sparse-checkout + fetch + checkout
|
||||
subpath = "src/docs"
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 5
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, subpath=subpath)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
# Verify the clone command includes sparse checkout flags
|
||||
assert_partial_clone_calls(run_command_mock, clone_config, commit=DEMO_COMMIT)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
# Verify partial clone (using git.clone instead of Repo.clone_from)
|
||||
mock_git_cmd = gitpython_mocks["git_cmd"]
|
||||
mock_git_cmd.clone.assert_called()
|
||||
|
||||
# Verify sparse checkout was configured
|
||||
mock_repo = gitpython_mocks["repo"]
|
||||
mock_repo.git.sparse_checkout.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_commit_and_subpath(run_command_mock: AsyncMock) -> None:
|
||||
"""Test cloning a repository with both a specific commit and subpath.
|
||||
|
||||
Given a valid repository URL, commit hash, and subpath:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned with sparse checkout enabled,
|
||||
checked out at the specific commit, and only include the specified subpath.
|
||||
"""
|
||||
subpath = "src/docs"
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 4 # ensure_git_installed + clone + sparse-checkout + fetch + checkout
|
||||
commit_hash = "a" * 40 # Simulating a valid commit hash
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, commit=commit_hash, subpath=subpath)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
assert_partial_clone_calls(run_command_mock, clone_config, commit=commit_hash)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_include_submodules(run_command_mock: AsyncMock) -> None:
|
||||
async def test_clone_with_include_submodules(gitpython_mocks: dict) -> None:
|
||||
"""Test cloning a repository with submodules included.
|
||||
|
||||
Given a valid URL and ``include_submodules=True``:
|
||||
When ``clone_repo`` is called,
|
||||
Then the repository should be cloned with ``--recurse-submodules`` in the git command.
|
||||
Then the repository should update submodules after cloning.
|
||||
"""
|
||||
# ensure_git_installed + resolve_commit + clone + fetch + checkout + checkout submodules
|
||||
expected_call_count = GIT_INSTALLED_CALLS + 5
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, branch="main", include_submodules=True)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
assert_standard_calls(run_command_mock, clone_config, commit=DEMO_COMMIT)
|
||||
assert_submodule_calls(run_command_mock, clone_config)
|
||||
assert run_command_mock.call_count == expected_call_count
|
||||
# Verify submodule update was called
|
||||
mock_repo = gitpython_mocks["repo"]
|
||||
mock_repo.git.submodule.assert_called_with("update", "--init", "--recursive", "--depth=1")
|
||||
|
||||
|
||||
def assert_standard_calls(mock: AsyncMock, cfg: CloneConfig, commit: str, *, partial_clone: bool = False) -> None:
|
||||
"""Assert that the standard clone sequence of git commands was called."""
|
||||
mock.assert_any_call("git", "--version")
|
||||
if sys.platform == "win32":
|
||||
mock.assert_any_call("git", "config", "core.longpaths")
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_repo_exists_with_auth_token(mocker: MockerFixture) -> None:
|
||||
"""Test ``check_repo_exists`` with authentication token.
|
||||
|
||||
# Clone
|
||||
clone_cmd = ["git", "clone", "--single-branch", "--no-checkout", "--depth=1"]
|
||||
if partial_clone:
|
||||
clone_cmd += ["--filter=blob:none", "--sparse"]
|
||||
mock.assert_any_call(*clone_cmd, cfg.url, cfg.local_path)
|
||||
Given a GitHub URL and a token:
|
||||
When ``check_repo_exists`` is called,
|
||||
Then it should pass the token to _resolve_ref_to_sha.
|
||||
"""
|
||||
mock_resolve = mocker.patch("gitingest.utils.git_utils._resolve_ref_to_sha")
|
||||
mock_resolve.return_value = "abc123def456" # Mock SHA
|
||||
|
||||
mock.assert_any_call("git", "-C", cfg.local_path, "fetch", "--depth=1", "origin", commit)
|
||||
mock.assert_any_call("git", "-C", cfg.local_path, "checkout", commit)
|
||||
test_token = "token123" # noqa: S105
|
||||
result = await check_repo_exists("https://github.com/test/repo", token=test_token)
|
||||
|
||||
|
||||
def assert_partial_clone_calls(mock: AsyncMock, cfg: CloneConfig, commit: str) -> None:
|
||||
"""Assert that the partial clone sequence of git commands was called."""
|
||||
assert_standard_calls(mock, cfg, commit=commit, partial_clone=True)
|
||||
mock.assert_any_call("git", "-C", cfg.local_path, "sparse-checkout", "set", cfg.subpath)
|
||||
|
||||
|
||||
def assert_submodule_calls(mock: AsyncMock, cfg: CloneConfig) -> None:
|
||||
"""Assert that submodule update commands were called."""
|
||||
mock.assert_any_call("git", "-C", cfg.local_path, "submodule", "update", "--init", "--recursive", "--depth=1")
|
||||
assert result is True
|
||||
mock_resolve.assert_called_once_with("https://github.com/test/repo", "HEAD", token=test_token)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ from typing import TYPE_CHECKING
|
|||
import pytest
|
||||
|
||||
from gitingest.utils.exceptions import InvalidGitHubTokenError
|
||||
from gitingest.utils.git_utils import create_git_auth_header, create_git_command, is_github_host, validate_github_token
|
||||
from gitingest.utils.git_utils import create_git_auth_header, create_git_repo, is_github_host, validate_github_token
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
|
@ -56,50 +56,51 @@ def test_validate_github_token_invalid(token: str) -> None:
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("base_cmd", "local_path", "url", "token", "expected_suffix"),
|
||||
("local_path", "url", "token", "should_configure_auth"),
|
||||
[
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.com/owner/repo.git",
|
||||
None,
|
||||
[], # No auth header expected when token is None
|
||||
False, # No auth configuration expected when token is None
|
||||
),
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.com/owner/repo.git",
|
||||
"ghp_" + "d" * 36,
|
||||
[
|
||||
"-c",
|
||||
create_git_auth_header("ghp_" + "d" * 36),
|
||||
], # Auth header expected for GitHub URL + token
|
||||
True, # Auth configuration expected for GitHub URL + token
|
||||
),
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://gitlab.com/owner/repo.git",
|
||||
"ghp_" + "e" * 36,
|
||||
[], # No auth header for non-GitHub URL even if token provided
|
||||
False, # No auth configuration for non-GitHub URL even if token provided
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_create_git_command(
|
||||
base_cmd: list[str],
|
||||
def test_create_git_repo(
|
||||
local_path: str,
|
||||
url: str,
|
||||
token: str | None,
|
||||
expected_suffix: list[str],
|
||||
should_configure_auth: bool, # noqa: FBT001
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Test that ``create_git_command`` builds the correct command list based on inputs."""
|
||||
cmd = create_git_command(base_cmd, local_path, url, token)
|
||||
"""Test that ``create_git_repo`` creates a proper Git repo object."""
|
||||
# Mock git.Repo to avoid actual filesystem operations
|
||||
mock_repo = mocker.MagicMock()
|
||||
mock_repo_class = mocker.patch("git.Repo", return_value=mock_repo)
|
||||
|
||||
# The command should start with base_cmd and the -C option
|
||||
expected_prefix = [*base_cmd, "-C", local_path]
|
||||
assert cmd[: len(expected_prefix)] == expected_prefix
|
||||
repo = create_git_repo(local_path, url, token)
|
||||
|
||||
# The suffix (anything after prefix) should match expected
|
||||
assert cmd[len(expected_prefix) :] == expected_suffix
|
||||
# Should create repo with correct path
|
||||
mock_repo_class.assert_called_once_with(local_path)
|
||||
assert repo == mock_repo
|
||||
|
||||
# Check auth configuration
|
||||
if should_configure_auth:
|
||||
mock_repo.git.config.assert_called_once()
|
||||
else:
|
||||
mock_repo.git.config.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -125,7 +126,7 @@ def test_create_git_auth_header(token: str) -> None:
|
|||
("https://gitlab.com/foo/bar.git", "ghp_" + "g" * 36, False),
|
||||
],
|
||||
)
|
||||
def test_create_git_command_helper_calls(
|
||||
def test_create_git_repo_helper_calls(
|
||||
mocker: MockerFixture,
|
||||
tmp_path: Path,
|
||||
*,
|
||||
|
|
@ -135,16 +136,18 @@ def test_create_git_command_helper_calls(
|
|||
) -> None:
|
||||
"""Test that ``create_git_auth_header`` is invoked only when appropriate."""
|
||||
work_dir = tmp_path / "repo"
|
||||
header_mock = mocker.patch("gitingest.utils.git_utils.create_git_auth_header", return_value="HEADER")
|
||||
header_mock = mocker.patch("gitingest.utils.git_utils.create_git_auth_header", return_value="key=value")
|
||||
mock_repo = mocker.MagicMock()
|
||||
mocker.patch("git.Repo", return_value=mock_repo)
|
||||
|
||||
cmd = create_git_command(["git", "clone"], str(work_dir), url, token)
|
||||
create_git_repo(str(work_dir), url, token)
|
||||
|
||||
if should_call:
|
||||
header_mock.assert_called_once_with(token, url=url)
|
||||
assert "HEADER" in cmd
|
||||
mock_repo.git.config.assert_called_once_with("key", "value")
|
||||
else:
|
||||
header_mock.assert_not_called()
|
||||
assert "HEADER" not in cmd
|
||||
mock_repo.git.config.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -198,11 +201,10 @@ def test_create_git_auth_header_with_ghe_url(token: str, url: str, expected_host
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("base_cmd", "local_path", "url", "token", "expected_auth_hostname"),
|
||||
("local_path", "url", "token", "expected_auth_hostname"),
|
||||
[
|
||||
# GitHub.com URLs - should use default hostname
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.com/owner/repo.git",
|
||||
"ghp_" + "a" * 36,
|
||||
|
|
@ -210,21 +212,18 @@ def test_create_git_auth_header_with_ghe_url(token: str, url: str, expected_host
|
|||
),
|
||||
# GitHub Enterprise URLs - should use custom hostname
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.company.com/owner/repo.git",
|
||||
"ghp_" + "b" * 36,
|
||||
"github.company.com",
|
||||
),
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.enterprise.org/owner/repo.git",
|
||||
"ghp_" + "c" * 36,
|
||||
"github.enterprise.org",
|
||||
),
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"http://github.internal/owner/repo.git",
|
||||
"ghp_" + "d" * 36,
|
||||
|
|
@ -232,48 +231,47 @@ def test_create_git_auth_header_with_ghe_url(token: str, url: str, expected_host
|
|||
),
|
||||
],
|
||||
)
|
||||
def test_create_git_command_with_ghe_urls(
|
||||
base_cmd: list[str],
|
||||
def test_create_git_repo_with_ghe_urls(
|
||||
local_path: str,
|
||||
url: str,
|
||||
token: str,
|
||||
expected_auth_hostname: str,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Test that ``create_git_command`` handles GitHub Enterprise URLs correctly."""
|
||||
cmd = create_git_command(base_cmd, local_path, url, token)
|
||||
"""Test that ``create_git_repo`` handles GitHub Enterprise URLs correctly."""
|
||||
mock_repo = mocker.MagicMock()
|
||||
mocker.patch("git.Repo", return_value=mock_repo)
|
||||
|
||||
# Should have base command and -C option
|
||||
expected_prefix = [*base_cmd, "-C", local_path]
|
||||
assert cmd[: len(expected_prefix)] == expected_prefix
|
||||
create_git_repo(local_path, url, token)
|
||||
|
||||
# Should have -c and auth header
|
||||
assert "-c" in cmd
|
||||
auth_header_index = cmd.index("-c") + 1
|
||||
auth_header = cmd[auth_header_index]
|
||||
# Should configure auth with the correct hostname
|
||||
mock_repo.git.config.assert_called_once()
|
||||
auth_config_call = mock_repo.git.config.call_args[0]
|
||||
|
||||
# Verify the auth header contains the expected hostname
|
||||
assert f"http.https://{expected_auth_hostname}/" in auth_header
|
||||
assert "Authorization: Basic" in auth_header
|
||||
# The first argument should contain the hostname
|
||||
assert expected_auth_hostname in auth_config_call[0]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("base_cmd", "local_path", "url", "token"),
|
||||
("local_path", "url", "token"),
|
||||
[
|
||||
# Should NOT add auth headers for non-GitHub URLs
|
||||
(["git", "clone"], "/some/path", "https://gitlab.com/owner/repo.git", "ghp_" + "a" * 36),
|
||||
(["git", "clone"], "/some/path", "https://bitbucket.org/owner/repo.git", "ghp_" + "b" * 36),
|
||||
(["git", "clone"], "/some/path", "https://git.example.com/owner/repo.git", "ghp_" + "c" * 36),
|
||||
# Should NOT configure auth for non-GitHub URLs
|
||||
("/some/path", "https://gitlab.com/owner/repo.git", "ghp_" + "a" * 36),
|
||||
("/some/path", "https://bitbucket.org/owner/repo.git", "ghp_" + "b" * 36),
|
||||
("/some/path", "https://git.example.com/owner/repo.git", "ghp_" + "c" * 36),
|
||||
],
|
||||
)
|
||||
def test_create_git_command_ignores_non_github_urls(
|
||||
base_cmd: list[str],
|
||||
def test_create_git_repo_ignores_non_github_urls(
|
||||
local_path: str,
|
||||
url: str,
|
||||
token: str,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Test that ``create_git_command`` does not add auth headers for non-GitHub URLs."""
|
||||
cmd = create_git_command(base_cmd, local_path, url, token)
|
||||
"""Test that ``create_git_repo`` does not configure auth for non-GitHub URLs."""
|
||||
mock_repo = mocker.MagicMock()
|
||||
mocker.patch("git.Repo", return_value=mock_repo)
|
||||
|
||||
# Should only have base command and -C option, no auth headers
|
||||
expected = [*base_cmd, "-C", local_path]
|
||||
assert cmd == expected
|
||||
create_git_repo(local_path, url, token)
|
||||
|
||||
# Should not configure auth for non-GitHub URLs
|
||||
mock_repo.git.config.assert_not_called()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue