mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-28 09:35:48 +00:00
update
This commit is contained in:
parent
d1d49e4520
commit
39badf3e53
1 changed files with 10 additions and 81 deletions
|
|
@ -1,7 +1,3 @@
|
|||
"""
|
||||
Electron/Chrome Cookie Manager
|
||||
用于读取和管理Electron浏览器的cookies
|
||||
"""
|
||||
import sqlite3
|
||||
import os
|
||||
from typing import List, Dict, Optional
|
||||
|
|
@ -13,21 +9,14 @@ logger = traceroot.get_logger("cookie_manager")
|
|||
|
||||
|
||||
class CookieManager:
|
||||
"""Cookie管理器,用于读取和管理浏览器cookies"""
|
||||
"""Manager for reading and managing browser cookies
|
||||
from Electron/Chrome SQLite database"""
|
||||
|
||||
def __init__(self, user_data_dir: str):
|
||||
"""
|
||||
初始化Cookie管理器
|
||||
|
||||
Args:
|
||||
user_data_dir: 浏览器用户数据目录
|
||||
"""
|
||||
self.user_data_dir = user_data_dir
|
||||
self.cookies_db_path = os.path.join(user_data_dir, "Cookies")
|
||||
|
||||
# Check for alternative paths
|
||||
if not os.path.exists(self.cookies_db_path):
|
||||
# Try Network/Cookies path (some Electron versions)
|
||||
alt_path = os.path.join(user_data_dir, "Network", "Cookies")
|
||||
if os.path.exists(alt_path):
|
||||
self.cookies_db_path = alt_path
|
||||
|
|
@ -35,22 +24,14 @@ class CookieManager:
|
|||
logger.warning(f"Cookies database not found at {self.cookies_db_path}")
|
||||
|
||||
def _get_cookies_connection(self) -> Optional[sqlite3.Connection]:
|
||||
"""
|
||||
获取cookies数据库连接
|
||||
|
||||
Returns:
|
||||
数据库连接或None
|
||||
"""
|
||||
"""Get database connection using a temporary copy to avoid locks"""
|
||||
if not os.path.exists(self.cookies_db_path):
|
||||
logger.warning(f"Cookies database not found: {self.cookies_db_path}")
|
||||
return None
|
||||
|
||||
try:
|
||||
# Create a temporary copy since the database might be locked
|
||||
temp_db_path = self.cookies_db_path + ".tmp"
|
||||
shutil.copy2(self.cookies_db_path, temp_db_path)
|
||||
|
||||
# Open the temporary copy
|
||||
conn = sqlite3.connect(temp_db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
|
@ -59,7 +40,7 @@ class CookieManager:
|
|||
return None
|
||||
|
||||
def _cleanup_temp_db(self):
|
||||
"""清理临时数据库文件"""
|
||||
"""Clean up temporary database file"""
|
||||
temp_db_path = self.cookies_db_path + ".tmp"
|
||||
try:
|
||||
if os.path.exists(temp_db_path):
|
||||
|
|
@ -68,20 +49,13 @@ class CookieManager:
|
|||
logger.debug(f"Error cleaning up temp database: {e}")
|
||||
|
||||
def get_cookie_domains(self) -> List[Dict[str, any]]:
|
||||
"""
|
||||
获取所有有cookies的域名列表
|
||||
|
||||
Returns:
|
||||
域名列表,包含域名和cookie数量
|
||||
"""
|
||||
"""Get list of all domains with cookies"""
|
||||
conn = self._get_cookies_connection()
|
||||
if not conn:
|
||||
return []
|
||||
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Group by host_key (domain) and count cookies
|
||||
query = """
|
||||
SELECT
|
||||
host_key as domain,
|
||||
|
|
@ -91,19 +65,14 @@ class CookieManager:
|
|||
GROUP BY host_key
|
||||
ORDER BY last_access DESC
|
||||
"""
|
||||
|
||||
cursor.execute(query)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
domains = []
|
||||
for row in rows:
|
||||
# Convert Chrome timestamp (microseconds since 1601-01-01) to readable format
|
||||
try:
|
||||
# Chrome timestamp is microseconds since 1601-01-01 UTC
|
||||
chrome_timestamp = row['last_access']
|
||||
if chrome_timestamp:
|
||||
# Convert to seconds since epoch (1970-01-01)
|
||||
# 11644473600 seconds between 1601-01-01 and 1970-01-01
|
||||
seconds_since_epoch = (chrome_timestamp / 1000000.0) - 11644473600
|
||||
last_access = datetime.fromtimestamp(seconds_since_epoch).strftime('%Y-%m-%d %H:%M:%S')
|
||||
else:
|
||||
|
|
@ -129,22 +98,13 @@ class CookieManager:
|
|||
self._cleanup_temp_db()
|
||||
|
||||
def get_cookies_for_domain(self, domain: str) -> List[Dict[str, str]]:
|
||||
"""
|
||||
获取指定域名的所有cookies
|
||||
|
||||
Args:
|
||||
domain: 域名
|
||||
|
||||
Returns:
|
||||
Cookie列表
|
||||
"""
|
||||
"""Get all cookies for a specific domain"""
|
||||
conn = self._get_cookies_connection()
|
||||
if not conn:
|
||||
return []
|
||||
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = """
|
||||
SELECT
|
||||
host_key,
|
||||
|
|
@ -158,8 +118,6 @@ class CookieManager:
|
|||
WHERE host_key = ? OR host_key LIKE ?
|
||||
ORDER BY name
|
||||
"""
|
||||
|
||||
# Match exact domain or subdomain pattern
|
||||
cursor.execute(query, (domain, f'%.{domain}'))
|
||||
rows = cursor.fetchall()
|
||||
|
||||
|
|
@ -168,7 +126,7 @@ class CookieManager:
|
|||
cookies.append({
|
||||
'domain': row['host_key'],
|
||||
'name': row['name'],
|
||||
'value': row['value'][:50] + '...' if len(row['value']) > 50 else row['value'], # Truncate long values
|
||||
'value': row['value'][:50] + '...' if len(row['value']) > 50 else row['value'],
|
||||
'path': row['path'],
|
||||
'secure': bool(row['is_secure']),
|
||||
'httponly': bool(row['is_httponly'])
|
||||
|
|
@ -184,33 +142,20 @@ class CookieManager:
|
|||
self._cleanup_temp_db()
|
||||
|
||||
def delete_cookies_for_domain(self, domain: str) -> bool:
|
||||
"""
|
||||
删除指定域名的所有cookies
|
||||
|
||||
Args:
|
||||
domain: 域名
|
||||
|
||||
Returns:
|
||||
是否删除成功
|
||||
"""
|
||||
"""Delete all cookies for a specific domain"""
|
||||
if not os.path.exists(self.cookies_db_path):
|
||||
logger.warning(f"Cookies database not found: {self.cookies_db_path}")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Direct connection to the actual database (not a copy)
|
||||
conn = sqlite3.connect(self.cookies_db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Delete cookies for exact domain and subdomains
|
||||
delete_query = """
|
||||
DELETE FROM cookies
|
||||
WHERE host_key = ? OR host_key LIKE ?
|
||||
"""
|
||||
|
||||
cursor.execute(delete_query, (domain, f'%.{domain}'))
|
||||
deleted_count = cursor.rowcount
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
|
@ -222,12 +167,7 @@ class CookieManager:
|
|||
return False
|
||||
|
||||
def delete_all_cookies(self) -> bool:
|
||||
"""
|
||||
删除所有cookies
|
||||
|
||||
Returns:
|
||||
是否删除成功
|
||||
"""
|
||||
"""Delete all cookies"""
|
||||
if not os.path.exists(self.cookies_db_path):
|
||||
logger.warning(f"Cookies database not found: {self.cookies_db_path}")
|
||||
return False
|
||||
|
|
@ -235,10 +175,8 @@ class CookieManager:
|
|||
try:
|
||||
conn = sqlite3.connect(self.cookies_db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("DELETE FROM cookies")
|
||||
deleted_count = cursor.rowcount
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
|
@ -250,18 +188,9 @@ class CookieManager:
|
|||
return False
|
||||
|
||||
def search_cookies(self, keyword: str) -> List[Dict[str, any]]:
|
||||
"""
|
||||
搜索包含关键词的cookies
|
||||
|
||||
Args:
|
||||
keyword: 搜索关键词
|
||||
|
||||
Returns:
|
||||
匹配的域名列表
|
||||
"""
|
||||
"""Search cookies by domain keyword"""
|
||||
domains = self.get_cookie_domains()
|
||||
keyword_lower = keyword.lower()
|
||||
|
||||
return [
|
||||
domain for domain in domains
|
||||
if keyword_lower in domain['domain'].lower()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue