diff --git a/surfsense_backend/app/connectors/notion_history.py b/surfsense_backend/app/connectors/notion_history.py index 87948c6..1f29a59 100644 --- a/surfsense_backend/app/connectors/notion_history.py +++ b/surfsense_backend/app/connectors/notion_history.py @@ -1,4 +1,4 @@ -from notion_client import Client +from notion_client import AsyncClient class NotionHistoryConnector: @@ -9,9 +9,21 @@ class NotionHistoryConnector: Args: token (str): Notion integration token """ - self.notion = Client(auth=token) + self.notion = AsyncClient(auth=token) - def get_all_pages(self, start_date=None, end_date=None): + async def close(self): + """Close the async client connection.""" + await self.notion.aclose() + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + + async def get_all_pages(self, start_date=None, end_date=None): """ Fetches all pages shared with your integration and their content. @@ -47,7 +59,7 @@ class NotionHistoryConnector: } # First, get a list of all pages the integration has access to - search_results = self.notion.search(**search_params) + search_results = await self.notion.search(**search_params) pages = search_results["results"] all_page_data = [] @@ -56,7 +68,7 @@ class NotionHistoryConnector: page_id = page["id"] # Get detailed page information - page_content = self.get_page_content(page_id) + page_content = await self.get_page_content(page_id) all_page_data.append( { @@ -90,7 +102,7 @@ class NotionHistoryConnector: # If no title found, return the page ID as fallback return f"Untitled page ({page['id']})" - def get_page_content(self, page_id): + async def get_page_content(self, page_id): """ Fetches the content (blocks) of a specific page. @@ -107,11 +119,11 @@ class NotionHistoryConnector: # Paginate through all blocks while has_more: if cursor: - response = self.notion.blocks.children.list( + response = await self.notion.blocks.children.list( block_id=page_id, start_cursor=cursor ) else: - response = self.notion.blocks.children.list(block_id=page_id) + response = await self.notion.blocks.children.list(block_id=page_id) blocks.extend(response["results"]) has_more = response["has_more"] @@ -122,12 +134,12 @@ class NotionHistoryConnector: # Process nested blocks recursively processed_blocks = [] for block in blocks: - processed_block = self.process_block(block) + processed_block = await self.process_block(block) processed_blocks.append(processed_block) return processed_blocks - def process_block(self, block): + async def process_block(self, block): """ Processes a block and recursively fetches any child blocks. @@ -149,9 +161,11 @@ class NotionHistoryConnector: if has_children: # Fetch and process child blocks - children_response = self.notion.blocks.children.list(block_id=block_id) + children_response = await self.notion.blocks.children.list( + block_id=block_id + ) for child_block in children_response["results"]: - child_blocks.append(self.process_block(child_block)) + child_blocks.append(await self.process_block(child_block)) return { "id": block_id, @@ -206,29 +220,3 @@ class NotionHistoryConnector: # Return empty string for unsupported block types return "" - - -# Example usage -# if __name__ == "__main__": -# # Simple example of how to use this module -# import argparse - -# parser = argparse.ArgumentParser(description="Fetch Notion pages using an integration token") -# parser.add_argument("--token", help="Your Notion integration token") -# parser.add_argument("--start-date", help="Start date in ISO format (e.g., 2023-01-01T00:00:00Z)") -# parser.add_argument("--end-date", help="End date in ISO format (e.g., 2023-12-31T23:59:59Z)") -# args = parser.parse_args() - -# token = args.token -# if not token: -# token = input("Enter your Notion integration token: ") - -# fetcher = NotionPageFetcher(token) - -# try: -# pages = fetcher.get_all_pages(args.start_date, args.end_date) -# print(f"Fetched {len(pages)} pages from Notion") -# for page in pages: -# print(f"- {page['title']}") -# except Exception as e: -# print(f"Error: {str(e)}") diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 6267ad9..a6c8853 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -108,7 +108,6 @@ async def index_notion_pages( ) logger.info(f"Initializing Notion client for connector {connector_id}") - notion_client = NotionHistoryConnector(token=notion_token) # Calculate date range if start_date is None or end_date is None: @@ -143,6 +142,8 @@ async def index_notion_pages( "%Y-%m-%dT%H:%M:%SZ" ) + notion_client = NotionHistoryConnector(token=notion_token) + logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}") await task_logger.log_task_progress( @@ -157,7 +158,7 @@ async def index_notion_pages( # Get all pages try: - pages = notion_client.get_all_pages( + pages = await notion_client.get_all_pages( start_date=start_date_iso, end_date=end_date_iso ) logger.info(f"Found {len(pages)} Notion pages") @@ -169,6 +170,7 @@ async def index_notion_pages( {"error_type": "PageFetchError"}, ) logger.error(f"Error fetching Notion pages: {e!s}", exc_info=True) + await notion_client.close() return 0, f"Failed to get Notion pages: {e!s}" if not pages: @@ -178,6 +180,7 @@ async def index_notion_pages( {"pages_found": 0}, ) logger.info("No Notion pages found to index") + await notion_client.close() return 0, "No Notion pages found" # Track the number of documents indexed @@ -382,6 +385,10 @@ async def index_notion_pages( logger.info( f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped" ) + + # Clean up the async client + await notion_client.close() + return total_processed, result_message except SQLAlchemyError as db_error: @@ -395,6 +402,9 @@ async def index_notion_pages( logger.error( f"Database error during Notion indexing: {db_error!s}", exc_info=True ) + # Clean up the async client in case of error + if "notion_client" in locals(): + await notion_client.close() return 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() @@ -405,4 +415,7 @@ async def index_notion_pages( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Notion pages: {e!s}", exc_info=True) + # Clean up the async client in case of error + if "notion_client" in locals(): + await notion_client.close() return 0, f"Failed to index Notion pages: {e!s}"