File size: 8,409 Bytes
267ad85
e020ac8
267ad85
 
 
e020ac8
267ad85
3f44a73
 
 
267ad85
3f44a73
 
267ad85
 
 
 
 
 
e020ac8
267ad85
 
 
 
 
 
 
e020ac8
267ad85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e020ac8
267ad85
 
 
 
 
 
 
 
 
 
 
 
 
e020ac8
267ad85
 
e020ac8
267ad85
e020ac8
267ad85
 
 
 
 
 
 
 
 
 
e020ac8
267ad85
e020ac8
267ad85
 
 
e020ac8
 
267ad85
e020ac8
 
267ad85
 
e020ac8
 
 
267ad85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e020ac8
267ad85
e020ac8
267ad85
 
 
e020ac8
 
267ad85
e020ac8
 
267ad85
 
e020ac8
 
 
267ad85
 
 
 
 
e020ac8
267ad85
 
 
 
 
 
 
 
 
 
 
 
 
 
e020ac8
267ad85
e020ac8
267ad85
 
 
e020ac8
 
267ad85
e020ac8
 
267ad85
 
e020ac8
267ad85
 
e020ac8
267ad85
 
 
 
 
 
 
e020ac8
267ad85
 
e020ac8
267ad85
 
 
 
 
 
 
 
 
 
 
 
 
 
e020ac8
267ad85
e020ac8
267ad85
 
 
 
 
 
e020ac8
 
