mirror of
https://github.com/Fade78/Fileshed.git
synced 2026-04-28 11:30:20 +00:00
Add mono-instance concurrency protections (C3-C6)
- C3: FS/DB atomicity with rollback on shed_rename, orphan cleanup in shed_maintenance - C4: SQLite WAL mode for better concurrent read/write performance - C5: Git lock (fcntl.flock) for group operations to prevent index corruption - C6: SQLite retry with exponential backoff (3 retries, 0.1/0.2/0.4s delays) Also: Change xxd to od in README features (xxd not in container) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
0fc7cb3bc2
commit
de4fa161a7
3 changed files with 107 additions and 28 deletions
127
Fileshed.py
127
Fileshed.py
|
|
@ -3131,10 +3131,28 @@ shed_exec(zone="storage", cmd="some_cmd", args=["..."],
|
|||
self._git_run(["commit", "-m", message, "--allow-empty-message"], repo_path)
|
||||
|
||||
def _git_commit_as_user(self, repo_path: Path, message: str, user_id: str) -> None:
|
||||
"""Performs a Git commit with user as author."""
|
||||
self._git_run(["add", "-A"], repo_path)
|
||||
author = f"{user_id} <{user_id}@fileshed>"
|
||||
self._git_run(["commit", "--author", author, "-m", message, "--allow-empty-message"], repo_path)
|
||||
"""Performs a Git commit with user as author (used for group operations).
|
||||
|
||||
Uses a lock to prevent concurrent Git operations on the same repository.
|
||||
This is important for group spaces where multiple users may commit simultaneously.
|
||||
"""
|
||||
import fcntl
|
||||
git_lock_path = repo_path / ".git" / "fileshed_git.lock"
|
||||
|
||||
# Ensure .git directory exists
|
||||
git_lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Acquire exclusive lock on the git repository
|
||||
lock_fd = open(git_lock_path, 'w')
|
||||
try:
|
||||
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX) # Blocking exclusive lock
|
||||
|
||||
self._git_run(["add", "-A"], repo_path)
|
||||
author = f"{user_id} <{user_id}@fileshed>"
|
||||
self._git_run(["commit", "--author", author, "-m", message, "--allow-empty-message"], repo_path)
|
||||
finally:
|
||||
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
|
||||
lock_fd.close()
|
||||
|
||||
# =========================================================================
|
||||
# GROUP HELPERS
|
||||
|
|
@ -3165,6 +3183,8 @@ shed_exec(zone="storage", cmd="some_cmd", args=["..."],
|
|||
conn.execute("CREATE INDEX IF NOT EXISTS idx_ownership_group ON file_ownership(group_id)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_ownership_owner ON file_ownership(owner_id)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_ownership_path ON file_ownership(group_id, file_path)")
|
||||
# Enable WAL mode for better concurrent read/write performance
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
|
@ -3173,22 +3193,44 @@ shed_exec(zone="storage", cmd="some_cmd", args=["..."],
|
|||
|
||||
def _db_execute(self, query: str, params: tuple = ()) -> tuple:
|
||||
"""
|
||||
Execute a database query.
|
||||
Execute a database query with automatic retry on transient errors.
|
||||
|
||||
Returns (rows, rowcount) tuple:
|
||||
- rows: list of Row objects for SELECT, empty list for others
|
||||
- rowcount: number of affected rows for INSERT/UPDATE/DELETE
|
||||
|
||||
Retries automatically on SQLite busy/locked errors to minimize
|
||||
round-trips with the LLM (each failed call is expensive).
|
||||
"""
|
||||
import time
|
||||
|
||||
self._init_db()
|
||||
conn = sqlite3.connect(str(self._get_db_path()), timeout=10.0, isolation_level="IMMEDIATE")
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
cursor = conn.execute(query, params)
|
||||
result = cursor.fetchall()
|
||||
rowcount = cursor.rowcount
|
||||
conn.commit()
|
||||
return result, rowcount
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
max_retries = 3
|
||||
base_delay = 0.1 # 100ms initial delay
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
conn = sqlite3.connect(str(self._get_db_path()), timeout=10.0, isolation_level="IMMEDIATE")
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
cursor = conn.execute(query, params)
|
||||
result = cursor.fetchall()
|
||||
rowcount = cursor.rowcount
|
||||
conn.commit()
|
||||
return result, rowcount
|
||||
except sqlite3.OperationalError as e:
|
||||
conn.close()
|
||||
# Retry on transient errors (busy, locked)
|
||||
if attempt < max_retries and ("locked" in str(e).lower() or "busy" in str(e).lower()):
|
||||
delay = base_delay * (2 ** attempt) # Exponential backoff: 0.1, 0.2, 0.4s
|
||||
time.sleep(delay)
|
||||
continue
|
||||
raise
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _get_user_groups(self, user_id: str) -> list:
|
||||
"""Get groups the user belongs to via Open WebUI API."""
|
||||
|
|
@ -4754,7 +4796,9 @@ class Tools:
|
|||
:param match_all: Replace all pattern matches (default: first only)
|
||||
:param overwrite: True=replace entire file content, False=patch at position (default: False).
|
||||
Note: overwrite=False on existing file APPENDS/PATCHES, does NOT fail!
|
||||
:param safe: Lock file during edit
|
||||
:param safe: Lock file during edit (default: True). Set False for slightly better
|
||||
performance but risk of data loss if multiple conversations edit the
|
||||
same file simultaneously (not recommended)
|
||||
:param group: Group name/ID (required if zone="group")
|
||||
:param message: Git commit message (documents/group only, ignored for storage)
|
||||
:param mode: Ownership mode for new files in group: "owner", "group", "owner_ro"
|
||||
|
|
@ -4811,7 +4855,9 @@ class Tools:
|
|||
:param offset: Byte offset for "at"/"replace"
|
||||
:param length: Bytes to replace for "replace"
|
||||
:param overwrite: True=replace entire file, False=patch at position (default: False)
|
||||
:param safe: Lock file during edit
|
||||
:param safe: Lock file during edit (default: True). Set False for slightly better
|
||||
performance but risk of data loss if multiple conversations edit the
|
||||
same file simultaneously (not recommended)
|
||||
:param group: Group name/ID (required if zone="group")
|
||||
:param message: Git commit message (documents/group only)
|
||||
:param mode: Ownership mode for new files in group
|
||||
|
|
@ -4992,14 +5038,19 @@ class Tools:
|
|||
|
||||
# Create parent directories
|
||||
new_target.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Rename
|
||||
|
||||
# Rename with DB rollback protection
|
||||
old_target.rename(new_target)
|
||||
|
||||
# Update ownership records
|
||||
|
||||
# Update ownership records (rollback FS if DB fails)
|
||||
if ctx.group_id:
|
||||
self._core._update_file_ownership_paths(ctx.group_id, old_path, new_path)
|
||||
|
||||
try:
|
||||
self._core._update_file_ownership_paths(ctx.group_id, old_path, new_path)
|
||||
except Exception:
|
||||
# Rollback filesystem rename to maintain FS/DB consistency
|
||||
new_target.rename(old_target)
|
||||
raise
|
||||
|
||||
# Git commit
|
||||
if ctx.git_commit:
|
||||
self._core._git_run(["add", "-A"], ctx.zone_root)
|
||||
|
|
@ -8518,6 +8569,7 @@ shed_tree(zone="storage") # Directory tree
|
|||
"expired_locks": [],
|
||||
"corrupted_locks": [],
|
||||
"orphan_editzones": [],
|
||||
"orphan_ownerships": [],
|
||||
}
|
||||
|
||||
def clean_zone(zone_root: Path, zone_name: str):
|
||||
|
|
@ -8592,10 +8644,31 @@ shed_tree(zone="storage") # Directory tree
|
|||
group_path = groups_root / group.id
|
||||
if group_path.exists():
|
||||
clean_zone(group_path, f"Group:{group.id}")
|
||||
|
||||
total = (len(cleaned["expired_locks"]) +
|
||||
len(cleaned["corrupted_locks"]) +
|
||||
len(cleaned["orphan_editzones"]))
|
||||
|
||||
# Clean orphan ownerships (DB records for files that no longer exist)
|
||||
for group in user_groups:
|
||||
group_path = groups_root / group.id
|
||||
data_path = group_path / "data"
|
||||
if data_path.exists():
|
||||
try:
|
||||
all_ownership, _ = self._core._db_execute(
|
||||
"SELECT file_path FROM file_ownership WHERE group_id = ?",
|
||||
(group.id,)
|
||||
)
|
||||
for row in all_ownership:
|
||||
file_path = data_path / row["file_path"]
|
||||
if not file_path.exists():
|
||||
self._core._delete_file_ownership(group.id, row["file_path"])
|
||||
cleaned["orphan_ownerships"].append(
|
||||
f"Group:{group.id}/{row['file_path']}"
|
||||
)
|
||||
except Exception:
|
||||
pass # Skip group on error
|
||||
|
||||
total = (len(cleaned["expired_locks"]) +
|
||||
len(cleaned["corrupted_locks"]) +
|
||||
len(cleaned["orphan_editzones"]) +
|
||||
len(cleaned["orphan_ownerships"]))
|
||||
|
||||
return self._core._format_response(
|
||||
True,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue