# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import logging import os import shutil import sqlite3 from datetime import datetime from typing import Any logger = logging.getLogger("cookie_manager") class CookieManager: """Manager for reading and managing browser cookies from Electron/Chrome SQLite database""" def __init__(self, user_data_dir: str): self.user_data_dir = user_data_dir # Check for cookies in partition directory first (for persist:user_login) partition_cookies_path = os.path.join( user_data_dir, "Partitions", "user_login", "Cookies" ) if os.path.exists(partition_cookies_path): self.cookies_db_path = partition_cookies_path logger.info( f"Using partition cookies at: {partition_cookies_path}" ) else: # Fallback to default location self.cookies_db_path = os.path.join(user_data_dir, "Cookies") if not os.path.exists(self.cookies_db_path): alt_path = os.path.join(user_data_dir, "Network", "Cookies") if os.path.exists(alt_path): self.cookies_db_path = alt_path else: logger.warning( f"Cookies database not found at {self.cookies_db_path} or {partition_cookies_path}" ) def _get_cookies_connection(self) -> sqlite3.Connection | None: """Get database connection using a temporary copy to avoid locks""" if not os.path.exists(self.cookies_db_path): logger.warning( f"Cookies database not found: {self.cookies_db_path}" ) return None temp_db_path = self.cookies_db_path + ".tmp" conn = None try: shutil.copy2(self.cookies_db_path, temp_db_path) conn = sqlite3.connect(temp_db_path) conn.row_factory = sqlite3.Row return conn except Exception as e: logger.error(f"Error connecting to cookies database: {e}") if conn is not None: try: conn.close() except Exception: pass try: if os.path.exists(temp_db_path): os.remove(temp_db_path) except OSError: pass return None def _cleanup_temp_db(self): """Clean up temporary database file""" temp_db_path = self.cookies_db_path + ".tmp" try: if os.path.exists(temp_db_path): os.remove(temp_db_path) except Exception as e: logger.debug(f"Error cleaning up temp database: {e}") def get_cookie_domains(self) -> list[dict[str, Any]]: """Get list of all domains with cookies""" conn = self._get_cookies_connection() if not conn: return [] try: cursor = conn.cursor() query = """ SELECT host_key as domain, COUNT(*) as cookie_count, MAX(last_access_utc) as last_access FROM cookies GROUP BY host_key ORDER BY last_access DESC """ cursor.execute(query) rows = cursor.fetchall() domains = [] for row in rows: try: chrome_timestamp = row["last_access"] if chrome_timestamp: seconds_since_epoch = ( chrome_timestamp / 1000000.0 ) - 11644473600 last_access = datetime.fromtimestamp( seconds_since_epoch ).strftime("%Y-%m-%d %H:%M:%S") else: last_access = "Never" except Exception as e: logger.debug(f"Error converting timestamp: {e}") last_access = "Unknown" domains.append( { "domain": row["domain"], "cookie_count": row["cookie_count"], "last_access": last_access, } ) logger.info(f"Found {len(domains)} domains with cookies") return domains except Exception as e: logger.error(f"Error reading cookies: {e}") return [] finally: conn.close() self._cleanup_temp_db() def get_cookies_for_domain(self, domain: str) -> list[dict[str, str]]: """Get all cookies for a specific domain""" conn = self._get_cookies_connection() if not conn: return [] try: cursor = conn.cursor() query = """ SELECT host_key, name, value, path, expires_utc, is_secure, is_httponly FROM cookies WHERE host_key = ? OR host_key LIKE ? ORDER BY name """ cursor.execute(query, (domain, f"%.{domain}")) rows = cursor.fetchall() cookies = [] for row in rows: raw_value = row["value"] if raw_value is None: value_str = "" elif len(raw_value) > 50: value_str = raw_value[:50] + "..." else: value_str = raw_value cookies.append( { "domain": row["host_key"], "name": row["name"], "value": value_str, "path": row["path"], "secure": bool(row["is_secure"]), "httponly": bool(row["is_httponly"]), } ) return cookies except Exception as e: logger.error(f"Error reading cookies for domain {domain}: {e}") return [] finally: conn.close() self._cleanup_temp_db() def delete_cookies_for_domain(self, domain: str) -> bool: """Delete all cookies for a specific domain""" if not os.path.exists(self.cookies_db_path): logger.warning( f"Cookies database not found: {self.cookies_db_path}" ) return False try: conn = sqlite3.connect(self.cookies_db_path) cursor = conn.cursor() delete_query = """ DELETE FROM cookies WHERE host_key = ? OR host_key LIKE ? """ cursor.execute(delete_query, (domain, f"%.{domain}")) deleted_count = cursor.rowcount conn.commit() # IMPORTANT: Execute VACUUM to remove deleted data and compact database # This prevents recovery from WAL files cursor.execute("VACUUM") conn.commit() conn.close() # Also remove WAL and SHM files to ensure clean state self._cleanup_wal_files() logger.info(f"Deleted {deleted_count} cookies for domain {domain}") return True except Exception as e: logger.error(f"Error deleting cookies for domain {domain}: {e}") return False def _cleanup_wal_files(self): """Remove SQLite WAL and SHM files""" try: wal_path = self.cookies_db_path + "-wal" shm_path = self.cookies_db_path + "-shm" journal_path = self.cookies_db_path + "-journal" for path in [wal_path, shm_path, journal_path]: if os.path.exists(path): os.remove(path) logger.info(f"Removed temporary file: {path}") except Exception as e: logger.warning(f"Error cleaning up WAL files: {e}") def delete_all_cookies(self) -> bool: """Delete all cookies""" if not os.path.exists(self.cookies_db_path): logger.warning( f"Cookies database not found: {self.cookies_db_path}" ) return False try: conn = sqlite3.connect(self.cookies_db_path) cursor = conn.cursor() cursor.execute("DELETE FROM cookies") deleted_count = cursor.rowcount conn.commit() # IMPORTANT: Execute VACUUM to remove deleted data and compact database # This prevents recovery from WAL files cursor.execute("VACUUM") conn.commit() conn.close() # Also remove WAL and SHM files to ensure clean state self._cleanup_wal_files() logger.info(f"Deleted all {deleted_count} cookies") return True except Exception as e: logger.error(f"Error deleting all cookies: {e}") return False def search_cookies(self, keyword: str) -> list[dict[str, Any]]: """Search cookies by domain keyword""" domains = self.get_cookie_domains() keyword_lower = keyword.lower() return [ domain for domain in domains if keyword_lower in (domain["domain"] or "").lower() ]