refactor: centralize PAT validation, streamline repo checks & misc cleanup (#349)
Some checks failed
CI / test (macos-latest, 3.10) (push) Has been cancelled
CI / test (macos-latest, 3.11) (push) Has been cancelled
CI / test (macos-latest, 3.12) (push) Has been cancelled
CI / test (macos-latest, 3.13) (push) Has been cancelled
CI / test (macos-latest, 3.8) (push) Has been cancelled
CI / test (macos-latest, 3.9) (push) Has been cancelled
CI / test (ubuntu-latest, 3.10) (push) Has been cancelled
CI / test (ubuntu-latest, 3.11) (push) Has been cancelled
CI / test (ubuntu-latest, 3.12) (push) Has been cancelled
CI / test (ubuntu-latest, 3.13) (push) Has been cancelled
CI / test (ubuntu-latest, 3.8) (push) Has been cancelled
CI / test (ubuntu-latest, 3.9) (push) Has been cancelled
CI / test (windows-latest, 3.10) (push) Has been cancelled
CI / test (windows-latest, 3.11) (push) Has been cancelled
CI / test (windows-latest, 3.12) (push) Has been cancelled
CI / test (windows-latest, 3.13) (push) Has been cancelled
CI / test (windows-latest, 3.8) (push) Has been cancelled
CI / test (windows-latest, 3.9) (push) Has been cancelled
OSSF Scorecard / Scorecard analysis (push) Has been cancelled

* refactor: centralize PAT validation, streamline repo checks & housekeeping

* `.venv*` to `.gitignore`
* `# type: ignore[attr-defined]` hints in `compat_typing.py` for IDE-agnostic imports
* Helpful PAT string in `InvalidGitHubTokenError` for easier debugging

* Bump **ruff-pre-commit** hook → `v0.12.1`
* CONTRIBUTING:
  * Require **Python 3.9+**
  * Recommend signed (`-S`) commits
* PAT validation now happens **only** in entry points
  (`utils.auth.resolve_token` for CLI/lib, `server.process_query` for Web UI)
* Unified `_check_github_repo_exists` into `check_repo_exists`, replacing
  `curl -I` with `curl --silent --location --write-out %{http_code} -o /dev/null`
* Broaden `_GITHUB_PAT_PATTERN`
* `create_git_auth_header` raises `ValueError` when hostname is missing
* Tests updated to expect raw HTTP-code output

* Superfluous “token can be set via `GITHUB_TOKEN`” notes in docstrings
* `.gitingestignore` & `.terraform` from `DEFAULT_IGNORE_PATTERNS`
* Token validation inside `create_git_command`
* Obsolete `test_create_git_command_invalid_token`

* Adjust `test_clone.py` and `test_git_utils.py` for new status-code handling
* Consolidate mocks after token-validation relocation

BREAKING CHANGE:
`create_git_command` no longer validates GitHub tokens; callers must ensure
tokens are valid (via `validate_github_token`) before invoking lower-level
git helpers.


---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Filip Christiansen 2025-07-01 14:21:13 +02:00 committed by GitHub
parent 25923037ea
commit f8d397e66e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 110 additions and 144 deletions

1
.gitignore vendored
View file

@ -126,6 +126,7 @@ celerybeat.pid
# Environments
.env
.venv
.venv*
env/
venv/
ENV/

View file

@ -75,7 +75,7 @@ repos:
args: ["--disable=line-length"]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.0
rev: v0.12.1
hooks:
- id: ruff-check
- id: ruff-format

View file

@ -19,6 +19,8 @@ Thanks for your interest in contributing to Gitingest! 🚀 Gitingest aims to be
cd gitingest
```
**Note**: To contribute, ensure you have **Python 3.9 or newer** installed, as some of the `pre-commit` hooks (e.g. `pyupgrade`) require Python 3.9+.
3. Set up the development environment and install dependencies:
```bash
@ -31,7 +33,7 @@ Thanks for your interest in contributing to Gitingest! 🚀 Gitingest aims to be
4. Create a new branch for your changes:
```bash
git checkout -b your-branch
git checkout -S -b your-branch
```
5. Make your changes. Make sure to add corresponding tests for your changes.
@ -66,10 +68,18 @@ Thanks for your interest in contributing to Gitingest! 🚀 Gitingest aims to be
9. Confirm that everything is working as expected. If you encounter any issues, fix them and repeat steps 6 to 8.
10. Commit your changes:
10. Commit your changes (signed):
All commits to Gitingest must be [GPG-signed](https://docs.github.com/en/authentication/managing-commit-signature-verification) so that the project can verify the authorship of every contribution. You can either configure Git globally with:
```bash
git commit -m "Your commit message"
git config --global commit.gpgSign true
```
or pass the `-S` flag as shown below.
```bash
git commit -S -m "Your commit message"
```
If `pre-commit` raises any issues, fix them and repeat steps 6 to 9.

View file

@ -13,7 +13,6 @@ from gitingest.utils.git_utils import (
ensure_git_installed,
is_github_host,
run_command,
validate_github_token,
)
from gitingest.utils.os_utils import ensure_directory
from gitingest.utils.timeout_wrapper import async_timeout
@ -23,7 +22,7 @@ if TYPE_CHECKING:
@async_timeout(DEFAULT_TIMEOUT)
async def clone_repo(config: CloneConfig, token: str | None = None) -> None:
async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
"""Clone a repository to a local path based on the provided configuration.
This function handles the process of cloning a Git repository to the local file system.
@ -36,7 +35,6 @@ async def clone_repo(config: CloneConfig, token: str | None = None) -> None:
The configuration for cloning the repository.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Can also be set via the ``GITHUB_TOKEN`` environment variable.
Raises
------
@ -51,10 +49,6 @@ async def clone_repo(config: CloneConfig, token: str | None = None) -> None:
branch: str | None = config.branch
partial_clone: bool = config.subpath != "/"
# Validate token if provided
if token and is_github_host(url):
validate_github_token(token)
# Create parent directory if it doesn't exist
await ensure_directory(Path(local_path).parent)

View file

@ -49,7 +49,6 @@ async def parse_query(
Patterns to ignore. Can be a set of strings or a single string.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Can also be set via the ``GITHUB_TOKEN`` environment variable.
Returns
-------
@ -109,7 +108,6 @@ async def _parse_remote_repo(source: str, token: str | None = None) -> Ingestion
The URL or domain-less slug to parse.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Can also be set via the ``GITHUB_TOKEN`` environment variable.
Returns
-------
@ -301,7 +299,6 @@ async def try_domains_for_user_and_repo(user_name: str, repo_name: str, token: s
The name of the repository.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Can also be set via the ``GITHUB_TOKEN`` environment variable.
Returns
-------
@ -316,7 +313,7 @@ async def try_domains_for_user_and_repo(user_name: str, repo_name: str, token: s
"""
for domain in KNOWN_GIT_HOSTS:
candidate = f"https://{domain}/{user_name}/{repo_name}"
if await check_repo_exists(candidate, token=token if domain == "github.com" else None):
if await check_repo_exists(candidate, token=token if domain.startswith("github.") else None):
return domain
msg = f"Could not find a valid repository host for '{user_name}/{repo_name}'."

View file

@ -4,6 +4,8 @@ from __future__ import annotations
import os
from gitingest.utils.git_utils import validate_github_token
def resolve_token(token: str | None) -> str | None:
"""Resolve the token to use for the query.
@ -19,4 +21,7 @@ def resolve_token(token: str | None) -> str | None:
The resolved token.
"""
return token or os.getenv("GITHUB_TOKEN")
token = token or os.getenv("GITHUB_TOKEN")
if token:
validate_github_token(token)
return token

View file

@ -1,13 +1,13 @@
"""Compatibility layer for typing."""
try:
from typing import ParamSpec, TypeAlias # Py ≥ 3.10
from typing import ParamSpec, TypeAlias # type: ignore[attr-defined] # Py ≥ 3.10
except ImportError:
from typing_extensions import ParamSpec, TypeAlias # Py 3.8 / 3.9
from typing_extensions import ParamSpec, TypeAlias # type: ignore[attr-defined] # Py 3.8 / 3.9
try:
from typing import Annotated # Py ≥ 3.9
from typing import Annotated # type: ignore[attr-defined] # Py ≥ 3.9
except ImportError:
from typing_extensions import Annotated # Py 3.8
from typing_extensions import Annotated # type: ignore[attr-defined] # Py 3.8
__all__ = ["Annotated", "ParamSpec", "TypeAlias"]

View file

@ -42,7 +42,8 @@ class InvalidGitHubTokenError(ValueError):
"""Exception raised when a GitHub Personal Access Token is malformed."""
def __init__(self) -> None:
super().__init__(
"Invalid GitHub token format. Token should start with 'github_pat_' or 'ghp_' "
"followed by at least 36 characters of letters, numbers, and underscores.",
msg = (
"Invalid GitHub token format. To generate a token, go to "
"https://github.com/settings/tokens/new?description=gitingest&scopes=repo."
)
super().__init__(msg)

View file

@ -4,12 +4,26 @@ from __future__ import annotations
import asyncio
import base64
import os
import re
from typing import Final
from urllib.parse import urlparse
from starlette.status import (
HTTP_200_OK,
HTTP_301_MOVED_PERMANENTLY,
HTTP_302_FOUND,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
)
from gitingest.utils.exceptions import InvalidGitHubTokenError
GITHUB_PAT_PATTERN = r"^(?:gh[pousr]_[A-Za-z0-9]{36}|github_pat_[A-Za-z0-9]{22}_[A-Za-z0-9]{59})$"
# GitHub Personal-Access tokens (classic + fine-grained).
# - ghp_ / gho_ / ghu_ / ghs_ / ghr_ → 36 alphanumerics
# - github_pat_ → 22 alphanumerics + "_" + 59 alphanumerics
_GITHUB_PAT_PATTERN: Final[str] = r"^(?:gh[pousr]_[A-Za-z0-9]{36}|github_pat_[A-Za-z0-9]{22}_[A-Za-z0-9]{59})$"
def is_github_host(url: str) -> bool:
@ -27,7 +41,7 @@ def is_github_host(url: str) -> bool:
"""
hostname = urlparse(url).hostname or ""
return hostname == "github.com" or hostname.startswith("github.")
return hostname.startswith("github.")
async def run_command(*args: str) -> tuple[bytes, bytes]:
@ -57,8 +71,7 @@ async def run_command(*args: str) -> tuple[bytes, bytes]:
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_message = stderr.decode().strip()
msg = f"Command failed: {' '.join(args)}\nError: {error_message}"
msg = f"Command failed: {' '.join(args)}\nError: {stderr.decode().strip()}"
raise RuntimeError(msg)
return stdout, stderr
@ -81,100 +94,46 @@ async def ensure_git_installed() -> None:
async def check_repo_exists(url: str, token: str | None = None) -> bool:
"""Check if a Git repository exists at the provided URL.
"""Check whether a remote Git repository is reachable.
Parameters
----------
url : str
The URL of the Git repository to check.
URL of the Git repository to check.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Can also be set via the ``GITHUB_TOKEN`` environment variable.
Returns
-------
bool
True if the repository exists, False otherwise.
``True`` if the repository exists, ``False`` otherwise.
Raises
------
RuntimeError
If the curl command returns an unexpected status code.
If the host returns an unrecognised status code.
"""
if token and is_github_host(url):
return await _check_github_repo_exists(url, token=token)
proc = await asyncio.create_subprocess_exec(
"curl",
"-I",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
if proc.returncode != 0:
return False # likely unreachable or private
response = stdout.decode()
status_line = response.splitlines()[0].strip()
parts = status_line.split(" ")
expected_path_length = 2
if len(parts) >= expected_path_length:
status = parts[1]
if status in ("200", "301"):
return True
if status in ("302", "404"):
return False
msg = f"Unexpected status line: {status_line}"
raise RuntimeError(msg)
async def _check_github_repo_exists(url: str, token: str | None = None) -> bool:
"""Return True iff the authenticated user can see ``url``.
Parameters
----------
url : str
The URL of the GitHub repository to check.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Can also be set via the ``GITHUB_TOKEN`` environment variable.
Returns
-------
bool
True if the repository exists, False otherwise.
Raises
------
RuntimeError
If the repository is not found, if the provided URL is invalid, or if the token format is invalid.
"""
host, owner, repo = _parse_github_url(url)
if host == "github.com":
api = f"https://api.github.com/repos/{owner}/{repo}"
else: # GitHub Enterprise
api = f"https://{host}/api/v3/repos/{owner}/{repo}"
cmd = [
# TODO: use `requests` instead of `curl`
cmd: list[str] = [
"curl",
"--silent",
"--location",
"--head",
"--write-out",
"%{http_code}",
"-o",
"/dev/null",
"-H",
"Accept: application/vnd.github+json",
os.devnull,
]
if token:
cmd += ["-H", f"Authorization: Bearer {token}"]
cmd.append(api)
if token and is_github_host(url):
host, owner, repo = _parse_github_url(url)
# Public GitHub vs. GitHub Enterprise
base_api = "https://api.github.com" if host == "github.com" else f"https://{host}/api/v3"
url = f"{base_api}/repos/{owner}/{repo}"
cmd += [f"Authorization: Bearer {token}"]
cmd.append(url)
proc = await asyncio.create_subprocess_exec(
*cmd,
@ -182,16 +141,19 @@ async def _check_github_repo_exists(url: str, token: str | None = None) -> bool:
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
status = stdout.decode()[-3:] # just the %{http_code}
if status == "200":
return True
if status == "404":
if proc.returncode != 0:
return False
if status in ("401", "403"):
msg = "Token invalid or lacks permissions"
raise RuntimeError(msg)
msg = f"GitHub API returned unexpected HTTP {status}"
status = int(stdout.decode().strip())
if status in {HTTP_200_OK, HTTP_301_MOVED_PERMANENTLY}:
return True
# TODO: handle 302 redirects
if status in {HTTP_404_NOT_FOUND, HTTP_302_FOUND}:
return False
if status in {HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN}:
return False
msg = f"Unexpected HTTP status {status} for {url}"
raise RuntimeError(msg)
@ -214,7 +176,6 @@ def _parse_github_url(url: str) -> tuple[str, str, str]:
If the URL is not a valid GitHub repository URL.
"""
expected_path_length = 2
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
msg = f"URL must start with http:// or https://: {url!r}"
@ -225,6 +186,7 @@ def _parse_github_url(url: str) -> tuple[str, str, str]:
raise ValueError(msg)
parts = parsed.path.strip("/").removesuffix(".git").split("/")
expected_path_length = 2
if len(parts) != expected_path_length:
msg = f"Path must look like /<owner>/<repo>: {parsed.path!r}"
raise ValueError(msg)
@ -242,7 +204,6 @@ async def fetch_remote_branch_list(url: str, token: str | None = None) -> list[s
The URL of the Git repository to fetch branches from.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Can also be set via the ``GITHUB_TOKEN`` environment variable.
Returns
-------
@ -250,21 +211,20 @@ async def fetch_remote_branch_list(url: str, token: str | None = None) -> list[s
A list of branch names available in the remote repository.
"""
fetch_branches_command = ["git"]
cmd = ["git"]
# Add authentication if needed
if token and is_github_host(url):
fetch_branches_command += ["-c", create_git_auth_header(token, url=url)]
cmd += ["-c", create_git_auth_header(token, url=url)]
fetch_branches_command += ["ls-remote", "--heads", url]
cmd += ["ls-remote", "--heads", url]
await ensure_git_installed()
stdout, _ = await run_command(*fetch_branches_command)
stdout_decoded = stdout.decode()
stdout, _ = await run_command(*cmd)
return [
line.split("refs/heads/", 1)[1]
for line in stdout_decoded.splitlines()
for line in stdout.decode().splitlines()
if line.strip() and "refs/heads/" in line
]
@ -291,7 +251,6 @@ def create_git_command(base_cmd: list[str], local_path: str, url: str, token: st
"""
cmd = [*base_cmd, "-C", local_path]
if token and is_github_host(url):
validate_github_token(token)
cmd += ["-c", create_git_auth_header(token, url=url)]
return cmd
@ -312,8 +271,17 @@ def create_git_auth_header(token: str, url: str = "https://github.com") -> str:
str
The git config command for setting the authentication header.
Raises
------
ValueError
If the URL is not a valid GitHub repository URL.
"""
hostname = urlparse(url).hostname
if not hostname:
msg = f"Invalid GitHub URL: {url!r}"
raise ValueError(msg)
basic = base64.b64encode(f"x-oauth-basic:{token}".encode()).decode()
return f"http.https://{hostname}/.extraheader=Authorization: Basic {basic}"
@ -332,5 +300,5 @@ def validate_github_token(token: str) -> None:
If the token format is invalid.
"""
if not re.match(GITHUB_PAT_PATTERN, token):
if not re.fullmatch(_GITHUB_PAT_PATTERN, token):
raise InvalidGitHubTokenError

View file

@ -93,7 +93,6 @@ DEFAULT_IGNORE_PATTERNS: set[str] = {
".svn",
".hg",
".gitignore",
".gitingestignore", # Ignore rules specific to Gitingest
".gitattributes",
".gitmodules",
# Images and media
@ -155,7 +154,6 @@ DEFAULT_IGNORE_PATTERNS: set[str] = {
## Source maps
"*.map",
## Terraform
".terraform",
"*.tfstate*",
## Dependencies in various languages
"vendor/",

View file

@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, cast
from gitingest.clone import clone_repo
from gitingest.ingestion import ingest_query
from gitingest.query_parser import IngestionQuery, parse_query
from gitingest.utils.git_utils import validate_github_token
from server.server_config import (
DEFAULT_FILE_SIZE_KB,
EXAMPLE_REPOS,
@ -75,6 +76,9 @@ async def process_query(
msg = f"Invalid pattern type: {pattern_type}"
raise ValueError(msg)
if token:
validate_github_token(token)
template = "index.jinja" if is_index else "git.jinja"
template_response = partial(templates.TemplateResponse, name=template)
max_file_size = log_slider_to_size(slider_position)
@ -124,9 +128,7 @@ async def process_query(
context["error_message"] = f"Error: {exc}"
if "405" in str(exc):
context["error_message"] = (
"Repository not found. Please make sure it is public (private repositories will be supported soon)"
)
context["error_message"] = "Repository not found. Please make sure it is public."
return template_response(context=context)
if len(content) > MAX_DISPLAY_SIZE:

View file

@ -2,6 +2,7 @@
from fastapi import APIRouter, HTTPException
from fastapi.responses import FileResponse
from starlette.status import HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND
from gitingest.config import TMP_BASE_PATH
@ -32,14 +33,17 @@ async def download_ingest(digest_id: str) -> FileResponse:
directory = TMP_BASE_PATH / digest_id
if not directory.is_dir():
raise HTTPException(status_code=404, detail=f"Digest {digest_id!r} not found")
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Digest {digest_id!r} not found")
try:
first_txt_file = next(directory.glob("*.txt"))
except StopIteration as exc:
raise HTTPException(status_code=404, detail=f"No .txt file found for digest {digest_id!r}") from exc
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No .txt file found for digest {digest_id!r}",
) from exc
try:
return FileResponse(path=first_txt_file, media_type="text/plain", filename=first_txt_file.name)
except PermissionError as exc:
raise HTTPException(status_code=403, detail=f"Permission denied for {first_txt_file}") from exc
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail=f"Permission denied for {first_txt_file}") from exc

View file

@ -89,9 +89,9 @@ async def test_clone_nonexistent_repository(repo_exists_true: AsyncMock) -> None
@pytest.mark.parametrize(
("mock_stdout", "return_code", "expected"),
[
(b"HTTP/1.1 200 OK\n", 0, True), # Existing repo
(b"HTTP/1.1 404 Not Found\n", 0, False), # Non-existing repo
(b"HTTP/1.1 200 OK\n", 1, False), # Failed request
(b"200\n", 0, True), # Existing repo
(b"404\n", 0, False), # Non-existing repo
(b"200\n", 1, False), # Failed request
],
)
async def test_check_repo_exists(
@ -209,7 +209,7 @@ async def test_check_repo_exists_with_redirect(mocker: MockerFixture) -> None:
"""
mock_exec = mocker.patch("asyncio.create_subprocess_exec", new_callable=AsyncMock)
mock_process = AsyncMock()
mock_process.communicate.return_value = (b"HTTP/1.1 302 Found\n", b"")
mock_process.communicate.return_value = (b"302\n", b"")
mock_process.returncode = 0 # Simulate successful request
mock_exec.return_value = mock_process
@ -228,7 +228,7 @@ async def test_check_repo_exists_with_permanent_redirect(mocker: MockerFixture)
"""
mock_exec = mocker.patch("asyncio.create_subprocess_exec", new_callable=AsyncMock)
mock_process = AsyncMock()
mock_process.communicate.return_value = (b"HTTP/1.1 301 Found\n", b"")
mock_process.communicate.return_value = (b"301\n", b"")
mock_process.returncode = 0 # Simulate successful request
mock_exec.return_value = mock_process

View file

@ -107,17 +107,6 @@ def test_create_git_command(
assert cmd[len(expected_prefix) :] == expected_suffix
def test_create_git_command_invalid_token() -> None:
"""Test that supplying an invalid token for a GitHub URL raises ``InvalidGitHubTokenError``."""
with pytest.raises(InvalidGitHubTokenError):
create_git_command(
["git", "clone"],
"/some/path",
"https://github.com/owner/repo.git",
"invalid_token",
)
@pytest.mark.parametrize(
"token",
[
@ -149,19 +138,16 @@ def test_create_git_command_helper_calls(
token: str | None,
should_call: bool,
) -> None:
"""Test that ``validate_github_token`` and ``create_git_auth_header`` are invoked only when appropriate."""
"""Test that ``create_git_auth_header`` is invoked only when appropriate."""
work_dir = tmp_path / "repo"
validate_mock = mocker.patch("gitingest.utils.git_utils.validate_github_token")
header_mock = mocker.patch("gitingest.utils.git_utils.create_git_auth_header", return_value="HEADER")
cmd = create_git_command(["git", "clone"], str(work_dir), url, token)
if should_call:
validate_mock.assert_called_once_with(token)
header_mock.assert_called_once_with(token, url=url)
assert "HEADER" in cmd
else:
validate_mock.assert_not_called()
header_mock.assert_not_called()
assert "HEADER" not in cmd