25.2 C
New York
Monday, June 30, 2025

Construct Customized AI Instruments for Your AI Brokers that Mix Machine Studying and Statistical Evaluation


class IntelligentDataAnalyzer(BaseTool):
   identify: str = "intelligent_data_analyzer"
   description: str = "Superior information evaluation instrument that performs statistical evaluation, machine studying clustering, outlier detection, correlation evaluation, and generates visualizations with actionable insights."
   args_schema: kind[BaseModel] = DataAnalysisInput
   response_format: str = "content_and_artifact"
  
   def _run(self, information: Record[Dict], analysis_type: str = "complete", target_column: Optionally available[str] = None, max_clusters: int = 5) -> Tuple[str, Dict]:
       strive:
           df = pd.DataFrame(information)
           if df.empty:
               elevate ToolException("Dataset is empty")
          
           insights = {"dataset_info": self._get_dataset_info(df)}
          
           if analysis_type in ["comprehensive", "correlation"]:
               insights["correlation_analysis"] = self._correlation_analysis(df)
           if analysis_type in ["comprehensive", "clustering"]:
               insights["clustering_analysis"] = self._clustering_analysis(df, max_clusters)
           if analysis_type in ["comprehensive", "outlier"]:
               insights["outlier_detection"] = self._outlier_detection(df)
          
           if target_column and target_column in df.columns:
               insights["target_analysis"] = self._target_analysis(df, target_column)
          
           suggestions = self._generate_recommendations(df, insights)
           abstract = self._create_analysis_summary(insights, suggestions)
          
           artifact = {
               "insights": insights,
               "suggestions": suggestions,
               "data_shape": df.form,
               "analysis_type": analysis_type,
               "numeric_columns": df.select_dtypes(embody=[np.number]).columns.tolist(),
               "categorical_columns": df.select_dtypes(embody=['object']).columns.tolist()
           }
          
           return abstract, artifact
          
       besides Exception as e:
           elevate ToolException(f"Evaluation failed: {str(e)}")
  
   def _get_dataset_info(self, df: pd.DataFrame) -> Dict:
       return {
           "form": df.form,
           "columns": df.columns.tolist(),
           "dtypes": df.dtypes.astype(str).to_dict(),
           "missing_values": df.isnull().sum().to_dict(),
           "memory_usage": df.memory_usage(deep=True).sum()
       }
  
   def _correlation_analysis(self, df: pd.DataFrame) -> Dict:
       numeric_df = df.select_dtypes(embody=[np.number])
       if numeric_df.empty:
           return {"message": "No numeric columns for correlation evaluation"}
      
       corr_matrix = numeric_df.corr()
       strong_corr = []
       for i in vary(len(corr_matrix.columns)):
           for j in vary(i+1, len(corr_matrix.columns)):
               corr_val = corr_matrix.iloc[i, j]
               if abs(corr_val) > 0.7:
                   strong_corr.append({"var1": corr_matrix.columns[i], "var2": corr_matrix.columns[j], "correlation": spherical(corr_val, 3)})
      
       return {
           "correlation_matrix": corr_matrix.spherical(3).to_dict(),
           "strong_correlations": strong_corr,
           "avg_correlation": spherical(corr_matrix.values[np.triu_indices_from(corr_matrix.values, k=1)].imply(), 3)
       }
  
   def _clustering_analysis(self, df: pd.DataFrame, max_clusters: int) -> Dict:
       numeric_df = df.select_dtypes(embody=[np.number]).dropna()
       if numeric_df.form[0] < 2 or numeric_df.form[1] < 2:
           return {"message": "Inadequate numeric information for clustering"}
      
       scaler = StandardScaler()
       scaled_data = scaler.fit_transform(numeric_df)
      
       inertias = []
       K_range = vary(1, min(max_clusters + 1, len(numeric_df) // 2 + 1))
      
       for okay in K_range:
           kmeans = KMeans(n_clusters=okay, random_state=42, n_init=10)
           kmeans.match(scaled_data)
           inertias.append(kmeans.inertia_)
      
       optimal_k = self._find_elbow_point(inertias, K_range)
       kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
       cluster_labels = kmeans.fit_predict(scaled_data)
      
       cluster_stats = {}
       for i in vary(optimal_k):
           cluster_data = numeric_df[cluster_labels == i]
           cluster_stats[f"cluster_{i}"] = {
               "measurement": len(cluster_data),
               "proportion": spherical(len(cluster_data) / len(numeric_df) * 100, 1),
               "means": cluster_data.imply().spherical(3).to_dict()
           }
      
       return {
           "optimal_clusters": optimal_k,
           "cluster_stats": cluster_stats,
           "silhouette_score": spherical(silhouette_score(scaled_data, cluster_labels), 3) if len(set(cluster_labels)) > 1 else 0.0,
           "inertias": inertias
       }
  
   def _outlier_detection(self, df: pd.DataFrame) -> Dict:
       numeric_df = df.select_dtypes(embody=[np.number])
       if numeric_df.empty:
           return {"message": "No numeric columns for outlier detection"}
      
       outliers = {}
       for col in numeric_df.columns:
           information = numeric_df[col].dropna()
           Q1, Q3 = information.quantile(0.25), information.quantile(0.75)
           IQR = Q3 - Q1
           iqr_outliers = information[(data < Q1 - 1.5 * IQR) | (data > Q3 + 1.5 * IQR)]
           z_scores = np.abs((information - information.imply()) / information.std())
           z_outliers = information[z_scores > 3]
          
           outliers[col] = {
               "iqr_outliers": len(iqr_outliers),
               "z_score_outliers": len(z_outliers),
               "outlier_percentage": spherical(len(iqr_outliers) / len(information) * 100, 2)
           }
      
       return outliers
  
   def _target_analysis(self, df: pd.DataFrame, target_col: str) -> Dict:
       if target_col not in df.columns:
           return {"error": f"Column {target_col} not discovered"}
      
       target_data = df[target_col].dropna()
      
       if pd.api.sorts.is_numeric_dtype(target_data):
           return {
               "kind": "numeric",
               "stats": {
                   "imply": spherical(target_data.imply(), 3),
                   "median": spherical(target_data.median(), 3),
                   "std": spherical(target_data.std(), 3),
                   "skewness": spherical(target_data.skew(), 3),
                   "kurtosis": spherical(target_data.kurtosis(), 3)
               },
               "distribution": "regular" if abs(target_data.skew()) < 0.5 else "skewed"
           }
       else:
           value_counts = target_data.value_counts()
           return {
               "kind": "categorical",
               "unique_values": len(value_counts),
               "most_common": value_counts.head(5).to_dict(),
               "entropy": spherical(-sum((p := value_counts / len(target_data)) * np.log2(p + 1e-10)), 3)
           }
  
   def _generate_recommendations(self, df: pd.DataFrame, insights: Dict) -> Record[str]:
       suggestions = []
      
       missing_pct = sum(insights["dataset_info"]["missing_values"].values()) / (df.form[0] * df.form[1]) * 100
       if missing_pct > 10:
           suggestions.append(f"Take into account information imputation - {missing_pct:.1f}% lacking values detected")
      
       if "correlation_analysis" in insights and insights["correlation_analysis"].get("strong_correlations"):
           suggestions.append("Robust correlations detected - take into account characteristic choice or dimensionality discount")
      
       if "clustering_analysis" in insights:
           cluster_info = insights["clustering_analysis"]
           if isinstance(cluster_info, dict) and "optimal_clusters" in cluster_info:
               suggestions.append(f"Information segments into {cluster_info['optimal_clusters']} distinct teams - helpful for focused methods")
      
       if "outlier_detection" in insights:
           high_outlier_cols = [col for col, info in insights["outlier_detection"].gadgets() if isinstance(information, dict) and information.get("outlier_percentage", 0) > 5]
           if high_outlier_cols:
               suggestions.append(f"Excessive outlier proportion in: {', '.be a part of(high_outlier_cols)} - examine information high quality")
      
       return suggestions if suggestions else ["Data appears well-structured with no immediate concerns"]
  
   def _create_analysis_summary(self, insights: Dict, suggestions: Record[str]) -> str:
       dataset_info = insights["dataset_info"]
       abstract = f"""📊 INTELLIGENT DATA ANALYSIS COMPLETE


Dataset Overview: {dataset_info['shape'][0]} rows × {dataset_info['shape'][1]} columns
Numeric Options: {len([c for c, t in dataset_info['dtypes'].gadgets() if 'int' in t or 'float' in t])}
Categorical Options: {len([c for c, t in dataset_info['dtypes'].gadgets() if 'object' in t])}


Key Insights Generated:
• Statistical correlations and relationships recognized
• Clustering patterns found for segmentation
• Outlier detection accomplished for information high quality evaluation
• Characteristic significance and distribution evaluation carried out


Prime Suggestions:
{chr(10).be a part of('• ' + rec for rec in suggestions[:3])}


Evaluation contains ML-powered clustering, statistical correlations, and actionable enterprise insights."""
      
       return abstract
  
   def _find_elbow_point(self, inertias: Record[float], k_range: vary) -> int:
       if len(inertias) < 3:
           return listing(k_range)[0]
       diffs = [inertias[i-1] - inertias[i] for i in vary(1, len(inertias))]
       return listing(k_range)[diffs.index(max(diffs)) + 1] if diffs else listing(k_range)[0]

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles