mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-19 07:59:39 +00:00
fix(cookie_manager): robustness — None-safe value, search_cookies domain, temp DB cleanup (#1098)
Co-authored-by: Contributor <contributor@example.com> Co-authored-by: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com> Co-authored-by: Wendong-Fan <w3ndong.fan@gmail.com>
This commit is contained in:
parent
d619c9967e
commit
f256272eed
1 changed files with 24 additions and 6 deletions
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
import sqlite3
|
||||
import os
|
||||
from typing import List, Dict, Optional
|
||||
from typing import Any, List, Dict, Optional
|
||||
import logging
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
|
|
@ -52,14 +52,25 @@ class CookieManager:
|
|||
logger.warning(f"Cookies database not found: {self.cookies_db_path}")
|
||||
return None
|
||||
|
||||
temp_db_path = self.cookies_db_path + ".tmp"
|
||||
conn = None
|
||||
try:
|
||||
temp_db_path = self.cookies_db_path + ".tmp"
|
||||
shutil.copy2(self.cookies_db_path, temp_db_path)
|
||||
conn = sqlite3.connect(temp_db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting to cookies database: {e}")
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if os.path.exists(temp_db_path):
|
||||
os.remove(temp_db_path)
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _cleanup_temp_db(self):
|
||||
|
|
@ -71,7 +82,7 @@ class CookieManager:
|
|||
except Exception as e:
|
||||
logger.debug(f"Error cleaning up temp database: {e}")
|
||||
|
||||
def get_cookie_domains(self) -> List[Dict[str, any]]:
|
||||
def get_cookie_domains(self) -> List[Dict[str, Any]]:
|
||||
"""Get list of all domains with cookies"""
|
||||
conn = self._get_cookies_connection()
|
||||
if not conn:
|
||||
|
|
@ -146,10 +157,17 @@ class CookieManager:
|
|||
|
||||
cookies = []
|
||||
for row in rows:
|
||||
raw_value = row['value']
|
||||
if raw_value is None:
|
||||
value_str = ""
|
||||
elif len(raw_value) > 50:
|
||||
value_str = raw_value[:50] + "..."
|
||||
else:
|
||||
value_str = raw_value
|
||||
cookies.append({
|
||||
'domain': row['host_key'],
|
||||
'name': row['name'],
|
||||
'value': row['value'][:50] + '...' if len(row['value']) > 50 else row['value'],
|
||||
'value': value_str,
|
||||
'path': row['path'],
|
||||
'secure': bool(row['is_secure']),
|
||||
'httponly': bool(row['is_httponly'])
|
||||
|
|
@ -240,11 +258,11 @@ class CookieManager:
|
|||
logger.error(f"Error deleting all cookies: {e}")
|
||||
return False
|
||||
|
||||
def search_cookies(self, keyword: str) -> List[Dict[str, any]]:
|
||||
def search_cookies(self, keyword: str) -> List[Dict[str, Any]]:
|
||||
"""Search cookies by domain keyword"""
|
||||
domains = self.get_cookie_domains()
|
||||
keyword_lower = keyword.lower()
|
||||
return [
|
||||
domain for domain in domains
|
||||
if keyword_lower in domain['domain'].lower()
|
||||
if keyword_lower in (domain['domain'] or '').lower()
|
||||
]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue