Memory extensions, UI updates

Auto memory extensions for main and solutions areas
UI updates, error toasts, progress bar
This commit is contained in:
frdel 2024-09-26 17:11:48 +02:00
parent 0992d52768
commit d233e0c94f
41 changed files with 1132 additions and 933 deletions

View file

@ -0,0 +1,97 @@
from agent import Agent
from python.helpers.extension import Extension
from python.helpers.files import read_file
from python.helpers.vector_db import Area
import json
from python.helpers import errors, files
from python.tools.memory_tool import get_db
class RecallMemories(Extension):
INTERVAL = 3
HISTORY = 5
RESULTS = 3
THRESHOLD = 0.1
async def execute(self, loop_data={}, **kwargs):
iter = loop_data.get("iteration", 0)
if (
iter % RecallMemories.INTERVAL == 0
): # every 3 iterations (or the first one) recall memories
await self.search_memories(loop_data=loop_data, **kwargs)
async def search_memories(self, loop_data={}, **kwargs):
# try:
# show temp info message
self.agent.context.log.log(
type="info", content="Searching memories...", temp=True
)
# show full util message, this will hide temp message immediately if turned on
log_item = self.agent.context.log.log(
type="util",
heading="Searching memories...",
)
# get system message and chat history for util llm
msgs_text = self.agent.concat_messages(
self.agent.history[-RecallMemories.HISTORY :]
) # only last X messages
system = self.agent.read_prompt(
"memory.memories_query.sys.md", history=msgs_text
)
# log query streamed by LLM
def log_callback(content):
log_item.stream(query=content)
# call util llm to summarize conversation
query = await self.agent.call_utility_llm(
system=system, msg=loop_data["message"], callback=log_callback
)
# get solutions database
vdb = get_db(self.agent)
memories = vdb.search_similarity_threshold(
query=query,
results=RecallMemories.RESULTS,
threshold=RecallMemories.THRESHOLD,
filter=f"area != '{Area.SOLUTIONS.value}'" # exclude solutions
)
# log the short result
if not isinstance(memories, list) or len(memories) == 0:
log_item.update(
heading="No useful memories found.",
)
return
else:
log_item.update(
heading=f"\n\n{len(memories)} memories found.",
)
# concatenate memory.page_content in memories:
memories_text = ""
for memory in memories:
memories_text += memory.page_content + "\n\n"
memories_text = memories_text.strip()
# log the full results
log_item.update(memories=memories_text)
# place to prompt
memories_prompt = self.agent.read_prompt(
"agent.system.memories.md", memories=memories_text
)
# append to system message
loop_data["system"] += memories_prompt
# except Exception as e:
# err = errors.format_error(e)
# self.agent.context.log.log(
# type="error", heading="Recall memories extension error:", content=err
# )

View file

