mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-18 06:11:17 +00:00
309 lines
11 KiB
Python
309 lines
11 KiB
Python
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
|
|
from camel.toolkits import LinkedInToolkit as BaseLinkedInToolkit
|
|
from camel.toolkits.function_tool import FunctionTool
|
|
|
|
from app.agent.toolkit.abstract_toolkit import AbstractToolkit
|
|
from app.component.environment import env
|
|
from app.service.task import Agents
|
|
from app.utils.listen.toolkit_listen import auto_listen_toolkit
|
|
|
|
logger = logging.getLogger("linkedin_toolkit")
|
|
|
|
# LinkedIn access tokens expire after 60 days (in seconds)
|
|
LINKEDIN_TOKEN_LIFETIME_SECONDS = 60 * 24 * 60 * 60
|
|
# Refresh token when less than 7 days remaining
|
|
LINKEDIN_TOKEN_REFRESH_THRESHOLD_SECONDS = 7 * 24 * 60 * 60
|
|
|
|
|
|
@auto_listen_toolkit(BaseLinkedInToolkit)
|
|
class LinkedInToolkit(BaseLinkedInToolkit, AbstractToolkit):
|
|
r"""LinkedIn toolkit for social media automation.
|
|
|
|
This toolkit wraps CAMEL-AI's LinkedInToolkit and provides:
|
|
- Post creation and deletion
|
|
- Profile information retrieval
|
|
- Token file persistence
|
|
- Environment variable fallback
|
|
"""
|
|
|
|
agent_name: str = Agents.social_media_agent
|
|
|
|
def __init__(self, api_task_id: str, timeout: float | None = None):
|
|
self.api_task_id = api_task_id
|
|
self._token_path = self._build_canonical_token_path()
|
|
self._load_credentials()
|
|
super().__init__(timeout)
|
|
|
|
@classmethod
|
|
def _build_canonical_token_path(cls) -> str:
|
|
r"""Build the canonical path for storing LinkedIn tokens."""
|
|
return env("LINKEDIN_TOKEN_PATH") or os.path.join(
|
|
os.path.expanduser("~"),
|
|
".eigent",
|
|
"tokens",
|
|
"linkedin",
|
|
"linkedin_token.json",
|
|
)
|
|
|
|
def _load_credentials(self):
|
|
r"""Load credentials from token file or environment variables."""
|
|
from dotenv import load_dotenv
|
|
|
|
# Force reload environment variables from default .env file
|
|
default_env_path = os.path.join(
|
|
os.path.expanduser("~"), ".eigent", ".env"
|
|
)
|
|
if os.path.exists(default_env_path):
|
|
load_dotenv(dotenv_path=default_env_path, override=True)
|
|
|
|
# Try to load from token file first
|
|
if os.path.exists(self._token_path):
|
|
try:
|
|
with open(self._token_path) as f:
|
|
token_data = json.load(f)
|
|
access_token = token_data.get("access_token")
|
|
if access_token:
|
|
# Check if token is expired before loading
|
|
expires_at = token_data.get("expires_at")
|
|
if expires_at and int(time.time()) >= expires_at:
|
|
logger.warning(
|
|
"LinkedIn token file contains "
|
|
"expired token, skipping load"
|
|
)
|
|
else:
|
|
os.environ["LINKEDIN_ACCESS_TOKEN"] = access_token
|
|
logger.info(
|
|
"Loaded LinkedIn credentials "
|
|
"from token file: "
|
|
f"{self._token_path}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Could not load LinkedIn token file: {e}")
|
|
|
|
@classmethod
|
|
def get_can_use_tools(cls, api_task_id: str) -> list[FunctionTool]:
|
|
r"""Return tools only if LinkedIn credentials are configured."""
|
|
from dotenv import load_dotenv
|
|
|
|
# Force reload environment variables
|
|
default_env_path = os.path.join(
|
|
os.path.expanduser("~"), ".eigent", ".env"
|
|
)
|
|
if os.path.exists(default_env_path):
|
|
load_dotenv(dotenv_path=default_env_path, override=True)
|
|
|
|
# Check for token file
|
|
token_path = cls._build_canonical_token_path()
|
|
if os.path.exists(token_path):
|
|
try:
|
|
with open(token_path) as f:
|
|
token_data = json.load(f)
|
|
access_token = token_data.get("access_token")
|
|
if access_token:
|
|
# Check if token is expired before loading
|
|
expires_at = token_data.get("expires_at")
|
|
if expires_at and int(time.time()) >= expires_at:
|
|
logger.warning(
|
|
"LinkedIn token file contains "
|
|
"expired token, skipping load"
|
|
)
|
|
else:
|
|
os.environ["LINKEDIN_ACCESS_TOKEN"] = access_token
|
|
except Exception:
|
|
pass
|
|
|
|
if env("LINKEDIN_ACCESS_TOKEN"):
|
|
return LinkedInToolkit(api_task_id).get_tools()
|
|
else:
|
|
return []
|
|
|
|
@classmethod
|
|
def save_token(cls, token_data: dict) -> bool:
|
|
r"""Save OAuth token to file.
|
|
|
|
Args:
|
|
token_data: Dictionary containing access_token
|
|
and optionally refresh_token
|
|
|
|
Returns:
|
|
True if saved successfully, False otherwise
|
|
"""
|
|
token_path = cls._build_canonical_token_path()
|
|
try:
|
|
# Add timestamp for expiration tracking if not present
|
|
if "saved_at" not in token_data:
|
|
token_data["saved_at"] = int(time.time())
|
|
|
|
# Calculate expiration time if expires_in is provided
|
|
if "expires_in" in token_data and "expires_at" not in token_data:
|
|
token_data["expires_at"] = (
|
|
token_data["saved_at"] + token_data["expires_in"]
|
|
)
|
|
elif "expires_at" not in token_data:
|
|
# Default to 60 days if no expiration info provided
|
|
token_data["expires_at"] = (
|
|
token_data["saved_at"] + LINKEDIN_TOKEN_LIFETIME_SECONDS
|
|
)
|
|
|
|
os.makedirs(os.path.dirname(token_path), exist_ok=True)
|
|
with open(token_path, "w") as f:
|
|
json.dump(token_data, f, indent=2)
|
|
logger.info(f"Saved LinkedIn token to {token_path}")
|
|
|
|
# Also update environment variable
|
|
if token_data.get("access_token"):
|
|
os.environ["LINKEDIN_ACCESS_TOKEN"] = token_data[
|
|
"access_token"
|
|
]
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to save LinkedIn token: {e}")
|
|
return False
|
|
|
|
@classmethod
|
|
def clear_token(cls) -> bool:
|
|
r"""Remove stored token file and clear environment variable.
|
|
|
|
Returns:
|
|
True if cleared successfully, False otherwise
|
|
"""
|
|
token_path = cls._build_canonical_token_path()
|
|
try:
|
|
if os.path.exists(token_path):
|
|
os.remove(token_path)
|
|
logger.info(f"Removed LinkedIn token file: {token_path}")
|
|
|
|
# Also try to remove the parent directory if empty
|
|
token_dir = os.path.dirname(token_path)
|
|
if os.path.exists(token_dir) and not os.listdir(token_dir):
|
|
os.rmdir(token_dir)
|
|
logger.info(
|
|
f"Removed empty LinkedIn token directory: {token_dir}"
|
|
)
|
|
|
|
# Clear environment variable
|
|
if "LINKEDIN_ACCESS_TOKEN" in os.environ:
|
|
del os.environ["LINKEDIN_ACCESS_TOKEN"]
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to clear LinkedIn token: {e}")
|
|
return False
|
|
|
|
@classmethod
|
|
def is_authenticated(cls) -> bool:
|
|
r"""Check if user has valid LinkedIn credentials.
|
|
|
|
Returns:
|
|
True if credentials are available, False otherwise
|
|
"""
|
|
from dotenv import load_dotenv
|
|
|
|
# Force reload environment variables
|
|
default_env_path = os.path.join(
|
|
os.path.expanduser("~"), ".eigent", ".env"
|
|
)
|
|
if os.path.exists(default_env_path):
|
|
load_dotenv(dotenv_path=default_env_path, override=True)
|
|
|
|
# Check token file
|
|
token_path = cls._build_canonical_token_path()
|
|
if os.path.exists(token_path):
|
|
try:
|
|
with open(token_path) as f:
|
|
token_data = json.load(f)
|
|
if token_data.get("access_token"):
|
|
return True
|
|
except Exception:
|
|
pass
|
|
|
|
# Check environment variable
|
|
return bool(env("LINKEDIN_ACCESS_TOKEN"))
|
|
|
|
@classmethod
|
|
def get_token_info(cls) -> dict | None:
|
|
r"""Get stored token information including expiration.
|
|
|
|
Returns:
|
|
Token data dictionary or None if not available
|
|
"""
|
|
token_path = cls._build_canonical_token_path()
|
|
if os.path.exists(token_path):
|
|
try:
|
|
with open(token_path) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
@classmethod
|
|
def is_token_expiring_soon(cls) -> bool:
|
|
r"""Check if token is expiring within the refresh threshold.
|
|
|
|
Returns:
|
|
True if token is expiring soon or already expired, False otherwise.
|
|
Returns False if no token file exists (e.g. env var auth only).
|
|
"""
|
|
token_info = cls.get_token_info()
|
|
if not token_info:
|
|
# No token file; cannot determine expiry
|
|
return False
|
|
|
|
expires_at = token_info.get("expires_at")
|
|
if not expires_at:
|
|
return False # No expiration info, assume valid
|
|
|
|
current_time = int(time.time())
|
|
time_remaining = expires_at - current_time
|
|
|
|
return time_remaining < LINKEDIN_TOKEN_REFRESH_THRESHOLD_SECONDS
|
|
|
|
@classmethod
|
|
def is_token_expired(cls) -> bool:
|
|
r"""Check if token has expired.
|
|
|
|
Returns:
|
|
True if token is expired, False otherwise.
|
|
Returns False if no token file exists (e.g. env var auth only).
|
|
"""
|
|
token_info = cls.get_token_info()
|
|
if not token_info:
|
|
# No token file; cannot determine expiry
|
|
return False
|
|
|
|
expires_at = token_info.get("expires_at")
|
|
if not expires_at:
|
|
return False # No expiration info, assume valid
|
|
|
|
return int(time.time()) >= expires_at
|
|
|
|
def get_profile_safe(self) -> dict:
|
|
r"""Get LinkedIn profile with error handling.
|
|
|
|
Returns:
|
|
Profile dictionary or error information
|
|
"""
|
|
try:
|
|
return self.get_profile(include_id=True)
|
|
except Exception as e:
|
|
logger.error(f"Failed to get LinkedIn profile: {e}")
|
|
return {"error": str(e)}
|