SurfSense/surfsense_backend/app/connectors/linear_connector.py
2025-04-15 23:10:35 -07:00

454 lines
16 KiB
Python

"""
Linear Connector Module
A module for retrieving issues and comments from Linear.
Allows fetching issue lists and their comments with date range filtering.
"""
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Union
class LinearConnector:
"""Class for retrieving issues and comments from Linear."""
def __init__(self, token: str = None):
"""
Initialize the LinearConnector class.
Args:
token: Linear API token (optional, can be set later with set_token)
"""
self.token = token
self.api_url = "https://api.linear.app/graphql"
def set_token(self, token: str) -> None:
"""
Set the Linear API token.
Args:
token: Linear API token
"""
self.token = token
def get_headers(self) -> Dict[str, str]:
"""
Get headers for Linear API requests.
Returns:
Dictionary of headers
Raises:
ValueError: If no Linear token has been set
"""
if not self.token:
raise ValueError("Linear token not initialized. Call set_token() first.")
return {
'Content-Type': 'application/json',
'Authorization': self.token
}
def execute_graphql_query(self, query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Execute a GraphQL query against the Linear API.
Args:
query: GraphQL query string
variables: Variables for the GraphQL query (optional)
Returns:
Response data from the API
Raises:
ValueError: If no Linear token has been set
Exception: If the API request fails
"""
if not self.token:
raise ValueError("Linear token not initialized. Call set_token() first.")
headers = self.get_headers()
payload = {'query': query}
if variables:
payload['variables'] = variables
response = requests.post(
self.api_url,
headers=headers,
json=payload
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
def get_all_issues(self, include_comments: bool = True) -> List[Dict[str, Any]]:
"""
Fetch all issues from Linear.
Args:
include_comments: Whether to include comments in the response
Returns:
List of issue objects
Raises:
ValueError: If no Linear token has been set
Exception: If the API request fails
"""
comments_query = ""
if include_comments:
comments_query = """
comments {
nodes {
id
body
user {
id
name
email
}
createdAt
updatedAt
}
}
"""
query = f"""
query {{
issues {{
nodes {{
id
identifier
title
description
state {{
id
name
type
}}
assignee {{
id
name
email
}}
creator {{
id
name
email
}}
createdAt
updatedAt
{comments_query}
}}
}}
}}
"""
result = self.execute_graphql_query(query)
# Extract issues from the response
if "data" in result and "issues" in result["data"] and "nodes" in result["data"]["issues"]:
return result["data"]["issues"]["nodes"]
return []
def get_issues_by_date_range(
self,
start_date: str,
end_date: str,
include_comments: bool = True
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""
Fetch issues within a date range.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (inclusive)
include_comments: Whether to include comments in the response
Returns:
Tuple containing (issues list, error message or None)
"""
# Convert date strings to ISO format
try:
# For Linear API: we need to use a more specific format for the filter
# Instead of DateTime, use a string in the filter for DateTimeOrDuration
comments_query = ""
if include_comments:
comments_query = """
comments {
nodes {
id
body
user {
id
name
email
}
createdAt
updatedAt
}
}
"""
# Query issues that were either created OR updated within the date range
# This ensures we catch both new issues and updated existing issues
query = f"""
query IssuesByDateRange($after: String) {{
issues(
first: 100,
after: $after,
filter: {{
or: [
{{
createdAt: {{
gte: "{start_date}T00:00:00Z"
lte: "{end_date}T23:59:59Z"
}}
}},
{{
updatedAt: {{
gte: "{start_date}T00:00:00Z"
lte: "{end_date}T23:59:59Z"
}}
}}
]
}}
) {{
nodes {{
id
identifier
title
description
state {{
id
name
type
}}
assignee {{
id
name
email
}}
creator {{
id
name
email
}}
createdAt
updatedAt
{comments_query}
}}
pageInfo {{
hasNextPage
endCursor
}}
}}
}}
"""
try:
all_issues = []
has_next_page = True
cursor = None
# Handle pagination to get all issues
while has_next_page:
variables = {"after": cursor} if cursor else {}
result = self.execute_graphql_query(query, variables)
# Check for errors
if "errors" in result:
error_message = "; ".join([error.get("message", "Unknown error") for error in result["errors"]])
return [], f"GraphQL errors: {error_message}"
# Extract issues from the response
if "data" in result and "issues" in result["data"]:
issues_page = result["data"]["issues"]
# Add issues from this page
if "nodes" in issues_page:
all_issues.extend(issues_page["nodes"])
# Check if there are more pages
if "pageInfo" in issues_page:
page_info = issues_page["pageInfo"]
has_next_page = page_info.get("hasNextPage", False)
cursor = page_info.get("endCursor") if has_next_page else None
else:
has_next_page = False
else:
has_next_page = False
if not all_issues:
return [], "No issues found in the specified date range."
return all_issues, None
except Exception as e:
return [], f"Error fetching issues: {str(e)}"
except ValueError as e:
return [], f"Invalid date format: {str(e)}. Please use YYYY-MM-DD."
def format_issue(self, issue: Dict[str, Any]) -> Dict[str, Any]:
"""
Format an issue for easier consumption.
Args:
issue: The issue object from Linear API
Returns:
Formatted issue dictionary
"""
# Extract basic issue details
formatted = {
"id": issue.get("id", ""),
"identifier": issue.get("identifier", ""),
"title": issue.get("title", ""),
"description": issue.get("description", ""),
"state": issue.get("state", {}).get("name", "Unknown") if issue.get("state") else "Unknown",
"state_type": issue.get("state", {}).get("type", "Unknown") if issue.get("state") else "Unknown",
"created_at": issue.get("createdAt", ""),
"updated_at": issue.get("updatedAt", ""),
"creator": {
"id": issue.get("creator", {}).get("id", "") if issue.get("creator") else "",
"name": issue.get("creator", {}).get("name", "Unknown") if issue.get("creator") else "Unknown",
"email": issue.get("creator", {}).get("email", "") if issue.get("creator") else ""
} if issue.get("creator") else {"id": "", "name": "Unknown", "email": ""},
"assignee": {
"id": issue.get("assignee", {}).get("id", ""),
"name": issue.get("assignee", {}).get("name", "Unknown"),
"email": issue.get("assignee", {}).get("email", "")
} if issue.get("assignee") else None,
"comments": []
}
# Extract comments if available
if "comments" in issue and "nodes" in issue["comments"]:
for comment in issue["comments"]["nodes"]:
formatted_comment = {
"id": comment.get("id", ""),
"body": comment.get("body", ""),
"created_at": comment.get("createdAt", ""),
"updated_at": comment.get("updatedAt", ""),
"user": {
"id": comment.get("user", {}).get("id", "") if comment.get("user") else "",
"name": comment.get("user", {}).get("name", "Unknown") if comment.get("user") else "Unknown",
"email": comment.get("user", {}).get("email", "") if comment.get("user") else ""
} if comment.get("user") else {"id": "", "name": "Unknown", "email": ""}
}
formatted["comments"].append(formatted_comment)
return formatted
def format_issue_to_markdown(self, issue: Dict[str, Any]) -> str:
"""
Convert an issue to markdown format.
Args:
issue: The issue object (either raw or formatted)
Returns:
Markdown string representation of the issue
"""
# Format the issue if it's not already formatted
if "identifier" not in issue:
issue = self.format_issue(issue)
# Build the markdown content
markdown = f"# {issue.get('identifier', 'No ID')}: {issue.get('title', 'No Title')}\n\n"
if issue.get('state'):
markdown += f"**Status:** {issue['state']}\n\n"
if issue.get('assignee') and issue['assignee'].get('name'):
markdown += f"**Assignee:** {issue['assignee']['name']}\n"
if issue.get('creator') and issue['creator'].get('name'):
markdown += f"**Created by:** {issue['creator']['name']}\n"
if issue.get('created_at'):
created_date = self.format_date(issue['created_at'])
markdown += f"**Created:** {created_date}\n"
if issue.get('updated_at'):
updated_date = self.format_date(issue['updated_at'])
markdown += f"**Updated:** {updated_date}\n\n"
if issue.get('description'):
markdown += f"## Description\n\n{issue['description']}\n\n"
if issue.get('comments'):
markdown += f"## Comments ({len(issue['comments'])})\n\n"
for comment in issue['comments']:
user_name = "Unknown"
if comment.get('user') and comment['user'].get('name'):
user_name = comment['user']['name']
comment_date = "Unknown date"
if comment.get('created_at'):
comment_date = self.format_date(comment['created_at'])
markdown += f"### {user_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n"
return markdown
@staticmethod
def format_date(iso_date: str) -> str:
"""
Format an ISO date string to a more readable format.
Args:
iso_date: ISO format date string
Returns:
Formatted date string
"""
if not iso_date or not isinstance(iso_date, str):
return "Unknown date"
try:
dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
return iso_date
# Example usage (uncomment to use):
"""
if __name__ == "__main__":
# Set your token here
token = "YOUR_LINEAR_API_KEY"
linear = LinearConnector(token)
try:
# Get all issues with comments
issues = linear.get_all_issues()
print(f"Retrieved {len(issues)} issues")
# Format and print the first issue as markdown
if issues:
issue_md = linear.format_issue_to_markdown(issues[0])
print("\nSample Issue in Markdown:\n")
print(issue_md)
# Get issues by date range
start_date = "2023-01-01"
end_date = "2023-01-31"
date_issues, error = linear.get_issues_by_date_range(start_date, end_date)
if error:
print(f"Error: {error}")
else:
print(f"\nRetrieved {len(date_issues)} issues from {start_date} to {end_date}")
except Exception as e:
print(f"Error: {e}")
"""