mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-01 10:09:08 +00:00
fix: made notion indexing async
This commit is contained in:
parent
95f70ad0fd
commit
3b87ecc3c5
2 changed files with 41 additions and 40 deletions
|
@ -1,4 +1,4 @@
|
|||
from notion_client import Client
|
||||
from notion_client import AsyncClient
|
||||
|
||||
|
||||
class NotionHistoryConnector:
|
||||
|
@ -9,9 +9,21 @@ class NotionHistoryConnector:
|
|||
Args:
|
||||
token (str): Notion integration token
|
||||
"""
|
||||
self.notion = Client(auth=token)
|
||||
self.notion = AsyncClient(auth=token)
|
||||
|
||||
def get_all_pages(self, start_date=None, end_date=None):
|
||||
async def close(self):
|
||||
"""Close the async client connection."""
|
||||
await self.notion.aclose()
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Async context manager entry."""
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Async context manager exit."""
|
||||
await self.close()
|
||||
|
||||
async def get_all_pages(self, start_date=None, end_date=None):
|
||||
"""
|
||||
Fetches all pages shared with your integration and their content.
|
||||
|
||||
|
@ -47,7 +59,7 @@ class NotionHistoryConnector:
|
|||
}
|
||||
|
||||
# First, get a list of all pages the integration has access to
|
||||
search_results = self.notion.search(**search_params)
|
||||
search_results = await self.notion.search(**search_params)
|
||||
|
||||
pages = search_results["results"]
|
||||
all_page_data = []
|
||||
|
@ -56,7 +68,7 @@ class NotionHistoryConnector:
|
|||
page_id = page["id"]
|
||||
|
||||
# Get detailed page information
|
||||
page_content = self.get_page_content(page_id)
|
||||
page_content = await self.get_page_content(page_id)
|
||||
|
||||
all_page_data.append(
|
||||
{
|
||||
|
@ -90,7 +102,7 @@ class NotionHistoryConnector:
|
|||
# If no title found, return the page ID as fallback
|
||||
return f"Untitled page ({page['id']})"
|
||||
|
||||
def get_page_content(self, page_id):
|
||||
async def get_page_content(self, page_id):
|
||||
"""
|
||||
Fetches the content (blocks) of a specific page.
|
||||
|
||||
|
@ -107,11 +119,11 @@ class NotionHistoryConnector:
|
|||
# Paginate through all blocks
|
||||
while has_more:
|
||||
if cursor:
|
||||
response = self.notion.blocks.children.list(
|
||||
response = await self.notion.blocks.children.list(
|
||||
block_id=page_id, start_cursor=cursor
|
||||
)
|
||||
else:
|
||||
response = self.notion.blocks.children.list(block_id=page_id)
|
||||
response = await self.notion.blocks.children.list(block_id=page_id)
|
||||
|
||||
blocks.extend(response["results"])
|
||||
has_more = response["has_more"]
|
||||
|
@ -122,12 +134,12 @@ class NotionHistoryConnector:
|
|||
# Process nested blocks recursively
|
||||
processed_blocks = []
|
||||
for block in blocks:
|
||||
processed_block = self.process_block(block)
|
||||
processed_block = await self.process_block(block)
|
||||
processed_blocks.append(processed_block)
|
||||
|
||||
return processed_blocks
|
||||
|
||||
def process_block(self, block):
|
||||
async def process_block(self, block):
|
||||
"""
|
||||
Processes a block and recursively fetches any child blocks.
|
||||
|
||||
|
@ -149,9 +161,11 @@ class NotionHistoryConnector:
|
|||
|
||||
if has_children:
|
||||
# Fetch and process child blocks
|
||||
children_response = self.notion.blocks.children.list(block_id=block_id)
|
||||
children_response = await self.notion.blocks.children.list(
|
||||
block_id=block_id
|
||||
)
|
||||
for child_block in children_response["results"]:
|
||||
child_blocks.append(self.process_block(child_block))
|
||||
child_blocks.append(await self.process_block(child_block))
|
||||
|
||||
return {
|
||||
"id": block_id,
|
||||
|
@ -206,29 +220,3 @@ class NotionHistoryConnector:
|
|||
|
||||
# Return empty string for unsupported block types
|
||||
return ""
|
||||
|
||||
|
||||
# Example usage
|
||||
# if __name__ == "__main__":
|
||||
# # Simple example of how to use this module
|
||||
# import argparse
|
||||
|
||||
# parser = argparse.ArgumentParser(description="Fetch Notion pages using an integration token")
|
||||
# parser.add_argument("--token", help="Your Notion integration token")
|
||||
# parser.add_argument("--start-date", help="Start date in ISO format (e.g., 2023-01-01T00:00:00Z)")
|
||||
# parser.add_argument("--end-date", help="End date in ISO format (e.g., 2023-12-31T23:59:59Z)")
|
||||
# args = parser.parse_args()
|
||||
|
||||
# token = args.token
|
||||
# if not token:
|
||||
# token = input("Enter your Notion integration token: ")
|
||||
|
||||
# fetcher = NotionPageFetcher(token)
|
||||
|
||||
# try:
|
||||
# pages = fetcher.get_all_pages(args.start_date, args.end_date)
|
||||
# print(f"Fetched {len(pages)} pages from Notion")
|
||||
# for page in pages:
|
||||
# print(f"- {page['title']}")
|
||||
# except Exception as e:
|
||||
# print(f"Error: {str(e)}")
|
||||
|
|
|
@ -108,7 +108,6 @@ async def index_notion_pages(
|
|||
)
|
||||
|
||||
logger.info(f"Initializing Notion client for connector {connector_id}")
|
||||
notion_client = NotionHistoryConnector(token=notion_token)
|
||||
|
||||
# Calculate date range
|
||||
if start_date is None or end_date is None:
|
||||
|
@ -143,6 +142,8 @@ async def index_notion_pages(
|
|||
"%Y-%m-%dT%H:%M:%SZ"
|
||||
)
|
||||
|
||||
notion_client = NotionHistoryConnector(token=notion_token)
|
||||
|
||||
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
@ -157,7 +158,7 @@ async def index_notion_pages(
|
|||
|
||||
# Get all pages
|
||||
try:
|
||||
pages = notion_client.get_all_pages(
|
||||
pages = await notion_client.get_all_pages(
|
||||
start_date=start_date_iso, end_date=end_date_iso
|
||||
)
|
||||
logger.info(f"Found {len(pages)} Notion pages")
|
||||
|
@ -169,6 +170,7 @@ async def index_notion_pages(
|
|||
{"error_type": "PageFetchError"},
|
||||
)
|
||||
logger.error(f"Error fetching Notion pages: {e!s}", exc_info=True)
|
||||
await notion_client.close()
|
||||
return 0, f"Failed to get Notion pages: {e!s}"
|
||||
|
||||
if not pages:
|
||||
|
@ -178,6 +180,7 @@ async def index_notion_pages(
|
|||
{"pages_found": 0},
|
||||
)
|
||||
logger.info("No Notion pages found to index")
|
||||
await notion_client.close()
|
||||
return 0, "No Notion pages found"
|
||||
|
||||
# Track the number of documents indexed
|
||||
|
@ -382,6 +385,10 @@ async def index_notion_pages(
|
|||
logger.info(
|
||||
f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
|
||||
)
|
||||
|
||||
# Clean up the async client
|
||||
await notion_client.close()
|
||||
|
||||
return total_processed, result_message
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
|
@ -395,6 +402,9 @@ async def index_notion_pages(
|
|||
logger.error(
|
||||
f"Database error during Notion indexing: {db_error!s}", exc_info=True
|
||||
)
|
||||
# Clean up the async client in case of error
|
||||
if "notion_client" in locals():
|
||||
await notion_client.close()
|
||||
return 0, f"Database error: {db_error!s}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
|
@ -405,4 +415,7 @@ async def index_notion_pages(
|
|||
{"error_type": type(e).__name__},
|
||||
)
|
||||
logger.error(f"Failed to index Notion pages: {e!s}", exc_info=True)
|
||||
# Clean up the async client in case of error
|
||||
if "notion_client" in locals():
|
||||
await notion_client.close()
|
||||
return 0, f"Failed to index Notion pages: {e!s}"
|
||||
|
|
Loading…
Add table
Reference in a new issue