267ad85
e020ac8
267ad85
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""Data analysis service"""
import logging
from typing import Dict, List, Any
import pandas as pd
import numpy as np

logger = logging.getLogger(__name__)


class Analyzer:
    """Service for analyzing data"""
    
    def __init__(self):
        self.supported_types = [
            "statistical_summary",
            "trend_detection",
            "outlier_detection",
            "correlation_analysis"
        ]
    
    async def analyze(
        self,
        data: List[Dict],
        analysis_type: str,
        columns: List[str] = None
    ) -> Dict[str, Any]:
        """Perform data analysis"""
        
        logger.info(f"πŸ“Š Starting analysis: {analysis_type}")
        
        # Validate analysis type
        if analysis_type not in self.supported_types:
            raise ValueError(
                f"Unknown analysis type: {analysis_type}. "
                f"Supported types: {', '.join(self.supported_types)}"
            )
        
        # Convert to DataFrame
        df = pd.DataFrame(data)
        
        # Select columns if specified
        if columns:
            numeric_columns = [col for col in columns if col in df.columns]
        else:
            numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
        
        logger.info(f"Analyzing columns: {numeric_columns}")
        
        # Route to appropriate analysis
        if analysis_type == "statistical_summary":
            return await self._statistical_summary(df, numeric_columns)
        elif analysis_type == "trend_detection":
            return await self._trend_detection(df, numeric_columns)
        elif analysis_type == "outlier_detection":
            return await self._outlier_detection(df, numeric_columns)
        elif analysis_type == "correlation_analysis":
            return await self._correlation_analysis(df, numeric_columns)
    
    async def _statistical_summary(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]:
        """Generate statistical summary"""
        try:
            results = {}
            
            for col in columns:
                if pd.api.types.is_numeric_dtype(df[col]):
                    results[col] = {
                        "mean": float(df[col].mean()),
                        "median": float(df[col].median()),
                        "std": float(df[col].std()),
                        "min": float(df[col].min()),
                        "max": float(df[col].max()),
                        "count": int(df[col].count())
                    }
            
            logger.info(f"βœ… Statistical summary complete for {len(results)} columns")
            return {
                "type": "statistical_summary",
                "results": results,
                "rows_analyzed": len(df)
            }
        except Exception as e:
            logger.error(f"❌ Statistical summary failed: {e}")
            raise
    
    async def _trend_detection(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]:
        """Detect trends in data"""
        try:
            trends = {}
            
            for col in columns:
                if pd.api.types.is_numeric_dtype(df[col]):
                    values = df[col].dropna().values
                    if len(values) > 1:
                        # Simple trend: compare first half vs second half
                        mid = len(values) // 2
                        first_half_mean = np.mean(values[:mid])
                        second_half_mean = np.mean(values[mid:])
                        
                        if second_half_mean > first_half_mean:
                            trend = "increasing"
                            trend_strength = ((second_half_mean - first_half_mean) / first_half_mean * 100) if first_half_mean != 0 else 0
                        elif second_half_mean < first_half_mean:
                            trend = "decreasing"
                            trend_strength = ((first_half_mean - second_half_mean) / first_half_mean * 100) if first_half_mean != 0 else 0
                        else:
                            trend = "stable"
                            trend_strength = 0
                        
                        trends[col] = {
                            "trend": trend,
                            "strength": float(trend_strength),
                            "first_half_avg": float(first_half_mean),
                            "second_half_avg": float(second_half_mean)
                        }
            
            logger.info(f"βœ… Trend detection complete for {len(trends)} columns")
            return {
                "type": "trend_detection",
                "results": trends,
                "rows_analyzed": len(df)
            }
        except Exception as e:
            logger.error(f"❌ Trend detection failed: {e}")
            raise
    
    async def _outlier_detection(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]:
        """Detect outliers in data"""
        try:
            outliers = {}
            
            for col in columns:
                if pd.api.types.is_numeric_dtype(df[col]):
                    Q1 = df[col].quantile(0.25)
                    Q3 = df[col].quantile(0.75)
                    IQR = Q3 - Q1
                    
                    lower_bound = Q1 - 1.5 * IQR
                    upper_bound = Q3 + 1.5 * IQR
                    
                    outlier_mask = (df[col] < lower_bound) | (df[col] > upper_bound)
                    outlier_count = outlier_mask.sum()
                    outlier_indices = df[outlier_mask].index.tolist()
                    
                    outliers[col] = {
                        "count": int(outlier_count),
                        "percentage": float(outlier_count / len(df) * 100),
                        "lower_bound": float(lower_bound),
                        "upper_bound": float(upper_bound),
                        "outlier_indices": outlier_indices[:10]  # First 10
                    }
            
            logger.info(f"βœ… Outlier detection complete for {len(outliers)} columns")
            return {
                "type": "outlier_detection",
                "results": outliers,
                "rows_analyzed": len(df)
            }
        except Exception as e:
            logger.error(f"❌ Outlier detection failed: {e}")
            raise
    
    async def _correlation_analysis(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]:
        """Analyze correlations between columns"""
        try:
            # Get numeric data
            numeric_df = df[columns].select_dtypes(include=[np.number])
            
            if len(numeric_df.columns) < 2:
                return {
                    "type": "correlation_analysis",
                    "results": {},
                    "message": "Need at least 2 numeric columns for correlation analysis",
                    "rows_analyzed": len(df)
                }
            
            # Calculate correlation matrix
            corr_matrix = numeric_df.corr()
            
            # Find strong correlations
            strong_correlations = []
            for i in range(len(corr_matrix.columns)):
                for j in range(i+1, len(corr_matrix.columns)):
                    col_i = corr_matrix.columns[i]
                    col_j = corr_matrix.columns[j]
                    corr_value = corr_matrix.iloc[i, j]
                    
                    if abs(corr_value) > 0.5:  # Strong correlation threshold
                        strong_correlations.append({
                            "column_1": col_i,
                            "column_2": col_j,
                            "correlation": float(corr_value)
                        })
            
            logger.info(f"βœ… Correlation analysis complete with {len(strong_correlations)} strong correlations")
            return {
                "type": "correlation_analysis",
                "results": {
                    "strong_correlations": strong_correlations,
                    "correlation_matrix": corr_matrix.to_dict()
                },
                "rows_analyzed": len(df)
            }
        except Exception as e:
            logger.error(f"❌ Correlation analysis failed: {e}")
            raise


# Global analyzer instance
analyzer = Analyzer()