Fixed all ruff lint and formatting errors

This commit is contained in:
Utkarsh-Patel-13 2025-07-24 14:43:48 -07:00
parent 0a03c42cc5
commit d359a59f6d
85 changed files with 5520 additions and 3870 deletions

View file

@ -5,96 +5,94 @@ A module for retrieving issues and comments from Linear.
Allows fetching issue lists and their comments with date range filtering.
"""
import requests
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any, Union
from typing import Any
import requests
class LinearConnector:
"""Class for retrieving issues and comments from Linear."""
def __init__(self, token: str = None):
def __init__(self, token: str | None = None):
"""
Initialize the LinearConnector class.
Args:
token: Linear API token (optional, can be set later with set_token)
"""
self.token = token
self.api_url = "https://api.linear.app/graphql"
def set_token(self, token: str) -> None:
"""
Set the Linear API token.
Args:
token: Linear API token
"""
self.token = token
def get_headers(self) -> Dict[str, str]:
def get_headers(self) -> dict[str, str]:
"""
Get headers for Linear API requests.
Returns:
Dictionary of headers
Raises:
ValueError: If no Linear token has been set
"""
if not self.token:
raise ValueError("Linear token not initialized. Call set_token() first.")
return {
'Content-Type': 'application/json',
'Authorization': self.token
}
def execute_graphql_query(self, query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]:
return {"Content-Type": "application/json", "Authorization": self.token}
def execute_graphql_query(
self, query: str, variables: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Execute a GraphQL query against the Linear API.
Args:
query: GraphQL query string
variables: Variables for the GraphQL query (optional)
Returns:
Response data from the API
Raises:
ValueError: If no Linear token has been set
Exception: If the API request fails
"""
if not self.token:
raise ValueError("Linear token not initialized. Call set_token() first.")
headers = self.get_headers()
payload = {'query': query}
payload = {"query": query}
if variables:
payload['variables'] = variables
response = requests.post(
self.api_url,
headers=headers,
json=payload
)
payload["variables"] = variables
response = requests.post(self.api_url, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
def get_all_issues(self, include_comments: bool = True) -> List[Dict[str, Any]]:
raise Exception(
f"Query failed with status code {response.status_code}: {response.text}"
)
def get_all_issues(self, include_comments: bool = True) -> list[dict[str, Any]]:
"""
Fetch all issues from Linear.
Args:
include_comments: Whether to include comments in the response
Returns:
List of issue objects
Raises:
ValueError: If no Linear token has been set
Exception: If the API request fails
@ -116,7 +114,7 @@ class LinearConnector:
}
}
"""
query = f"""
query {{
issues {{
@ -147,29 +145,30 @@ class LinearConnector:
}}
}}
"""
result = self.execute_graphql_query(query)
# Extract issues from the response
if "data" in result and "issues" in result["data"] and "nodes" in result["data"]["issues"]:
if (
"data" in result
and "issues" in result["data"]
and "nodes" in result["data"]["issues"]
):
return result["data"]["issues"]["nodes"]
return []
def get_issues_by_date_range(
self,
start_date: str,
end_date: str,
include_comments: bool = True
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
self, start_date: str, end_date: str, include_comments: bool = True
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch issues within a date range.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (inclusive)
include_comments: Whether to include comments in the response
Returns:
Tuple containing (issues list, error message or None)
"""
@ -194,7 +193,7 @@ class LinearConnector:
}
}
"""
# Query issues that were either created OR updated within the date range
# This ensures we catch both new issues and updated existing issues
query = f"""
@ -250,58 +249,65 @@ class LinearConnector:
}}
}}
"""
try:
all_issues = []
has_next_page = True
cursor = None
# Handle pagination to get all issues
while has_next_page:
variables = {"after": cursor} if cursor else {}
result = self.execute_graphql_query(query, variables)
# Check for errors
if "errors" in result:
error_message = "; ".join([error.get("message", "Unknown error") for error in result["errors"]])
error_message = "; ".join(
[
error.get("message", "Unknown error")
for error in result["errors"]
]
)
return [], f"GraphQL errors: {error_message}"
# Extract issues from the response
if "data" in result and "issues" in result["data"]:
issues_page = result["data"]["issues"]
# Add issues from this page
if "nodes" in issues_page:
all_issues.extend(issues_page["nodes"])
# Check if there are more pages
if "pageInfo" in issues_page:
page_info = issues_page["pageInfo"]
has_next_page = page_info.get("hasNextPage", False)
cursor = page_info.get("endCursor") if has_next_page else None
cursor = (
page_info.get("endCursor") if has_next_page else None
)
else:
has_next_page = False
else:
has_next_page = False
if not all_issues:
return [], "No issues found in the specified date range."
return all_issues, None
except Exception as e:
return [], f"Error fetching issues: {str(e)}"
return [], f"Error fetching issues: {e!s}"
except ValueError as e:
return [], f"Invalid date format: {str(e)}. Please use YYYY-MM-DD."
def format_issue(self, issue: Dict[str, Any]) -> Dict[str, Any]:
return [], f"Invalid date format: {e!s}. Please use YYYY-MM-DD."
def format_issue(self, issue: dict[str, Any]) -> dict[str, Any]:
"""
Format an issue for easier consumption.
Args:
issue: The issue object from Linear API
Returns:
Formatted issue dictionary
"""
@ -311,23 +317,37 @@ class LinearConnector:
"identifier": issue.get("identifier", ""),
"title": issue.get("title", ""),
"description": issue.get("description", ""),
"state": issue.get("state", {}).get("name", "Unknown") if issue.get("state") else "Unknown",
"state_type": issue.get("state", {}).get("type", "Unknown") if issue.get("state") else "Unknown",
"state": issue.get("state", {}).get("name", "Unknown")
if issue.get("state")
else "Unknown",
"state_type": issue.get("state", {}).get("type", "Unknown")
if issue.get("state")
else "Unknown",
"created_at": issue.get("createdAt", ""),
"updated_at": issue.get("updatedAt", ""),
"creator": {
"id": issue.get("creator", {}).get("id", "") if issue.get("creator") else "",
"name": issue.get("creator", {}).get("name", "Unknown") if issue.get("creator") else "Unknown",
"email": issue.get("creator", {}).get("email", "") if issue.get("creator") else ""
} if issue.get("creator") else {"id": "", "name": "Unknown", "email": ""},
"id": issue.get("creator", {}).get("id", "")
if issue.get("creator")
else "",
"name": issue.get("creator", {}).get("name", "Unknown")
if issue.get("creator")
else "Unknown",
"email": issue.get("creator", {}).get("email", "")
if issue.get("creator")
else "",
}
if issue.get("creator")
else {"id": "", "name": "Unknown", "email": ""},
"assignee": {
"id": issue.get("assignee", {}).get("id", ""),
"name": issue.get("assignee", {}).get("name", "Unknown"),
"email": issue.get("assignee", {}).get("email", "")
} if issue.get("assignee") else None,
"comments": []
"email": issue.get("assignee", {}).get("email", ""),
}
if issue.get("assignee")
else None,
"comments": [],
}
# Extract comments if available
if "comments" in issue and "nodes" in issue["comments"]:
for comment in issue["comments"]["nodes"]:
@ -337,85 +357,93 @@ class LinearConnector:
"created_at": comment.get("createdAt", ""),
"updated_at": comment.get("updatedAt", ""),
"user": {
"id": comment.get("user", {}).get("id", "") if comment.get("user") else "",
"name": comment.get("user", {}).get("name", "Unknown") if comment.get("user") else "Unknown",
"email": comment.get("user", {}).get("email", "") if comment.get("user") else ""
} if comment.get("user") else {"id": "", "name": "Unknown", "email": ""}
"id": comment.get("user", {}).get("id", "")
if comment.get("user")
else "",
"name": comment.get("user", {}).get("name", "Unknown")
if comment.get("user")
else "Unknown",
"email": comment.get("user", {}).get("email", "")
if comment.get("user")
else "",
}
if comment.get("user")
else {"id": "", "name": "Unknown", "email": ""},
}
formatted["comments"].append(formatted_comment)
return formatted
def format_issue_to_markdown(self, issue: Dict[str, Any]) -> str:
def format_issue_to_markdown(self, issue: dict[str, Any]) -> str:
"""
Convert an issue to markdown format.
Args:
issue: The issue object (either raw or formatted)
Returns:
Markdown string representation of the issue
"""
# Format the issue if it's not already formatted
if "identifier" not in issue:
issue = self.format_issue(issue)
# Build the markdown content
markdown = f"# {issue.get('identifier', 'No ID')}: {issue.get('title', 'No Title')}\n\n"
if issue.get('state'):
if issue.get("state"):
markdown += f"**Status:** {issue['state']}\n\n"
if issue.get('assignee') and issue['assignee'].get('name'):
if issue.get("assignee") and issue["assignee"].get("name"):
markdown += f"**Assignee:** {issue['assignee']['name']}\n"
if issue.get('creator') and issue['creator'].get('name'):
if issue.get("creator") and issue["creator"].get("name"):
markdown += f"**Created by:** {issue['creator']['name']}\n"
if issue.get('created_at'):
created_date = self.format_date(issue['created_at'])
if issue.get("created_at"):
created_date = self.format_date(issue["created_at"])
markdown += f"**Created:** {created_date}\n"
if issue.get('updated_at'):
updated_date = self.format_date(issue['updated_at'])
if issue.get("updated_at"):
updated_date = self.format_date(issue["updated_at"])
markdown += f"**Updated:** {updated_date}\n\n"
if issue.get('description'):
if issue.get("description"):
markdown += f"## Description\n\n{issue['description']}\n\n"
if issue.get('comments'):
if issue.get("comments"):
markdown += f"## Comments ({len(issue['comments'])})\n\n"
for comment in issue['comments']:
for comment in issue["comments"]:
user_name = "Unknown"
if comment.get('user') and comment['user'].get('name'):
user_name = comment['user']['name']
if comment.get("user") and comment["user"].get("name"):
user_name = comment["user"]["name"]
comment_date = "Unknown date"
if comment.get('created_at'):
comment_date = self.format_date(comment['created_at'])
if comment.get("created_at"):
comment_date = self.format_date(comment["created_at"])
markdown += f"### {user_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n"
return markdown
@staticmethod
def format_date(iso_date: str) -> str:
"""
Format an ISO date string to a more readable format.
Args:
iso_date: ISO format date string
Returns:
Formatted date string
"""
if not iso_date or not isinstance(iso_date, str):
return "Unknown date"
try:
dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%d %H:%M:%S')
dt = datetime.fromisoformat(iso_date.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return iso_date