From 6201fa7ca1f0659fe573f4a7343c3b4a1320af2e Mon Sep 17 00:00:00 2001 From: James Date: Tue, 26 Nov 2024 12:17:35 +1000 Subject: [PATCH] Delete strategic_analysis_parser.py --- strategic_analysis_parser.py | 219 ----------------------------------- 1 file changed, 219 deletions(-) delete mode 100644 strategic_analysis_parser.py diff --git a/strategic_analysis_parser.py b/strategic_analysis_parser.py deleted file mode 100644 index 58e57b2..0000000 --- a/strategic_analysis_parser.py +++ /dev/null @@ -1,219 +0,0 @@ -from typing import List, Dict, Optional, Union -import re -import logging -from dataclasses import dataclass -from datetime import datetime - -@dataclass -class ResearchFocus: - """Represents a specific area of research focus""" - area: str - priority: int - source_query: str = "" - timestamp: str = "" - search_queries: List[str] = None - - def __post_init__(self): - if not self.timestamp: - self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if self.search_queries is None: - self.search_queries = [] - -@dataclass -class AnalysisResult: - """Contains the complete analysis result""" - original_question: str - focus_areas: List[ResearchFocus] - raw_response: str - timestamp: str = "" - confidence_score: float = 0.0 - - def __post_init__(self): - if not self.timestamp: - self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - -# Set up logging -logger = logging.getLogger(__name__) - -class StrategicAnalysisParser: - """Enhanced parser with improved pattern matching and validation""" - def __init__(self): - self.patterns = { - 'original_question': [ - r"(?i)original question analysis:\s*(.*?)(?=research gap|$)", - r"(?i)original query:\s*(.*?)(?=research gap|$)", - r"(?i)research question:\s*(.*?)(?=research gap|$)", - r"(?i)topic analysis:\s*(.*?)(?=research gap|$)" - ], - 'research_gaps': [ - r"(?i)research gaps?:\s*", - r"(?i)gaps identified:\s*", - r"(?i)areas for research:\s*", - r"(?i)investigation areas:\s*" - ], - 'priority': [ - r"(?i)priority:\s*(\d+)", - r"(?i)priority level:\s*(\d+)", - r"(?i)\(priority:\s*(\d+)\)", - r"(?i)importance:\s*(\d+)" - ] - } - self.logger = logging.getLogger(__name__) - - def parse_analysis(self, llm_response: str) -> Optional[AnalysisResult]: - """Main parsing method with improved validation""" - try: - # Clean and normalize the response - cleaned_response = self._clean_text(llm_response) - - # Extract original question with validation - original_question = self._extract_original_question(cleaned_response) - if not original_question: - self.logger.warning("Failed to extract original question") - original_question = "Original question extraction failed" - - # Extract and validate research areas - focus_areas = self._extract_research_areas(cleaned_response) - focus_areas = self._normalize_focus_areas(focus_areas) - - # Calculate confidence score - confidence_score = self._calculate_confidence_score(original_question, focus_areas) - - return AnalysisResult( - original_question=original_question, - focus_areas=focus_areas, - raw_response=llm_response, - confidence_score=confidence_score - ) - - except Exception as e: - self.logger.error(f"Error in parse_analysis: {str(e)}") - return None - - def _clean_text(self, text: str) -> str: - """Clean and normalize text for parsing""" - text = re.sub(r'\n{3,}', '\n\n', text) - text = re.sub(r'\s{2,}', ' ', text) - text = re.sub(r'(\d+\))', r'\1.', text) - return text.strip() - - def _extract_original_question(self, text: str) -> str: - """Extract original question with improved matching""" - for pattern in self.patterns['original_question']: - match = re.search(pattern, text, re.DOTALL) - if match: - return self._clean_text(match.group(1)) - return "" - - def _extract_research_areas(self, text: str) -> List[ResearchFocus]: - """Extract research areas with enhanced validation""" - areas = [] - for pattern in self.patterns['research_gaps']: - gap_match = re.search(pattern, text) - if gap_match: - sections = re.split(r'\n\s*\d+[\.)]\s+', text[gap_match.end():]) - sections = [s for s in sections if s.strip()] - - for section in sections: - focus = self._parse_research_focus(section) - if focus and self._is_valid_focus(focus): - areas.append(focus) - break - return areas - - def _parse_research_focus(self, text: str) -> Optional[ResearchFocus]: - """Parse research focus with improved validation without reasoning.""" - try: - # Extract area - area = text.split('\n')[0].strip() - - # Extract and validate priority - priority = self._extract_priority(text) - - # Return ResearchFocus without reasoning - return ResearchFocus( - area=area, - priority=priority - ) - - except Exception as e: - self.logger.error(f"Error parsing research focus: {str(e)}") - return None - - def _extract_priority(self, text: str) -> int: - """Extract priority with validation""" - for pattern in self.patterns['priority']: - priority_match = re.search(pattern, text) - if priority_match: - try: - priority = int(priority_match.group(1)) - return max(1, min(5, priority)) - except ValueError: - continue - return 3 # Default priority - - def _is_valid_focus(self, focus: ResearchFocus) -> bool: - """Validate research focus completeness and quality""" - if not focus.area: # Only check if area exists and isn't empty - return False - if focus.priority < 1 or focus.priority > 5: - return False - return True - - def _normalize_focus_areas(self, areas: List[ResearchFocus]) -> List[ResearchFocus]: - """Normalize and validate focus areas""" - normalized = [] - for area in areas: - if not area.area.strip(): - continue - - area.priority = max(1, min(5, area.priority)) - - if self._is_valid_focus(area): - normalized.append(area) - - # Sort by priority (highest first) but don't add any filler areas - normalized.sort(key=lambda x: x.priority, reverse=True) - - return normalized - - def _calculate_confidence_score(self, question: str, areas: List[ResearchFocus]) -> float: - """Calculate confidence score for analysis quality""" - score = 0.0 - - # Question quality (0.3) - if question and len(question.split()) >= 3: - score += 0.3 - - # Areas quality (0.7) - if areas: - # Valid areas ratio (0.35) - now based on proportion that are valid vs total - num_areas = len(areas) - if num_areas > 0: # Avoid division by zero - valid_areas = sum(1 for a in areas if self._is_valid_focus(a)) - score += 0.35 * (valid_areas / num_areas) - - # Priority distribution (0.35) - now based on having different priorities - if num_areas > 0: # Avoid division by zero - unique_priorities = len(set(a.priority for a in areas)) - score += 0.35 * (unique_priorities / num_areas) - - return round(score, 2) - - def format_analysis_result(self, result: AnalysisResult) -> str: - """Format analysis result for display without reasoning.""" - formatted = [ - "Strategic Analysis Result", - "=" * 80, - f"\nOriginal Question Analysis:\n{result.original_question}\n", - f"Analysis Confidence Score: {result.confidence_score}", - "\nResearch Focus Areas:" - ] - - for i, focus in enumerate(result.focus_areas, 1): - formatted.extend([ - f"\n{i}. {focus.area}", - f" Priority: {focus.priority}" - ]) - - return "\n".join(formatted)