""" Jira Connector Module A module for retrieving data from Jira. Allows fetching issue lists and their comments, projects and more. """ from typing import Any, Dict, List, Optional import requests class JiraConnector: """Class for retrieving data from Jira.""" def __init__( self, base_url: Optional[str] = None, personal_access_token: Optional[str] = None, ): """ Initialize the JiraConnector class. Args: base_url: Jira instance base URL (e.g., 'https://yourcompany.atlassian.net') (optional) personal_access_token: Jira personal access token (optional) """ self.base_url = base_url.rstrip("/") if base_url else None self.personal_access_token = personal_access_token self.api_version = "3" # Jira Cloud API version def set_credentials(self, base_url: str, personal_access_token: str) -> None: """ Set the Jira credentials. Args: base_url: Jira instance base URL personal_access_token: Jira personal access token """ self.base_url = base_url.rstrip("/") self.personal_access_token = personal_access_token def set_personal_access_token(self, personal_access_token: str) -> None: """ Set the Jira personal access token. Args: personal_access_token: Jira personal access token """ self.personal_access_token = personal_access_token def get_headers(self) -> Dict[str, str]: """ Get headers for Jira API requests. Returns: Dictionary of headers Raises: ValueError: If personal_access_token or base_url have not been set """ if not all([self.base_url, self.personal_access_token]): raise ValueError("Jira personal access token or base URL not initialized.") return { "Content-Type": "application/json", "Authorization": f"Bearer {self.personal_access_token}", "Accept": "application/json", } def make_api_request( self, endpoint: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Make a request to the Jira API. Args: endpoint: API endpoint (without base URL) params: Query parameters for the request (optional) Returns: Response data from the API Raises: ValueError: If personal_access_token or base_url have not been set Exception: If the API request fails """ if not all([self.base_url, self.personal_access_token]): raise ValueError("Jira personal access token or base URL not initialized.") url = f"{self.base_url}/rest/api/{self.api_version}/{endpoint}" headers = self.get_headers() response = requests.get(url, headers=headers, params=params, timeout=500) if response.status_code == 200: return response.json() else: raise Exception( f"API request failed with status code {response.status_code}: {response.text}" ) def get_all_projects(self) -> dict[str, Any]: """ Fetch all projects from Jira. Returns: List of project objects Raises: ValueError: If credentials have not been set Exception: If the API request fails """ return self.make_api_request("project/search") def get_all_issues(self, project_key: Optional[str] = None) -> List[Dict[str, Any]]: """ Fetch all issues from Jira. Args: project_key: Optional project key to filter issues (e.g., 'PROJ') Returns: List of issue objects Raises: ValueError: If credentials have not been set Exception: If the API request fails """ jql = "ORDER BY created DESC" if project_key: jql = f'project = "{project_key}" ' + jql fields = [ "summary", "description", "status", "assignee", "reporter", "created", "updated", "priority", "issuetype", "project", ] params = { "jql": jql, "fields": ",".join(fields), "maxResults": 100, "startAt": 0, } all_issues = [] start_at = 0 while True: params["startAt"] = start_at result = self.make_api_request("search", params) if not isinstance(result, dict) or "issues" not in result: raise Exception("Invalid response from Jira API") issues = result["issues"] all_issues.extend(issues) print(f"Fetched {len(issues)} issues (startAt={start_at})") total = result.get("total", 0) if start_at + len(issues) >= total: break start_at += len(issues) return all_issues def get_issues_by_date_range( self, start_date: str, end_date: str, include_comments: bool = True, project_key: Optional[str] = None, ) -> tuple[List[Dict[str, Any]], Optional[str]]: """ Fetch issues within a date range. Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format (inclusive) include_comments: Whether to include comments in the response project_key: Optional project key to filter issues Returns: Tuple containing (issues list, error message or None) """ try: # Build JQL query for date range # Query issues that were either created OR updated within the date range date_filter = f"(created >= '{start_date}' AND created <= '{end_date}') OR (updated >= '{start_date}' AND updated <= '{end_date}')" jql = f"{date_filter} ORDER BY created DESC" if project_key: jql = ( f'project = "{project_key}" AND {date_filter} ORDER BY created DESC' ) # Define fields to retrieve fields = [ "summary", "description", "status", "assignee", "reporter", "created", "updated", "priority", "issuetype", "project", ] if include_comments: fields.append("comment") params = { "jql": jql, "fields": ",".join(fields), "maxResults": 100, "startAt": 0, } all_issues = [] start_at = 0 while True: params["startAt"] = start_at result = self.make_api_request("search", params) if not isinstance(result, dict) or "issues" not in result: return [], "Invalid response from Jira API" issues = result["issues"] all_issues.extend(issues) # Check if there are more issues to fetch total = result.get("total", 0) if start_at + len(issues) >= total: break start_at += len(issues) if not all_issues: return [], "No issues found in the specified date range." return all_issues, None except Exception as e: return [], f"Error fetching issues: {str(e)}" def format_issue(self, issue: Dict[str, Any]) -> Dict[str, Any]: """ Format an issue for easier consumption. Args: issue: The issue object from Jira API Returns: Formatted issue dictionary """ fields = issue.get("fields", {}) # Extract basic issue details formatted = { "id": issue.get("id", ""), "key": issue.get("key", ""), "title": fields.get("summary", ""), "description": fields.get("description", ""), "status": ( fields.get("status", {}).get("name", "Unknown") if fields.get("status") else "Unknown" ), "status_category": ( fields.get("status", {}) .get("statusCategory", {}) .get("name", "Unknown") if fields.get("status") else "Unknown" ), "priority": ( fields.get("priority", {}).get("name", "Unknown") if fields.get("priority") else "Unknown" ), "issue_type": ( fields.get("issuetype", {}).get("name", "Unknown") if fields.get("issuetype") else "Unknown" ), "project": ( fields.get("project", {}).get("key", "Unknown") if fields.get("project") else "Unknown" ), "created_at": fields.get("created", ""), "updated_at": fields.get("updated", ""), "reporter": ( { "account_id": ( fields.get("reporter", {}).get("accountId", "") if fields.get("reporter") else "" ), "display_name": ( fields.get("reporter", {}).get("displayName", "Unknown") if fields.get("reporter") else "Unknown" ), "email": ( fields.get("reporter", {}).get("emailAddress", "") if fields.get("reporter") else "" ), } if fields.get("reporter") else {"account_id": "", "display_name": "Unknown", "email": ""} ), "assignee": ( { "account_id": fields.get("assignee", {}).get("accountId", ""), "display_name": fields.get("assignee", {}).get( "displayName", "Unknown" ), "email": fields.get("assignee", {}).get("emailAddress", ""), } if fields.get("assignee") else None ), "comments": [], } # Extract comments if available if "comment" in fields and "comments" in fields["comment"]: for comment in fields["comment"]["comments"]: formatted_comment = { "id": comment.get("id", ""), "body": comment.get("body", ""), "created_at": comment.get("created", ""), "updated_at": comment.get("updated", ""), "author": ( { "account_id": ( comment.get("author", {}).get("accountId", "") if comment.get("author") else "" ), "display_name": ( comment.get("author", {}).get("displayName", "Unknown") if comment.get("author") else "Unknown" ), "email": ( comment.get("author", {}).get("emailAddress", "") if comment.get("author") else "" ), } if comment.get("author") else {"account_id": "", "display_name": "Unknown", "email": ""} ), } formatted["comments"].append(formatted_comment) return formatted def format_issue_to_markdown(self, issue: Dict[str, Any]) -> str: """ Convert an issue to markdown format. Args: issue: The issue object (either raw or formatted) Returns: Markdown string representation of the issue """ # Format the issue if it's not already formatted if "key" not in issue: issue = self.format_issue(issue) # Build the markdown content markdown = ( f"# {issue.get('key', 'No Key')}: {issue.get('title', 'No Title')}\n\n" ) if issue.get("status"): markdown += f"**Status:** {issue['status']}\n" if issue.get("priority"): markdown += f"**Priority:** {issue['priority']}\n" if issue.get("issue_type"): markdown += f"**Type:** {issue['issue_type']}\n" if issue.get("project"): markdown += f"**Project:** {issue['project']}\n\n" if issue.get("assignee") and issue["assignee"].get("display_name"): markdown += f"**Assignee:** {issue['assignee']['display_name']}\n" if issue.get("reporter") and issue["reporter"].get("display_name"): markdown += f"**Reporter:** {issue['reporter']['display_name']}\n" if issue.get("created_at"): created_date = self.format_date(issue["created_at"]) markdown += f"**Created:** {created_date}\n" if issue.get("updated_at"): updated_date = self.format_date(issue["updated_at"]) markdown += f"**Updated:** {updated_date}\n\n" if issue.get("description"): markdown += f"## Description\n\n{issue['description']}\n\n" if issue.get("comments"): markdown += f"## Comments ({len(issue['comments'])})\n\n" for comment in issue["comments"]: author_name = "Unknown" if comment.get("author") and comment["author"].get("display_name"): author_name = comment["author"]["display_name"] comment_date = "Unknown date" if comment.get("created_at"): comment_date = self.format_date(comment["created_at"]) markdown += f"### {author_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n" return markdown @staticmethod def format_date(iso_date: str) -> str: """ Format an ISO date string to a more readable format. Args: iso_date: ISO format date string Returns: Formatted date string """ if not iso_date or not isinstance(iso_date, str): return "Unknown date" try: from datetime import datetime # Jira dates are typically in format: 2023-01-01T12:00:00.000+0000 dt = datetime.fromisoformat(iso_date.replace("Z", "+00:00")) return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: return iso_date