@ -1,8 +1,10 @@
from agent import Agent
from python.helpers.extension import Extension
from python.helpers.files import read_file
from python.helpers.vector_db import get_or_create
from python.helpers.vector_db import Area
import json
from python.helpers import errors, files
from python.tools.memory_tool import get_db
class RecallSolutions(Extension):
@ -15,59 +17,81 @@ class RecallSolutions(Extension):
async def execute(self, loop_data={}, **kwargs):
iter = loop_data.get("iteration", 0)
if iter % RecallSolutions.INTERVAL == 0: # every 3 iterations (or the first one) recall solution memories
if (
iter % RecallSolutions.INTERVAL == 0
): # every 3 iterations (or the first one) recall solution memories
await self.search_solutions(loop_data=loop_data, **kwargs)
async def search_solutions(self, loop_data={}, **kwargs):
self.agent.context.log.log(
type="info", content="Searching memory for solutions...", temp=False
)
# get system message and chat history for util llm
msgs_text = self.agent.concat_messages(
self.agent.history[-RecallSolutions.HISTORY:]
) # only last X messages
system = self.agent.read_prompt(
"memory.solutions_query.sys.md", history=msgs_text
)
# call util llm to summarize conversation
query = await self.agent.call_utility_llm(
system=system, msg=loop_data["message"]
)
# get solutions database
vdb = get_or_create(
logger=self.agent.context.log,
embeddings_model=self.agent.config.embeddings_model,
memory_dir="./memory/solutions",
knowledge_dir="",
)
solutions = vdb.search_similarity_threshold(
query=query, results=RecallSolutions.RESULTS, threshold=RecallSolutions.THRESHOLD
)
if not isinstance(solutions, list) or len(solutions) == 0:
# try:
# show temp info message
self.agent.context.log.log(
type="info", content="No successful solution memories found.", temp=False
)
return
else:
self.agent.context.log.log(
type="info",
content=f"{len(solutions)} successful solution memories found.",
temp=False,
type="info", content="Searching memory for solutions...", temp=True
)
# concatenate solution.page_content in solutions:
solutions_text = ""
for solution in solutions:
solutions_text += solution.page_content + "\n\n"
# show full util message, this will hide temp message immediately if turned on
log_item = self.agent.context.log.log(
type="util",
heading="Searching memory for solutions...",
)
# place to prompt
solutions_prompt = self.agent.read_prompt("agent.system.solutions.md", solutions=solutions_text)
# get system message and chat history for util llm
msgs_text = self.agent.concat_messages(
self.agent.history[-RecallSolutions.HISTORY :]
) # only last X messages
system = self.agent.read_prompt(
"memory.solutions_query.sys.md", history=msgs_text
)
# append to system message
loop_data["system"] += solutions_prompt
# log query streamed by LLM
def log_callback(content):
log_item.stream(query=content)
# call util llm to summarize conversation
query = await self.agent.call_utility_llm(
system=system, msg=loop_data["message"], callback=log_callback
)
# get solutions database
vdb = get_db(self.agent)
solutions = vdb.search_similarity_threshold(
query=query,
results=RecallSolutions.RESULTS,
threshold=RecallSolutions.THRESHOLD,
filter=f"area == '{Area.SOLUTIONS.value}'"
)
# log the short result
if not isinstance(solutions, list) or len(solutions) == 0:
log_item.update(
heading="No successful solution memories found.",
)
return
else:
log_item.update(
heading=f"\n\n{len(solutions)} successful solution memories found.",
)
# concatenate solution.page_content in solutions:
solutions_text = ""
for solution in solutions:
solutions_text += solution.page_content + "\n\n"
solutions_text = solutions_text.strip()
# log the full results
log_item.update(solutions=solutions_text)
# place to prompt
solutions_prompt = self.agent.read_prompt(
"agent.system.solutions.md", solutions=solutions_text
)
# append to system message
loop_data["system"] += solutions_prompt
# except Exception as e:
# err = errors.format_error(e)
# self.agent.context.log.log(
# type="error", heading="Recall solutions extension error:", content=err
# )

View file

@ -0,0 +1,74 @@
from agent import Agent
from python.helpers.extension import Extension
import python.helpers.files as files
from python.helpers.vector_db import Area
import json
from python.helpers.dirty_json import DirtyJson
from python.helpers import errors
from python.tools.memory_tool import get_db
class MemorizeMemories(Extension):
async def execute(self, loop_data={}, **kwargs):
# try:
# show temp info message
self.agent.context.log.log(
type="info", content="Memorizing new information...", temp=True
)
# show full util message, this will hide temp message immediately if turned on
log_item = self.agent.context.log.log(
type="util",
heading="Memorizing new information...",
)
# get system message and chat history for util llm
system = self.agent.read_prompt("memory.memories_sum.sys.md")
msgs_text = self.agent.concat_messages(self.agent.history)
# log query streamed by LLM
def log_callback(content):
log_item.stream(content=content)
# call util llm to find info in history
memories_json = await self.agent.call_utility_llm(
system=system,
msg=msgs_text,
callback=log_callback,
)
memories = DirtyJson.parse_string(memories_json)
if not isinstance(memories, list) or len(memories) == 0:
log_item.update(heading="No useful information to memorize.")
return
else:
log_item.update(
heading=f"{len(memories)} entries to memorize."
)
# save chat history
vdb = get_db(self.agent)
memories_txt = ""
for memory in memories:
# solution to plain text:
txt = f"{memory}"
memories_txt += txt + "\n\n"
vdb.insert_text(
text=txt, metadata={"area": Area.MAIN.value}
)
memories_txt = memories_txt.strip()
log_item.update(memories=memories_txt)
log_item.update(
result=f"{len(memories)} entries memorized.",
heading=f"{len(memories)} entries memorized.",
)
# except Exception as e:
# err = errors.format_error(e)
# self.agent.context.log.log(
# type="error", heading="Memorize memories extension error:", content=err
# )

