eigent/backend/app/utils/cookie_manager.py

289 lines
9.9 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import logging
import os
import shutil
import sqlite3
from datetime import datetime
from typing import Any
logger = logging.getLogger("cookie_manager")
class CookieManager:
"""Manager for reading and managing browser cookies
from Electron/Chrome SQLite database"""
def __init__(self, user_data_dir: str):
self.user_data_dir = user_data_dir
# Check for cookies in partition directory first (for persist:user_login)
partition_cookies_path = os.path.join(
user_data_dir, "Partitions", "user_login", "Cookies"
)
if os.path.exists(partition_cookies_path):
self.cookies_db_path = partition_cookies_path
logger.info(
f"Using partition cookies at: {partition_cookies_path}"
)
else:
# Fallback to default location
self.cookies_db_path = os.path.join(user_data_dir, "Cookies")
if not os.path.exists(self.cookies_db_path):
alt_path = os.path.join(user_data_dir, "Network", "Cookies")
if os.path.exists(alt_path):
self.cookies_db_path = alt_path
else:
logger.warning(
f"Cookies database not found at {self.cookies_db_path} or {partition_cookies_path}"
)
def _get_cookies_connection(self) -> sqlite3.Connection | None:
"""Get database connection using a temporary copy to avoid locks"""
if not os.path.exists(self.cookies_db_path):
logger.warning(
f"Cookies database not found: {self.cookies_db_path}"
)
return None
temp_db_path = self.cookies_db_path + ".tmp"
conn = None
try:
shutil.copy2(self.cookies_db_path, temp_db_path)
conn = sqlite3.connect(temp_db_path)
conn.row_factory = sqlite3.Row
return conn
except Exception as e:
logger.error(f"Error connecting to cookies database: {e}")
if conn is not None:
try:
conn.close()
except Exception:
pass
try:
if os.path.exists(temp_db_path):
os.remove(temp_db_path)
except OSError:
pass
return None
def _cleanup_temp_db(self):
"""Clean up temporary database file"""
temp_db_path = self.cookies_db_path + ".tmp"
try:
if os.path.exists(temp_db_path):
os.remove(temp_db_path)
except Exception as e:
logger.debug(f"Error cleaning up temp database: {e}")
def get_cookie_domains(self) -> list[dict[str, Any]]:
"""Get list of all domains with cookies"""
conn = self._get_cookies_connection()
if not conn:
return []
try:
cursor = conn.cursor()
query = """
SELECT
host_key as domain,
COUNT(*) as cookie_count,
MAX(last_access_utc) as last_access
FROM cookies
GROUP BY host_key
ORDER BY last_access DESC
"""
cursor.execute(query)
rows = cursor.fetchall()
domains = []
for row in rows:
try:
chrome_timestamp = row["last_access"]
if chrome_timestamp:
seconds_since_epoch = (
chrome_timestamp / 1000000.0
) - 11644473600
last_access = datetime.fromtimestamp(
seconds_since_epoch
).strftime("%Y-%m-%d %H:%M:%S")
else:
last_access = "Never"
except Exception as e:
logger.debug(f"Error converting timestamp: {e}")
last_access = "Unknown"
domains.append(
{
"domain": row["domain"],
"cookie_count": row["cookie_count"],
"last_access": last_access,
}
)
logger.info(f"Found {len(domains)} domains with cookies")
return domains
except Exception as e:
logger.error(f"Error reading cookies: {e}")
return []
finally:
conn.close()
self._cleanup_temp_db()
def get_cookies_for_domain(self, domain: str) -> list[dict[str, str]]:
"""Get all cookies for a specific domain"""
conn = self._get_cookies_connection()
if not conn:
return []
try:
cursor = conn.cursor()
query = """
SELECT
host_key,
name,
value,
path,
expires_utc,
is_secure,
is_httponly
FROM cookies
WHERE host_key = ? OR host_key LIKE ?
ORDER BY name
"""
cursor.execute(query, (domain, f"%.{domain}"))
rows = cursor.fetchall()
cookies = []
for row in rows:
raw_value = row["value"]
if raw_value is None:
value_str = ""
elif len(raw_value) > 50:
value_str = raw_value[:50] + "..."
else:
value_str = raw_value
cookies.append(
{
"domain": row["host_key"],
"name": row["name"],
"value": value_str,
"path": row["path"],
"secure": bool(row["is_secure"]),
"httponly": bool(row["is_httponly"]),
}
)
return cookies
except Exception as e:
logger.error(f"Error reading cookies for domain {domain}: {e}")
return []
finally:
conn.close()
self._cleanup_temp_db()
def delete_cookies_for_domain(self, domain: str) -> bool:
"""Delete all cookies for a specific domain"""
if not os.path.exists(self.cookies_db_path):
logger.warning(
f"Cookies database not found: {self.cookies_db_path}"
)
return False
try:
conn = sqlite3.connect(self.cookies_db_path)
cursor = conn.cursor()
delete_query = """
DELETE FROM cookies
WHERE host_key = ? OR host_key LIKE ?
"""
cursor.execute(delete_query, (domain, f"%.{domain}"))
deleted_count = cursor.rowcount
conn.commit()
# IMPORTANT: Execute VACUUM to remove deleted data and compact database
# This prevents recovery from WAL files
cursor.execute("VACUUM")
conn.commit()
conn.close()
# Also remove WAL and SHM files to ensure clean state
self._cleanup_wal_files()
logger.info(f"Deleted {deleted_count} cookies for domain {domain}")
return True
except Exception as e:
logger.error(f"Error deleting cookies for domain {domain}: {e}")
return False
def _cleanup_wal_files(self):
"""Remove SQLite WAL and SHM files"""
try:
wal_path = self.cookies_db_path + "-wal"
shm_path = self.cookies_db_path + "-shm"
journal_path = self.cookies_db_path + "-journal"
for path in [wal_path, shm_path, journal_path]:
if os.path.exists(path):
os.remove(path)
logger.info(f"Removed temporary file: {path}")
except Exception as e:
logger.warning(f"Error cleaning up WAL files: {e}")
def delete_all_cookies(self) -> bool:
"""Delete all cookies"""
if not os.path.exists(self.cookies_db_path):
logger.warning(
f"Cookies database not found: {self.cookies_db_path}"
)
return False
try:
conn = sqlite3.connect(self.cookies_db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM cookies")
deleted_count = cursor.rowcount
conn.commit()
# IMPORTANT: Execute VACUUM to remove deleted data and compact database
# This prevents recovery from WAL files
cursor.execute("VACUUM")
conn.commit()
conn.close()
# Also remove WAL and SHM files to ensure clean state
self._cleanup_wal_files()
logger.info(f"Deleted all {deleted_count} cookies")
return True
except Exception as e:
logger.error(f"Error deleting all cookies: {e}")
return False
def search_cookies(self, keyword: str) -> list[dict[str, Any]]:
"""Search cookies by domain keyword"""
domains = self.get_cookie_domains()
keyword_lower = keyword.lower()
return [
domain
for domain in domains
if keyword_lower in (domain["domain"] or "").lower()
]