Merge pull request #281 from MODSetter/dev
Some checks are pending
pre-commit / pre-commit (push) Waiting to run

fix: made notion indexing async
This commit is contained in:
Rohan Verma 2025-08-21 14:45:28 -07:00 committed by GitHub
commit 4cfecc8543
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 41 additions and 40 deletions

View file

@ -1,4 +1,4 @@
from notion_client import Client
from notion_client import AsyncClient
class NotionHistoryConnector:
@ -9,9 +9,21 @@ class NotionHistoryConnector:
Args:
token (str): Notion integration token
"""
self.notion = Client(auth=token)
self.notion = AsyncClient(auth=token)
def get_all_pages(self, start_date=None, end_date=None):
async def close(self):
"""Close the async client connection."""
await self.notion.aclose()
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def get_all_pages(self, start_date=None, end_date=None):
"""
Fetches all pages shared with your integration and their content.
@ -47,7 +59,7 @@ class NotionHistoryConnector:
}
# First, get a list of all pages the integration has access to
search_results = self.notion.search(**search_params)
search_results = await self.notion.search(**search_params)
pages = search_results["results"]
all_page_data = []
@ -56,7 +68,7 @@ class NotionHistoryConnector:
page_id = page["id"]
# Get detailed page information
page_content = self.get_page_content(page_id)
page_content = await self.get_page_content(page_id)
all_page_data.append(
{
@ -90,7 +102,7 @@ class NotionHistoryConnector:
# If no title found, return the page ID as fallback
return f"Untitled page ({page['id']})"
def get_page_content(self, page_id):
async def get_page_content(self, page_id):
"""
Fetches the content (blocks) of a specific page.
@ -107,11 +119,11 @@ class NotionHistoryConnector:
# Paginate through all blocks
while has_more:
if cursor:
response = self.notion.blocks.children.list(
response = await self.notion.blocks.children.list(
block_id=page_id, start_cursor=cursor
)
else:
response = self.notion.blocks.children.list(block_id=page_id)
response = await self.notion.blocks.children.list(block_id=page_id)
blocks.extend(response["results"])
has_more = response["has_more"]
@ -122,12 +134,12 @@ class NotionHistoryConnector:
# Process nested blocks recursively
processed_blocks = []
for block in blocks:
processed_block = self.process_block(block)
processed_block = await self.process_block(block)
processed_blocks.append(processed_block)
return processed_blocks
def process_block(self, block):
async def process_block(self, block):
"""
Processes a block and recursively fetches any child blocks.
@ -149,9 +161,11 @@ class NotionHistoryConnector:
if has_children:
# Fetch and process child blocks
children_response = self.notion.blocks.children.list(block_id=block_id)
children_response = await self.notion.blocks.children.list(
block_id=block_id
)
for child_block in children_response["results"]:
child_blocks.append(self.process_block(child_block))
child_blocks.append(await self.process_block(child_block))
return {
"id": block_id,
@ -206,29 +220,3 @@ class NotionHistoryConnector:
# Return empty string for unsupported block types
return ""
# Example usage
# if __name__ == "__main__":
# # Simple example of how to use this module
# import argparse
# parser = argparse.ArgumentParser(description="Fetch Notion pages using an integration token")
# parser.add_argument("--token", help="Your Notion integration token")
# parser.add_argument("--start-date", help="Start date in ISO format (e.g., 2023-01-01T00:00:00Z)")
# parser.add_argument("--end-date", help="End date in ISO format (e.g., 2023-12-31T23:59:59Z)")
# args = parser.parse_args()
# token = args.token
# if not token:
# token = input("Enter your Notion integration token: ")
# fetcher = NotionPageFetcher(token)
# try:
# pages = fetcher.get_all_pages(args.start_date, args.end_date)
# print(f"Fetched {len(pages)} pages from Notion")
# for page in pages:
# print(f"- {page['title']}")
# except Exception as e:
# print(f"Error: {str(e)}")

View file

@ -108,7 +108,6 @@ async def index_notion_pages(
)
logger.info(f"Initializing Notion client for connector {connector_id}")
notion_client = NotionHistoryConnector(token=notion_token)
# Calculate date range
if start_date is None or end_date is None:
@ -143,6 +142,8 @@ async def index_notion_pages(
"%Y-%m-%dT%H:%M:%SZ"
)
notion_client = NotionHistoryConnector(token=notion_token)
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
await task_logger.log_task_progress(
@ -157,7 +158,7 @@ async def index_notion_pages(
# Get all pages
try:
pages = notion_client.get_all_pages(
pages = await notion_client.get_all_pages(
start_date=start_date_iso, end_date=end_date_iso
)
logger.info(f"Found {len(pages)} Notion pages")
@ -169,6 +170,7 @@ async def index_notion_pages(
{"error_type": "PageFetchError"},
)
logger.error(f"Error fetching Notion pages: {e!s}", exc_info=True)
await notion_client.close()
return 0, f"Failed to get Notion pages: {e!s}"
if not pages:
@ -178,6 +180,7 @@ async def index_notion_pages(
{"pages_found": 0},
)
logger.info("No Notion pages found to index")
await notion_client.close()
return 0, "No Notion pages found"
# Track the number of documents indexed
@ -382,6 +385,10 @@ async def index_notion_pages(
logger.info(
f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
)
# Clean up the async client
await notion_client.close()
return total_processed, result_message
except SQLAlchemyError as db_error:
@ -395,6 +402,9 @@ async def index_notion_pages(
logger.error(
f"Database error during Notion indexing: {db_error!s}", exc_info=True
)
# Clean up the async client in case of error
if "notion_client" in locals():
await notion_client.close()
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
@ -405,4 +415,7 @@ async def index_notion_pages(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Notion pages: {e!s}", exc_info=True)
# Clean up the async client in case of error
if "notion_client" in locals():
await notion_client.close()
return 0, f"Failed to index Notion pages: {e!s}"