feat(logging): implement loguru (#473)

This commit is contained in:
Mickael 2025-07-30 18:42:48 +02:00 committed by GitHub
parent 5fbb445cd8
commit d061b4877a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 606 additions and 156 deletions

View file

@ -100,8 +100,9 @@ jobs:
comment-tag: 'pr-preview'
create-if-not-exists: 'true'
message: |
⚙️ Preview environment for PR #${{ env.PR_ID }} is available at:
https://pr-${{ env.PR_ID }}.${{ env.APP_NAME }}.coderamp.dev/
🌐 [Preview environment](https://pr-${{ env.PR_ID }}.${{ env.APP_NAME }}.coderamp.dev/) for PR #${{ env.PR_ID }}
📊 [Log viewer](https://app.datadoghq.eu/logs?query=kube_namespace%3Aprs-gitingest%20version%3Apr-${{ env.PR_ID }})
remove-pr-env:
if: >-

View file

@ -118,6 +118,7 @@ repos:
click>=8.0.0,
'fastapi[standard]>=0.109.1',
httpx,
loguru>=0.7.0,
pathspec>=0.12.1,
prometheus-client,
pydantic,
@ -144,6 +145,7 @@ repos:
click>=8.0.0,
'fastapi[standard]>=0.109.1',
httpx,
loguru>=0.7.0,
pathspec>=0.12.1,
prometheus-client,
pydantic,

4
.vscode/launch.json vendored
View file

@ -4,8 +4,8 @@
"name": "Python Debugger: Module",
"type": "debugpy",
"request": "launch",
"module": "uvicorn",
"args": ["server.main:app", "--host", "0.0.0.0", "--port", "8000"],
"module": "server",
"args": [],
"cwd": "${workspaceFolder}/src"
}
]

View file

@ -65,8 +65,7 @@ If you ever get stuck, reach out on [Discord](https://discord.com/invite/zerRaGK
9. **Run the local server** to sanity-check:
```bash
cd src
uvicorn server.main:app
python -m server
```
Open [http://localhost:8000](http://localhost:8000) to confirm everything works.

View file

@ -44,4 +44,4 @@ USER appuser
EXPOSE 8000
EXPOSE 9090
CMD ["python", "-m", "uvicorn", "server.main:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["python", "-m", "server"]

View file

@ -1,27 +1,45 @@
# Common base configuration for all services
x-base-environment: &base-environment
# Python Configuration
PYTHONUNBUFFERED: "1"
PYTHONDONTWRITEBYTECODE: "1"
# Host Configuration
ALLOWED_HOSTS: ${ALLOWED_HOSTS:-gitingest.com,*.gitingest.com,localhost,127.0.0.1}
# Metrics Configuration
GITINGEST_METRICS_ENABLED: ${GITINGEST_METRICS_ENABLED:-true}
GITINGEST_METRICS_HOST: ${GITINGEST_METRICS_HOST:-0.0.0.0}
GITINGEST_METRICS_PORT: ${GITINGEST_METRICS_PORT:-9090}
# Sentry Configuration
GITINGEST_SENTRY_ENABLED: ${GITINGEST_SENTRY_ENABLED:-false}
GITINGEST_SENTRY_DSN: ${GITINGEST_SENTRY_DSN:-}
GITINGEST_SENTRY_TRACES_SAMPLE_RATE: ${GITINGEST_SENTRY_TRACES_SAMPLE_RATE:-1.0}
GITINGEST_SENTRY_PROFILE_SESSION_SAMPLE_RATE: ${GITINGEST_SENTRY_PROFILE_SESSION_SAMPLE_RATE:-1.0}
GITINGEST_SENTRY_PROFILE_LIFECYCLE: ${GITINGEST_SENTRY_PROFILE_LIFECYCLE:-trace}
GITINGEST_SENTRY_SEND_DEFAULT_PII: ${GITINGEST_SENTRY_SEND_DEFAULT_PII:-true}
x-prod-environment: &prod-environment
GITINGEST_SENTRY_ENVIRONMENT: ${GITINGEST_SENTRY_ENVIRONMENT:-production}
x-dev-environment: &dev-environment
DEBUG: "true"
LOG_LEVEL: "debug"
RELOAD: "true"
GITINGEST_SENTRY_ENVIRONMENT: ${GITINGEST_SENTRY_ENVIRONMENT:-development}
# S3 Configuration for development
S3_ENABLED: "true"
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: ${S3_ACCESS_KEY:-gitingest}
S3_SECRET_KEY: ${S3_SECRET_KEY:-gitingest123}
S3_BUCKET_NAME: ${S3_BUCKET_NAME:-gitingest-bucket}
S3_REGION: ${S3_REGION:-us-east-1}
S3_DIRECTORY_PREFIX: ${S3_DIRECTORY_PREFIX:-dev}
S3_ALIAS_HOST: ${S3_ALIAS_HOST:-http://127.0.0.1:9000/${S3_BUCKET_NAME:-gitingest-bucket}}
x-app-base: &app-base
ports:
- "${APP_WEB_BIND:-8000}:8000" # Main application port
- "${GITINGEST_METRICS_HOST:-127.0.0.1}:${GITINGEST_METRICS_PORT:-9090}:9090" # Metrics port
environment:
# Python Configuration
- PYTHONUNBUFFERED=1
- PYTHONDONTWRITEBYTECODE=1
# Host Configuration
- ALLOWED_HOSTS=${ALLOWED_HOSTS:-gitingest.com,*.gitingest.com,localhost,127.0.0.1}
# Metrics Configuration
- GITINGEST_METRICS_ENABLED=${GITINGEST_METRICS_ENABLED:-true}
- GITINGEST_METRICS_HOST=${GITINGEST_METRICS_HOST:-127.0.0.1}
- GITINGEST_METRICS_PORT=${GITINGEST_METRICS_PORT:-9090}
# Sentry Configuration
- GITINGEST_SENTRY_ENABLED=${GITINGEST_SENTRY_ENABLED:-false}
- GITINGEST_SENTRY_DSN=${GITINGEST_SENTRY_DSN:-}
- GITINGEST_SENTRY_TRACES_SAMPLE_RATE=${GITINGEST_SENTRY_TRACES_SAMPLE_RATE:-1.0}
- GITINGEST_SENTRY_PROFILE_SESSION_SAMPLE_RATE=${GITINGEST_SENTRY_PROFILE_SESSION_SAMPLE_RATE:-1.0}
- GITINGEST_SENTRY_PROFILE_LIFECYCLE=${GITINGEST_SENTRY_PROFILE_LIFECYCLE:-trace}
- GITINGEST_SENTRY_SEND_DEFAULT_PII=${GITINGEST_SENTRY_SEND_DEFAULT_PII:-true}
user: "1000:1000"
command: ["python", "-m", "uvicorn", "server.main:app", "--host", "0.0.0.0", "--port", "8000"]
command: ["python", "-m", "server"]
services:
# Production service configuration
@ -31,7 +49,7 @@ services:
profiles:
- prod
environment:
- GITINGEST_SENTRY_ENVIRONMENT=${GITINGEST_SENTRY_ENVIRONMENT:-production}
<<: [*base-environment, *prod-environment]
restart: unless-stopped
# Development service configuration
@ -43,24 +61,12 @@ services:
profiles:
- dev
environment:
- DEBUG=true
- GITINGEST_SENTRY_ENVIRONMENT=${GITINGEST_SENTRY_ENVIRONMENT:-development}
# S3 Configuration
- S3_ENABLED=true
- S3_ENDPOINT=http://minio:9000
- S3_ACCESS_KEY=${S3_ACCESS_KEY:-gitingest}
- S3_SECRET_KEY=${S3_SECRET_KEY:-gitingest123}
# Use lowercase bucket name to ensure compatibility with MinIO
- S3_BUCKET_NAME=${S3_BUCKET_NAME:-gitingest-bucket}
- S3_REGION=${S3_REGION:-us-east-1}
- S3_DIRECTORY_PREFIX=${S3_DIRECTORY_PREFIX:-dev}
# Public URL for S3 resources
- S3_ALIAS_HOST=${S3_ALIAS_HOST:-http://127.0.0.1:9000/${S3_BUCKET_NAME:-gitingest-bucket}}
<<: [*base-environment, *dev-environment]
volumes:
# Mount source code for live development
- ./src:/app:ro
# Use --reload flag for hot reloading during development
command: ["python", "-m", "uvicorn", "server.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
command: ["python", "-m", "server"]
depends_on:
minio-setup:
condition: service_completed_successfully
@ -73,9 +79,9 @@ services:
ports:
- "9000:9000" # API port
- "9001:9001" # Console port
environment:
- MINIO_ROOT_USER=${MINIO_ROOT_USER:-minioadmin}
- MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD:-minioadmin}
environment: &minio-environment
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
volumes:
- minio-data:/data
command: server /data --console-address ":9001"
@ -96,11 +102,10 @@ services:
minio:
condition: service_healthy
environment:
- MINIO_ROOT_USER=${MINIO_ROOT_USER:-minioadmin}
- MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD:-minioadmin}
- S3_ACCESS_KEY=${S3_ACCESS_KEY:-gitingest}
- S3_SECRET_KEY=${S3_SECRET_KEY:-gitingest123}
- S3_BUCKET_NAME=${S3_BUCKET_NAME:-gitingest-bucket}
<<: *minio-environment
S3_ACCESS_KEY: ${S3_ACCESS_KEY:-gitingest}
S3_SECRET_KEY: ${S3_SECRET_KEY:-gitingest123}
S3_BUCKET_NAME: ${S3_BUCKET_NAME:-gitingest-bucket}
volumes:
- ./.docker/minio/setup.sh:/setup.sh:ro
entrypoint: sh

View file

@ -7,6 +7,7 @@ requires-python = ">= 3.8"
dependencies = [
"click>=8.0.0",
"httpx",
"loguru>=0.7.0",
"pathspec>=0.12.1",
"pydantic",
"python-dotenv",
@ -96,7 +97,6 @@ ignore = [ # https://docs.astral.sh/ruff/rules/...
# TODO: fix the following issues:
"TD003", # missing-todo-link, TODO: add issue links
"T201", # print, TODO: replace with logging
"S108", # hardcoded-temp-file, TODO: replace with tempfile
"BLE001", # blind-except, TODO: replace with specific exceptions
"FAST003", # fast-api-unused-path-parameter, TODO: fix

View file

@ -2,6 +2,7 @@ boto3>=1.28.0 # AWS SDK for S3 support
click>=8.0.0
fastapi[standard]>=0.109.1 # Vulnerable to https://osv.dev/vulnerability/PYSEC-2024-38
httpx
loguru>=0.7.0
pathspec>=0.12.1
prometheus-client
pydantic

View file

@ -12,6 +12,12 @@ from typing_extensions import Unpack
from gitingest.config import MAX_FILE_SIZE, OUTPUT_FILE_NAME
from gitingest.entrypoint import ingest_async
# Import logging configuration first to intercept all logging
from gitingest.utils.logging_config import get_logger
# Initialize logger for this module
logger = get_logger(__name__)
class _CLIArgs(TypedDict):
source: str

View file

@ -16,12 +16,16 @@ from gitingest.utils.git_utils import (
resolve_commit,
run_command,
)
from gitingest.utils.logging_config import get_logger
from gitingest.utils.os_utils import ensure_directory_exists_or_create
from gitingest.utils.timeout_wrapper import async_timeout
if TYPE_CHECKING:
from gitingest.schemas import CloneConfig
# Initialize logger for this module
logger = get_logger(__name__)
@async_timeout(DEFAULT_TIMEOUT)
async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
@ -49,14 +53,35 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
local_path: str = config.local_path
partial_clone: bool = config.subpath != "/"
logger.info(
"Starting git clone operation",
extra={
"url": url,
"local_path": local_path,
"partial_clone": partial_clone,
"subpath": config.subpath,
"branch": config.branch,
"tag": config.tag,
"commit": config.commit,
"include_submodules": config.include_submodules,
},
)
logger.debug("Ensuring git is installed")
await ensure_git_installed()
logger.debug("Creating local directory", extra={"parent_path": str(Path(local_path).parent)})
await ensure_directory_exists_or_create(Path(local_path).parent)
logger.debug("Checking if repository exists", extra={"url": url})
if not await check_repo_exists(url, token=token):
logger.error("Repository not found", extra={"url": url})
msg = "Repository not found. Make sure it is public or that you have provided a valid token."
raise ValueError(msg)
logger.debug("Resolving commit reference")
commit = await resolve_commit(config, token=token)
logger.debug("Resolved commit", extra={"commit": commit})
clone_cmd = ["git"]
if token and is_github_host(url):
@ -69,20 +94,30 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
clone_cmd += [url, local_path]
# Clone the repository
logger.info("Executing git clone command", extra={"command": " ".join([*clone_cmd[:-1], "<url>", local_path])})
await run_command(*clone_cmd)
logger.info("Git clone completed successfully")
# Checkout the subpath if it is a partial clone
if partial_clone:
logger.info("Setting up partial clone for subpath", extra={"subpath": config.subpath})
await checkout_partial_clone(config, token=token)
logger.debug("Partial clone setup completed")
git = create_git_command(["git"], local_path, url, token)
# Ensure the commit is locally available
logger.debug("Fetching specific commit", extra={"commit": commit})
await run_command(*git, "fetch", "--depth=1", "origin", commit)
# Write the work-tree at that commit
logger.info("Checking out commit", extra={"commit": commit})
await run_command(*git, "checkout", commit)
# Update submodules
if config.include_submodules:
logger.info("Updating submodules")
await run_command(*git, "submodule", "update", "--init", "--recursive", "--depth=1")
logger.debug("Submodules updated successfully")
logger.info("Git clone operation completed successfully", extra={"local_path": local_path})

View file

@ -7,7 +7,6 @@ import errno
import shutil
import stat
import sys
import warnings
from contextlib import asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator, Callable
@ -20,6 +19,7 @@ from gitingest.query_parser import parse_local_dir_path, parse_remote_repo
from gitingest.utils.auth import resolve_token
from gitingest.utils.compat_func import removesuffix
from gitingest.utils.ignore_patterns import load_ignore_patterns
from gitingest.utils.logging_config import get_logger
from gitingest.utils.pattern_utils import process_patterns
from gitingest.utils.query_parser_utils import KNOWN_GIT_HOSTS
@ -28,6 +28,9 @@ if TYPE_CHECKING:
from gitingest.schemas import IngestionQuery
# Initialize logger for this module
logger = get_logger(__name__)
async def ingest_async(
source: str,
@ -83,6 +86,8 @@ async def ingest_async(
- The content of the files in the repository or directory.
"""
logger.info("Starting ingestion process", extra={"source": source})
token = resolve_token(token)
source = removesuffix(source.strip(), ".git")
@ -90,12 +95,14 @@ async def ingest_async(
# Determine the parsing method based on the source type
if urlparse(source).scheme in ("https", "http") or any(h in source for h in KNOWN_GIT_HOSTS):
# We either have a full URL or a domain-less slug
logger.info("Parsing remote repository", extra={"source": source})
query = await parse_remote_repo(source, token=token)
query.include_submodules = include_submodules
_override_branch_and_tag(query, branch=branch, tag=tag)
else:
# Local path scenario
logger.info("Processing local directory", extra={"source": source})
query = parse_local_dir_path(source)
query.max_file_size = max_file_size
@ -109,11 +116,35 @@ async def ingest_async(
query.include_submodules = include_submodules
logger.debug(
"Configuration completed",
extra={
"max_file_size": query.max_file_size,
"include_submodules": query.include_submodules,
"include_gitignored": include_gitignored,
"has_include_patterns": bool(query.include_patterns),
"has_exclude_patterns": bool(query.ignore_patterns),
},
)
async with _clone_repo_if_remote(query, token=token):
if query.url:
logger.info("Repository cloned, starting file processing")
else:
logger.info("Starting local directory processing")
if not include_gitignored:
logger.debug("Applying gitignore patterns")
_apply_gitignores(query)
logger.info("Processing files and generating output")
summary, tree, content = ingest_query(query)
if output:
logger.debug("Writing output to file", extra={"output_path": output})
await _write_output(tree, content=content, target=output)
logger.info("Ingestion completed successfully")
return summary, tree, content
@ -209,19 +240,19 @@ def _override_branch_and_tag(query: IngestionQuery, branch: str | None, tag: str
"""
if tag and query.tag and tag != query.tag:
msg = f"Warning: The specified tag '{tag}' overrides the tag found in the URL '{query.tag}'."
warnings.warn(msg, RuntimeWarning, stacklevel=3)
logger.warning(msg)
query.tag = tag or query.tag
if branch and query.branch and branch != query.branch:
msg = f"Warning: The specified branch '{branch}' overrides the branch found in the URL '{query.branch}'."
warnings.warn(msg, RuntimeWarning, stacklevel=3)
logger.warning(msg)
query.branch = branch or query.branch
if tag and branch:
msg = "Warning: Both tag and branch are specified. The tag will be used."
warnings.warn(msg, RuntimeWarning, stacklevel=3)
logger.warning(msg)
# Tag wins over branch if both supplied
if query.tag:

View file

@ -9,10 +9,14 @@ from gitingest.config import MAX_DIRECTORY_DEPTH, MAX_FILES, MAX_TOTAL_SIZE_BYTE
from gitingest.output_formatter import format_node
from gitingest.schemas import FileSystemNode, FileSystemNodeType, FileSystemStats
from gitingest.utils.ingestion_utils import _should_exclude, _should_include
from gitingest.utils.logging_config import get_logger
if TYPE_CHECKING:
from gitingest.schemas import IngestionQuery
# Initialize logger for this module
logger = get_logger(__name__)
def ingest_query(query: IngestionQuery) -> tuple[str, str, str]:
"""Run the ingestion process for a parsed query.
@ -37,16 +41,30 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]:
If the path cannot be found, is not a file, or the file has no content.
"""
logger.info(
"Starting file ingestion",
extra={
"slug": query.slug,
"subpath": query.subpath,
"local_path": str(query.local_path),
"max_file_size": query.max_file_size,
},
)
subpath = Path(query.subpath.strip("/")).as_posix()
path = query.local_path / subpath
if not path.exists():
logger.error("Path not found", extra={"path": str(path), "slug": query.slug})
msg = f"{query.slug} cannot be found"
raise ValueError(msg)
if (query.type and query.type == "blob") or query.local_path.is_file():
# TODO: We do this wrong! We should still check the branch and commit!
logger.info("Processing single file", extra={"file_path": str(path)})
if not path.is_file():
logger.error("Expected file but found non-file", extra={"path": str(path)})
msg = f"Path {path} is not a file"
raise ValueError(msg)
@ -62,11 +80,21 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]:
)
if not file_node.content:
logger.error("File has no content", extra={"file_name": file_node.name})
msg = f"File {file_node.name} has no content"
raise ValueError(msg)
logger.info(
"Single file processing completed",
extra={
"file_name": file_node.name,
"file_size": file_node.size,
},
)
return format_node(file_node, query=query)
logger.info("Processing directory", extra={"directory_path": str(path)})
root_node = FileSystemNode(
name=path.name,
type=FileSystemNodeType.DIRECTORY,
@ -78,6 +106,17 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]:
_process_node(node=root_node, query=query, stats=stats)
logger.info(
"Directory processing completed",
extra={
"total_files": root_node.file_count,
"total_directories": root_node.dir_count,
"total_size_bytes": root_node.size,
"stats_total_files": stats.total_files,
"stats_total_size": stats.total_size,
},
)
return format_node(root_node, query=query)
@ -111,7 +150,14 @@ def _process_node(node: FileSystemNode, query: IngestionQuery, stats: FileSystem
_process_symlink(path=sub_path, parent_node=node, stats=stats, local_path=query.local_path)
elif sub_path.is_file():
if sub_path.stat().st_size > query.max_file_size:
print(f"Skipping file {sub_path}: would exceed max file size limit")
logger.debug(
"Skipping file: would exceed max file size limit",
extra={
"file_path": str(sub_path),
"file_size": sub_path.stat().st_size,
"max_file_size": query.max_file_size,
},
)
continue
_process_file(path=sub_path, parent_node=node, stats=stats, local_path=query.local_path)
elif sub_path.is_dir():
@ -133,7 +179,7 @@ def _process_node(node: FileSystemNode, query: IngestionQuery, stats: FileSystem
node.file_count += child_directory_node.file_count
node.dir_count += 1 + child_directory_node.dir_count
else:
print(f"Warning: {sub_path} is an unknown file type, skipping")
logger.warning("Unknown file type, skipping", extra={"file_path": str(sub_path)})
node.sort_children()
@ -186,12 +232,27 @@ def _process_file(path: Path, parent_node: FileSystemNode, stats: FileSystemStat
"""
if stats.total_files + 1 > MAX_FILES:
print(f"Maximum file limit ({MAX_FILES}) reached")
logger.warning(
"Maximum file limit reached",
extra={
"current_files": stats.total_files,
"max_files": MAX_FILES,
"file_path": str(path),
},
)
return
file_size = path.stat().st_size
if stats.total_size + file_size > MAX_TOTAL_SIZE_BYTES:
print(f"Skipping file {path}: would exceed total size limit")
logger.warning(
"Skipping file: would exceed total size limit",
extra={
"file_path": str(path),
"file_size": file_size,
"current_total_size": stats.total_size,
"max_total_size": MAX_TOTAL_SIZE_BYTES,
},
)
return
stats.total_files += 1
@ -232,15 +293,33 @@ def limit_exceeded(stats: FileSystemStats, depth: int) -> bool:
"""
if depth > MAX_DIRECTORY_DEPTH:
print(f"Maximum depth limit ({MAX_DIRECTORY_DEPTH}) reached")
logger.warning(
"Maximum directory depth limit reached",
extra={
"current_depth": depth,
"max_depth": MAX_DIRECTORY_DEPTH,
},
)
return True
if stats.total_files >= MAX_FILES:
print(f"Maximum file limit ({MAX_FILES}) reached")
logger.warning(
"Maximum file limit reached",
extra={
"current_files": stats.total_files,
"max_files": MAX_FILES,
},
)
return True # TODO: end recursion
if stats.total_size >= MAX_TOTAL_SIZE_BYTES:
print(f"Maxumum total size limit ({MAX_TOTAL_SIZE_BYTES / 1024 / 1024:.1f}MB) reached")
logger.warning(
"Maximum total size limit reached",
extra={
"current_size_mb": stats.total_size / 1024 / 1024,
"max_size_mb": MAX_TOTAL_SIZE_BYTES / 1024 / 1024,
},
)
return True # TODO: end recursion
return False

View file

@ -3,7 +3,6 @@
from __future__ import annotations
import ssl
import warnings
from typing import TYPE_CHECKING
import requests.exceptions
@ -11,10 +10,14 @@ import tiktoken
from gitingest.schemas import FileSystemNode, FileSystemNodeType
from gitingest.utils.compat_func import readlink
from gitingest.utils.logging_config import get_logger
if TYPE_CHECKING:
from gitingest.schemas import IngestionQuery
# Initialize logger for this module
logger = get_logger(__name__)
_TOKEN_THRESHOLDS: list[tuple[int, str]] = [
(1_000_000, "M"),
(1_000, "k"),
@ -193,11 +196,11 @@ def _format_token_count(text: str) -> str | None:
encoding = tiktoken.get_encoding("o200k_base") # gpt-4o, gpt-4o-mini
total_tokens = len(encoding.encode(text, disallowed_special=()))
except (ValueError, UnicodeEncodeError) as exc:
warnings.warn(f"Failed to estimate token size: {exc}", RuntimeWarning, stacklevel=3)
logger.warning("Failed to estimate token size", extra={"error": str(exc)})
return None
except (requests.exceptions.RequestException, ssl.SSLError) as exc:
# If network errors, skip token count estimation instead of erroring out
warnings.warn(f"Failed to download tiktoken model: {exc}", RuntimeWarning, stacklevel=3)
logger.warning("Failed to download tiktoken model", extra={"error": str(exc)})
return None
for threshold, suffix in _TOKEN_THRESHOLDS:

View file

@ -3,13 +3,13 @@
from __future__ import annotations
import uuid
import warnings
from pathlib import Path
from typing import Literal
from gitingest.config import TMP_BASE_PATH
from gitingest.schemas import IngestionQuery
from gitingest.utils.git_utils import fetch_remote_branches_or_tags, resolve_commit
from gitingest.utils.logging_config import get_logger
from gitingest.utils.query_parser_utils import (
PathKind,
_fallback_to_root,
@ -18,6 +18,9 @@ from gitingest.utils.query_parser_utils import (
_normalise_source,
)
# Initialize logger for this module
logger = get_logger(__name__)
async def parse_remote_repo(source: str, token: str | None = None) -> IngestionQuery:
"""Parse a repository URL and return an ``IngestionQuery`` object.
@ -169,7 +172,7 @@ async def _configure_branch_or_tag(
except RuntimeError as exc:
# If remote discovery fails, we optimistically treat the first path segment as the branch/tag.
msg = f"Warning: Failed to fetch {_ref_type}: {exc}"
warnings.warn(msg, RuntimeWarning, stacklevel=2)
logger.warning(msg)
return path_parts.pop(0) if path_parts else None
# Iterate over the path components and try to find a matching branch/tag

View file

@ -15,11 +15,14 @@ from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_403_FORBID
from gitingest.utils.compat_func import removesuffix
from gitingest.utils.exceptions import InvalidGitHubTokenError
from server.server_utils import Colors
from gitingest.utils.logging_config import get_logger
if TYPE_CHECKING:
from gitingest.schemas import CloneConfig
# Initialize logger for this module
logger = get_logger(__name__)
# GitHub Personal-Access tokens (classic + fine-grained).
# - ghp_ / gho_ / ghu_ / ghs_ / ghr_ → 36 alphanumerics
# - github_pat_ → 22 alphanumerics + "_" + 59 alphanumerics
@ -97,13 +100,12 @@ async def ensure_git_installed() -> None:
try:
stdout, _ = await run_command("git", "config", "core.longpaths")
if stdout.decode().strip().lower() != "true":
print(
f"{Colors.BROWN}WARN{Colors.END}: {Colors.RED}Git clone may fail on Windows "
f"due to long file paths:{Colors.END}",
logger.warning(
"Git clone may fail on Windows due to long file paths. "
"Consider enabling long path support with: 'git config --global core.longpaths true'. "
"Note: This command may require administrator privileges.",
extra={"platform": "windows", "longpaths_enabled": False},
)
print(f"{Colors.RED}To avoid this issue, consider enabling long path support with:{Colors.END}")
print(f"{Colors.RED} git config --global core.longpaths true{Colors.END}")
print(f"{Colors.RED}Note: This command may require administrator privileges.{Colors.END}")
except RuntimeError:
# Ignore if checking 'core.longpaths' fails.
pass

View file

@ -0,0 +1,200 @@
"""Logging configuration for gitingest using loguru.
This module provides structured JSON logging suitable for Kubernetes deployments
while also supporting human-readable logging for development.
"""
from __future__ import annotations
import json
import logging
import os
import sys
from typing import Any
from loguru import logger
def json_sink(message: Any) -> None: # noqa: ANN401
"""Create JSON formatted log output.
Parameters
----------
message : Any
The loguru message record
"""
record = message.record
log_entry = {
"timestamp": record["time"].isoformat(),
"level": record["level"].name.upper(),
"logger": record["name"],
"module": record["module"],
"function": record["function"],
"line": record["line"],
"message": record["message"],
}
# Add exception info if present
if record["exception"]:
log_entry["exception"] = {
"type": record["exception"].type.__name__,
"value": str(record["exception"].value),
"traceback": record["exception"].traceback,
}
# Add extra fields if present
if record["extra"]:
log_entry.update(record["extra"])
sys.stdout.write(json.dumps(log_entry, ensure_ascii=False, separators=(",", ":")) + "\n")
def format_extra_fields(record: dict) -> str:
"""Format extra fields as JSON string.
Parameters
----------
record : dict
The loguru record dictionary
Returns
-------
str
JSON formatted extra fields or empty string
"""
if not record.get("extra"):
return ""
# Filter out loguru's internal extra fields
filtered_extra = {k: v for k, v in record["extra"].items() if not k.startswith("_") and k not in ["name"]}
# Handle nested extra structure - if there's an 'extra' key, use its contents
if "extra" in filtered_extra and isinstance(filtered_extra["extra"], dict):
filtered_extra = filtered_extra["extra"]
if filtered_extra:
extra_json = json.dumps(filtered_extra, ensure_ascii=False, separators=(",", ":"))
return f" | {extra_json}"
return ""
def extra_filter(record: dict) -> dict:
"""Filter function to add extra fields to the message.
Parameters
----------
record : dict
The loguru record dictionary
Returns
-------
dict
Modified record with extra fields appended to message
"""
extra_str = format_extra_fields(record)
if extra_str:
record["message"] = record["message"] + extra_str
return record
class InterceptHandler(logging.Handler):
"""Intercept standard library logging and redirect to loguru."""
def emit(self, record: logging.LogRecord) -> None:
"""Emit a record to loguru."""
# Get corresponding loguru level
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Find caller from where originated the logged message
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level,
record.getMessage(),
)
def configure_logging() -> None:
"""Configure loguru for the application.
Sets up JSON logging for production/Kubernetes environments
or human-readable logging for development.
Intercepts all standard library logging including uvicorn.
"""
# Remove default handler
logger.remove()
# Check if we're in Kubernetes or production environment
is_k8s = os.getenv("KUBERNETES_SERVICE_HOST") is not None
log_format = os.getenv("LOG_FORMAT", "json" if is_k8s else "human")
log_level = os.getenv("LOG_LEVEL", "INFO")
if log_format.lower() == "json":
# JSON format for structured logging (Kubernetes/production)
logger.add(
json_sink,
level=log_level,
enqueue=True, # Async logging for better performance
diagnose=False, # Don't include variable values in exceptions (security)
backtrace=True, # Include full traceback
serialize=True, # Ensure proper serialization
)
else:
# Human-readable format for development
logger_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"{message}"
)
logger.add(
sys.stderr,
format=logger_format,
filter=extra_filter,
level=log_level,
enqueue=True,
diagnose=True, # Include variable values in development
backtrace=True,
)
# Intercept all standard library logging
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
# Intercept specific loggers that might bypass basicConfig
for name in logging.root.manager.loggerDict: # pylint: disable=no-member
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = True
def get_logger(name: str | None = None) -> logger.__class__:
"""Get a configured logger instance.
Parameters
----------
name : str | None, optional
Logger name, defaults to the calling module name
Returns
-------
logger.__class__
Configured logger instance
"""
if name:
return logger.bind(name=name)
return logger
# Initialize logging when module is imported
configure_logging()

View file

@ -3,15 +3,18 @@
from __future__ import annotations
import json
import warnings
from itertools import chain
from typing import TYPE_CHECKING, Any
from gitingest.utils.exceptions import InvalidNotebookError
from gitingest.utils.logging_config import get_logger
if TYPE_CHECKING:
from pathlib import Path
# Initialize logger for this module
logger = get_logger(__name__)
def process_notebook(file: Path, *, include_output: bool = True) -> str:
"""Process a Jupyter notebook file and return an executable Python script as a string.
@ -44,20 +47,16 @@ def process_notebook(file: Path, *, include_output: bool = True) -> str:
# Check if the notebook contains worksheets
worksheets = notebook.get("worksheets")
if worksheets:
warnings.warn(
logger.warning(
"Worksheets are deprecated as of IPEP-17. Consider updating the notebook. "
"(See: https://github.com/jupyter/nbformat and "
"https://github.com/ipython/ipython/wiki/IPEP-17:-Notebook-Format-4#remove-multiple-worksheets "
"for more information.)",
DeprecationWarning,
stacklevel=2,
)
if len(worksheets) > 1:
warnings.warn(
logger.warning(
"Multiple worksheets detected. Combining all worksheets into a single script.",
UserWarning,
stacklevel=2,
)
cells = list(chain.from_iterable(ws["cells"] for ws in worksheets))

View file

@ -3,16 +3,18 @@
from __future__ import annotations
import string
import warnings
from typing import TYPE_CHECKING, cast
from urllib.parse import ParseResult, unquote, urlparse
from gitingest.utils.compat_typing import StrEnum
from gitingest.utils.git_utils import _resolve_ref_to_sha, check_repo_exists
from gitingest.utils.logging_config import get_logger
if TYPE_CHECKING:
from gitingest.schemas import IngestionQuery
# Initialize logger for this module
logger = get_logger(__name__)
HEX_DIGITS: set[str] = set(string.hexdigits)
@ -56,7 +58,7 @@ async def _fallback_to_root(query: IngestionQuery, token: str | None, warn_msg:
url = cast("str", query.url)
query.commit = await _resolve_ref_to_sha(url, pattern="HEAD", token=token)
if warn_msg:
warnings.warn(warn_msg, RuntimeWarning, stacklevel=3)
logger.warning(warn_msg)
return query

32
src/server/__main__.py Normal file
View file

@ -0,0 +1,32 @@
"""Server module entry point for running with python -m server."""
import os
import uvicorn
# Import logging configuration first to intercept all logging
from gitingest.utils.logging_config import get_logger
logger = get_logger(__name__)
if __name__ == "__main__":
# Get configuration from environment variables
host = os.getenv("HOST", "0.0.0.0") # noqa: S104
port = int(os.getenv("PORT", "8000"))
reload = os.getenv("RELOAD", "false").lower() == "true"
logger.info(
"Starting Gitingest server",
extra={
"host": host,
"port": port,
},
)
uvicorn.run(
"server.main:app",
host=host,
port=port,
reload=reload,
log_config=None, # Disable uvicorn's default logging config
)

View file

@ -14,6 +14,8 @@ from fastapi.staticfiles import StaticFiles
from slowapi.errors import RateLimitExceeded
from starlette.middleware.trustedhost import TrustedHostMiddleware
# Import logging configuration first to intercept all logging
from gitingest.utils.logging_config import get_logger
from server.metrics_server import start_metrics_server
from server.routers import dynamic, index, ingest
from server.server_config import templates
@ -22,6 +24,9 @@ from server.server_utils import lifespan, limiter, rate_limit_exception_handler
# Load environment variables from .env file
load_dotenv()
# Initialize logger for this module
logger = get_logger(__name__)
# Initialize Sentry SDK if enabled
if os.getenv("GITINGEST_SENTRY_ENABLED") is not None:
sentry_dsn = os.getenv("GITINGEST_SENTRY_DSN")

View file

@ -1,14 +1,14 @@
"""Prometheus metrics server running on a separate port."""
import logging
import uvicorn
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from prometheus_client import REGISTRY, generate_latest
from gitingest.utils.logging_config import get_logger
# Create a logger for this module
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
# Create a separate FastAPI app for metrics
metrics_app = FastAPI(
@ -53,5 +53,16 @@ def start_metrics_server(host: str = "127.0.0.1", port: int = 9090) -> None:
None
"""
logger.info("Starting metrics server on %s:%s", host, port)
uvicorn.run(metrics_app, host=host, port=port)
logger.info("Starting metrics server", extra={"host": host, "port": port})
# Configure uvicorn to suppress startup messages to avoid duplicates
# since the main server already shows similar messages
uvicorn.run(
metrics_app,
host=host,
port=port,
log_config=None, # Disable uvicorn's default logging config
access_log=False, # Disable access logging for metrics server
# Suppress uvicorn's startup messages by setting log level higher
log_level="warning",
)

View file

@ -2,7 +2,6 @@
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING, cast
@ -10,6 +9,7 @@ from gitingest.clone import clone_repo
from gitingest.ingestion import ingest_query
from gitingest.query_parser import parse_remote_repo
from gitingest.utils.git_utils import resolve_commit, validate_github_token
from gitingest.utils.logging_config import get_logger
from gitingest.utils.pattern_utils import process_patterns
from server.models import IngestErrorResponse, IngestResponse, IngestSuccessResponse, PatternType, S3Metadata
from server.s3_utils import (
@ -22,14 +22,14 @@ from server.s3_utils import (
upload_to_s3,
)
from server.server_config import MAX_DISPLAY_SIZE
from server.server_utils import Colors
# Initialize logger for this module
logger = get_logger(__name__)
if TYPE_CHECKING:
from gitingest.schemas.cloning import CloneConfig
from gitingest.schemas.ingestion import IngestionQuery
logger = logging.getLogger(__name__)
async def _check_s3_cache(
query: IngestionQuery,
@ -68,7 +68,10 @@ async def _check_s3_cache(
try:
# Use git ls-remote to get commit SHA without cloning
clone_config = query.extract_clone_config()
logger.info("Resolving commit for S3 cache check", extra={"repo_url": query.url})
query.commit = await resolve_commit(clone_config, token=token)
logger.info("Commit resolved successfully", extra={"repo_url": query.url, "commit": query.commit})
# Generate S3 file path using the resolved commit
s3_file_path = generate_s3_file_path(
source=query.url,
@ -114,8 +117,9 @@ async def _check_s3_cache(
)
except Exception as exc:
# Log the exception but don't fail the entire request
logger.warning("S3 cache check failed, falling back to normal cloning: %s", exc)
logger.warning("S3 cache check failed, falling back to normal cloning", extra={"error": str(exc)})
logger.info("Digest not found in S3 cache, proceeding with normal cloning", extra={"repo_url": query.url})
return None
@ -165,10 +169,10 @@ def _store_digest_content(
)
try:
upload_metadata_to_s3(metadata=metadata, s3_file_path=s3_file_path, ingest_id=query.id)
logger.debug("Successfully uploaded metadata to S3")
logger.info("Successfully uploaded metadata to S3")
except Exception as metadata_exc:
# Log the error but don't fail the entire request
logger.warning("Failed to upload metadata to S3: %s", metadata_exc)
logger.warning("Failed to upload metadata to S3", extra={"error": str(metadata_exc)})
# Store S3 URL in query for later use
query.s3_url = s3_url
@ -250,8 +254,7 @@ async def process_query(
try:
query = await parse_remote_repo(input_text, token=token)
except Exception as exc:
print(f"{Colors.BROWN}WARN{Colors.END}: {Colors.RED}<- {Colors.END}", end="")
print(f"{Colors.RED}{exc}{Colors.END}")
logger.warning("Failed to parse remote repository", extra={"input_text": input_text, "error": str(exc)})
return IngestErrorResponse(error=str(exc))
query.url = cast("str", query.url)
@ -336,16 +339,16 @@ def _print_query(url: str, max_file_size: int, pattern_type: str, pattern: str)
"""
default_max_file_kb = 50
print(f"{Colors.WHITE}{url:<20}{Colors.END}", end="")
if int(max_file_size / 1024) != default_max_file_kb:
print(
f" | {Colors.YELLOW}Size: {int(max_file_size / 1024)}kB{Colors.END}",
end="",
logger.info(
"Processing query",
extra={
"url": url,
"max_file_size_kb": int(max_file_size / 1024),
"pattern_type": pattern_type,
"pattern": pattern,
"custom_size": int(max_file_size / 1024) != default_max_file_kb,
},
)
if pattern_type == "include" and pattern != "":
print(f" | {Colors.YELLOW}Include {pattern}{Colors.END}", end="")
elif pattern_type == "exclude" and pattern != "":
print(f" | {Colors.YELLOW}Exclude {pattern}{Colors.END}", end="")
def _print_error(url: str, exc: Exception, max_file_size: int, pattern_type: str, pattern: str) -> None:
@ -365,9 +368,16 @@ def _print_error(url: str, exc: Exception, max_file_size: int, pattern_type: str
The actual pattern string to include or exclude in the query.
"""
print(f"{Colors.BROWN}WARN{Colors.END}: {Colors.RED}<- {Colors.END}", end="")
_print_query(url, max_file_size, pattern_type, pattern)
print(f" | {Colors.RED}{exc}{Colors.END}")
logger.error(
"Query processing failed",
extra={
"url": url,
"max_file_size_kb": int(max_file_size / 1024),
"pattern_type": pattern_type,
"pattern": pattern,
"error": str(exc),
},
)
def _print_success(url: str, max_file_size: int, pattern_type: str, pattern: str, summary: str) -> None:
@ -388,6 +398,13 @@ def _print_success(url: str, max_file_size: int, pattern_type: str, pattern: str
"""
estimated_tokens = summary[summary.index("Estimated tokens:") + len("Estimated ") :]
print(f"{Colors.GREEN}INFO{Colors.END}: {Colors.GREEN}<- {Colors.END}", end="")
_print_query(url, max_file_size, pattern_type, pattern)
print(f" | {Colors.PURPLE}{estimated_tokens}{Colors.END}")
logger.info(
"Query processing completed successfully",
extra={
"url": url,
"max_file_size_kb": int(max_file_size / 1024),
"pattern_type": pattern_type,
"pattern": pattern,
"estimated_tokens": estimated_tokens,
},
)

View file

@ -3,7 +3,6 @@
from __future__ import annotations
import hashlib
import logging
import os
from typing import TYPE_CHECKING
from urllib.parse import urlparse
@ -13,6 +12,7 @@ import boto3
from botocore.exceptions import ClientError
from prometheus_client import Counter
from gitingest.utils.logging_config import get_logger
from server.models import S3Metadata
if TYPE_CHECKING:
@ -20,7 +20,7 @@ if TYPE_CHECKING:
# Initialize logger for this module
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
_cache_lookup_counter = Counter("gitingest_cache_lookup", "Number of cache lookups", ["url"])
_cache_hit_counter = Counter("gitingest_cache_hit", "Number of cache hits", ["url"])
@ -133,7 +133,7 @@ def create_s3_client() -> BaseClient:
log_config = config.copy()
has_credentials = bool(log_config.pop("aws_access_key_id", None) or log_config.pop("aws_secret_access_key", None))
logger.debug(
msg="Creating S3 client",
"Creating S3 client",
extra={
"s3_config": log_config,
"has_credentials": has_credentials,
@ -186,7 +186,7 @@ def upload_to_s3(content: str, s3_file_path: str, ingest_id: UUID) -> str:
}
# Log upload attempt
logger.debug("Starting S3 upload", extra=extra_fields)
logger.info("Starting S3 upload", extra=extra_fields)
try:
# Upload the content with ingest_id as tag
@ -226,7 +226,7 @@ def upload_to_s3(content: str, s3_file_path: str, ingest_id: UUID) -> str:
public_url = f"https://{bucket_name}.s3.{get_s3_config()['region_name']}.amazonaws.com/{s3_file_path}"
# Log successful upload
logger.debug(
logger.info(
"S3 upload completed successfully",
extra={
"bucket_name": bucket_name,
@ -283,7 +283,7 @@ def upload_metadata_to_s3(metadata: S3Metadata, s3_file_path: str, ingest_id: UU
}
# Log upload attempt
logger.debug("Starting S3 metadata upload", extra=extra_fields)
logger.info("Starting S3 metadata upload", extra=extra_fields)
try:
# Upload the metadata with ingest_id as tag
@ -325,7 +325,7 @@ def upload_metadata_to_s3(metadata: S3Metadata, s3_file_path: str, ingest_id: UU
)
# Log successful upload
logger.debug(
logger.info(
"S3 metadata upload completed successfully",
extra={
"bucket_name": bucket_name,
@ -371,14 +371,14 @@ def get_metadata_from_s3(s3_file_path: str) -> S3Metadata | None:
# Object doesn't exist if we get a 404 error
error_code = err.response.get("Error", {}).get("Code")
if error_code == "404":
logger.debug("Metadata file not found: %s", metadata_file_path)
logger.info("Metadata file not found", extra={"metadata_file_path": metadata_file_path})
return None
# Log other errors but don't fail
logger.warning("Failed to retrieve metadata from S3: %s", err)
logger.warning("Failed to retrieve metadata from S3", extra={"error": str(err)})
return None
except Exception as exc:
# For any other exception, log and return None
logger.warning("Unexpected error retrieving metadata from S3: %s", exc)
logger.warning("Unexpected error retrieving metadata from S3", extra={"error": str(exc)})
return None
@ -428,7 +428,10 @@ def check_s3_object_exists(s3_file_path: str) -> bool:
"""
if not is_s3_enabled():
logger.info("S3 not enabled, skipping object existence check", extra={"s3_file_path": s3_file_path})
return False
logger.info("Checking S3 object existence", extra={"s3_file_path": s3_file_path})
_cache_lookup_counter.labels(url=s3_file_path).inc()
try:
s3_client = create_s3_client()
@ -440,15 +443,38 @@ def check_s3_object_exists(s3_file_path: str) -> bool:
# Object doesn't exist if we get a 404 error
error_code = err.response.get("Error", {}).get("Code")
if error_code == "404":
logger.info(
"S3 object not found",
extra={
"s3_file_path": s3_file_path,
"bucket_name": get_s3_bucket_name(),
"error_code": error_code,
},
)
_cache_miss_counter.labels(url=s3_file_path).inc()
return False
# Re-raise other errors (permissions, etc.)
raise
except Exception:
except Exception as exc:
# For any other exception, assume object doesn't exist
logger.info(
"S3 object check failed with exception, assuming not found",
extra={
"s3_file_path": s3_file_path,
"bucket_name": get_s3_bucket_name(),
"exception": str(exc),
},
)
_cache_miss_counter.labels(url=s3_file_path).inc()
return False
else:
logger.info(
"S3 object found",
extra={
"s3_file_path": s3_file_path,
"bucket_name": get_s3_bucket_name(),
},
)
_cache_hit_counter.labels(url=s3_file_path).inc()
return True
@ -471,10 +497,10 @@ def get_s3_url_for_ingest_id(ingest_id: UUID) -> str | None:
"""
if not is_s3_enabled():
logger.debug("S3 not enabled, skipping URL lookup for ingest_id: %s", ingest_id)
logger.debug("S3 not enabled, skipping URL lookup", extra={"ingest_id": str(ingest_id)})
return None
logger.debug(msg="Starting S3 URL lookup for ingest ID", extra={"ingest_id": str(ingest_id)})
logger.info("Starting S3 URL lookup for ingest ID", extra={"ingest_id": str(ingest_id)})
try:
s3_client = create_s3_client()
@ -499,8 +525,8 @@ def get_s3_url_for_ingest_id(ingest_id: UUID) -> str | None:
target_ingest_id=ingest_id,
):
s3_url = _build_s3_url(key)
logger.debug(
msg="Found S3 object for ingest ID",
logger.info(
"Found S3 object for ingest ID",
extra={
"ingest_id": str(ingest_id),
"s3_key": key,
@ -510,8 +536,8 @@ def get_s3_url_for_ingest_id(ingest_id: UUID) -> str | None:
)
return s3_url
logger.debug(
msg="No S3 object found for ingest ID",
logger.info(
"No S3 object found for ingest ID",
extra={
"ingest_id": str(ingest_id),
"objects_checked": objects_checked,
@ -520,7 +546,7 @@ def get_s3_url_for_ingest_id(ingest_id: UUID) -> str | None:
except ClientError as err:
logger.exception(
msg="Error during S3 URL lookup",
"Error during S3 URL lookup",
extra={
"ingest_id": str(ingest_id),
"error_code": err.response.get("Error", {}).get("Code"),

View file

@ -14,8 +14,12 @@ from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from gitingest.config import TMP_BASE_PATH
from gitingest.utils.logging_config import get_logger
from server.server_config import DELETE_REPO_AFTER
# Initialize logger for this module
logger = get_logger(__name__)
# Initialize a rate limiter
limiter = Limiter(key_func=get_remote_address)
@ -103,8 +107,8 @@ async def _remove_old_repositories(
await _process_folder(folder)
except (OSError, PermissionError) as exc:
print(f"Error in _remove_old_repositories: {exc}")
except (OSError, PermissionError):
logger.exception("Error in repository cleanup", extra={"base_path": str(base_path)})
await asyncio.sleep(scan_interval)
@ -133,16 +137,16 @@ async def _process_folder(folder: Path) -> None:
owner, repo = filename.split("-", 1)
repo_url = f"{owner}/{repo}"
await loop.run_in_executor(None, _append_line, history_file, repo_url)
except (OSError, PermissionError) as exc:
print(f"Error logging repository URL for {folder}: {exc}")
except (OSError, PermissionError):
logger.exception("Error logging repository URL", extra={"folder": str(folder)})
# Delete the cloned repo
try:
await loop.run_in_executor(None, shutil.rmtree, folder)
except PermissionError as exc:
print(f"No permission to delete {folder}: {exc}")
except OSError as exc:
print(f"Could not delete {folder}: {exc}")
except PermissionError:
logger.exception("No permission to delete folder", extra={"folder": str(folder)})
except OSError:
logger.exception("Could not delete folder", extra={"folder": str(folder)})
def _append_line(path: Path, line: str) -> None:

View file

@ -1,6 +1,7 @@
"""Integration tests covering core functionalities, edge cases, and concurrency handling."""
import shutil
import sys
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Generator
@ -41,7 +42,7 @@ def cleanup_tmp_dir() -> Generator[None, None, None]:
try:
shutil.rmtree(temp_dir)
except PermissionError as exc:
print(f"Error cleaning up {temp_dir}: {exc}")
sys.stderr.write(f"Error cleaning up {temp_dir}: {exc}\n")
@pytest.mark.asyncio

View file

@ -69,7 +69,6 @@ def test_process_notebook_with_worksheets(write_notebook: WriteNotebookFunc) ->
nb_with = write_notebook("with_worksheets.ipynb", with_worksheets)
nb_without = write_notebook("without_worksheets.ipynb", without_worksheets)
with pytest.warns(DeprecationWarning, match="Worksheets are deprecated as of IPEP-17."):
result_with = process_notebook(nb_with)
# Should not raise a warning
@ -104,21 +103,8 @@ def test_process_notebook_multiple_worksheets(write_notebook: WriteNotebookFunc)
nb_multi = write_notebook("multiple_worksheets.ipynb", multi_worksheets)
nb_single = write_notebook("single_worksheet.ipynb", single_worksheet)
# Expect DeprecationWarning + UserWarning
with pytest.warns(
DeprecationWarning,
match="Worksheets are deprecated as of IPEP-17. Consider updating the notebook.",
), pytest.warns(
UserWarning,
match="Multiple worksheets detected. Combining all worksheets into a single script.",
):
result_multi = process_notebook(nb_multi)
# Expect DeprecationWarning only
with pytest.warns(
DeprecationWarning,
match="Worksheets are deprecated as of IPEP-17. Consider updating the notebook.",
):
result_single = process_notebook(nb_single)
assert result_multi != result_single, "Two worksheets should produce more content than one."