| | |
| | |
| |
|
| | """ |
| | Code Analyzer Service |
| | |
| | This module provides functionality for analyzing code quality across different languages. |
| | """ |
| |
|
| | import os |
| | import subprocess |
| | import logging |
| | import json |
| | import tempfile |
| | import concurrent.futures |
| | from collections import defaultdict |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class CodeAnalyzer: |
| | """ |
| | Service for analyzing code quality across different languages. |
| | """ |
| | |
| | def __init__(self): |
| | """ |
| | Initialize the CodeAnalyzer. |
| | """ |
| | logger.info("Initialized CodeAnalyzer") |
| | self.analyzers = { |
| | 'Python': self._analyze_python, |
| | 'JavaScript': self._analyze_javascript, |
| | 'TypeScript': self._analyze_typescript, |
| | 'Java': self._analyze_java, |
| | 'Go': self._analyze_go, |
| | 'Rust': self._analyze_rust, |
| | } |
| | |
| | def analyze_repository(self, repo_path, languages): |
| | """ |
| | Analyze code quality in a repository for the specified languages using parallel processing. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | languages (list): A list of programming languages to analyze. |
| | |
| | Returns: |
| | dict: A dictionary containing analysis results for each language. |
| | """ |
| | logger.info(f"Analyzing repository at {repo_path} for languages: {languages}") |
| | |
| | results = {} |
| | |
| | |
| | def analyze_language(language): |
| | if language in self.analyzers: |
| | try: |
| | logger.info(f"Analyzing {language} code in {repo_path}") |
| | return language, self.analyzers[language](repo_path) |
| | except Exception as e: |
| | logger.error(f"Error analyzing {language} code: {e}") |
| | return language, { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |
| | else: |
| | logger.warning(f"No analyzer available for {language}") |
| | return language, { |
| | 'status': 'not_supported', |
| | 'message': f"Analysis for {language} is not supported yet.", |
| | 'issues': [], |
| | } |
| | |
| | |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(languages), 5)) as executor: |
| | |
| | future_to_language = {executor.submit(analyze_language, language): language for language in languages} |
| | |
| | |
| | for future in concurrent.futures.as_completed(future_to_language): |
| | language = future_to_language[future] |
| | try: |
| | lang, result = future.result() |
| | results[lang] = result |
| | logger.info(f"Completed analysis for {lang}") |
| | except Exception as e: |
| | logger.error(f"Exception occurred during analysis of {language}: {e}") |
| | results[language] = { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |
| | |
| | return results |
| | |
| | def _analyze_python(self, repo_path): |
| | """ |
| | Analyze Python code using pylint. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Analysis results. |
| | """ |
| | logger.info(f"Analyzing Python code in {repo_path}") |
| | |
| | |
| | python_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.py'): |
| | python_files.append(os.path.join(root, file)) |
| | |
| | if not python_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Python files found in the repository.', |
| | 'issues': [], |
| | } |
| | |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: |
| | temp_path = temp_file.name |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'python', |
| | '-m', |
| | 'pylint', |
| | '--output-format=json', |
| | '--reports=n', |
| | ] + python_files |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | if process.stdout.strip(): |
| | try: |
| | issues = json.loads(process.stdout) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing pylint output: {process.stdout}") |
| | issues = [] |
| | else: |
| | issues = [] |
| | |
| | |
| | issues_by_type = defaultdict(list) |
| | for issue in issues: |
| | issue_type = issue.get('type', 'unknown') |
| | issues_by_type[issue_type].append(issue) |
| | |
| | return { |
| | 'status': 'success', |
| | 'issues': issues, |
| | 'issues_by_type': dict(issues_by_type), |
| | 'issue_count': len(issues), |
| | 'files_analyzed': len(python_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running pylint: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |
| | |
| | finally: |
| | |
| | if os.path.exists(temp_path): |
| | os.unlink(temp_path) |
| | |
| | def _analyze_javascript(self, repo_path): |
| | """ |
| | Analyze JavaScript code using ESLint. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Analysis results. |
| | """ |
| | logger.info(f"Analyzing JavaScript code in {repo_path}") |
| | |
| | |
| | js_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith(('.js', '.jsx')) and not 'node_modules' in root: |
| | js_files.append(os.path.join(root, file)) |
| | |
| | if not js_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No JavaScript files found in the repository.', |
| | 'issues': [], |
| | } |
| | |
| | |
| | eslint_config = { |
| | "env": { |
| | "browser": True, |
| | "es2021": True, |
| | "node": True |
| | }, |
| | "extends": "eslint:recommended", |
| | "parserOptions": { |
| | "ecmaVersion": 12, |
| | "sourceType": "module", |
| | "ecmaFeatures": { |
| | "jsx": True |
| | } |
| | }, |
| | "rules": {} |
| | } |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: |
| | json.dump(eslint_config, temp_config) |
| | temp_config_path = temp_config.name |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'npx', |
| | 'eslint', |
| | '--config', temp_config_path, |
| | '--format', 'json', |
| | ] + js_files |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | if process.stdout.strip(): |
| | try: |
| | eslint_results = json.loads(process.stdout) |
| | |
| | |
| | issues = [] |
| | for result in eslint_results: |
| | file_path = result.get('filePath', '') |
| | for message in result.get('messages', []): |
| | issues.append({ |
| | 'path': file_path, |
| | 'line': message.get('line', 0), |
| | 'column': message.get('column', 0), |
| | 'message': message.get('message', ''), |
| | 'severity': message.get('severity', 0), |
| | 'ruleId': message.get('ruleId', ''), |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing ESLint output: {process.stdout}") |
| | issues = [] |
| | else: |
| | issues = [] |
| | |
| | |
| | issues_by_severity = defaultdict(list) |
| | for issue in issues: |
| | severity = issue.get('severity', 0) |
| | severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') |
| | issues_by_severity[severity_name].append(issue) |
| | |
| | return { |
| | 'status': 'success', |
| | 'issues': issues, |
| | 'issues_by_severity': dict(issues_by_severity), |
| | 'issue_count': len(issues), |
| | 'files_analyzed': len(js_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running ESLint: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |
| | |
| | finally: |
| | |
| | if os.path.exists(temp_config_path): |
| | os.unlink(temp_config_path) |
| | |
| | def _analyze_typescript(self, repo_path): |
| | """ |
| | Analyze TypeScript code using ESLint and TSC. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Analysis results. |
| | """ |
| | logger.info(f"Analyzing TypeScript code in {repo_path}") |
| | |
| | |
| | ts_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith(('.ts', '.tsx')) and not 'node_modules' in root: |
| | ts_files.append(os.path.join(root, file)) |
| | |
| | if not ts_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No TypeScript files found in the repository.', |
| | 'issues': [], |
| | } |
| | |
| | |
| | eslint_config = { |
| | "env": { |
| | "browser": True, |
| | "es2021": True, |
| | "node": True |
| | }, |
| | "extends": [ |
| | "eslint:recommended", |
| | "plugin:@typescript-eslint/recommended" |
| | ], |
| | "parser": "@typescript-eslint/parser", |
| | "parserOptions": { |
| | "ecmaVersion": 12, |
| | "sourceType": "module", |
| | "ecmaFeatures": { |
| | "jsx": True |
| | } |
| | }, |
| | "plugins": [ |
| | "@typescript-eslint" |
| | ], |
| | "rules": {} |
| | } |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: |
| | json.dump(eslint_config, temp_config) |
| | temp_config_path = temp_config.name |
| | |
| | |
| | tsconfig = { |
| | "compilerOptions": { |
| | "target": "es2020", |
| | "module": "commonjs", |
| | "strict": True, |
| | "esModuleInterop": True, |
| | "skipLibCheck": True, |
| | "forceConsistentCasingInFileNames": True, |
| | "noEmit": True |
| | }, |
| | "include": ts_files |
| | } |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_tsconfig: |
| | json.dump(tsconfig, temp_tsconfig) |
| | temp_tsconfig_path = temp_tsconfig.name |
| | |
| | try: |
| | |
| | eslint_cmd = [ |
| | 'npx', |
| | 'eslint', |
| | '--config', temp_config_path, |
| | '--format', 'json', |
| | '--ext', '.ts,.tsx', |
| | ] + ts_files |
| | |
| | eslint_process = subprocess.run( |
| | eslint_cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | eslint_issues = [] |
| | if eslint_process.stdout.strip(): |
| | try: |
| | eslint_results = json.loads(eslint_process.stdout) |
| | |
| | |
| | for result in eslint_results: |
| | file_path = result.get('filePath', '') |
| | for message in result.get('messages', []): |
| | eslint_issues.append({ |
| | 'path': file_path, |
| | 'line': message.get('line', 0), |
| | 'column': message.get('column', 0), |
| | 'message': message.get('message', ''), |
| | 'severity': message.get('severity', 0), |
| | 'ruleId': message.get('ruleId', ''), |
| | 'source': 'eslint', |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing ESLint output: {eslint_process.stdout}") |
| | |
| | |
| | tsc_cmd = [ |
| | 'npx', |
| | 'tsc', |
| | '--project', temp_tsconfig_path, |
| | '--noEmit', |
| | ] |
| | |
| | tsc_process = subprocess.run( |
| | tsc_cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | tsc_issues = [] |
| | if tsc_process.stderr.strip(): |
| | |
| | for line in tsc_process.stderr.splitlines(): |
| | if ': error ' in line or ': warning ' in line: |
| | try: |
| | file_info, error_info = line.split(':', 1) |
| | file_path, line_col = file_info.rsplit('(', 1) |
| | line_num, col_num = line_col.rstrip(')').split(',') |
| | |
| | error_type, error_message = error_info.split(':', 1) |
| | error_type = error_type.strip() |
| | error_message = error_message.strip() |
| | |
| | tsc_issues.append({ |
| | 'path': file_path, |
| | 'line': int(line_num), |
| | 'column': int(col_num), |
| | 'message': error_message, |
| | 'severity': 2 if 'error' in error_type else 1, |
| | 'ruleId': error_type, |
| | 'source': 'tsc', |
| | }) |
| | except Exception as e: |
| | logger.warning(f"Error parsing TSC output line: {line}, error: {e}") |
| | |
| | |
| | all_issues = eslint_issues + tsc_issues |
| | |
| | |
| | issues_by_source = defaultdict(list) |
| | issues_by_severity = defaultdict(list) |
| | |
| | for issue in all_issues: |
| | source = issue.get('source', 'unknown') |
| | issues_by_source[source].append(issue) |
| | |
| | severity = issue.get('severity', 0) |
| | severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') |
| | issues_by_severity[severity_name].append(issue) |
| | |
| | return { |
| | 'status': 'success', |
| | 'issues': all_issues, |
| | 'issues_by_source': dict(issues_by_source), |
| | 'issues_by_severity': dict(issues_by_severity), |
| | 'issue_count': len(all_issues), |
| | 'files_analyzed': len(ts_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error analyzing TypeScript code: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |
| | |
| | finally: |
| | |
| | for temp_file in [temp_config_path, temp_tsconfig_path]: |
| | if os.path.exists(temp_file): |
| | os.unlink(temp_file) |
| | |
| | def _analyze_java(self, repo_path): |
| | """ |
| | Analyze Java code using PMD. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Analysis results. |
| | """ |
| | logger.info(f"Analyzing Java code in {repo_path}") |
| | |
| | |
| | java_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.java'): |
| | java_files.append(os.path.join(root, file)) |
| | |
| | if not java_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Java files found in the repository.', |
| | 'issues': [], |
| | } |
| | |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: |
| | temp_path = temp_file.name |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'pmd', |
| | 'check', |
| | '--dir', repo_path, |
| | '--format', 'json', |
| | '--rulesets', 'category/java/bestpractices.xml,category/java/codestyle.xml,category/java/design.xml,category/java/errorprone.xml,category/java/multithreading.xml,category/java/performance.xml,category/java/security.xml', |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | if process.stdout.strip(): |
| | try: |
| | pmd_results = json.loads(process.stdout) |
| | |
| | |
| | issues = [] |
| | for file_result in pmd_results.get('files', []): |
| | file_path = file_result.get('filename', '') |
| | for violation in file_result.get('violations', []): |
| | issues.append({ |
| | 'path': file_path, |
| | 'line': violation.get('beginline', 0), |
| | 'endLine': violation.get('endline', 0), |
| | 'column': violation.get('begincolumn', 0), |
| | 'endColumn': violation.get('endcolumn', 0), |
| | 'message': violation.get('description', ''), |
| | 'rule': violation.get('rule', ''), |
| | 'ruleset': violation.get('ruleset', ''), |
| | 'priority': violation.get('priority', 0), |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing PMD output: {process.stdout}") |
| | issues = [] |
| | else: |
| | issues = [] |
| | |
| | |
| | issues_by_ruleset = defaultdict(list) |
| | for issue in issues: |
| | ruleset = issue.get('ruleset', 'unknown') |
| | issues_by_ruleset[ruleset].append(issue) |
| | |
| | return { |
| | 'status': 'success', |
| | 'issues': issues, |
| | 'issues_by_ruleset': dict(issues_by_ruleset), |
| | 'issue_count': len(issues), |
| | 'files_analyzed': len(java_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running PMD: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |
| | |
| | finally: |
| | |
| | if os.path.exists(temp_path): |
| | os.unlink(temp_path) |
| | |
| | def _analyze_go(self, repo_path): |
| | """ |
| | Analyze Go code using golangci-lint. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Analysis results. |
| | """ |
| | logger.info(f"Analyzing Go code in {repo_path}") |
| | |
| | |
| | go_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.go'): |
| | go_files.append(os.path.join(root, file)) |
| | |
| | if not go_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Go files found in the repository.', |
| | 'issues': [], |
| | } |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'golangci-lint', |
| | 'run', |
| | '--out-format=json', |
| | repo_path, |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | cwd=repo_path, |
| | ) |
| | |
| | |
| | if process.stdout.strip(): |
| | try: |
| | lint_results = json.loads(process.stdout) |
| | |
| | |
| | issues = [] |
| | for issue in lint_results.get('Issues', []): |
| | issues.append({ |
| | 'path': issue.get('Pos', {}).get('Filename', ''), |
| | 'line': issue.get('Pos', {}).get('Line', 0), |
| | 'column': issue.get('Pos', {}).get('Column', 0), |
| | 'message': issue.get('Text', ''), |
| | 'linter': issue.get('FromLinter', ''), |
| | 'severity': 'error' if issue.get('Severity', '') == 'error' else 'warning', |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing golangci-lint output: {process.stdout}") |
| | issues = [] |
| | else: |
| | issues = [] |
| | |
| | |
| | issues_by_linter = defaultdict(list) |
| | for issue in issues: |
| | linter = issue.get('linter', 'unknown') |
| | issues_by_linter[linter].append(issue) |
| | |
| | return { |
| | 'status': 'success', |
| | 'issues': issues, |
| | 'issues_by_linter': dict(issues_by_linter), |
| | 'issue_count': len(issues), |
| | 'files_analyzed': len(go_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running golangci-lint: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |
| | |
| | def _analyze_rust(self, repo_path): |
| | """ |
| | Analyze Rust code using clippy. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Analysis results. |
| | """ |
| | logger.info(f"Analyzing Rust code in {repo_path}") |
| | |
| | |
| | rust_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.rs'): |
| | rust_files.append(os.path.join(root, file)) |
| | |
| | if not rust_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Rust files found in the repository.', |
| | 'issues': [], |
| | } |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'cargo', |
| | 'clippy', |
| | '--message-format=json', |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | cwd=repo_path, |
| | ) |
| | |
| | |
| | issues = [] |
| | if process.stdout.strip(): |
| | for line in process.stdout.splitlines(): |
| | try: |
| | message = json.loads(line) |
| | if message.get('reason') == 'compiler-message': |
| | msg = message.get('message', {}) |
| | spans = msg.get('spans', []) |
| | |
| | if spans: |
| | primary_span = next((s for s in spans if s.get('is_primary')), spans[0]) |
| | file_path = primary_span.get('file_name', '') |
| | line_num = primary_span.get('line_start', 0) |
| | column = primary_span.get('column_start', 0) |
| | |
| | issues.append({ |
| | 'path': file_path, |
| | 'line': line_num, |
| | 'column': column, |
| | 'message': msg.get('message', ''), |
| | 'level': msg.get('level', ''), |
| | 'code': msg.get('code', {}).get('code', ''), |
| | }) |
| | except json.JSONDecodeError: |
| | continue |
| | |
| | |
| | issues_by_level = defaultdict(list) |
| | for issue in issues: |
| | level = issue.get('level', 'unknown') |
| | issues_by_level[level].append(issue) |
| | |
| | return { |
| | 'status': 'success', |
| | 'issues': issues, |
| | 'issues_by_level': dict(issues_by_level), |
| | 'issue_count': len(issues), |
| | 'files_analyzed': len(rust_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running clippy: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'issues': [], |
| | } |