View file

@ -0,0 +1,74 @@
from agent import Agent
from python.helpers.extension import Extension
import python.helpers.files as files
from python.helpers.vector_db import Area
import json
from python.helpers.dirty_json import DirtyJson
from python.helpers import errors
from python.tools.memory_tool import get_db
class MemorizeSolutions(Extension):
async def execute(self, loop_data={}, **kwargs):
# try:
# show temp info message
self.agent.context.log.log(
type="info", content="Memorizing succesful solutions...", temp=True
)
# show full util message, this will hide temp message immediately if turned on
log_item = self.agent.context.log.log(
type="util",
heading="Memorizing succesful solutions...",
)
# get system message and chat history for util llm
system = self.agent.read_prompt("memory.solutions_sum.sys.md")
msgs_text = self.agent.concat_messages(self.agent.history)
# log query streamed by LLM
def log_callback(content):
log_item.stream(content=content)
# call util llm to find solutions in history
solutions_json = await self.agent.call_utility_llm(
system=system,
msg=msgs_text,
callback=log_callback,
)
solutions = DirtyJson.parse_string(solutions_json)
if not isinstance(solutions, list) or len(solutions) == 0:
log_item.update(heading="No successful solutions to memorize.")
return
else:
log_item.update(
heading=f"{len(solutions)} successful solutions to memorize."
)
# save chat history
vdb = get_db(self.agent)
solutions_txt = ""
for solution in solutions:
# solution to plain text:
txt = f"# Problem\n {solution['problem']}\n# Solution\n {solution['solution']}"
solutions_txt += txt + "\n\n"
vdb.insert_text(
text=txt, metadata={"area": Area.SOLUTIONS.value}
)
solutions_txt = solutions_txt.strip()
log_item.update(solutions=solutions_txt)
log_item.update(
result=f"{len(solutions)} solutions memorized.",
heading=f"{len(solutions)} solutions memorized.",
)
# except Exception as e:
# err = errors.format_error(e)
# self.agent.context.log.log(
# type="error", heading="Memorize solutions extension error:", content=err
# )

View file

@ -0,0 +1,12 @@
from agent import Agent
from python.helpers.extension import Extension
class WaitingForInputMsg(Extension):
async def execute(self, loop_data={}, **kwargs):
# show temp info message
if self.agent.number == 0:
self.agent.context.log.log(
type="util", heading="Waiting for input", temp=True
)

View file

@ -1,70 +0,0 @@
from agent import Agent
from python.helpers.extension import Extension
from python.helpers.files import read_file
from python.helpers.vector_db import get_or_create
import json
from python.helpers.dirty_json import DirtyJson
from python.helpers import errors
class MemorizeSolutions(Extension):
async def execute(self, loop_data={}, **kwargs):
try:
self.agent.context.log.log(
type="info", content="Memorizing succesful solutions...", temp=True
)
# get system message and chat history for util llm
system = self.agent.read_prompt("memory.solutions_sum.sys.md")
msgs_text = self.agent.concat_messages(self.agent.history)
# call util llm to find solutions in history
solutions_json = await self.agent.call_utility_llm(
system=system,
msg=msgs_text,
log_type="util",
)
solutions = DirtyJson.parse_string(solutions_json)
if not isinstance(solutions, list) or len(solutions) == 0:
self.agent.context.log.log(
type="info", content="No succesful solutions found.", temp=False
)
return
else:
self.agent.context.log.log(
type="info",
content=f"{len(solutions)} succesful solutions found.",
temp=True,
)
# save chat history
vdb = get_or_create(
logger=self.agent.context.log,
embeddings_model=self.agent.config.embeddings_model,
memory_dir="./memory/solutions",
knowledge_dir="",
)
for solution in solutions:
# solution to plain text:
txt = (
f"Problem: {solution['problem']}\nSolution: {solution['solution']}"
)
vdb.insert_text(
text=txt,
) # metadata={"full": msgs_text})
self.agent.context.log.log(
type="info",
content=f"{len(solutions)} solutions memorized.",
temp=False,
)
except Exception as e:
err = errors.format_error(e)
self.agent.context.log.log(
type="error", heading="Memorize solutions extension error:", content=err
)

View file

