# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import hashlib import logging import os from pathlib import Path from camel.embeddings import BaseEmbedding, OpenAIEmbedding from camel.retrievers import AutoRetriever, VectorRetriever from camel.storages import BaseVectorStorage, QdrantStorage from camel.toolkits import RetrievalToolkit from camel.toolkits.function_tool import FunctionTool from camel.types import StorageType from app.agent.toolkit.abstract_toolkit import AbstractToolkit from app.component.environment import env from app.service.task import Agents logger = logging.getLogger("rag_toolkit") # Default paths and constants DEFAULT_RAG_STORAGE_PATH = "~/.eigent/rag_storage" DEFAULT_COLLECTION_NAME = "default" RAW_TEXT_SUBDIR = "raw_text" DEFAULT_STORAGE_TYPE = StorageType.QDRANT DEFAULT_EMBEDDING_DIM = 1536 # OpenAI text-embedding-ada-002 dimension class RAGToolkit(AbstractToolkit): """Generic RAG toolkit wrapping CAMEL's RetrievalToolkit. This toolkit provides RAG functionality with configurable storage: - Raw text document support via add_document + query_knowledge_base - File/URL retrieval via information_retrieval - Configurable collection_name and storage_path for flexibility Task isolation and other application-specific concerns should be handled at the orchestration layer by passing appropriate collection_name and storage_path values. """ agent_name: str = Agents.task_agent def __init__( self, api_task_id: str, agent_name: str | None = None, collection_name: str | None = None, storage_path: str | Path | None = None, storage_type: StorageType | None = None, embedding_model: BaseEmbedding | None = None, vector_dim: int | None = None, ): """Initialize RAGToolkit with configurable storage. Args: api_task_id (str): Task ID for eigent integration. agent_name (str | None): Optional agent name override. collection_name (str | None): Name for the vector collection. storage_path (str | Path | None): Path for vector storage. storage_type (StorageType | None): Vector storage type (default: QDRANT). embedding_model (BaseEmbedding | None): Custom embedding model. vector_dim (int | None): Embedding dimension (required if custom model). """ self.api_task_id = api_task_id if agent_name is not None: self.agent_name = agent_name # Use provided paths or defaults self._storage_path = ( Path(storage_path) if storage_path else Path(os.path.expanduser(DEFAULT_RAG_STORAGE_PATH)) ) self._storage_path.mkdir(parents=True, exist_ok=True) self._collection_name = collection_name or DEFAULT_COLLECTION_NAME self._storage_type = storage_type or DEFAULT_STORAGE_TYPE self._custom_embedding_model = embedding_model self._vector_dim = vector_dim or DEFAULT_EMBEDDING_DIM # Initialize CAMEL's AutoRetriever with configured storage auto_retriever = AutoRetriever( vector_storage_local_path=str(self._storage_path), storage_type=self._storage_type, ) # Wrap CAMEL's RetrievalToolkit using composition (for file/URL retrieval) self._retrieval_toolkit = RetrievalToolkit( auto_retriever=auto_retriever ) # Lazy-initialized components for raw text support self._embedding_model = None self._vector_retriever = None self._storage = None def _get_embedding_model(self): """Lazily initialize embedding model.""" if self._embedding_model is None: if self._custom_embedding_model is not None: self._embedding_model = self._custom_embedding_model else: api_key = env("OPENAI_API_KEY") if not api_key: raise ValueError( "OPENAI_API_KEY required (or provide embedding_model)" ) self._embedding_model = OpenAIEmbedding(api_key=api_key) return self._embedding_model def _get_storage(self): """Lazily initialize vector storage for raw text.""" if self._storage is None: self._storage = self._create_storage( vector_dim=self._vector_dim, path=str(self._storage_path / RAW_TEXT_SUBDIR), collection_name=self._collection_name, ) return self._storage def _create_storage( self, vector_dim: int, path: str, collection_name: str ) -> BaseVectorStorage: """Create vector storage based on configured storage type.""" if self._storage_type == StorageType.QDRANT: return QdrantStorage( vector_dim=vector_dim, path=path, collection_name=collection_name, ) raise ValueError(f"Unsupported storage type: {self._storage_type}") def _get_vector_retriever(self) -> VectorRetriever: """Lazily initialize vector retriever for raw text.""" if self._vector_retriever is None: self._vector_retriever = VectorRetriever( embedding_model=self._get_embedding_model(), storage=self._get_storage(), ) return self._vector_retriever def information_retrieval( self, query: str, contents: str | list[str], top_k: int = 5, similarity_threshold: float = 0.5, ) -> str: """Retrieves information from a local vector storage based on the query. This method connects to a task-isolated vector storage and retrieves relevant information. Content is automatically indexed on first use. Args: query (str): The question or query for which an answer is required. contents: Local file paths, remote URLs, or string contents to search. top_k: Number of top results to return (default: 5). similarity_threshold: Minimum similarity score for results (default: 0.5). Returns: The information retrieved in response to the query. Example: information_retrieval( query="What are the main features?", contents="/path/to/document.pdf" ) """ try: result = self._retrieval_toolkit.information_retrieval( query=query, contents=contents, top_k=top_k, similarity_threshold=similarity_threshold, ) logger.info( f"Retrieved information for query in collection {self._collection_name}" ) return result except Exception as e: logger.error(f"Failed to retrieve information: {e}", exc_info=True) return f"Error retrieving information: {str(e)}" def add_document( self, content: str, metadata: dict | None = None, doc_id: str | None = None, ) -> str: """Add a raw text document to the knowledge base. This method allows adding text content directly without requiring a file. Useful for adding API responses, conversation snippets, or any text data. Args: content: The text content to add to the knowledge base. metadata: Optional metadata to associate with the document (e.g., source, title, date). doc_id: Optional unique identifier for the document. If not provided, a hash of the content will be used. Returns: A confirmation message with the document ID. Example: add_document( content="Python is a programming language.", metadata={"source": "wiki"}, doc_id="doc-001" ) """ try: if not content or not content.strip(): return "Error: Cannot add empty document" # Generate document ID if not provided if doc_id is None: doc_id = hashlib.md5( # noqa: S324 content.encode(), usedforsecurity=False ).hexdigest()[:12] # Prepare metadata doc_metadata = metadata or {} doc_metadata["doc_id"] = doc_id doc_metadata["collection"] = self._collection_name # Get vector retriever and add content retriever = self._get_vector_retriever() retriever.process(content=content, extra_info=doc_metadata) logger.info( f"Added document {doc_id} to collection {self._collection_name}" ) return ( f"Successfully added document (ID: {doc_id}) to knowledge base" ) except Exception as e: logger.error(f"Failed to add document: {e}", exc_info=True) return f"Error adding document: {str(e)}" def query_knowledge_base( self, query: str, top_k: int = 5, similarity_threshold: float = 0.5, ) -> str: """Query the knowledge base for relevant information from added documents. This queries documents previously added via add_document(). For querying files/URLs, use information_retrieval() instead. Args: query (str): The question or search query to find relevant documents. top_k (int): Maximum number of relevant chunks to return (default: 5). similarity_threshold (float): Minimum similarity score (default: 0.5). Returns: Retrieved relevant text chunks from the knowledge base, or a message if no relevant information is found. Example: query_knowledge_base(query="What is Python?", top_k=3) """ try: if not query or not query.strip(): return "Error: Query cannot be empty" retriever = self._get_vector_retriever() results = retriever.query( query=query, top_k=top_k, similarity_threshold=similarity_threshold, ) # Format results as a simple numbered list formatted_results = [] for i, result in enumerate(results, 1): text = result.get("text", result.get("content", "")) metadata = result.get("metadata", {}) result_text = f"{i}. {text}" if metadata: source = metadata.get("source", metadata.get("doc_id", "")) if source: result_text += f" (Source: {source})" formatted_results.append(result_text) if not formatted_results: return f"No relevant information found for query: {query}" logger.info( f"Retrieved {len(results)} results for query in collection {self._collection_name}" ) return "\n\n".join(formatted_results) except Exception as e: logger.error(f"Failed to query knowledge base: {e}", exc_info=True) return f"Error querying knowledge base: {str(e)}" def list_knowledge_bases(self) -> str: """List all available knowledge bases. Returns: A list of available knowledge base collection names. """ try: collections = [] if self._storage_path.exists(): for item in self._storage_path.iterdir(): if item.is_dir(): collections.append(item.name) if not collections: return "No knowledge bases found. Use add_document or information_retrieval to create one." return "Available knowledge bases:\n" + "\n".join( f"- {c}" for c in sorted(collections) ) except Exception as e: logger.error(f"Failed to list knowledge bases: {e}", exc_info=True) return f"Error listing knowledge bases: {str(e)}" def get_tools(self) -> list[FunctionTool]: """Return the list of tools provided by this toolkit. Note: list_knowledge_bases is not exposed as a tool since with task isolation, each task has its own collection and listing all KBs is not useful for the agent. """ return [ FunctionTool(self.add_document), FunctionTool(self.query_knowledge_base), FunctionTool(self.information_retrieval), ] @classmethod def get_can_use_tools(cls, api_task_id: str) -> list[FunctionTool]: """Return tools that can be used based on available configuration. Args: api_task_id (str): Task ID for eigent integration. """ # Auto-derive collection name for task isolation collection_name = f"task_{api_task_id}" toolkit = RAGToolkit( api_task_id=api_task_id, collection_name=collection_name, ) return toolkit.get_tools()