eigent/backend/app/agent/toolkit/linkedin_toolkit.py
Dream ba47db8a84
refactor: move toolkit from utils to agent module (#1045) (#1171)
Co-authored-by: bytecii <994513625@qq.com>
2026-02-06 15:22:21 -08:00

309 lines
11 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import json
import logging
import os
import time
from camel.toolkits import LinkedInToolkit as BaseLinkedInToolkit
from camel.toolkits.function_tool import FunctionTool
from app.agent.toolkit.abstract_toolkit import AbstractToolkit
from app.component.environment import env
from app.service.task import Agents
from app.utils.listen.toolkit_listen import auto_listen_toolkit
logger = logging.getLogger("linkedin_toolkit")
# LinkedIn access tokens expire after 60 days (in seconds)
LINKEDIN_TOKEN_LIFETIME_SECONDS = 60 * 24 * 60 * 60
# Refresh token when less than 7 days remaining
LINKEDIN_TOKEN_REFRESH_THRESHOLD_SECONDS = 7 * 24 * 60 * 60
@auto_listen_toolkit(BaseLinkedInToolkit)
class LinkedInToolkit(BaseLinkedInToolkit, AbstractToolkit):
r"""LinkedIn toolkit for social media automation.
This toolkit wraps CAMEL-AI's LinkedInToolkit and provides:
- Post creation and deletion
- Profile information retrieval
- Token file persistence
- Environment variable fallback
"""
agent_name: str = Agents.social_media_agent
def __init__(self, api_task_id: str, timeout: float | None = None):
self.api_task_id = api_task_id
self._token_path = self._build_canonical_token_path()
self._load_credentials()
super().__init__(timeout)
@classmethod
def _build_canonical_token_path(cls) -> str:
r"""Build the canonical path for storing LinkedIn tokens."""
return env("LINKEDIN_TOKEN_PATH") or os.path.join(
os.path.expanduser("~"),
".eigent",
"tokens",
"linkedin",
"linkedin_token.json",
)
def _load_credentials(self):
r"""Load credentials from token file or environment variables."""
from dotenv import load_dotenv
# Force reload environment variables from default .env file
default_env_path = os.path.join(
os.path.expanduser("~"), ".eigent", ".env"
)
if os.path.exists(default_env_path):
load_dotenv(dotenv_path=default_env_path, override=True)
# Try to load from token file first
if os.path.exists(self._token_path):
try:
with open(self._token_path) as f:
token_data = json.load(f)
access_token = token_data.get("access_token")
if access_token:
# Check if token is expired before loading
expires_at = token_data.get("expires_at")
if expires_at and int(time.time()) >= expires_at:
logger.warning(
"LinkedIn token file contains "
"expired token, skipping load"
)
else:
os.environ["LINKEDIN_ACCESS_TOKEN"] = access_token
logger.info(
"Loaded LinkedIn credentials "
"from token file: "
f"{self._token_path}"
)
except Exception as e:
logger.warning(f"Could not load LinkedIn token file: {e}")
@classmethod
def get_can_use_tools(cls, api_task_id: str) -> list[FunctionTool]:
r"""Return tools only if LinkedIn credentials are configured."""
from dotenv import load_dotenv
# Force reload environment variables
default_env_path = os.path.join(
os.path.expanduser("~"), ".eigent", ".env"
)
if os.path.exists(default_env_path):
load_dotenv(dotenv_path=default_env_path, override=True)
# Check for token file
token_path = cls._build_canonical_token_path()
if os.path.exists(token_path):
try:
with open(token_path) as f:
token_data = json.load(f)
access_token = token_data.get("access_token")
if access_token:
# Check if token is expired before loading
expires_at = token_data.get("expires_at")
if expires_at and int(time.time()) >= expires_at:
logger.warning(
"LinkedIn token file contains "
"expired token, skipping load"
)
else:
os.environ["LINKEDIN_ACCESS_TOKEN"] = access_token
except Exception:
pass
if env("LINKEDIN_ACCESS_TOKEN"):
return LinkedInToolkit(api_task_id).get_tools()
else:
return []
@classmethod
def save_token(cls, token_data: dict) -> bool:
r"""Save OAuth token to file.
Args:
token_data: Dictionary containing access_token
and optionally refresh_token
Returns:
True if saved successfully, False otherwise
"""
token_path = cls._build_canonical_token_path()
try:
# Add timestamp for expiration tracking if not present
if "saved_at" not in token_data:
token_data["saved_at"] = int(time.time())
# Calculate expiration time if expires_in is provided
if "expires_in" in token_data and "expires_at" not in token_data:
token_data["expires_at"] = (
token_data["saved_at"] + token_data["expires_in"]
)
elif "expires_at" not in token_data:
# Default to 60 days if no expiration info provided
token_data["expires_at"] = (
token_data["saved_at"] + LINKEDIN_TOKEN_LIFETIME_SECONDS
)
os.makedirs(os.path.dirname(token_path), exist_ok=True)
with open(token_path, "w") as f:
json.dump(token_data, f, indent=2)
logger.info(f"Saved LinkedIn token to {token_path}")
# Also update environment variable
if token_data.get("access_token"):
os.environ["LINKEDIN_ACCESS_TOKEN"] = token_data[
"access_token"
]
return True
except Exception as e:
logger.error(f"Failed to save LinkedIn token: {e}")
return False
@classmethod
def clear_token(cls) -> bool:
r"""Remove stored token file and clear environment variable.
Returns:
True if cleared successfully, False otherwise
"""
token_path = cls._build_canonical_token_path()
try:
if os.path.exists(token_path):
os.remove(token_path)
logger.info(f"Removed LinkedIn token file: {token_path}")
# Also try to remove the parent directory if empty
token_dir = os.path.dirname(token_path)
if os.path.exists(token_dir) and not os.listdir(token_dir):
os.rmdir(token_dir)
logger.info(
f"Removed empty LinkedIn token directory: {token_dir}"
)
# Clear environment variable
if "LINKEDIN_ACCESS_TOKEN" in os.environ:
del os.environ["LINKEDIN_ACCESS_TOKEN"]
return True
except Exception as e:
logger.error(f"Failed to clear LinkedIn token: {e}")
return False
@classmethod
def is_authenticated(cls) -> bool:
r"""Check if user has valid LinkedIn credentials.
Returns:
True if credentials are available, False otherwise
"""
from dotenv import load_dotenv
# Force reload environment variables
default_env_path = os.path.join(
os.path.expanduser("~"), ".eigent", ".env"
)
if os.path.exists(default_env_path):
load_dotenv(dotenv_path=default_env_path, override=True)
# Check token file
token_path = cls._build_canonical_token_path()
if os.path.exists(token_path):
try:
with open(token_path) as f:
token_data = json.load(f)
if token_data.get("access_token"):
return True
except Exception:
pass
# Check environment variable
return bool(env("LINKEDIN_ACCESS_TOKEN"))
@classmethod
def get_token_info(cls) -> dict | None:
r"""Get stored token information including expiration.
Returns:
Token data dictionary or None if not available
"""
token_path = cls._build_canonical_token_path()
if os.path.exists(token_path):
try:
with open(token_path) as f:
return json.load(f)
except Exception:
pass
return None
@classmethod
def is_token_expiring_soon(cls) -> bool:
r"""Check if token is expiring within the refresh threshold.
Returns:
True if token is expiring soon or already expired, False otherwise.
Returns False if no token file exists (e.g. env var auth only).
"""
token_info = cls.get_token_info()
if not token_info:
# No token file; cannot determine expiry
return False
expires_at = token_info.get("expires_at")
if not expires_at:
return False # No expiration info, assume valid
current_time = int(time.time())
time_remaining = expires_at - current_time
return time_remaining < LINKEDIN_TOKEN_REFRESH_THRESHOLD_SECONDS
@classmethod
def is_token_expired(cls) -> bool:
r"""Check if token has expired.
Returns:
True if token is expired, False otherwise.
Returns False if no token file exists (e.g. env var auth only).
"""
token_info = cls.get_token_info()
if not token_info:
# No token file; cannot determine expiry
return False
expires_at = token_info.get("expires_at")
if not expires_at:
return False # No expiration info, assume valid
return int(time.time()) >= expires_at
def get_profile_safe(self) -> dict:
r"""Get LinkedIn profile with error handling.
Returns:
Profile dictionary or error information
"""
try:
return self.get_profile(include_id=True)
except Exception as e:
logger.error(f"Failed to get LinkedIn profile: {e}")
return {"error": str(e)}