mirror of
https://github.com/cyclotruc/gitingest.git
synced 2026-04-28 08:09:31 +00:00
test: add pytest-mock, introduce fixtures & type hints (#290)
* Added pytest-mock to dev dependencies and pre-commit hooks * Introduced InvalidGitHubTokenError for clearer token-validation failures * Refactored tests: * Replaced ad-hoc mocks with reusable fixtures * Parametrised URL/branch matrices to cut duplication * Added type hints throughout * New coverage: * validate_github_token (happy & error paths) * create_git_command / create_git_auth_header
This commit is contained in:
parent
3869aa32e3
commit
95009bdf15
10 changed files with 578 additions and 461 deletions
|
|
@ -99,6 +99,7 @@ repos:
|
|||
"fastapi[standard]>=0.109.1",
|
||||
pydantic,
|
||||
pytest-asyncio,
|
||||
pytest-mock,
|
||||
python-dotenv,
|
||||
slowapi,
|
||||
starlette>=0.40.0,
|
||||
|
|
@ -117,6 +118,7 @@ repos:
|
|||
"fastapi[standard]>=0.109.1",
|
||||
pydantic,
|
||||
pytest-asyncio,
|
||||
pytest-mock,
|
||||
python-dotenv,
|
||||
slowapi,
|
||||
starlette>=0.40.0,
|
||||
|
|
|
|||
|
|
@ -5,3 +5,4 @@ pre-commit
|
|||
pylint
|
||||
pytest
|
||||
pytest-asyncio
|
||||
pytest-mock
|
||||
|
|
|
|||
|
|
@ -35,3 +35,13 @@ class InvalidNotebookError(Exception):
|
|||
|
||||
def __init__(self, message: str) -> None:
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class InvalidGitHubTokenError(ValueError):
|
||||
"""Exception raised when a GitHub Personal Access Token is malformed."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
"Invalid GitHub token format. Token should start with 'github_pat_' or 'ghp_' "
|
||||
"followed by at least 36 characters of letters, numbers, and underscores."
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@ import base64
|
|||
import re
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from gitingest.utils.exceptions import InvalidGitHubTokenError
|
||||
|
||||
GITHUB_PAT_PATTERN = r"^(?:github_pat_|ghp_)[A-Za-z0-9_]{36,}$"
|
||||
|
||||
|
||||
|
|
@ -256,11 +258,8 @@ def validate_github_token(token: str) -> None:
|
|||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
InvalidGitHubTokenError
|
||||
If the token format is invalid
|
||||
"""
|
||||
if not re.match(GITHUB_PAT_PATTERN, token):
|
||||
raise ValueError(
|
||||
"Invalid GitHub token format. Token should start with 'github_pat_' or 'ghp_' "
|
||||
"followed by at least 36 characters of letters, numbers, and underscores."
|
||||
)
|
||||
raise InvalidGitHubTokenError()
|
||||
|
|
|
|||
|
|
@ -7,14 +7,19 @@ to write `.ipynb` notebooks for testing notebook utilities.
|
|||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict
|
||||
from typing import Any, Callable, Dict, List
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from gitingest.query_parsing import IngestionQuery
|
||||
|
||||
WriteNotebookFunc = Callable[[str, Dict[str, Any]], Path]
|
||||
|
||||
DEMO_URL = "https://github.com/user/repo"
|
||||
LOCAL_REPO_PATH = "/tmp/repo"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_query() -> IngestionQuery:
|
||||
|
|
@ -129,3 +134,51 @@ def write_notebook(tmp_path: Path) -> WriteNotebookFunc:
|
|||
return notebook_path
|
||||
|
||||
return _write_notebook
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stub_branches(mocker: MockerFixture) -> Callable[[List[str]], None]:
|
||||
"""Return a function that stubs git branch discovery to *branches*."""
|
||||
|
||||
def _factory(branches: List[str]) -> None:
|
||||
mocker.patch(
|
||||
"gitingest.utils.git_utils.run_command",
|
||||
new_callable=AsyncMock,
|
||||
return_value=("\n".join(f"refs/heads/{b}" for b in branches).encode() + b"\n", b""),
|
||||
)
|
||||
mocker.patch(
|
||||
"gitingest.utils.git_utils.fetch_remote_branch_list",
|
||||
new_callable=AsyncMock,
|
||||
return_value=branches,
|
||||
)
|
||||
|
||||
return _factory
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def repo_exists_true(mocker: MockerFixture) -> AsyncMock:
|
||||
"""Patch `gitingest.cloning.check_repo_exists` to always return ``True``.
|
||||
|
||||
Many cloning-related tests assume that the remote repository exists. This fixture centralises
|
||||
that behaviour so individual tests no longer need to repeat the same ``mocker.patch`` call.
|
||||
The mock object is returned so that tests can make assertions on how it was used or override
|
||||
its behaviour when needed.
|
||||
"""
|
||||
return mocker.patch("gitingest.cloning.check_repo_exists", return_value=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run_command_mock(mocker: MockerFixture) -> AsyncMock:
|
||||
"""Patch `gitingest.cloning.run_command` with an ``AsyncMock``.
|
||||
|
||||
The mocked function returns a dummy process whose ``communicate`` method yields generic
|
||||
*stdout* / *stderr* bytes. Tests can still access / tweak the mock via the fixture argument.
|
||||
"""
|
||||
mock_exec = mocker.patch("gitingest.cloning.run_command", new_callable=AsyncMock)
|
||||
|
||||
# Provide a default dummy process so most tests don't have to create one.
|
||||
dummy_process = AsyncMock()
|
||||
dummy_process.communicate.return_value = (b"output", b"error")
|
||||
mock_exec.return_value = dummy_process
|
||||
|
||||
return mock_exec
|
||||
|
|
|
|||
|
|
@ -5,91 +5,60 @@ These tests confirm that `parse_query` correctly identifies user/repo pairs and
|
|||
Bitbucket, Gitea, and Codeberg, even if the host is omitted.
|
||||
"""
|
||||
|
||||
from typing import List
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from gitingest.query_parsing import parse_query
|
||||
|
||||
# Repository matrix: (host, user, repo)
|
||||
_REPOS: List[Tuple[str, str, str]] = [
|
||||
("github.com", "tiangolo", "fastapi"),
|
||||
("gitlab.com", "gitlab-org", "gitlab-runner"),
|
||||
("bitbucket.org", "na-dna", "llm-knowledge-share"),
|
||||
("gitea.com", "xorm", "xorm"),
|
||||
("codeberg.org", "forgejo", "forgejo"),
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"urls, expected_user, expected_repo, expected_url",
|
||||
[
|
||||
(
|
||||
[
|
||||
"https://github.com/tiangolo/fastapi",
|
||||
"github.com/tiangolo/fastapi",
|
||||
"tiangolo/fastapi",
|
||||
],
|
||||
"tiangolo",
|
||||
"fastapi",
|
||||
"https://github.com/tiangolo/fastapi",
|
||||
),
|
||||
(
|
||||
[
|
||||
"https://gitlab.com/gitlab-org/gitlab-runner",
|
||||
"gitlab.com/gitlab-org/gitlab-runner",
|
||||
"gitlab-org/gitlab-runner",
|
||||
],
|
||||
"gitlab-org",
|
||||
"gitlab-runner",
|
||||
"https://gitlab.com/gitlab-org/gitlab-runner",
|
||||
),
|
||||
(
|
||||
[
|
||||
"https://bitbucket.org/na-dna/llm-knowledge-share",
|
||||
"bitbucket.org/na-dna/llm-knowledge-share",
|
||||
"na-dna/llm-knowledge-share",
|
||||
],
|
||||
"na-dna",
|
||||
"llm-knowledge-share",
|
||||
"https://bitbucket.org/na-dna/llm-knowledge-share",
|
||||
),
|
||||
(
|
||||
[
|
||||
"https://gitea.com/xorm/xorm",
|
||||
"gitea.com/xorm/xorm",
|
||||
"xorm/xorm",
|
||||
],
|
||||
"xorm",
|
||||
"xorm",
|
||||
"https://gitea.com/xorm/xorm",
|
||||
),
|
||||
(
|
||||
[
|
||||
"https://codeberg.org/forgejo/forgejo",
|
||||
"codeberg.org/forgejo/forgejo",
|
||||
"forgejo/forgejo",
|
||||
],
|
||||
"forgejo",
|
||||
"forgejo",
|
||||
"https://codeberg.org/forgejo/forgejo",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
# Generate cartesian product of repository tuples with URL variants.
|
||||
@pytest.mark.parametrize("host, user, repo", _REPOS, ids=[f"{h}:{u}/{r}" for h, u, r in _REPOS])
|
||||
@pytest.mark.parametrize("variant", ["full", "noscheme", "slug"])
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_query_without_host(
|
||||
urls: List[str],
|
||||
expected_user: str,
|
||||
expected_repo: str,
|
||||
expected_url: str,
|
||||
host: str,
|
||||
user: str,
|
||||
repo: str,
|
||||
variant: str,
|
||||
) -> None:
|
||||
"""
|
||||
Test `parse_query` for Git host agnosticism.
|
||||
"""Verify that `parse_query` handles URLs, host-omitted URLs and raw slugs."""
|
||||
|
||||
Given multiple URL variations for the same user/repo on different Git hosts (with or without host names):
|
||||
When `parse_query` is called with each variation,
|
||||
Then the parser should correctly identify the user, repo, canonical URL, and other default fields.
|
||||
"""
|
||||
for url in urls:
|
||||
query = await parse_query(url, max_file_size=50, from_web=True)
|
||||
# Build the input URL based on the selected variant
|
||||
if variant == "full":
|
||||
url = f"https://{host}/{user}/{repo}"
|
||||
elif variant == "noscheme":
|
||||
url = f"{host}/{user}/{repo}"
|
||||
else: # "slug"
|
||||
url = f"{user}/{repo}"
|
||||
|
||||
assert query.user_name == expected_user
|
||||
assert query.repo_name == expected_repo
|
||||
assert query.url == expected_url
|
||||
assert query.slug == f"{expected_user}-{expected_repo}"
|
||||
assert query.id is not None
|
||||
assert query.subpath == "/"
|
||||
assert query.branch is None
|
||||
assert query.commit is None
|
||||
assert query.type is None
|
||||
expected_url = f"https://{host}/{user}/{repo}"
|
||||
|
||||
query = await parse_query(url, max_file_size=50, from_web=True)
|
||||
|
||||
# Compare against the canonical dict while ignoring unpredictable fields.
|
||||
actual = query.model_dump(exclude={"id", "local_path", "ignore_patterns"})
|
||||
|
||||
expected = {
|
||||
"user_name": user,
|
||||
"repo_name": repo,
|
||||
"url": expected_url,
|
||||
"slug": f"{user}-{repo}",
|
||||
"subpath": "/",
|
||||
"type": None,
|
||||
"branch": None,
|
||||
"commit": None,
|
||||
"max_file_size": 50,
|
||||
"include_patterns": None,
|
||||
}
|
||||
|
||||
assert actual == expected
|
||||
|
|
|
|||
|
|
@ -6,62 +6,43 @@ paths.
|
|||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from typing import Callable, List, Optional
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from gitingest.query_parsing import _parse_patterns, _parse_remote_repo, parse_query
|
||||
from gitingest.schemas.ingestion_schema import IngestionQuery
|
||||
from gitingest.utils.ignore_patterns import DEFAULT_IGNORE_PATTERNS
|
||||
from tests.conftest import DEMO_URL
|
||||
|
||||
URLS_HTTPS: List[str] = [
|
||||
DEMO_URL,
|
||||
"https://gitlab.com/user/repo",
|
||||
"https://bitbucket.org/user/repo",
|
||||
"https://gitea.com/user/repo",
|
||||
"https://codeberg.org/user/repo",
|
||||
"https://gist.github.com/user/repo",
|
||||
]
|
||||
|
||||
URLS_HTTP: List[str] = [url.replace("https://", "http://") for url in URLS_HTTPS]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url", URLS_HTTPS, ids=lambda u: u)
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_url_valid_https() -> None:
|
||||
"""
|
||||
Test `_parse_remote_repo` with valid HTTPS URLs.
|
||||
async def test_parse_url_valid_https(url: str) -> None:
|
||||
"""Valid HTTPS URLs parse correctly and `query.url` equals the input."""
|
||||
query = await _assert_basic_repo_fields(url)
|
||||
|
||||
Given various HTTPS URLs on supported platforms:
|
||||
When `_parse_remote_repo` is called,
|
||||
Then user name, repo name, and the URL should be extracted correctly.
|
||||
"""
|
||||
test_cases = [
|
||||
"https://github.com/user/repo",
|
||||
"https://gitlab.com/user/repo",
|
||||
"https://bitbucket.org/user/repo",
|
||||
"https://gitea.com/user/repo",
|
||||
"https://codeberg.org/user/repo",
|
||||
"https://gist.github.com/user/repo",
|
||||
]
|
||||
for url in test_cases:
|
||||
query = await _parse_remote_repo(url)
|
||||
|
||||
assert query.user_name == "user"
|
||||
assert query.repo_name == "repo"
|
||||
assert query.url == url
|
||||
assert query.url == url # HTTPS: canonical URL should equal input
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url", URLS_HTTP, ids=lambda u: u)
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_url_valid_http() -> None:
|
||||
"""
|
||||
Test `_parse_remote_repo` with valid HTTP URLs.
|
||||
|
||||
Given various HTTP URLs on supported platforms:
|
||||
When `_parse_remote_repo` is called,
|
||||
Then user name, repo name, and the slug should be extracted correctly.
|
||||
"""
|
||||
test_cases = [
|
||||
"http://github.com/user/repo",
|
||||
"http://gitlab.com/user/repo",
|
||||
"http://bitbucket.org/user/repo",
|
||||
"http://gitea.com/user/repo",
|
||||
"http://codeberg.org/user/repo",
|
||||
"http://gist.github.com/user/repo",
|
||||
]
|
||||
for url in test_cases:
|
||||
query = await _parse_remote_repo(url)
|
||||
|
||||
assert query.user_name == "user"
|
||||
assert query.repo_name == "repo"
|
||||
assert query.slug == "user-repo"
|
||||
async def test_parse_url_valid_http(url: str) -> None:
|
||||
"""Valid HTTP URLs parse correctly (slug check only)."""
|
||||
await _assert_basic_repo_fields(url)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -74,13 +55,14 @@ async def test_parse_url_invalid() -> None:
|
|||
Then a ValueError should be raised indicating an invalid repository URL.
|
||||
"""
|
||||
url = "https://github.com"
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid repository URL"):
|
||||
await _parse_remote_repo(url)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("url", ["https://github.com/user/repo", "https://gitlab.com/user/repo"])
|
||||
async def test_parse_query_basic(url):
|
||||
@pytest.mark.parametrize("url", [DEMO_URL, "https://gitlab.com/user/repo"])
|
||||
async def test_parse_query_basic(url: str) -> None:
|
||||
"""
|
||||
Test `parse_query` with a basic valid repository URL.
|
||||
|
||||
|
|
@ -122,8 +104,7 @@ async def test_parse_query_include_pattern() -> None:
|
|||
When `parse_query` is called,
|
||||
Then the include pattern should be set, and default ignore patterns remain applied.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
query = await parse_query(url, max_file_size=50, from_web=True, include_patterns="*.py")
|
||||
query = await parse_query(DEMO_URL, max_file_size=50, from_web=True, include_patterns="*.py")
|
||||
|
||||
assert query.include_patterns == {"*.py"}
|
||||
assert query.ignore_patterns == DEFAULT_IGNORE_PATTERNS
|
||||
|
|
@ -138,13 +119,12 @@ async def test_parse_query_invalid_pattern() -> None:
|
|||
When `parse_query` is called,
|
||||
Then a ValueError should be raised indicating invalid characters.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
with pytest.raises(ValueError, match="Pattern.*contains invalid characters"):
|
||||
await parse_query(url, max_file_size=50, from_web=True, include_patterns="*.py;rm -rf")
|
||||
await parse_query(DEMO_URL, max_file_size=50, from_web=True, include_patterns="*.py;rm -rf")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_url_with_subpaths() -> None:
|
||||
async def test_parse_url_with_subpaths(stub_branches: Callable[[List[str]], None]) -> None:
|
||||
"""
|
||||
Test `_parse_remote_repo` with a URL containing branch and subpath.
|
||||
|
||||
|
|
@ -152,19 +132,16 @@ async def test_parse_url_with_subpaths() -> None:
|
|||
When `_parse_remote_repo` is called with remote branch fetching,
|
||||
Then user, repo, branch, and subpath should be identified correctly.
|
||||
"""
|
||||
url = "https://github.com/user/repo/tree/main/subdir/file"
|
||||
with patch("gitingest.utils.git_utils.run_command", new_callable=AsyncMock) as mock_run_command:
|
||||
mock_run_command.return_value = (b"refs/heads/main\nrefs/heads/dev\nrefs/heads/feature-branch\n", b"")
|
||||
with patch(
|
||||
"gitingest.utils.git_utils.fetch_remote_branch_list", new_callable=AsyncMock
|
||||
) as mock_fetch_branches:
|
||||
mock_fetch_branches.return_value = ["main", "dev", "feature-branch"]
|
||||
query = await _parse_remote_repo(url)
|
||||
url = DEMO_URL + "/tree/main/subdir/file"
|
||||
|
||||
assert query.user_name == "user"
|
||||
assert query.repo_name == "repo"
|
||||
assert query.branch == "main"
|
||||
assert query.subpath == "/subdir/file"
|
||||
stub_branches(["main", "dev", "feature-branch"])
|
||||
|
||||
query = await _assert_basic_repo_fields(url)
|
||||
|
||||
assert query.user_name == "user"
|
||||
assert query.repo_name == "repo"
|
||||
assert query.branch == "main"
|
||||
assert query.subpath == "/subdir/file"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -177,6 +154,7 @@ async def test_parse_url_invalid_repo_structure() -> None:
|
|||
Then a ValueError should be raised indicating an invalid repository URL.
|
||||
"""
|
||||
url = "https://github.com/user"
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid repository URL"):
|
||||
await _parse_remote_repo(url)
|
||||
|
||||
|
|
@ -204,6 +182,7 @@ def test_parse_patterns_invalid_characters() -> None:
|
|||
Then a ValueError should be raised indicating invalid pattern syntax.
|
||||
"""
|
||||
patterns = "*.py;rm -rf"
|
||||
|
||||
with pytest.raises(ValueError, match="Pattern.*contains invalid characters"):
|
||||
_parse_patterns(patterns)
|
||||
|
||||
|
|
@ -217,8 +196,7 @@ async def test_parse_query_with_large_file_size() -> None:
|
|||
When `parse_query` is called,
|
||||
Then `max_file_size` should be set correctly and default ignore patterns remain unchanged.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
query = await parse_query(url, max_file_size=10**9, from_web=True)
|
||||
query = await parse_query(DEMO_URL, max_file_size=10**9, from_web=True)
|
||||
|
||||
assert query.max_file_size == 10**9
|
||||
assert query.ignore_patterns == DEFAULT_IGNORE_PATTERNS
|
||||
|
|
@ -233,8 +211,7 @@ async def test_parse_query_empty_patterns() -> None:
|
|||
When `parse_query` is called,
|
||||
Then include_patterns becomes None and default ignore patterns apply.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
query = await parse_query(url, max_file_size=50, from_web=True, include_patterns="", ignore_patterns="")
|
||||
query = await parse_query(DEMO_URL, max_file_size=50, from_web=True, include_patterns="", ignore_patterns="")
|
||||
|
||||
assert query.include_patterns is None
|
||||
assert query.ignore_patterns == DEFAULT_IGNORE_PATTERNS
|
||||
|
|
@ -249,9 +226,8 @@ async def test_parse_query_include_and_ignore_overlap() -> None:
|
|||
When `parse_query` is called,
|
||||
Then "*.py" should be removed from ignore patterns.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
query = await parse_query(
|
||||
url,
|
||||
DEMO_URL,
|
||||
max_file_size=50,
|
||||
from_web=True,
|
||||
include_patterns="*.py",
|
||||
|
|
@ -308,23 +284,26 @@ async def test_parse_query_empty_source() -> None:
|
|||
When `parse_query` is called,
|
||||
Then a ValueError should be raised indicating an invalid repository URL.
|
||||
"""
|
||||
url = ""
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid repository URL"):
|
||||
await parse_query("", max_file_size=100, from_web=True)
|
||||
await parse_query(url, max_file_size=100, from_web=True)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"url, expected_branch, expected_commit",
|
||||
"path, expected_branch, expected_commit",
|
||||
[
|
||||
("https://github.com/user/repo/tree/main", "main", None),
|
||||
(
|
||||
"https://github.com/user/repo/tree/abcd1234abcd1234abcd1234abcd1234abcd1234",
|
||||
None,
|
||||
"abcd1234abcd1234abcd1234abcd1234abcd1234",
|
||||
),
|
||||
("/tree/main", "main", None),
|
||||
("/tree/abcd1234abcd1234abcd1234abcd1234abcd1234", None, "abcd1234abcd1234abcd1234abcd1234abcd1234"),
|
||||
],
|
||||
)
|
||||
async def test_parse_url_branch_and_commit_distinction(url: str, expected_branch: str, expected_commit: str) -> None:
|
||||
async def test_parse_url_branch_and_commit_distinction(
|
||||
path: str,
|
||||
expected_branch: str,
|
||||
expected_commit: str,
|
||||
stub_branches: Callable[[List[str]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test `_parse_remote_repo` distinguishing branch vs. commit hash.
|
||||
|
||||
|
|
@ -332,19 +311,13 @@ async def test_parse_url_branch_and_commit_distinction(url: str, expected_branch
|
|||
When `_parse_remote_repo` is called with branch fetching,
|
||||
Then the function should correctly set `branch` or `commit` based on the URL content.
|
||||
"""
|
||||
with patch("gitingest.utils.git_utils.run_command", new_callable=AsyncMock) as mock_run_command:
|
||||
# Mocking the return value to include 'main' and some additional branches
|
||||
mock_run_command.return_value = (b"refs/heads/main\nrefs/heads/dev\nrefs/heads/feature-branch\n", b"")
|
||||
with patch(
|
||||
"gitingest.utils.git_utils.fetch_remote_branch_list", new_callable=AsyncMock
|
||||
) as mock_fetch_branches:
|
||||
mock_fetch_branches.return_value = ["main", "dev", "feature-branch"]
|
||||
stub_branches(["main", "dev", "feature-branch"])
|
||||
|
||||
query = await _parse_remote_repo(url)
|
||||
url = DEMO_URL + path
|
||||
query = await _assert_basic_repo_fields(url)
|
||||
|
||||
# Verify that `branch` and `commit` match our expectations
|
||||
assert query.branch == expected_branch
|
||||
assert query.commit == expected_commit
|
||||
assert query.branch == expected_branch
|
||||
assert query.commit == expected_commit
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -372,12 +345,12 @@ async def test_parse_url_with_query_and_fragment() -> None:
|
|||
When `_parse_remote_repo` is called,
|
||||
Then those parts should be stripped, leaving a clean user/repo URL.
|
||||
"""
|
||||
url = "https://github.com/user/repo?arg=value#fragment"
|
||||
url = DEMO_URL + "?arg=value#fragment"
|
||||
query = await _parse_remote_repo(url)
|
||||
|
||||
assert query.user_name == "user"
|
||||
assert query.repo_name == "repo"
|
||||
assert query.url == "https://github.com/user/repo" # URL should be cleaned
|
||||
assert query.url == DEMO_URL # URL should be cleaned
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -390,6 +363,7 @@ async def test_parse_url_unsupported_host() -> None:
|
|||
Then a ValueError should be raised for the unknown domain.
|
||||
"""
|
||||
url = "https://only-domain.com"
|
||||
|
||||
with pytest.raises(ValueError, match="Unknown domain 'only-domain.com' in URL"):
|
||||
await _parse_remote_repo(url)
|
||||
|
||||
|
|
@ -419,14 +393,19 @@ async def test_parse_query_with_branch() -> None:
|
|||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"url, expected_branch, expected_subpath",
|
||||
"path, expected_branch, expected_subpath",
|
||||
[
|
||||
("https://github.com/user/repo/tree/main/src", "main", "/src"),
|
||||
("https://github.com/user/repo/tree/fix1", "fix1", "/"),
|
||||
("https://github.com/user/repo/tree/nonexistent-branch/src", "nonexistent-branch", "/src"),
|
||||
("/tree/main/src", "main", "/src"),
|
||||
("/tree/fix1", "fix1", "/"),
|
||||
("/tree/nonexistent-branch/src", "nonexistent-branch", "/src"),
|
||||
],
|
||||
)
|
||||
async def test_parse_repo_source_with_failed_git_command(url, expected_branch, expected_subpath):
|
||||
async def test_parse_repo_source_with_failed_git_command(
|
||||
path: str,
|
||||
expected_branch: str,
|
||||
expected_subpath: str,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""
|
||||
Test `_parse_remote_repo` when git fetch fails.
|
||||
|
||||
|
|
@ -434,52 +413,62 @@ async def test_parse_repo_source_with_failed_git_command(url, expected_branch, e
|
|||
When `_parse_remote_repo` is called,
|
||||
Then it should fall back to path components for branch identification.
|
||||
"""
|
||||
with patch("gitingest.utils.git_utils.fetch_remote_branch_list", new_callable=AsyncMock) as mock_fetch_branches:
|
||||
mock_fetch_branches.side_effect = Exception("Failed to fetch branch list")
|
||||
url = DEMO_URL + path
|
||||
|
||||
with pytest.warns(
|
||||
RuntimeWarning,
|
||||
match="Warning: Failed to fetch branch list: Command failed: "
|
||||
"git ls-remote --heads https://github.com/user/repo",
|
||||
):
|
||||
mock_fetch_branches = mocker.patch("gitingest.utils.git_utils.fetch_remote_branch_list", new_callable=AsyncMock)
|
||||
mock_fetch_branches.side_effect = Exception("Failed to fetch branch list")
|
||||
|
||||
query = await _parse_remote_repo(url)
|
||||
with pytest.warns(
|
||||
RuntimeWarning,
|
||||
match="Warning: Failed to fetch branch list: Command failed: "
|
||||
"git ls-remote --heads https://github.com/user/repo",
|
||||
):
|
||||
query = await _parse_remote_repo(url)
|
||||
|
||||
assert query.branch == expected_branch
|
||||
assert query.subpath == expected_subpath
|
||||
assert query.branch == expected_branch
|
||||
assert query.subpath == expected_subpath
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"url, expected_branch, expected_subpath",
|
||||
("path", "expected_branch", "expected_subpath"),
|
||||
[
|
||||
("https://github.com/user/repo/tree/feature/fix1/src", "feature/fix1", "/src"),
|
||||
("https://github.com/user/repo/tree/main/src", "main", "/src"),
|
||||
("https://github.com/user/repo", None, "/"), # No
|
||||
("https://github.com/user/repo/tree/nonexistent-branch/src", None, "/"), # Non-existent branch
|
||||
("https://github.com/user/repo/tree/fix", "fix", "/"),
|
||||
("https://github.com/user/repo/blob/fix/page.html", "fix", "/page.html"),
|
||||
("/tree/feature/fix1/src", "feature/fix1", "/src"),
|
||||
("/tree/main/src", "main", "/src"),
|
||||
("", None, "/"),
|
||||
("/tree/nonexistent-branch/src", None, "/"),
|
||||
("/tree/fix", "fix", "/"),
|
||||
("/blob/fix/page.html", "fix", "/page.html"),
|
||||
],
|
||||
)
|
||||
async def test_parse_repo_source_with_various_url_patterns(url, expected_branch, expected_subpath):
|
||||
async def test_parse_repo_source_with_various_url_patterns(
|
||||
path: str,
|
||||
expected_branch: Optional[str],
|
||||
expected_subpath: str,
|
||||
stub_branches: Callable[[List[str]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test `_parse_remote_repo` with various URL patterns.
|
||||
`_parse_remote_repo` should detect (or reject) a branch and resolve the
|
||||
sub-path for various GitHub-style URL permutations.
|
||||
|
||||
Given multiple branch/blob patterns (including nonexistent branches):
|
||||
When `_parse_remote_repo` is called with remote branch fetching,
|
||||
Then the correct branch/subpath should be set or None if unmatched.
|
||||
Branch discovery is stubbed so that only names passed to `stub_branches` are considered "remote".
|
||||
"""
|
||||
with patch("gitingest.utils.git_utils.run_command", new_callable=AsyncMock) as mock_run_command:
|
||||
with patch(
|
||||
"gitingest.utils.git_utils.fetch_remote_branch_list", new_callable=AsyncMock
|
||||
) as mock_fetch_branches:
|
||||
mock_run_command.return_value = (
|
||||
b"refs/heads/feature/fix1\nrefs/heads/main\nrefs/heads/feature-branch\nrefs/heads/fix\n",
|
||||
b"",
|
||||
)
|
||||
mock_fetch_branches.return_value = ["feature/fix1", "main", "feature-branch"]
|
||||
stub_branches(["feature/fix1", "main", "feature-branch", "fix"])
|
||||
|
||||
query = await _parse_remote_repo(url)
|
||||
url = DEMO_URL + path
|
||||
query = await _assert_basic_repo_fields(url)
|
||||
|
||||
assert query.branch == expected_branch
|
||||
assert query.subpath == expected_subpath
|
||||
assert query.branch == expected_branch
|
||||
assert query.subpath == expected_subpath
|
||||
|
||||
|
||||
async def _assert_basic_repo_fields(url: str) -> IngestionQuery:
|
||||
"""Run _parse_remote_repo and assert user, repo and slug are parsed."""
|
||||
|
||||
query = await _parse_remote_repo(url)
|
||||
|
||||
assert query.user_name == "user"
|
||||
assert query.repo_name == "repo"
|
||||
assert query.slug == "user-repo"
|
||||
|
||||
return query
|
||||
|
|
|
|||
|
|
@ -3,10 +3,12 @@
|
|||
import shutil
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
from typing import Generator
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from pytest import FixtureRequest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from src.server.main import app
|
||||
|
||||
|
|
@ -15,30 +17,33 @@ TEMPLATE_DIR = BASE_DIR / "src" / "templates"
|
|||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_client():
|
||||
def test_client() -> Generator[TestClient, None, None]:
|
||||
"""Create a test client fixture."""
|
||||
with TestClient(app) as client_instance:
|
||||
client_instance.headers.update({"Host": "localhost"})
|
||||
yield client_instance
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def mock_static_files():
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_static_files(mocker: MockerFixture) -> Generator[None, None, None]:
|
||||
"""Mock the static file mount to avoid directory errors."""
|
||||
with patch("src.server.main.StaticFiles") as mock_static:
|
||||
mock_static.return_value = None # Mocks the StaticFiles response
|
||||
yield mock_static
|
||||
mock_static = mocker.patch("src.server.main.StaticFiles", autospec=True)
|
||||
mock_static.return_value = None
|
||||
yield mock_static
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_templates(mocker: MockerFixture) -> Generator[None, None, None]:
|
||||
"""Mock Jinja2 template rendering to bypass actual file loading."""
|
||||
mock_template = mocker.patch("starlette.templating.Jinja2Templates.TemplateResponse", autospec=True)
|
||||
mock_template.return_value = "Mocked Template Response"
|
||||
yield mock_template
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def mock_templates():
|
||||
"""Mock Jinja2 template rendering to bypass actual file loading."""
|
||||
with patch("starlette.templating.Jinja2Templates.TemplateResponse") as mock_template:
|
||||
mock_template.return_value = "Mocked Template Response"
|
||||
yield mock_template
|
||||
|
||||
|
||||
def cleanup_temp_directories():
|
||||
def cleanup_tmp_dir() -> Generator[None, None, None]:
|
||||
"""Remove /tmp/gitingest after this test-module is done."""
|
||||
yield # run tests
|
||||
temp_dir = Path("/tmp/gitingest")
|
||||
if temp_dir.exists():
|
||||
try:
|
||||
|
|
@ -47,15 +52,8 @@ def cleanup_temp_directories():
|
|||
print(f"Error cleaning up {temp_dir}: {exc}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def cleanup():
|
||||
"""Cleanup temporary directories after tests."""
|
||||
yield
|
||||
cleanup_temp_directories()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_remote_repository_analysis(request):
|
||||
async def test_remote_repository_analysis(request: FixtureRequest) -> None:
|
||||
"""Test the complete flow of analyzing a remote repository."""
|
||||
client = request.getfixturevalue("test_client")
|
||||
form_data = {
|
||||
|
|
@ -72,7 +70,7 @@ async def test_remote_repository_analysis(request):
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_repository_url(request):
|
||||
async def test_invalid_repository_url(request: FixtureRequest) -> None:
|
||||
"""Test handling of an invalid repository URL."""
|
||||
client = request.getfixturevalue("test_client")
|
||||
form_data = {
|
||||
|
|
@ -89,7 +87,7 @@ async def test_invalid_repository_url(request):
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_large_repository(request):
|
||||
async def test_large_repository(request: FixtureRequest) -> None:
|
||||
"""Simulate analysis of a large repository with nested folders."""
|
||||
client = request.getfixturevalue("test_client")
|
||||
form_data = {
|
||||
|
|
@ -106,7 +104,7 @@ async def test_large_repository(request):
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_requests(request):
|
||||
async def test_concurrent_requests(request: FixtureRequest) -> None:
|
||||
"""Test handling of multiple concurrent requests."""
|
||||
client = request.getfixturevalue("test_client")
|
||||
|
||||
|
|
@ -129,7 +127,7 @@ async def test_concurrent_requests(request):
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_large_file_handling(request):
|
||||
async def test_large_file_handling(request: FixtureRequest) -> None:
|
||||
"""Test handling of repositories with large files."""
|
||||
client = request.getfixturevalue("test_client")
|
||||
form_data = {
|
||||
|
|
@ -146,7 +144,7 @@ async def test_large_file_handling(request):
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_repository_with_patterns(request):
|
||||
async def test_repository_with_patterns(request: FixtureRequest) -> None:
|
||||
"""Test repository analysis with include/exclude patterns."""
|
||||
client = request.getfixturevalue("test_client")
|
||||
form_data = {
|
||||
|
|
|
|||
142
tests/test_git_utils.py
Normal file
142
tests/test_git_utils.py
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
"""
|
||||
Tests for the `git_utils` module.
|
||||
|
||||
These tests validate the `validate_github_token` function, which ensures that
|
||||
GitHub personal access tokens (PATs) are properly formatted.
|
||||
"""
|
||||
|
||||
import base64
|
||||
|
||||
import pytest
|
||||
|
||||
from gitingest.utils.exceptions import InvalidGitHubTokenError
|
||||
from gitingest.utils.git_utils import (
|
||||
create_git_auth_header,
|
||||
create_git_command,
|
||||
validate_github_token,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"token",
|
||||
[
|
||||
# Valid tokens: correct prefixes and at least 36 allowed characters afterwards
|
||||
"github_pat_" + "a" * 36,
|
||||
"ghp_" + "A" * 36,
|
||||
"github_pat_1234567890abcdef1234567890abcdef1234",
|
||||
],
|
||||
)
|
||||
def test_validate_github_token_valid(token):
|
||||
"""validate_github_token should accept properly-formatted tokens."""
|
||||
# Should not raise any exception
|
||||
validate_github_token(token)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"token",
|
||||
[
|
||||
"github_pat_short", # Too short after prefix
|
||||
"ghp_" + "b" * 35, # one character short
|
||||
"invalidprefix_" + "c" * 36, # Wrong prefix
|
||||
"github_pat_" + "!" * 36, # Disallowed characters
|
||||
"", # Empty string
|
||||
],
|
||||
)
|
||||
def test_validate_github_token_invalid(token):
|
||||
"""validate_github_token should raise ValueError on malformed tokens."""
|
||||
with pytest.raises(InvalidGitHubTokenError):
|
||||
validate_github_token(token)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"base_cmd, local_path, url, token, expected_suffix",
|
||||
[
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.com/owner/repo.git",
|
||||
None,
|
||||
[], # No auth header expected when token is None
|
||||
),
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.com/owner/repo.git",
|
||||
"ghp_" + "d" * 36,
|
||||
[
|
||||
"-c",
|
||||
create_git_auth_header("ghp_" + "d" * 36),
|
||||
], # Auth header expected for GitHub URL + token
|
||||
),
|
||||
(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://gitlab.com/owner/repo.git",
|
||||
"ghp_" + "e" * 36,
|
||||
[], # No auth header for non-GitHub URL even if token provided
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_create_git_command(base_cmd, local_path, url, token, expected_suffix):
|
||||
"""create_git_command should build the correct command list based on inputs."""
|
||||
cmd = create_git_command(base_cmd, local_path, url, token)
|
||||
|
||||
# The command should start with base_cmd and the -C option
|
||||
expected_prefix = base_cmd + ["-C", local_path]
|
||||
assert cmd[: len(expected_prefix)] == expected_prefix
|
||||
|
||||
# The suffix (anything after prefix) should match expected
|
||||
assert cmd[len(expected_prefix) :] == expected_suffix
|
||||
|
||||
|
||||
def test_create_git_command_invalid_token():
|
||||
"""Supplying an invalid token for a GitHub URL should raise ValueError."""
|
||||
with pytest.raises(InvalidGitHubTokenError):
|
||||
create_git_command(
|
||||
["git", "clone"],
|
||||
"/some/path",
|
||||
"https://github.com/owner/repo.git",
|
||||
"invalid_token",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"token",
|
||||
[
|
||||
"ghp_abcdefghijklmnopqrstuvwxyz012345", # typical ghp_ token
|
||||
"github_pat_1234567890abcdef1234567890abcdef1234",
|
||||
],
|
||||
)
|
||||
def test_create_git_auth_header(token):
|
||||
"""create_git_auth_header should produce correct base64-encoded header."""
|
||||
header = create_git_auth_header(token)
|
||||
expected_basic = base64.b64encode(f"x-oauth-basic:{token}".encode()).decode()
|
||||
expected = f"http.https://github.com/.extraheader=Authorization: Basic {expected_basic}"
|
||||
assert header == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, token, should_call",
|
||||
[
|
||||
("https://github.com/foo/bar.git", "ghp_" + "f" * 36, True),
|
||||
("https://github.com/foo/bar.git", None, False),
|
||||
("https://gitlab.com/foo/bar.git", "ghp_" + "g" * 36, False),
|
||||
],
|
||||
)
|
||||
def test_create_git_command_helper_calls(mocker, url, token, should_call):
|
||||
"""Verify validate_github_token & create_git_auth_header are invoked only when appropriate."""
|
||||
|
||||
validate_mock = mocker.patch("gitingest.utils.git_utils.validate_github_token")
|
||||
header_mock = mocker.patch("gitingest.utils.git_utils.create_git_auth_header", return_value="HEADER")
|
||||
|
||||
cmd = create_git_command(["git", "clone"], "/tmp", url, token)
|
||||
|
||||
if should_call:
|
||||
validate_mock.assert_called_once_with(token)
|
||||
header_mock.assert_called_once_with(token)
|
||||
assert "HEADER" in cmd
|
||||
else:
|
||||
validate_mock.assert_not_called()
|
||||
header_mock.assert_not_called()
|
||||
# HEADER should not be included in command list
|
||||
assert "HEADER" not in cmd
|
||||
|
|
@ -8,18 +8,24 @@ and handling edge cases such as nonexistent URLs, timeouts, redirects, and speci
|
|||
import asyncio
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from gitingest.cloning import clone_repo
|
||||
from gitingest.schemas import CloneConfig
|
||||
from gitingest.utils.exceptions import AsyncTimeoutError
|
||||
from gitingest.utils.git_utils import check_repo_exists
|
||||
from tests.conftest import DEMO_URL, LOCAL_REPO_PATH
|
||||
|
||||
# All cloning-related tests assume (unless explicitly overridden) that the repository exists.
|
||||
# Apply the check-repo patch automatically so individual tests don't need to repeat it.
|
||||
pytestmark = pytest.mark.usefixtures("repo_exists_true")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_commit() -> None:
|
||||
async def test_clone_with_commit(repo_exists_true: AsyncMock, run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a repository with a specific commit hash.
|
||||
|
||||
|
|
@ -28,26 +34,20 @@ async def test_clone_with_commit() -> None:
|
|||
Then the repository should be cloned and checked out at that commit.
|
||||
"""
|
||||
clone_config = CloneConfig(
|
||||
url="https://github.com/user/repo",
|
||||
local_path="/tmp/repo",
|
||||
url=DEMO_URL,
|
||||
local_path=LOCAL_REPO_PATH,
|
||||
commit="a" * 40, # Simulating a valid commit hash
|
||||
branch="main",
|
||||
)
|
||||
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True) as mock_check:
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (b"output", b"error")
|
||||
mock_exec.return_value = mock_process
|
||||
await clone_repo(clone_config)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
mock_check.assert_called_once_with(clone_config.url, token=None)
|
||||
assert mock_exec.call_count == 2 # Clone and checkout calls
|
||||
repo_exists_true.assert_called_once_with(clone_config.url, token=None)
|
||||
assert run_command_mock.call_count == 2 # Clone and checkout calls
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_without_commit() -> None:
|
||||
async def test_clone_without_commit(repo_exists_true: AsyncMock, run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a repository when no commit hash is provided.
|
||||
|
||||
|
|
@ -55,27 +55,16 @@ async def test_clone_without_commit() -> None:
|
|||
When `clone_repo` is called,
|
||||
Then only the clone_repo operation should be performed (no checkout).
|
||||
"""
|
||||
clone_config = CloneConfig(
|
||||
url="https://github.com/user/repo",
|
||||
local_path="/tmp/repo",
|
||||
commit=None,
|
||||
branch="main",
|
||||
)
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, commit=None, branch="main")
|
||||
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True) as mock_check:
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (b"output", b"error")
|
||||
mock_exec.return_value = mock_process
|
||||
await clone_repo(clone_config)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
mock_check.assert_called_once_with(clone_config.url, token=None)
|
||||
assert mock_exec.call_count == 1 # Only clone call
|
||||
repo_exists_true.assert_called_once_with(clone_config.url, token=None)
|
||||
assert run_command_mock.call_count == 1 # Only clone call
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_nonexistent_repository() -> None:
|
||||
async def test_clone_nonexistent_repository(repo_exists_true: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a nonexistent repository URL.
|
||||
|
||||
|
|
@ -85,15 +74,17 @@ async def test_clone_nonexistent_repository() -> None:
|
|||
"""
|
||||
clone_config = CloneConfig(
|
||||
url="https://github.com/user/nonexistent-repo",
|
||||
local_path="/tmp/repo",
|
||||
local_path=LOCAL_REPO_PATH,
|
||||
commit=None,
|
||||
branch="main",
|
||||
)
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=False) as mock_check:
|
||||
with pytest.raises(ValueError, match="Repository not found"):
|
||||
await clone_repo(clone_config)
|
||||
# Override the default fixture behaviour for this test
|
||||
repo_exists_true.return_value = False
|
||||
|
||||
mock_check.assert_called_once_with(clone_config.url)
|
||||
with pytest.raises(ValueError, match="Repository not found"):
|
||||
await clone_repo(clone_config)
|
||||
|
||||
repo_exists_true.assert_called_once_with(clone_config.url, token=None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -105,7 +96,7 @@ async def test_clone_nonexistent_repository() -> None:
|
|||
(b"HTTP/1.1 200 OK\n", 1, False), # Failed request
|
||||
],
|
||||
)
|
||||
async def test_check_repo_exists(mock_stdout: bytes, return_code: int, expected: bool) -> None:
|
||||
async def test_check_repo_exists(mock_stdout: bytes, return_code: int, expected: bool, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test the `check_repo_exists` function with different Git HTTP responses.
|
||||
|
||||
|
|
@ -113,22 +104,19 @@ async def test_check_repo_exists(mock_stdout: bytes, return_code: int, expected:
|
|||
When `check_repo_exists` is called,
|
||||
Then it should correctly indicate whether the repository exists.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
mock_exec = mocker.patch("asyncio.create_subprocess_exec", new_callable=AsyncMock)
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (mock_stdout, b"")
|
||||
mock_process.returncode = return_code
|
||||
mock_exec.return_value = mock_process
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_exec:
|
||||
mock_process = AsyncMock()
|
||||
# Mock the subprocess output
|
||||
mock_process.communicate.return_value = (mock_stdout, b"")
|
||||
mock_process.returncode = return_code
|
||||
mock_exec.return_value = mock_process
|
||||
repo_exists = await check_repo_exists(DEMO_URL)
|
||||
|
||||
repo_exists = await check_repo_exists(url)
|
||||
|
||||
assert repo_exists is expected
|
||||
assert repo_exists is expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_custom_branch() -> None:
|
||||
async def test_clone_with_custom_branch(run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a repository with a specified custom branch.
|
||||
|
||||
|
|
@ -136,25 +124,24 @@ async def test_clone_with_custom_branch() -> None:
|
|||
When `clone_repo` is called,
|
||||
Then the repository should be cloned shallowly to that branch.
|
||||
"""
|
||||
clone_config = CloneConfig(url="https://github.com/user/repo", local_path="/tmp/repo", branch="feature-branch")
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
await clone_repo(clone_config)
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, branch="feature-branch")
|
||||
|
||||
mock_exec.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
"--branch",
|
||||
"feature-branch",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
await clone_repo(clone_config)
|
||||
|
||||
run_command_mock.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
"--branch",
|
||||
"feature-branch",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_git_command_failure() -> None:
|
||||
async def test_git_command_failure(run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning when the Git command fails during execution.
|
||||
|
||||
|
|
@ -162,18 +149,16 @@ async def test_git_command_failure() -> None:
|
|||
When `clone_repo` is called,
|
||||
Then a RuntimeError should be raised with the correct message.
|
||||
"""
|
||||
clone_config = CloneConfig(
|
||||
url="https://github.com/user/repo",
|
||||
local_path="/tmp/repo",
|
||||
)
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", side_effect=RuntimeError("Git command failed")):
|
||||
with pytest.raises(RuntimeError, match="Git command failed"):
|
||||
await clone_repo(clone_config)
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH)
|
||||
|
||||
run_command_mock.side_effect = RuntimeError("Git command failed")
|
||||
|
||||
with pytest.raises(RuntimeError, match="Git command failed"):
|
||||
await clone_repo(clone_config)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_default_shallow_clone() -> None:
|
||||
async def test_clone_default_shallow_clone(run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a repository with the default shallow clone options.
|
||||
|
||||
|
|
@ -181,27 +166,22 @@ async def test_clone_default_shallow_clone() -> None:
|
|||
When `clone_repo` is called,
|
||||
Then the repository should be cloned with `--depth=1` and `--single-branch`.
|
||||
"""
|
||||
clone_config = CloneConfig(
|
||||
url="https://github.com/user/repo",
|
||||
local_path="/tmp/repo",
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
run_command_mock.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
await clone_repo(clone_config)
|
||||
|
||||
mock_exec.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_commit_without_branch() -> None:
|
||||
async def test_clone_commit_without_branch(run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning when a commit hash is provided but no branch is specified.
|
||||
|
||||
|
|
@ -209,22 +189,18 @@ async def test_clone_commit_without_branch() -> None:
|
|||
When `clone_repo` is called,
|
||||
Then the repository should be cloned and checked out at that commit.
|
||||
"""
|
||||
clone_config = CloneConfig(
|
||||
url="https://github.com/user/repo",
|
||||
local_path="/tmp/repo",
|
||||
commit="a" * 40, # Simulating a valid commit hash
|
||||
)
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
await clone_repo(clone_config)
|
||||
# Simulating a valid commit hash
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, commit="a" * 40)
|
||||
|
||||
assert mock_exec.call_count == 2 # Clone and checkout calls
|
||||
mock_exec.assert_any_call("git", "clone", "--single-branch", clone_config.url, clone_config.local_path)
|
||||
mock_exec.assert_any_call("git", "-C", clone_config.local_path, "checkout", clone_config.commit)
|
||||
await clone_repo(clone_config)
|
||||
|
||||
assert run_command_mock.call_count == 2 # Clone and checkout calls
|
||||
run_command_mock.assert_any_call("git", "clone", "--single-branch", clone_config.url, clone_config.local_path)
|
||||
run_command_mock.assert_any_call("git", "-C", clone_config.local_path, "checkout", clone_config.commit)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_repo_exists_with_redirect() -> None:
|
||||
async def test_check_repo_exists_with_redirect(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test `check_repo_exists` when a redirect (302) is returned.
|
||||
|
||||
|
|
@ -232,20 +208,19 @@ async def test_check_repo_exists_with_redirect() -> None:
|
|||
When `check_repo_exists` is called,
|
||||
Then it should return `False`, indicating the repo is inaccessible.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_exec:
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (b"HTTP/1.1 302 Found\n", b"")
|
||||
mock_process.returncode = 0 # Simulate successful request
|
||||
mock_exec.return_value = mock_process
|
||||
mock_exec = mocker.patch("asyncio.create_subprocess_exec", new_callable=AsyncMock)
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (b"HTTP/1.1 302 Found\n", b"")
|
||||
mock_process.returncode = 0 # Simulate successful request
|
||||
mock_exec.return_value = mock_process
|
||||
|
||||
repo_exists = await check_repo_exists(url)
|
||||
repo_exists = await check_repo_exists(DEMO_URL)
|
||||
|
||||
assert repo_exists is False
|
||||
assert repo_exists is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_repo_exists_with_permanent_redirect() -> None:
|
||||
async def test_check_repo_exists_with_permanent_redirect(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test `check_repo_exists` when a permanent redirect (301) is returned.
|
||||
|
||||
|
|
@ -253,20 +228,19 @@ async def test_check_repo_exists_with_permanent_redirect() -> None:
|
|||
When `check_repo_exists` is called,
|
||||
Then it should return `True`, indicating the repo may exist at the new location.
|
||||
"""
|
||||
url = "https://github.com/user/repo"
|
||||
with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_exec:
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (b"HTTP/1.1 301 Found\n", b"")
|
||||
mock_process.returncode = 0 # Simulate successful request
|
||||
mock_exec.return_value = mock_process
|
||||
mock_exec = mocker.patch("asyncio.create_subprocess_exec", new_callable=AsyncMock)
|
||||
mock_process = AsyncMock()
|
||||
mock_process.communicate.return_value = (b"HTTP/1.1 301 Found\n", b"")
|
||||
mock_process.returncode = 0 # Simulate successful request
|
||||
mock_exec.return_value = mock_process
|
||||
|
||||
repo_exists = await check_repo_exists(url)
|
||||
repo_exists = await check_repo_exists(DEMO_URL)
|
||||
|
||||
assert repo_exists
|
||||
assert repo_exists
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_timeout() -> None:
|
||||
async def test_clone_with_timeout(run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a repository when a timeout occurs.
|
||||
|
||||
|
|
@ -274,17 +248,16 @@ async def test_clone_with_timeout() -> None:
|
|||
When `clone_repo` is called,
|
||||
Then an `AsyncTimeoutError` should be raised to indicate the operation exceeded time limits.
|
||||
"""
|
||||
clone_config = CloneConfig(url="https://github.com/user/repo", local_path="/tmp/repo")
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH)
|
||||
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
mock_exec.side_effect = asyncio.TimeoutError
|
||||
with pytest.raises(AsyncTimeoutError, match="Operation timed out after"):
|
||||
await clone_repo(clone_config)
|
||||
run_command_mock.side_effect = asyncio.TimeoutError
|
||||
|
||||
with pytest.raises(AsyncTimeoutError, match="Operation timed out after"):
|
||||
await clone_repo(clone_config)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_specific_branch(tmp_path):
|
||||
async def test_clone_specific_branch(tmp_path: Path) -> None:
|
||||
"""
|
||||
Test cloning a specific branch of a repository.
|
||||
|
||||
|
|
@ -295,21 +268,18 @@ async def test_clone_specific_branch(tmp_path):
|
|||
repo_url = "https://github.com/cyclotruc/gitingest.git"
|
||||
branch_name = "main"
|
||||
local_path = tmp_path / "gitingest"
|
||||
|
||||
clone_config = CloneConfig(url=repo_url, local_path=str(local_path), branch=branch_name)
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
# Assertions
|
||||
assert local_path.exists(), "The repository was not cloned successfully."
|
||||
assert local_path.is_dir(), "The cloned repository path is not a directory."
|
||||
|
||||
# Check the current branch
|
||||
current_branch = os.popen(f"git -C {local_path} branch --show-current").read().strip()
|
||||
assert current_branch == branch_name, f"Expected branch '{branch_name}', got '{current_branch}'."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_branch_with_slashes(tmp_path):
|
||||
async def test_clone_branch_with_slashes(tmp_path: Path, run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a branch with slashes in the name.
|
||||
|
||||
|
|
@ -317,29 +287,26 @@ async def test_clone_branch_with_slashes(tmp_path):
|
|||
When `clone_repo` is called,
|
||||
Then the repository should be cloned and checked out at that branch.
|
||||
"""
|
||||
repo_url = "https://github.com/user/repo"
|
||||
branch_name = "fix/in-operator"
|
||||
local_path = tmp_path / "gitingest"
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=str(local_path), branch=branch_name)
|
||||
|
||||
clone_config = CloneConfig(url=repo_url, local_path=str(local_path), branch=branch_name)
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
await clone_repo(clone_config)
|
||||
await clone_repo(clone_config)
|
||||
|
||||
mock_exec.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
"--branch",
|
||||
"fix/in-operator",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
run_command_mock.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
"--branch",
|
||||
"fix/in-operator",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_creates_parent_directory(tmp_path: Path) -> None:
|
||||
async def test_clone_creates_parent_directory(tmp_path: Path, run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test that clone_repo creates parent directories if they don't exist.
|
||||
|
||||
|
|
@ -348,28 +315,23 @@ async def test_clone_creates_parent_directory(tmp_path: Path) -> None:
|
|||
Then it should create the parent directories before attempting to clone.
|
||||
"""
|
||||
nested_path = tmp_path / "deep" / "nested" / "path" / "repo"
|
||||
clone_config = CloneConfig(url="https://github.com/user/repo", local_path=str(nested_path))
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=str(nested_path))
|
||||
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
await clone_repo(clone_config)
|
||||
await clone_repo(clone_config)
|
||||
|
||||
# Verify parent directory was created
|
||||
assert nested_path.parent.exists()
|
||||
|
||||
# Verify git clone was called with correct parameters
|
||||
mock_exec.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
clone_config.url,
|
||||
str(nested_path),
|
||||
)
|
||||
assert nested_path.parent.exists()
|
||||
run_command_mock.assert_called_once_with(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--depth=1",
|
||||
clone_config.url,
|
||||
str(nested_path),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_specific_subpath() -> None:
|
||||
async def test_clone_with_specific_subpath(run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a repository with a specific subpath.
|
||||
|
||||
|
|
@ -377,32 +339,30 @@ async def test_clone_with_specific_subpath() -> None:
|
|||
When `clone_repo` is called,
|
||||
Then the repository should be cloned with sparse checkout enabled and the specified subpath.
|
||||
"""
|
||||
clone_config = CloneConfig(url="https://github.com/user/repo", local_path="/tmp/repo", subpath="src/docs")
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, subpath="src/docs")
|
||||
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
await clone_repo(clone_config)
|
||||
await clone_repo(clone_config)
|
||||
|
||||
# Verify the clone command includes sparse checkout flags
|
||||
mock_exec.assert_any_call(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--filter=blob:none",
|
||||
"--sparse",
|
||||
"--depth=1",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
# Verify the clone command includes sparse checkout flags
|
||||
run_command_mock.assert_any_call(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--filter=blob:none",
|
||||
"--sparse",
|
||||
"--depth=1",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
|
||||
# Verify the sparse-checkout command sets the correct path
|
||||
mock_exec.assert_any_call("git", "-C", clone_config.local_path, "sparse-checkout", "set", "src/docs")
|
||||
# Verify the sparse-checkout command sets the correct path
|
||||
run_command_mock.assert_any_call("git", "-C", clone_config.local_path, "sparse-checkout", "set", "src/docs")
|
||||
|
||||
assert mock_exec.call_count == 2
|
||||
assert run_command_mock.call_count == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clone_with_commit_and_subpath() -> None:
|
||||
async def test_clone_with_commit_and_subpath(run_command_mock: AsyncMock) -> None:
|
||||
"""
|
||||
Test cloning a repository with both a specific commit and subpath.
|
||||
|
||||
|
|
@ -411,45 +371,39 @@ async def test_clone_with_commit_and_subpath() -> None:
|
|||
Then the repository should be cloned with sparse checkout enabled,
|
||||
checked out at the specific commit, and only include the specified subpath.
|
||||
"""
|
||||
clone_config = CloneConfig(
|
||||
url="https://github.com/user/repo",
|
||||
local_path="/tmp/repo",
|
||||
commit="a" * 40, # Simulating a valid commit hash
|
||||
subpath="src/docs",
|
||||
# Simulating a valid commit hash
|
||||
clone_config = CloneConfig(url=DEMO_URL, local_path=LOCAL_REPO_PATH, commit="a" * 40, subpath="src/docs")
|
||||
|
||||
await clone_repo(clone_config)
|
||||
|
||||
# Verify the clone command includes sparse checkout flags
|
||||
run_command_mock.assert_any_call(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--filter=blob:none",
|
||||
"--sparse",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
|
||||
with patch("gitingest.cloning.check_repo_exists", return_value=True):
|
||||
with patch("gitingest.cloning.run_command", new_callable=AsyncMock) as mock_exec:
|
||||
await clone_repo(clone_config)
|
||||
# Verify sparse-checkout set
|
||||
run_command_mock.assert_any_call(
|
||||
"git",
|
||||
"-C",
|
||||
clone_config.local_path,
|
||||
"sparse-checkout",
|
||||
"set",
|
||||
"src/docs",
|
||||
)
|
||||
|
||||
# Verify the clone command includes sparse checkout flags
|
||||
mock_exec.assert_any_call(
|
||||
"git",
|
||||
"clone",
|
||||
"--single-branch",
|
||||
"--filter=blob:none",
|
||||
"--sparse",
|
||||
clone_config.url,
|
||||
clone_config.local_path,
|
||||
)
|
||||
# Verify checkout commit
|
||||
run_command_mock.assert_any_call(
|
||||
"git",
|
||||
"-C",
|
||||
clone_config.local_path,
|
||||
"checkout",
|
||||
clone_config.commit,
|
||||
)
|
||||
|
||||
# Verify sparse-checkout set
|
||||
mock_exec.assert_any_call(
|
||||
"git",
|
||||
"-C",
|
||||
clone_config.local_path,
|
||||
"sparse-checkout",
|
||||
"set",
|
||||
"src/docs",
|
||||
)
|
||||
|
||||
# Verify checkout commit
|
||||
mock_exec.assert_any_call(
|
||||
"git",
|
||||
"-C",
|
||||
clone_config.local_path,
|
||||
"checkout",
|
||||
clone_config.commit,
|
||||
)
|
||||
|
||||
assert mock_exec.call_count == 3
|
||||
assert run_command_mock.call_count == 3
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue