mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2025-09-10 17:14:36 +00:00
improve websearch api
This commit is contained in:
parent
baaecd1c65
commit
2de1975ca2
1 changed files with 74 additions and 17 deletions
91
koboldcpp.py
91
koboldcpp.py
|
@ -1276,16 +1276,47 @@ def websearch(query):
|
|||
return []
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import difflib
|
||||
from html.parser import HTMLParser
|
||||
num_results = 3
|
||||
searchresults = []
|
||||
|
||||
def fetch_searched_webpage(url):
|
||||
try:
|
||||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
with urllib.request.urlopen(req) as response:
|
||||
html_content = response.read().decode('utf-8', errors='ignore')
|
||||
return html_content
|
||||
except Exception as e:
|
||||
print(f"Error fetching text from URL {url}: {e}")
|
||||
return ""
|
||||
|
||||
class VisibleTextParser(HTMLParser):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.texts = []
|
||||
self.is_script_or_style = False
|
||||
def handle_starttag(self, tag, attrs):
|
||||
if tag in {'script', 'style'}:
|
||||
self.is_script_or_style = True
|
||||
def handle_endtag(self, tag):
|
||||
if tag in {'script', 'style'}:
|
||||
self.is_script_or_style = False
|
||||
def handle_data(self, data):
|
||||
if not self.is_script_or_style and data.strip():
|
||||
self.texts.append(data.strip())
|
||||
def get_text(self):
|
||||
return ' '.join(self.texts)
|
||||
|
||||
class ExtractResultsParser(HTMLParser):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.results = []
|
||||
self.titles = []
|
||||
self.urls = []
|
||||
self.descs = []
|
||||
self.recordingTitle = False
|
||||
self.recordingUrl = False
|
||||
self.recordingDesc = False
|
||||
self.currentrytxt = ""
|
||||
self.currsegmenttxt = ""
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
|
@ -1294,10 +1325,9 @@ def websearch(query):
|
|||
for attr_name, attr_value in attrs:
|
||||
if not self.recordingTitle and attr_name == "class" and "result__a" in attr_value.split():
|
||||
self.recordingTitle = True
|
||||
self.currentrytxt = ""
|
||||
self.currsegmenttxt = ""
|
||||
if not self.recordingTitle and attr_name == "class" and "result__url" in attr_value.split():
|
||||
self.recordingTitle = True
|
||||
if not self.recordingUrl and attr_name == "class" and "result__url" in attr_value.split():
|
||||
self.recordingUrl = True
|
||||
self.currsegmenttxt = ""
|
||||
if not self.recordingDesc and attr_name == "class" and "result__snippet" in attr_value.split():
|
||||
self.recordingDesc = True
|
||||
|
@ -1306,30 +1336,57 @@ def websearch(query):
|
|||
def handle_endtag(self, tag):
|
||||
if tag == "a" and self.recordingTitle:
|
||||
self.recordingTitle = False
|
||||
self.currentrytxt += self.currsegmenttxt.strip() + "\n"
|
||||
self.titles.append(self.currsegmenttxt.strip())
|
||||
self.currsegmenttxt = ""
|
||||
if tag == "a" and self.recordingUrl:
|
||||
self.recordingUrl = False
|
||||
self.urls.append(f"https://{self.currsegmenttxt.strip()}")
|
||||
self.currsegmenttxt = ""
|
||||
if tag == "a" and self.recordingDesc:
|
||||
self.recordingDesc = False
|
||||
self.currentrytxt += self.currsegmenttxt.strip()
|
||||
self.descs.append(self.currsegmenttxt.strip())
|
||||
self.currsegmenttxt = ""
|
||||
if self.currentrytxt != "":
|
||||
self.results.append(self.currentrytxt.strip())
|
||||
self.currentrytxt = ""
|
||||
|
||||
def handle_data(self, data):
|
||||
if self.recordingTitle or self.recordingDesc:
|
||||
if self.recordingTitle or self.recordingDesc or self.recordingUrl:
|
||||
self.currsegmenttxt += data
|
||||
|
||||
encoded_query = urllib.parse.quote(query)
|
||||
search_url = f"https://html.duckduckgo.com/html/?q={encoded_query}"
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(search_url, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
with urllib.request.urlopen(req) as response:
|
||||
search_html = response.read().decode('utf-8', errors='ignore')
|
||||
parser = ExtractResultsParser()
|
||||
parser.feed(search_html)
|
||||
searchresults = parser.results[:num_results]
|
||||
search_html = fetch_searched_webpage(search_url)
|
||||
parser = ExtractResultsParser()
|
||||
parser.feed(search_html)
|
||||
titles = parser.titles[:num_results]
|
||||
searchurls = parser.urls[:num_results]
|
||||
descs = parser.descs[:num_results]
|
||||
for i in range(len(descs)):
|
||||
# dive into the results to try and get even more details
|
||||
title = titles[i]
|
||||
url = searchurls[i]
|
||||
desc = descs[i]
|
||||
pagedesc = ""
|
||||
try:
|
||||
desclen = len(desc)
|
||||
html_content = fetch_searched_webpage(url)
|
||||
parser2 = VisibleTextParser()
|
||||
parser2.feed(html_content)
|
||||
scraped = parser2.get_text().strip()
|
||||
s = difflib.SequenceMatcher(None, scraped.lower(), desc.lower())
|
||||
matches = s.find_longest_match(0, len(scraped), 0, desclen)
|
||||
if matches.size > 100 and desclen-matches.size < 50: #good enough match
|
||||
# expand description by some chars both sides
|
||||
expandamtbefore = 250
|
||||
expandamtafter = 600
|
||||
startpt = matches.a - expandamtbefore
|
||||
startpt = 0 if startpt < 0 else startpt
|
||||
endpt = matches.a + expandamtafter + desclen
|
||||
pagedesc = scraped[startpt:endpt]
|
||||
except Exception:
|
||||
pass
|
||||
searchresults.append({"title":title,"url":url,"desc":desc,"content":pagedesc})
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fetching URL {search_url}: {e}")
|
||||
return ""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue