File size: 6,044 Bytes
e020ac8
 
 
 
 
 
3f44a73
 
 
e020ac8
3f44a73
 
e020ac8
3f44a73
e020ac8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
ML suggestions - anomaly detection and insights
"""
from typing import List, Dict, Any
import logging
import statistics


class MLSuggester:
    """Generate ML-based suggestions from data"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def generate(self, data: List[Dict[str, Any]], context: str = None) -> List[Dict[str, Any]]:
        """Generate ML suggestions"""
        suggestions = []
        
        if not data:
            return suggestions
        
        # Detect missing values
        suggestions.extend(self._check_missing_values(data))
        
        # Detect outliers
        suggestions.extend(self._detect_outliers(data))
        
        # Detect imbalances
        suggestions.extend(self._detect_imbalances(data))
        
        # Detect data quality issues
        suggestions.extend(self._detect_quality_issues(data))
        
        # Sort by confidence
        suggestions.sort(key=lambda x: x["confidence"], reverse=True)
        
        return suggestions[:10]  # Top 10 suggestions
    
    def _check_missing_values(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Check for missing values"""
        suggestions = []
        
        for col in data.keys():
            missing_count = sum(1 for row in data if row[col] is None or row[col] == "")
            if missing_count > 0:
                percentage = (missing_count / len(data)) * 100
                
                if percentage > 50:
                    suggestions.append({
                        "title": f"High Missing Values in {col}",
                        "description": f"{percentage:.1f}% of {col} is missing. Consider data imputation or removal.",
                        "confidence": min(0.95, percentage / 100),
                        "action": "impute_or_remove",
                        "category": "data_quality"
                    })
                elif percentage > 10:
                    suggestions.append({
                        "title": f"Missing Values in {col}",
                        "description": f"{percentage:.1f}% of {col} is missing. Consider handling these values.",
                        "confidence": min(0.8, percentage / 100),
                        "action": "handle_missing",
                        "category": "data_quality"
                    })
        
        return suggestions
    
    def _detect_outliers(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Detect outliers"""
        suggestions = []
        numeric_cols = self._get_numeric_columns(data)
        
        for col in numeric_cols:
            values = [row[col] for row in data if row[col] is not None]
            if len(values) > 4:
                try:
                    q1 = sorted(values)[len(values) // 4]
                    q3 = sorted(values)[3 * len(values) // 4]
                    iqr = q3 - q1
                    outliers = [v for v in values if v < q1 - 1.5 * iqr or v > q3 + 1.5 * iqr]
                    
                    if outliers:
                        outlier_percentage = (len(outliers) / len(values)) * 100
                        if outlier_percentage > 5:
                            suggestions.append({
                                "title": f"Outliers Detected in {col}",
                                "description": f"{len(outliers)} outlier(s) ({outlier_percentage:.1f}%) found. Review and handle appropriately.",
                                "confidence": min(0.85, outlier_percentage / 100),
                                "action": "review_outliers",
                                "category": "anomaly"
                            })
                except:
                    pass
        
        return suggestions
    
    def _detect_imbalances(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Detect data imbalances"""
        suggestions = []
        
        for col in data.keys():
            values = [row[col] for row in data if row[col] is not None]
            if len(set(values)) < len(values) * 0.1:  # Few unique values
                from collections import Counter
                counts = Counter(values)
                most_common_count = counts.most_common(1)
                imbalance_ratio = most_common_count / len(values)
                
                if imbalance_ratio > 0.8:
                    suggestions.append({
                        "title": f"Class Imbalance in {col}",
                        "description": f"One value appears {imbalance_ratio*100:.1f}% of the time. Data is highly imbalanced.",
                        "confidence": min(0.9, imbalance_ratio),
                        "action": "rebalance_data",
                        "category": "pattern"
                    })
        
        return suggestions
    
    def _detect_quality_issues(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Detect data quality issues"""
        suggestions = []
        
        # Check for duplicates
        if len(data) != len(set(str(row) for row in data)):
            duplicate_count = len(data) - len(set(str(row) for row in data))
            suggestions.append({
                "title": "Duplicate Records Found",
                "description": f"{duplicate_count} duplicate row(s) detected. Consider deduplication.",
                "confidence": 0.9,
                "action": "deduplicate",
                "category": "data_quality"
            })
        
        return suggestions
    
    @staticmethod
    def _get_numeric_columns(data: List[Dict[str, Any]]) -> List[str]:
        """Get numeric columns"""
        if not data:
            return []
        
        numeric = []
        for key in data.keys():
            try:
                for row in data:
                    if row[key] is not None:
                        float(row[key])
                numeric.append(key)
            except (ValueError, TypeError):
                pass
        return numeric