from __future__ import annotations
"""Core analytics for recipe interactions (popularity & rating relationships).
This module was (re)created after the previous refactor removed the original
implementation inadvertently. It provides a focused analytical surface used by
Streamlit pages:
- Popularity vs Rating
- Rating vs structural features (minutes, n_steps, n_ingredients)
- Popularity vs structural features
Design goals:
- Pure computation (no Streamlit / plotting here)
- Accept pre-loaded DataFrames (recipes, interactions) or a unified df
- Graceful handling of missing columns
- Light, dependency‑minimal (pandas only)
Public class: InteractionsAnalyzer
Backward compatibility alias: InteractionsExplorer
"""
import hashlib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, Optional
import numpy as np
import pandas as pd
from .cacheable_mixin import CacheableMixin
from .logger import get_logger
# Column name mappings for compatibility
RECIPE_ID_COL = "recipe_id"
RATING_COL = "rating"
[docs]
@dataclass
class PreprocessingConfig:
"""Configuration for data preprocessing options."""
enable_preprocessing: bool = True
outlier_method: str = "iqr" # "iqr", "zscore", "none"
outlier_threshold: float = 5.0 # Fixed IQR multiplier for optimal data retention (95.1%)
[docs]
def get_hash(self) -> str:
"""Generate hash for cache validation."""
config_str = f"{self.enable_preprocessing}_{self.outlier_method}_{self.outlier_threshold}"
return hashlib.md5(config_str.encode()).hexdigest()[:8]
[docs]
@dataclass
class InteractionsAnalyzer(CacheableMixin):
"""Compute relational aggregates between interactions & recipe metadata.
Parameters
----------
interactions : pd.DataFrame
Raw interactions dataframe (expects at least recipe_id; may include rating)
recipes : pd.DataFrame | None
Recipes dataframe providing features (minutes, n_steps, ingredients...)
merged : pd.DataFrame | None
Pre‑merged dataframe (bypasses merge step if provided). If given it
supersedes interactions/recipes arguments.
"""
interactions: Optional[pd.DataFrame] = None
recipes: Optional[pd.DataFrame] = None
merged: Optional[pd.DataFrame] = None
preprocessing: PreprocessingConfig = field(default_factory=PreprocessingConfig)
cache_enabled: bool = True # Cache control parameter
def __post_init__(self) -> None:
# Initialize mixin first
CacheableMixin.__init__(self)
self.logger = get_logger()
# Define fixed CSV cache paths (IQR 5.0 optimal)
self.data_dir = Path("data")
self.merged_csv_path = self.data_dir / "merged_interactions_recipes_optimized.csv"
self.aggregated_csv_path = self.data_dir / "aggregated_popularity_metrics_optimized.csv"
self.config_cache_path = self.data_dir / ".preprocessing_config_optimized.txt"
# Enable/disable cache based on parameter
self.enable_cache(self.cache_enabled)
# Choose data source: provided DataFrames have priority over CSV cache
if self.merged is not None:
# Pre-merged data provided - use it directly
self._df = self.merged
self.logger.info("Using provided pre-merged DataFrame")
elif self.interactions is not None and self.recipes is not None:
# Merge provided DataFrames
self._df = self._merge_data()
self.logger.info("Using provided interactions and recipes DataFrames")
elif self.interactions is not None:
# Only interactions provided - no merge needed
self._df = self.interactions
self.logger.info("Using provided interactions DataFrame only")
else:
# No data provided - use optimized CSV cache system
self._df = self._load_or_compute_merged_data()
# Cache for aggregated data in memory
self._aggregated_cache = None
def _get_default_cache_params(self) -> dict:
"""Generate cache parameters for the current configuration."""
return {
"preprocessing_config": self.preprocessing.get_hash(),
"has_merged": self.merged is not None,
"interactions_shape": (self.interactions.shape if self.interactions is not None else None),
"recipes_shape": self.recipes.shape if self.recipes is not None else None,
"merged_shape": self.merged.shape if self.merged is not None else None,
}
[docs]
def get_cache_info(self) -> dict:
"""Get cache information compatible with the old interface."""
from .cache_manager import get_cache_manager
cache_manager = get_cache_manager()
cache_info = cache_manager.get_info()
# Check if cache exists for this analyzer
analyzer_name = "interactions"
analyzer_files = 0
cache_exists = False
if analyzer_name in cache_info.get("analyzers", {}):
analyzer_files = cache_info["analyzers"][analyzer_name].get("files", 0)
cache_exists = analyzer_files > 0
# Return format compatible with old interface
return {
"cache_enabled": True, # Always enabled with new cache system
"cache_exists": cache_exists,
"cache_files_count": analyzer_files, # Add missing key for compatibility
"cache_info": cache_info, # Include full cache info for advanced usage
"total_files": cache_info.get("total_files", 0),
"total_size_mb": cache_info.get("total_size_mb", 0.0),
}
def _load_or_compute_merged_data(self) -> pd.DataFrame:
"""Load merged data from CSV cache or compute if not available."""
# Check if CSV cache exists and is valid
if self.merged_csv_path.exists() and self.config_cache_path.exists():
try:
# Validate config
if self._is_csv_cache_valid():
self.logger.info("Loading merged data from optimized CSV cache (IQR 5.0)")
return pd.read_csv(self.merged_csv_path)
else:
self.logger.info("CSV cache invalid, recomputing with optimal IQR 5.0 settings")
except Exception as e:
self.logger.warning(f"Error loading CSV cache: {e}")
# Cache miss or invalid - compute data
self.logger.info("Computing optimized merged data (IQR 5.0 for 95.1% data retention)")
data = self._compute_preprocessed_data()
# Save to cache
self._save_merged_csv_cache(data)
return data
def _is_csv_cache_valid(self) -> bool:
"""Check if CSV cache is valid based on configuration."""
if not self.config_cache_path.exists():
return False
try:
with open(self.config_cache_path, 'r') as f:
cached_config = f.read().strip()
current_config = self._get_current_config_string()
return cached_config == current_config
except Exception:
return False
def _merge_data(self) -> pd.DataFrame:
"""Merge provided interactions and recipes DataFrames."""
if self.interactions is None:
raise ValueError("interactions DataFrame is required for merging")
inter = self._standardize_cols(self.interactions.copy())
if self.recipes is not None:
rec = self._standardize_cols(self.recipes.copy())
# Handle common alternate primary key naming ('id' -> 'recipe_id')
if RECIPE_ID_COL not in rec.columns and "id" in rec.columns:
rec = rec.rename(columns={"id": RECIPE_ID_COL})
# prefer left join to keep only interactions that occurred
if RECIPE_ID_COL in rec.columns:
df = inter.merge(rec, on=RECIPE_ID_COL, how="left", suffixes=("", "_r"))
else:
df = inter
else:
df = inter
# Apply preprocessing if enabled
if self.preprocessing.enable_preprocessing:
self.logger.info("Starting data preprocessing (outlier removal)")
df, self.preprocessing_stats = self._preprocess_data(df)
else:
self.preprocessing_stats = None
return df
def _save_merged_csv_cache(self, data: pd.DataFrame) -> None:
"""Save merged data to CSV cache."""
try:
# Ensure directory exists
self.merged_csv_path.parent.mkdir(parents=True, exist_ok=True)
# Save data
data.to_csv(self.merged_csv_path, index=False)
# Save config for validation
current_config = self._get_current_config_string()
with open(self.config_cache_path, 'w') as f:
f.write(current_config)
size_mb = self.merged_csv_path.stat().st_size / (1024 * 1024)
self.logger.info(f"Saved optimized merged data to CSV cache ({size_mb:.1f} MB)")
except Exception as e:
self.logger.error(f"Error saving CSV cache: {e}")
def _get_current_config_string(self) -> str:
"""Get current configuration as string for cache validation."""
return f"optimized_iqr_5.0_{self.preprocessing.get_hash()}"
def _save_aggregated_csv_cache(self, data: pd.DataFrame) -> None:
"""Save aggregated data to CSV cache."""
try:
# Ensure directory exists
self.aggregated_csv_path.parent.mkdir(parents=True, exist_ok=True)
# Save data
data.to_csv(self.aggregated_csv_path, index=False)
size_mb = self.aggregated_csv_path.stat().st_size / (1024 * 1024)
self.logger.info(f"Saved optimized aggregated data to CSV cache ({size_mb:.1f} MB)")
except Exception as e:
self.logger.error(f"Error saving aggregated CSV cache: {e}")
def _compute_preprocessed_data(self) -> pd.DataFrame:
"""Compute the preprocessed data (called when not in cache)."""
self.logger.info("Computing preprocessed data from scratch")
# Step 1: Initial data merging and standardization
if self.merged is not None:
df = self._standardize_cols(self.merged.copy())
else:
if self.interactions is None:
raise ValueError("Provide either 'merged' or 'interactions' DataFrame")
inter = self._standardize_cols(self.interactions.copy())
if self.recipes is not None:
rec = self._standardize_cols(self.recipes.copy())
# Handle common alternate primary key naming ('id' -> 'recipe_id')
if RECIPE_ID_COL not in rec.columns and "id" in rec.columns:
rec = rec.rename(columns={"id": RECIPE_ID_COL})
# prefer left join to keep only interactions that occurred
if RECIPE_ID_COL in rec.columns:
df = inter.merge(rec, on=RECIPE_ID_COL, how="left", suffixes=("", "_r"))
else:
df = inter
else:
df = inter
# Step 2: Derive n_ingredients if ingredients list present and column absent
if "n_ingredients" not in df.columns:
ingredient_col = self._detect_ingredients_column(df.columns)
if ingredient_col:
df["n_ingredients"] = df[ingredient_col].apply(self._safe_count_ingredients)
# Step 3: Apply preprocessing if enabled
if self.preprocessing.enable_preprocessing:
self.logger.info("Starting data preprocessing (outlier removal)")
df, self.preprocessing_stats = self._preprocess_data(df)
outliers_removed = self.preprocessing_stats.get("outliers_removed", 0)
self.logger.info(f"Preprocessing completed: {outliers_removed} outliers removed")
else:
self.preprocessing_stats = {"outliers_removed": 0}
return df
# ------------------ Internal helpers ------------------ #
@staticmethod
def _standardize_cols(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
return df
@staticmethod
def _detect_ingredients_column(cols: Iterable[str]) -> Optional[str]:
for candidate in ["ingredients", "recipe_ingredient_parts", "ingredients_list"]:
if candidate in cols:
return candidate
return None
@staticmethod
def _safe_count_ingredients(val) -> Optional[int]:
if pd.isna(val):
return None
# Expect a stringified list or already a list
try:
if isinstance(val, list):
return len(val)
text = str(val)
if text.startswith("[") and text.endswith("]"):
# crude split on commas (avoid ast for speed/safety here)
inside = text[1:-1].strip()
if not inside:
return 0
return sum(1 for _ in inside.split(","))
# fallback: count semicolons/commas
return len([p for p in text.split(",") if p.strip()])
except Exception:
return None
# ------------------ Preprocessing methods ------------------ #
def _preprocess_data(self, df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
"""Apply preprocessing: outlier removal.
Returns:
Tuple of (processed_dataframe, statistics_dict)
"""
df_processed = df.copy()
stats = {
"original_rows": len(df),
"outliers_removed": 0,
"features_processed": [],
}
# Get numerical features that exist in the dataframe
available_features = [f for f in ["minutes", "n_steps", "n_ingredients", "rating"] if f in df_processed.columns]
stats["features_processed"] = available_features
# Removed debug logs for performance
if not available_features:
self.logger.warning("No numerical features found for preprocessing")
return df_processed, stats
# Outlier detection and removal
if self.preprocessing.outlier_method != "none":
df_processed, outliers_count = self._remove_outliers(df_processed, available_features)
stats["outliers_removed"] = outliers_count
if outliers_count > 0:
self.logger.info(f"Removed {outliers_count} outliers using {self.preprocessing.outlier_method} method")
stats["final_rows"] = len(df_processed)
return df_processed, stats
def _remove_outliers(self, df: pd.DataFrame, features: list) -> tuple[pd.DataFrame, int]:
"""Remove outliers using IQR or Z-score method."""
outlier_mask = pd.Series(False, index=df.index)
for feature in features:
if feature not in df.columns:
continue
values = df[feature].dropna()
if len(values) == 0:
continue
if self.preprocessing.outlier_method == "iqr":
Q1 = values.quantile(0.25)
Q3 = values.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - self.preprocessing.outlier_threshold * IQR
upper_bound = Q3 + self.preprocessing.outlier_threshold * IQR
feature_outliers = (df[feature] < lower_bound) | (df[feature] > upper_bound)
elif self.preprocessing.outlier_method == "zscore":
z_scores = np.abs((df[feature] - values.mean()) / values.std())
feature_outliers = z_scores > self.preprocessing.outlier_threshold
else:
continue
outlier_mask |= feature_outliers.fillna(False)
# Remove outliers
df_clean = df[~outlier_mask].copy()
outliers_removed = outlier_mask.sum()
return df_clean, outliers_removed
[docs]
def get_preprocessing_stats(self) -> Optional[dict]:
"""Get preprocessing statistics if available."""
return getattr(self, "preprocessing_stats", None)
# ------------------ Aggregations ------------------ #
[docs]
def aggregate(self) -> pd.DataFrame:
"""Return core aggregate per recipe.
Metrics:
- interaction_count
- avg_rating (if ratings provided)
- minutes / n_steps / n_ingredients (if present)
"""
# Use memory cache if available
if self._aggregated_cache is not None:
self.logger.info("Using memory cache for aggregate data")
return self._aggregated_cache
# Only use CSV cache if no explicit DataFrames were provided
use_csv_cache = (self.interactions is None and self.recipes is None and self.merged is None)
if use_csv_cache and self.aggregated_csv_path.exists() and self._is_csv_cache_valid():
try:
self.logger.info("Loading aggregated data from optimized CSV cache")
self._aggregated_cache = pd.read_csv(self.aggregated_csv_path)
return self._aggregated_cache
except Exception as e:
self.logger.warning(f"Error loading aggregated CSV cache: {e}")
# Compute and cache
self.logger.info("Computing optimized aggregated data")
result = self._compute_aggregate()
# Save to CSV cache only if using CSV cache system
if use_csv_cache:
self._save_aggregated_csv_cache(result)
# Save to memory cache
self._aggregated_cache = result
return result
def _compute_aggregate(self) -> pd.DataFrame:
"""Compute the aggregation (called when not in cache)."""
if RECIPE_ID_COL not in self._df.columns:
raise KeyError(f"'{RECIPE_ID_COL}' column required for aggregation")
grp = self._df.groupby(RECIPE_ID_COL)
base = grp.size().rename("interaction_count")
frames = [base]
if RATING_COL in self._df.columns:
frames.append(grp[RATING_COL].mean().rename("avg_rating"))
for feature in ["minutes", "n_steps", "n_ingredients"]:
if feature in self._df.columns:
frames.append(grp[feature].mean().rename(feature))
agg = pd.concat(frames, axis=1).reset_index()
return agg.sort_values("interaction_count", ascending=False)
# ------------------ Relationship helpers ------------------ #
def _filter_min(self, df: pd.DataFrame, min_interactions: int) -> pd.DataFrame:
if min_interactions <= 1:
return df
return df[df["interaction_count"] >= min_interactions]
[docs]
def popularity_vs_rating(self, min_interactions: int = 1) -> pd.DataFrame:
agg = self.aggregate()
if "avg_rating" not in agg.columns:
raise ValueError("Ratings not available; cannot compute popularity_vs_rating")
return self._filter_min(agg[[RECIPE_ID_COL, "interaction_count", "avg_rating"]], min_interactions)
[docs]
def rating_vs_feature(self, feature: str, min_interactions: int = 1) -> pd.DataFrame:
if feature not in {"minutes", "n_steps", "n_ingredients"}:
raise ValueError("Unsupported feature; choose among 'minutes', 'n_steps', 'n_ingredients'")
agg = self.aggregate()
needed = {feature, "avg_rating"}
if not needed.issubset(agg.columns):
missing = needed - set(agg.columns)
raise ValueError(f"Missing columns for analysis: {missing}")
subset = agg[[RECIPE_ID_COL, "interaction_count", feature, "avg_rating"]]
subset = subset.dropna(subset=[feature, "avg_rating"])
return self._filter_min(subset, min_interactions)
[docs]
def popularity_vs_feature(self, feature: str, min_interactions: int = 1) -> pd.DataFrame:
if feature not in {"minutes", "n_steps", "n_ingredients"}:
raise ValueError("Unsupported feature; choose among 'minutes', 'n_steps', 'n_ingredients'")
agg = self.aggregate()
needed = {feature}
if not needed.issubset(agg.columns):
raise ValueError(f"Missing feature column: {feature}")
subset = agg[[RECIPE_ID_COL, "interaction_count", feature]]
subset = subset.dropna(subset=[feature])
return self._filter_min(subset, min_interactions)
# ------------------ Feature Engineering Methods ------------------ #
[docs]
def create_popularity_segments(self, df: pd.DataFrame = None) -> pd.DataFrame:
"""Create popularity segments based on interaction_count."""
if df is None:
df = self.aggregate()
df = df.copy()
# Calculate percentiles for intelligent thresholds
percentiles = df["interaction_count"].quantile([0.25, 0.50, 0.75, 0.90, 0.95])
# Define segments with data-driven thresholds
def assign_popularity_segment(interaction_count):
if interaction_count <= percentiles[0.25]:
return "Low"
elif interaction_count <= percentiles[0.75]:
return "Medium"
elif interaction_count <= percentiles[0.95]:
return "High"
else:
return "Viral"
df["popularity_segment"] = df["interaction_count"].apply(assign_popularity_segment)
# Add segment statistics
segment_stats = (
df.groupby("popularity_segment")
.agg(
{
"interaction_count": ["count", "mean", "min", "max"],
"avg_rating": ["mean", "std"],
}
)
.round(2)
)
# Store segment info for reporting
self._popularity_segments_info = {
"thresholds": {
"low_max": percentiles[0.25],
"medium_max": percentiles[0.75],
"high_max": percentiles[0.95],
},
"stats": segment_stats,
}
return df
[docs]
def create_recipe_categories(self, df: pd.DataFrame = None) -> pd.DataFrame:
"""Create sophisticated categorization based on recipe characteristics."""
if df is None:
df = self.aggregate()
df = df.copy()
# 1. Complexity categorization (based on steps + ingredients)
if "n_steps" in df.columns and "n_ingredients" in df.columns:
df["complexity_score"] = df["n_steps"] + df["n_ingredients"] * 0.5
complexity_percentiles = df["complexity_score"].quantile([0.33, 0.67])
def assign_complexity(score):
if pd.isna(score):
return "Unknown"
elif score <= complexity_percentiles[0.33]:
return "Simple"
elif score <= complexity_percentiles[0.67]:
return "Moderate"
else:
return "Complex"
df["complexity_category"] = df["complexity_score"].apply(assign_complexity)
# 2. Duration categorization
if "minutes" in df.columns:
def assign_duration(minutes):
if pd.isna(minutes):
return "Unknown"
elif minutes <= 15:
return "Express"
elif minutes <= 45:
return "Normal"
elif minutes <= 120:
return "Long"
else:
return "Marathon"
df["duration_category"] = df["minutes"].apply(assign_duration)
# 3. Efficiency score (rating per minute)
if "avg_rating" in df.columns and "minutes" in df.columns:
# Handle edge cases: use max(minutes, 1) to avoid division by 0 while
# keeping logic correct
df["efficiency_score"] = df["avg_rating"] / df["minutes"].clip(lower=1)
efficiency_percentiles = df["efficiency_score"].quantile([0.25, 0.75])
def assign_efficiency(score):
if pd.isna(score):
return "Unknown"
elif score <= efficiency_percentiles[0.25]:
return "Low Efficiency"
elif score <= efficiency_percentiles[0.75]:
return "Medium Efficiency"
else:
return "High Efficiency"
df["efficiency_category"] = df["efficiency_score"].apply(assign_efficiency)
# 4. Recipe size categorization (based on ingredients)
if "n_ingredients" in df.columns:
def assign_recipe_size(n_ingredients):
if pd.isna(n_ingredients):
return "Unknown"
elif n_ingredients <= 5:
return "Minimal"
elif n_ingredients <= 10:
return "Standard"
elif n_ingredients <= 15:
return "Rich"
else:
return "Elaborate"
df["recipe_size_category"] = df["n_ingredients"].apply(assign_recipe_size)
return df
[docs]
def get_category_insights(self, df: pd.DataFrame = None) -> dict:
"""Get insights and statistics about the created categories."""
if df is None:
df = self.aggregate()
df = self.create_recipe_categories(df)
insights = {}
# Popularity segments analysis
if "popularity_segment" in df.columns:
insights["popularity_segments"] = {
"distribution": df["popularity_segment"].value_counts().to_dict(),
"avg_rating_by_segment": df.groupby("popularity_segment")["avg_rating"].mean().round(2).to_dict(),
"thresholds": getattr(self, "_popularity_segments_info", {}).get("thresholds", {}),
}
# Category correlations
categorical_cols = [col for col in df.columns if "category" in col or "segment" in col]
for cat_col in categorical_cols:
if cat_col in df.columns:
insights[cat_col] = {
"distribution": df[cat_col].value_counts().to_dict(),
"avg_rating_by_category": df.groupby(cat_col)["avg_rating"].mean().round(2).to_dict(),
}
return insights
# Backward compatibility alias (older code may import InteractionsExplorer)
InteractionsExplorer = InteractionsAnalyzer