diff --git a/surfsense_backend/app/connectors/confluence_connector.py b/surfsense_backend/app/connectors/confluence_connector.py new file mode 100644 index 0000000..7816b9b --- /dev/null +++ b/surfsense_backend/app/connectors/confluence_connector.py @@ -0,0 +1,379 @@ +""" +Confluence Connector Module + +A module for retrieving data from Confluence. +Allows fetching pages and their comments from specified spaces. +""" + +import base64 +from datetime import datetime, timezone +from typing import Any + +import requests + + +class ConfluenceConnector: + """Class for retrieving data from Confluence.""" + + def __init__( + self, + base_url: str | None = None, + email: str | None = None, + api_token: str | None = None, + ): + """ + Initialize the ConfluenceConnector class. + + Args: + base_url: Confluence instance base URL (e.g., 'https://yourcompany.atlassian.net') (optional) + email: Confluence account email address (optional) + api_token: Confluence API token (optional) + """ + self.base_url = base_url.rstrip("/") if base_url else None + self.email = email + self.api_token = api_token + self.api_version = "v2" # Confluence Cloud API version + + def set_credentials(self, base_url: str, email: str, api_token: str) -> None: + """ + Set the Confluence credentials. + + Args: + base_url: Confluence instance base URL + email: Confluence account email address + api_token: Confluence API token + """ + self.base_url = base_url.rstrip("/") + self.email = email + self.api_token = api_token + + def set_email(self, email: str) -> None: + """ + Set the Confluence account email. + + Args: + email: Confluence account email address + """ + self.email = email + + def set_api_token(self, api_token: str) -> None: + """ + Set the Confluence API token. + + Args: + api_token: Confluence API token + """ + self.api_token = api_token + + def get_headers(self) -> dict[str, str]: + """ + Get headers for Confluence API requests using Basic Authentication. + + Returns: + Dictionary of headers + + Raises: + ValueError: If email, api_token, or base_url have not been set + """ + if not all([self.base_url, self.email, self.api_token]): + raise ValueError( + "Confluence credentials not initialized. Call set_credentials() first." + ) + + # Create Basic Auth header using email:api_token + auth_str = f"{self.email}:{self.api_token}" + auth_bytes = auth_str.encode("utf-8") + auth_header = "Basic " + base64.b64encode(auth_bytes).decode("ascii") + + return { + "Content-Type": "application/json", + "Authorization": auth_header, + "Accept": "application/json", + } + + def make_api_request( + self, endpoint: str, params: dict[str, Any] | None = None + ) -> dict[str, Any]: + """ + Make a request to the Confluence API. + + Args: + endpoint: API endpoint (without base URL) + params: Query parameters for the request (optional) + + Returns: + Response data from the API + + Raises: + ValueError: If email, api_token, or base_url have not been set + Exception: If the API request fails + """ + if not all([self.base_url, self.email, self.api_token]): + raise ValueError( + "Confluence credentials not initialized. Call set_credentials() first." + ) + + url = f"{self.base_url}/wiki/api/{self.api_version}/{endpoint}" + headers = self.get_headers() + + try: + response = requests.get(url, headers=headers, params=params, timeout=30) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + raise Exception(f"Confluence API request failed: {e!s}") from e + + def get_all_spaces(self) -> list[dict[str, Any]]: + """ + Fetch all spaces from Confluence. + + Returns: + List of space objects + + Raises: + ValueError: If credentials have not been set + Exception: If the API request fails + """ + params = { + "limit": 100, + } + + all_spaces = [] + cursor = None + + while True: + if cursor: + params["cursor"] = cursor + + result = self.make_api_request("spaces", params) + + if not isinstance(result, dict) or "results" not in result: + raise Exception("Invalid response from Confluence API") + + spaces = result["results"] + all_spaces.extend(spaces) + + # Check if there are more spaces to fetch + links = result.get("_links", {}) + if "next" not in links: + break + + # Extract cursor from next link if available + next_link = links["next"] + if "cursor=" in next_link: + cursor = next_link.split("cursor=")[1].split("&")[0] + else: + break + + return all_spaces + + def get_pages_in_space( + self, space_id: str, include_body: bool = True + ) -> list[dict[str, Any]]: + """ + Fetch all pages in a specific space. + + Args: + space_id: The ID of the space to fetch pages from + include_body: Whether to include page body content + + Returns: + List of page objects + + Raises: + ValueError: If credentials have not been set + Exception: If the API request fails + """ + params = { + "limit": 100, + } + + if include_body: + params["body-format"] = "storage" + + all_pages = [] + cursor = None + + while True: + if cursor: + params["cursor"] = cursor + + result = self.make_api_request(f"spaces/{space_id}/pages", params) + + if not isinstance(result, dict) or "results" not in result: + raise Exception("Invalid response from Confluence API") + + pages = result["results"] + all_pages.extend(pages) + + # Check if there are more pages to fetch + links = result.get("_links", {}) + if "next" not in links: + break + + # Extract cursor from next link if available + next_link = links["next"] + if "cursor=" in next_link: + cursor = next_link.split("cursor=")[1].split("&")[0] + else: + break + + return all_pages + + def get_page_comments(self, page_id: str) -> list[dict[str, Any]]: + """ + Fetch all comments for a specific page (both footer and inline comments). + + Args: + page_id: The ID of the page to fetch comments from + + Returns: + List of comment objects + + Raises: + ValueError: If credentials have not been set + Exception: If the API request fails + """ + all_comments = [] + + # Get footer comments + footer_comments = self._get_comments_for_page(page_id, "footer-comments") + all_comments.extend(footer_comments) + + # Get inline comments + inline_comments = self._get_comments_for_page(page_id, "inline-comments") + all_comments.extend(inline_comments) + + return all_comments + + def _get_comments_for_page( + self, page_id: str, comment_type: str + ) -> list[dict[str, Any]]: + """ + Helper method to fetch comments of a specific type for a page. + + Args: + page_id: The ID of the page + comment_type: Type of comments ('footer-comments' or 'inline-comments') + + Returns: + List of comment objects + """ + params = { + "limit": 100, + "body-format": "storage", + } + + all_comments = [] + cursor = None + + while True: + if cursor: + params["cursor"] = cursor + + result = self.make_api_request(f"pages/{page_id}/{comment_type}", params) + + if not isinstance(result, dict) or "results" not in result: + break # No comments or invalid response + + comments = result["results"] + all_comments.extend(comments) + + # Check if there are more comments to fetch + links = result.get("_links", {}) + if "next" not in links: + break + + # Extract cursor from next link if available + next_link = links["next"] + if "cursor=" in next_link: + cursor = next_link.split("cursor=")[1].split("&")[0] + else: + break + + return all_comments + + def get_pages_by_date_range( + self, + start_date: str, + end_date: str, + space_ids: list[str] | None = None, + include_comments: bool = True, + ) -> tuple[list[dict[str, Any]], str | None]: + """ + Fetch pages within a date range, optionally filtered by spaces. + + Args: + start_date: Start date in YYYY-MM-DD format + end_date: End date in YYYY-MM-DD format (inclusive) + space_ids: Optional list of space IDs to filter pages + include_comments: Whether to include comments for each page + + Returns: + Tuple containing (pages list with comments, error message or None) + """ + try: + all_pages = [] + + if space_ids: + # Fetch pages from specific spaces + for space_id in space_ids: + pages = self.get_pages_in_space(space_id, include_body=True) + all_pages.extend(pages) + else: + # Fetch all pages (this might be expensive for large instances) + params = { + "limit": 100, + "body-format": "storage", + } + + cursor = None + while True: + if cursor: + params["cursor"] = cursor + + result = self.make_api_request("pages", params) + if not isinstance(result, dict) or "results" not in result: + break + + pages = result["results"] + all_pages.extend(pages) + + links = result.get("_links", {}) + if "next" not in links: + break + + next_link = links["next"] + if "cursor=" in next_link: + cursor = next_link.split("cursor=")[1].split("&")[0] + else: + break + + # Filter pages by date range + filtered_pages = [] + start_datetime = datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc) + end_datetime = datetime.fromisoformat(end_date + "T23:59:59").replace(tzinfo=timezone.utc) + + for page in all_pages: + created_at = page.get("createdAt") + if created_at: + try: + page_date = datetime.fromisoformat( + created_at.replace("Z", "+00:00") + ) + if start_datetime <= page_date <= end_datetime: + # Add comments if requested + if include_comments: + page["comments"] = self.get_page_comments(page["id"]) + filtered_pages.append(page) + except ValueError: + # Skip pages with invalid date format + continue + + if not filtered_pages: + return [], "No pages found in the specified date range." + + return filtered_pages, None + + except Exception as e: + return [], f"Error fetching pages: {e!s}"