@ -1,35 +0,0 @@
from agent import Agent
from python.helpers.extension import Extension
from python.helpers.files import read_file
from python.helpers.vector_db import VectorDB
import json
class MemorizeHistory(Extension):
async def execute(self, **kwargs):
if self.agent.number != 0: return #only agent 0 will memorize chat history with user
self.agent.context.log.log(type="info", content="Memorizing chat history...", temp=True)
#get system message and chat history for util llm
system = self.agent.read_prompt("fw.memory.hist_sum.sys")
msgs = []
for msg in self.agent.history:
content = msg.get("content", "")
if content:
msgs.append(content)
msgs_json = json.dumps(msgs)
#call util llm to summarize conversation
summary = await self.agent.call_utility_llm(system=system,msg=msgs_json,output_label="")
#save chat history
vdb = VectorDB(
logger=self.agent.context.log,
embeddings_model=self.agent.config.embeddings_model,
memory_dir="./memory/history",
knowledge_dir=""
)
self.agent.context.log.log(type="info", content="Chat history memorized.", temp=True)

View file

@ -4,13 +4,17 @@ import hashlib
import json
from typing import Any, Dict, Literal, TypedDict
from langchain_community.document_loaders import (
CSVLoader, JSONLoader, PyPDFLoader, TextLoader, UnstructuredHTMLLoader,
UnstructuredMarkdownLoader
CSVLoader,
JSONLoader,
PyPDFLoader,
TextLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
)
from python.helpers import files
from python.helpers.log import Log
from python.helpers.log import LogItem
text_loader_kwargs = {'autodetect_encoding': True}
text_loader_kwargs = {"autodetect_encoding": True}
class KnowledgeImport(TypedDict):
@ -23,65 +27,91 @@ class KnowledgeImport(TypedDict):
def calculate_checksum(file_path: str) -> str:
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
buf = f.read()
hasher.update(buf)
return hasher.hexdigest()
def load_knowledge(logger: Log, knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dict[str, KnowledgeImport]:
def load_knowledge(
log_item: LogItem | None, knowledge_dir: str, index: Dict[str, KnowledgeImport]
) -> Dict[str, KnowledgeImport]:
knowledge_dir = files.get_abs_path(knowledge_dir)
from python.helpers.vector_db import Area
# Mapping file extensions to corresponding loader classes
file_types_loaders = {
'txt': TextLoader,
'pdf': PyPDFLoader,
'csv': CSVLoader,
'html': UnstructuredHTMLLoader,
'json': JSONLoader,
'md': UnstructuredMarkdownLoader
"txt": TextLoader,
"pdf": PyPDFLoader,
"csv": CSVLoader,
"html": UnstructuredHTMLLoader,
"json": JSONLoader,
"md": UnstructuredMarkdownLoader,
}
cnt_files = 0
cnt_docs = 0
# Fetch all files in the directory with specified extensions
kn_files = glob.glob(knowledge_dir + '/**/*', recursive=True)
if kn_files:
print(f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...")
logger.log(type="info", content=f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...")
for area in Area:
subdir = files.get_abs_path(knowledge_dir, area.value)
for file_path in kn_files:
ext = file_path.split('.')[-1].lower()
if ext in file_types_loaders:
checksum = calculate_checksum(file_path)
file_key = os.path.relpath(file_path, knowledge_dir)
# Load existing data from the index or create a new entry
file_data = index.get(file_key, {})
if file_data.get('checksum') == checksum:
file_data['state'] = 'original'
else:
file_data['state'] = 'changed'
if file_data['state'] == 'changed':
file_data['checksum'] = checksum
loader_cls = file_types_loaders[ext]
loader = loader_cls(file_path, **(text_loader_kwargs if ext in ['txt', 'csv', 'html', 'md'] else {}))
file_data['documents'] = loader.load_and_split()
cnt_files += 1
cnt_docs += len(file_data['documents'])
# print(f"Imported {len(file_data['documents'])} documents from {file_path}")
# Update the index
index[file_key] = file_data # type: ignore
if not os.path.exists(subdir):
os.makedirs(subdir)
continue
# Fetch all files in the directory with specified extensions
kn_files = glob.glob(subdir + "/**/*", recursive=True)
if kn_files:
print(f"Found {len(kn_files)} knowledge files in {subdir}, processing...")
if log_item:
log_item.stream(
progress=f"\nFound {len(kn_files)} knowledge files in {subdir}, processing...",
)
for file_path in kn_files:
ext = file_path.split(".")[-1].lower()
if ext in file_types_loaders:
checksum = calculate_checksum(file_path)
file_key = file_path # os.path.relpath(file_path, knowledge_dir)
# Load existing data from the index or create a new entry
file_data = index.get(file_key, {})
if file_data.get("checksum") == checksum:
file_data["state"] = "original"
else:
file_data["state"] = "changed"
if file_data["state"] == "changed":
file_data["checksum"] = checksum
loader_cls = file_types_loaders[ext]
loader = loader_cls(
file_path,
**(
text_loader_kwargs
if ext in ["txt", "csv", "html", "md"]
else {}
),
)
file_data["documents"] = loader.load_and_split()
for doc in file_data["documents"]:
doc.metadata["area"] = area.value
cnt_files += 1
cnt_docs += len(file_data["documents"])
# print(f"Imported {len(file_data['documents'])} documents from {file_path}")
# Update the index
index[file_key] = file_data # type: ignore
# loop index where state is not set and mark it as removed
for file_key, file_data in index.items():
if not file_data.get('state', ''):
index[file_key]['state'] = 'removed'
if not file_data.get("state", ""):
index[file_key]["state"] = "removed"
print(f"Processed {cnt_docs} documents from {cnt_files} files.")
logger.log(type="info", content=f"Processed {cnt_docs} documents from {cnt_files} files.")
if log_item:
log_item.stream(
progress=f"\nProcessed {cnt_docs} documents from {cnt_files} files."
)
return index

View file

@ -5,20 +5,23 @@ import uuid
type Type = Literal[
'agent',
'code_exe',
'error',
'hint',
'info',
'tool',
'user',
'util',
'warning',
]
"agent",
"code_exe",
"error",
"hint",
"info",
"progress",
"response",
"tool",
"user",
"util",
"warning",
]
@dataclass
class LogItem:
log: 'Log'
log: "Log"
no: int
type: str
heading: str
@ -30,9 +33,35 @@ class LogItem:
def __post_init__(self):
self.guid = self.log.guid
def update(self, type: Type | None = None, heading: str | None = None, content: str | None = None, kvps: dict | None = None, temp: bool | None = None):
def update(
self,
type: Type | None = None,
heading: str | None = None,
content: str | None = None,
kvps: dict | None = None,
temp: bool | None = None,
**kwargs,
):
if self.guid == self.log.guid:
self.log.update_item(self.no, type=type, heading=heading, content=content, kvps=kvps, temp=temp)
self.log.update_item(
self.no,
type=type,
heading=heading,
content=content,
kvps=kvps,
temp=temp,
**kwargs,
)
def stream(self, heading: str | None = None, content: str | None = None, **kwargs):
if heading is not None:
self.update(heading=self.heading + heading)
if content is not None:
self.update(content=self.content + content)
for k, v in kwargs.items():
prev = self.kvps.get(k, "") if self.kvps else ""
self.update(**{k: prev + v})
def output(self):
return {
@ -41,34 +70,70 @@ class LogItem:
"heading": self.heading,
"content": self.content,
"temp": self.temp,
"kvps": self.kvps
"kvps": self.kvps,
}
class Log:
def __init__(self):
self.guid: str = str(uuid.uuid4())
self.updates: list[int] = []
self.logs: list[LogItem] = []
self.progress = ""
def log(self, type: Type, heading: str | None = None, content: str | None = None, kvps: dict | None = None, temp: bool | None = None) -> LogItem:
item = LogItem(log=self,no=len(self.logs), type=type, heading=heading or "", content=content or "", kvps=kvps, temp=temp or False)
def log(
self,
type: Type,
heading: str | None = None,
content: str | None = None,
kvps: dict | None = None,
temp: bool | None = None,
) -> LogItem:
item = LogItem(
log=self,
no=len(self.logs),
type=type,
heading=heading or "",
content=content or "",
kvps=kvps,
temp=temp or False,
)
self.logs.append(item)
self.updates += [item.no]
if heading:
self.progress = heading
return item
def update_item(self, no: int, type: str | None = None, heading: str | None = None, content: str | None = None, kvps: dict | None = None, temp: bool | None = None):
def update_item(
self,
no: int,
type: str | None = None,
heading: str | None = None,
content: str | None = None,
kvps: dict | None = None,
temp: bool | None = None,
**kwargs,
):
item = self.logs[no]
if type is not None:
item.type = type
if heading is not None:
item.heading = heading
self.progress = heading
if content is not None:
item.content = content
if kvps is not None:
item.kvps = kvps
if temp is not None:
item.temp = temp
if kwargs:
if item.kvps is None:
item.kvps = {}
for k, v in kwargs.items():
item.kvps[k] = v
self.updates += [item.no]
def output(self, start=None, end=None):
@ -76,19 +141,18 @@ class Log:
start = 0
if end is None:
end = len(self.updates)
out = []
seen = set()
for update in self.updates[start:end]:
if update not in seen:
out.append(self.logs[update].output())
seen.add(update)
return out
return out
def reset(self):
self.guid = str(uuid.uuid4())
self.updates = []
self.logs = []
self.progress = ""

View file

@ -22,8 +22,8 @@ class Tool:
pass
async def before_execution(self, **kwargs):
PrintStyle(font_color="#1B4F72", padding=True, background_color="white", bold=True).print(f"{self.agent.agent_name}: Using tool '{self.name}':")
self.log = self.agent.context.log.log(type="tool", heading=f"{self.agent.agent_name}: Using tool '{self.name}':", content="", kvps=self.args)
PrintStyle(font_color="#1B4F72", padding=True, background_color="white", bold=True).print(f"{self.agent.agent_name}: Using tool '{self.name}'")
self.log = self.agent.context.log.log(type="tool", heading=f"{self.agent.agent_name}: Using tool '{self.name}'", content="", kvps=self.args)
if self.args and isinstance(self.args, dict):
for key, value in self.args.items():
PrintStyle(font_color="#85C1E9", bold=True).stream(self.nice_key(key)+": ")
@ -34,7 +34,7 @@ class Tool:
text = messages.truncate_text(self.agent, response.message.strip(), self.agent.config.max_tool_response_length)
msg_response = self.agent.read_prompt("fw.tool_response.md", tool_name=self.name, tool_response=text)
await self.agent.append_message(msg_response, human=True)
PrintStyle(font_color="#1B4F72", background_color="white", padding=True, bold=True).print(f"{self.agent.agent_name}: Response from tool '{self.name}':")
PrintStyle(font_color="#1B4F72", background_color="white", padding=True, bold=True).print(f"{self.agent.agent_name}: Response from tool '{self.name}'")
PrintStyle(font_color="#85C1E9").print(response.message)
self.log.update(content=response.message)

View file

@ -1,3 +1,4 @@
from typing import Any
from langchain.storage import InMemoryByteStore, LocalFileStore
from langchain.embeddings import CacheBackedEmbeddings
@ -11,21 +12,31 @@ from . import files
from langchain_core.documents import Document
import uuid
from python.helpers import knowledge_import
from python.helpers.log import Log
from python.helpers.log import Log, LogItem
import pandas as pd
from enum import Enum
class Area(Enum):
MAIN = "main"
SOLUTIONS = "solutions"
index: dict[str, "VectorDB"] = {}
def get_or_create(
logger: Log,
def get_or_create_db(
logger: Log | None,
embeddings_model,
memory_dir: str,
in_memory=False,
memory_dir="./memory",
knowledge_dir=None,
knowledge_dirs: list[str] = [],
):
if index.get(memory_dir) is None:
log_item = None
if(logger): log_item = logger.log(type="util", heading=f"Initializing VectorDB in {memory_dir}")
index[memory_dir] = VectorDB(
logger, embeddings_model, in_memory, memory_dir, knowledge_dir
log_item, embeddings_model, memory_dir, knowledge_dirs, in_memory
)
return index[memory_dir]
@ -34,22 +45,23 @@ class VectorDB:
def __init__(
self,
logger: Log,
log_item: LogItem | None,
embeddings_model,
memory_dir: str,
knowledge_dirs: list[str] = [],
in_memory=False,
memory_dir="./memory/default",
knowledge_dir=None,
):
self.logger = logger
self.log_item = log_item
print("Initializing VectorDB...")
self.logger.log("info", content="Initializing VectorDB...", temp=True)
if(self.log_item): self.log_item.stream(progress="\nInitializing VectorDB")
self.embeddings_model = embeddings_model
self.em_dir = files.get_abs_path("./memory/embeddings") # just caching, no need to parameterize
self.em_dir = files.get_abs_path(
"./memory/embeddings"
) # just caching, no need to parameterize
self.db_dir = files.get_abs_path("./memory", memory_dir, "database")
self.kn_dir = files.get_abs_path(knowledge_dir) if knowledge_dir else ""
# make sure embeddings and database directories exist
os.makedirs(self.db_dir, exist_ok=True)
@ -93,10 +105,10 @@ class VectorDB:
)
# preload knowledge files
if self.kn_dir:
self.preload_knowledge(self.kn_dir, self.db_dir)
if knowledge_dirs:
self.preload_knowledge(knowledge_dirs, self.db_dir)
def preload_knowledge(self, kn_dir: str, db_dir: str):
def preload_knowledge(self, kn_dirs: list[str], db_dir: str):
# Load the index file if it exists
index_path = files.get_abs_path(db_dir, "knowledge_import.json")
@ -110,7 +122,8 @@ class VectorDB:
with open(index_path, "r") as f:
index = json.load(f)
index = knowledge_import.load_knowledge(self.logger, kn_dir, index)
for kn_dir in kn_dirs:
index = knowledge_import.load_knowledge(self.log_item, kn_dir, index)
for file in index:
if index[file]["state"] in ["changed", "removed"] and index[file].get(
@ -139,12 +152,16 @@ class VectorDB:
def search_similarity(self, query, results=3):
return self.db.similarity_search(query, results)
def search_similarity_threshold(self, query, results=3, threshold=0.5):
def search_similarity_threshold(
self, query: str, results=3, threshold=0.5, filter: str = ""
):
comparator = VectorDB.get_comparator(filter) if filter else None
return self.db.search(
query,
search_type="similarity_score_threshold",
k=results,
score_threshold=threshold,
filter=comparator,
)
def search_max_rel(self, query, results=3):
@ -198,8 +215,20 @@ class VectorDB:
def insert_documents(self, docs: list[Document]):
ids = [str(uuid.uuid4()) for _ in range(len(docs))]
for doc, id in zip(docs, ids):
doc.metadata["id"] = id # add ids to documents metadata
self.db.add_documents(documents=docs, ids=ids)
self.db.save_local(folder_path=self.db_dir) # persist
if ids:
for doc, id in zip(docs, ids):
doc.metadata["id"] = id # add ids to documents metadata
self.db.add_documents(documents=docs, ids=ids)
self.db.save_local(folder_path=self.db_dir) # persist
return ids
@staticmethod
def get_comparator(condition: str):
def comparator(data: dict[str, Any]):
try:
return eval(condition, {}, data)
except Exception as e:
print(f"Error evaluating condition: {e}")
return False
return comparator

View file

@ -53,10 +53,10 @@ class CodeExecution(Tool):
await self.agent.handle_intervention() # wait for intervention and handle it, if paused
PrintStyle(
font_color="#1B4F72", padding=True, background_color="white", bold=True
).print(f"{self.agent.agent_name}: Using tool '{self.name}':")
).print(f"{self.agent.agent_name}: Using tool '{self.name}'")
self.log = self.agent.context.log.log(
type="code_exe",
heading=f"{self.agent.agent_name}: Using tool '{self.name}':",
heading=f"{self.agent.agent_name}: Using tool '{self.name}'",
content="",
kvps=self.args,
)
@ -132,7 +132,7 @@ class CodeExecution(Tool):
self.state.shell.send_command(command)
PrintStyle(background_color="white", font_color="#1B4F72", bold=True).print(
f"{self.agent.agent_name} code execution output:"
f"{self.agent.agent_name} code execution output"
)
return await self.get_terminal_output()

View file

@ -25,9 +25,7 @@ class Knowledge(Tool):
duckduckgo = executor.submit(duckduckgo_search.search, question)
# manual memory search
future_memory_man = executor.submit(memory_tool.search, self.agent, "manual", question)
# history memory search
# future_memory_man = executor.submit(memory_tool.search, self.agent, "history", question)
future_memory_man = executor.submit(memory_tool.search, self.agent, question)
# Wait for both functions to complete
try:

View file

@ -1,14 +1,11 @@
import re
from typing import Literal
from agent import Agent
from python.helpers.vector_db import get_or_create
from python.helpers.vector_db import get_or_create_db
import os
from python.helpers.tool import Tool, Response
from python.helpers.print_style import PrintStyle
from python.helpers.errors import handle_error
type Area = Literal['manual', 'history']
from python.helpers import files
class Memory(Tool):
async def execute(self,**kwargs):
@ -20,13 +17,13 @@ class Memory(Tool):
if "query" in kwargs:
threshold = float(kwargs.get("threshold", 0.1))
count = int(kwargs.get("count", 5))
result = search(self.agent, area, kwargs["query"], count, threshold)
result = search(self.agent, kwargs["query"], count, threshold)
elif "memorize" in kwargs:
result = save(self.agent, area, kwargs["memorize"])
result = save(self.agent, kwargs["memorize"])
elif "forget" in kwargs:
result = forget(self.agent, area, kwargs["forget"])
result = forget(self.agent, kwargs["forget"])
# elif "delete" in kwargs
result = delete(self.agent, area, kwargs["delete"])
result = delete(self.agent, kwargs["delete"])
except Exception as e:
handle_error(e)
# hint about embedding change with existing database
@ -37,39 +34,39 @@ class Memory(Tool):
# result = process_query(self.agent, self.args["memory"],self.args["action"], result_count=self.agent.config.auto_memory_count)
return Response(message=result, break_loop=False)
def search(agent:Agent, area: Area, query:str, count:int=5, threshold:float=0.1):
db = get_db(agent, area)
def search(agent:Agent, query:str, count:int=5, threshold:float=0.1):
db = get_db(agent)
# docs = db.search_similarity(query,count) # type: ignore
docs = db.search_similarity_threshold(query,count,threshold) # type: ignore
if len(docs)==0: return agent.read_prompt("fw.memories_not_found.md", query=query)
else: return str(docs)
def save(agent:Agent, area: Area, text:str):
db = get_db(agent, area)
def save(agent:Agent, text:str):
db = get_db(agent)
id = db.insert_text(text) # type: ignore
return agent.read_prompt("fw.memory_saved.md", memory_id=id)
def delete(agent:Agent, area: Area, ids_str:str):
db = get_db(agent, area)
def delete(agent:Agent, ids_str:str):
db = get_db(agent)
ids = extract_guids(ids_str)
deleted = db.delete_documents_by_ids(ids) # type: ignore
return agent.read_prompt("fw.memories_deleted.md", memory_count=deleted)
def forget(agent:Agent, area: Area, query:str):
db = get_db(agent, area)
def forget(agent:Agent, query:str):
db = get_db(agent)
deleted = db.delete_documents_by_query(query) # type: ignore
return agent.read_prompt("fw.memories_deleted.md", memory_count=deleted)
def get_db(agent: Agent, area: Area):
mem_dir = os.path.join("memory", agent.config.memory_subdir or "default", area)
kn_dir = os.path.join("knowledge", agent.config.knowledge_subdir)
def get_db(agent: Agent):
mem_dir = files.get_abs_path("memory", agent.config.memory_subdir or "default")
kn_dirs = [files.get_abs_path("knowledge", d) for d in agent.config.knowledge_subdirs or []]
db = get_or_create(
db = get_or_create_db(
agent.context.log,
embeddings_model=agent.config.embeddings_model,
in_memory=False,
memory_dir=mem_dir,
knowledge_dir=kn_dir)
knowledge_dirs=kn_dirs)
return db

View file

@ -7,7 +7,7 @@ class ResponseTool(Tool):
return Response(message=self.args["text"], break_loop=True)
async def before_execution(self, **kwargs):
self.log = self.agent.context.log.log(type="response", heading=f"{self.agent.agent_name}: Responding:", content=self.args.get("text", ""))
self.log = self.agent.context.log.log(type="response", heading=f"{self.agent.agent_name}: Responding", content=self.args.get("text", ""))
async def after_execution(self, response, **kwargs):

View file

@ -7,7 +7,7 @@ class TaskDone(Tool):
return Response(message=self.args["text"], break_loop=True)
async def before_execution(self, **kwargs):
self.log = self.agent.context.log.log(type="response", heading=f"{self.agent.agent_name}: Task done:", content=self.args.get("text", ""))
self.log = self.agent.context.log.log(type="response", heading=f"{self.agent.agent_name}: Task done", content=self.args.get("text", ""))
async def after_execution(self, response, **kwargs):
pass # do add anything to the history or output