get_curve_deviations(gt_model, dataset, spend_points=None, dynamic_range=False, **kwargs)

Calculates the ROI Curve deviations for select spend points at different aggregation levels

Source code in wt_ml/tuning/deviation_metrics.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_curve_deviations(
    gt_model: EconomicNetwork,
    dataset: EconomicDataset,
    spend_points: list[float] | None = None,
    dynamic_range: bool | float = False,
    **kwargs,
):
    """Calculates the ROI Curve deviations for select spend points at different aggregation levels"""
    grouping_options = ["parent_signal", "signal", "brand", "wholesaler", "state"]
    parent_child_mapping = {"parent_signal": "signal", "state": "wholesaler"}
    select_curves = []
    for batch in dataset:
        intermediaries = make_curve_tracker(
            gt_model, gt_model.impacts_layer.roicurve.curve_combiner, batch, 0, **kwargs
        )()

        curve_df = OutputCurve(curve_values=intermediaries, encodings=dataset.encodings, dynamic_range=dynamic_range).df
        curve_df = curve_df[0] if dynamic_range else curve_df
        curve_df = filter_spend_points(curve_df, "sales", spend_points)
        parent_values = curve_df.columns.get_level_values("signal").map(dataset.encodings["parent_vehicle_lookup"])
        add_col_level(curve_df, parent_values, "parent_signal", axis=1)
        select_curves.append(curve_df)
    select_curves = pd.concat(select_curves, axis=1)
    return pd.DataFrame(
        {level: level_deviations(select_curves, level, parent_child_mapping) for level in grouping_options}
    )

get_me_deviations(gt_model, dataset, layer_type, spend_points=None, **kwargs)

Calculates the Mixed Effect deviations for select spend points at different aggregation levels

Source code in wt_ml/tuning/deviation_metrics.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def get_me_deviations(
    gt_model: EconomicNetwork,
    dataset: EconomicDataset,
    layer_type: str,
    spend_points: list[float] | None = None,
    **kwargs,
) -> pd.DataFrame:
    """Calculates the Mixed Effect deviations for select spend points at different aggregation levels"""
    grouping_options = ["signal", "brand", "wholesaler", "state"]
    parent_child_lookup = {"state": "wholesaler"}
    parent_lookup = None
    # TODO (@rohith-abi): Try to simplify this and the add_curve_trackers together to avoid duplication.
    match layer_type:
        case "global_me":
            layer = gt_model.impacts_layer.global_me
            signals_attr = "global_signals"
            parent_lookup = dataset.encodings["global_parent_lookup"]
            grouping_options.append("parent_signal")
            parent_child_lookup |= {"parent_signal": "signal"}
        case "weather_me":
            layer = gt_model.impacts_layer.weather_me
            signals_attr = "weather_signals"
        case "distribution":
            layer = gt_model.impacts_layer.distribution_layer
            signals_attr = "distributions"
            kwargs = {"radius": 0.99, "center": 1.0} | kwargs
        case "holiday_me":
            layer = gt_model.impacts_layer.holiday_me
            signals_attr = "holiday_signals"
            kwargs = {"radius": 0.5, "center": 0.5} | kwargs
        case "price_me":
            layer = gt_model.impacts_layer.pricing_lead_lag_me
            signals_attr = "price_devs"
            kwargs = {"radius": 0.25} | kwargs
        case "price_ratio":
            layer = gt_model.impacts_layer.price_ratio
            signals_attr = "price_ratios"
            kwargs = {"radius": 0.25} | kwargs
        case _:
            raise KeyError(f"{layer_type} is not a valid layer type.")

    batch_curves = []
    for batch in dataset:
        intermediaries = make_me_tracker(gt_model, getattr(batch, signals_attr), batch, layer, **kwargs)()
        me_df = OutputMixedEffect(curve_values=intermediaries, encodings=dataset.encodings).df
        me_df = filter_spend_points(me_df, "impact", spend_points)
        if parent_lookup:
            parent_values = me_df.columns.get_level_values("signal").map(parent_lookup)
            add_col_level(me_df, parent_values, "parent_signal", axis=1)
        batch_curves.append(me_df)
    me_df = pd.concat(batch_curves, axis=1)
    return pd.DataFrame({level: level_deviations(me_df, level, parent_child_lookup) for level in grouping_options})

get_price_elasticity_deviations(gt_model, dataset, spend_points=None, **kwargs)

Calculates the Price Elasticity deviations for select spend points at different aggregation levels

Source code in wt_ml/tuning/deviation_metrics.py
121
122
123
124
125
126
127
128
129
130
131
132
def get_price_elasticity_deviations(gt_model, dataset, spend_points=None, **kwargs):
    """Calculates the Price Elasticity deviations for select spend points at different aggregation levels"""
    grouping_options = ["signal", "brand", "wholesaler", "state"]
    parent_child_lookup = {"state": "wholesaler"}
    batch_curves = []
    for batch in dataset:
        intermediaries = make_price_tracker(gt_model, batch, **kwargs)()
        price_df = OutputPriceElasticity(price_values=intermediaries, encodings=dataset.encodings).df
        price_df = filter_spend_points(price_df, "impact", spend_points)
        batch_curves.append(price_df)
    price_df = pd.concat(batch_curves, axis=1)
    return pd.DataFrame({level: level_deviations(price_df, level, parent_child_lookup) for level in grouping_options})

level_deviations(curve_df, level, parent_child_mapping={})

Calculate the average standard deviation of the curve points at a given level

Source code in wt_ml/tuning/deviation_metrics.py
26
27
28
29
30
31
32
33
34
35
36
def level_deviations(curve_df: pd.DataFrame, level: str, parent_child_mapping: dict[str, str] = {}) -> pd.Series:
    """Calculate the average standard deviation of the curve points at a given level"""
    groupby_levels = [lev for lev in curve_df.columns.names if lev != level]
    if level in parent_child_mapping.keys():
        # Remove child in parent aggregation
        groupby_levels.remove(parent_child_mapping[level])
        curve_df = curve_df.groupby(groupby_levels + [level], axis=1).mean()
    elif level in parent_child_mapping.values():
        # Standardize child values within parent level
        curve_df = curve_df.groupby(groupby_levels, axis=1).transform(lambda x: x - x.mean())
    return curve_df.groupby(groupby_levels, axis=1).std().mean(axis=1)