aggregate_impact_df(impacts_df, agg_levels=None, prepare_for_viz=True, **level_subset)

function for aggregating the impact df at any level

Parameters:

Name Type Description Default
impacts_df DataFrame

Dataframe containing impacts from the model

required
agg_levels list[str] | str | None

Hierarchical level(s) to aggregate over. If not specified, full country aggregation is computed.

None
prepare_for_viz bool

description. Flattens columns in order to visualize. Defaults to True.

True
**level_subset dict[str, str | list[str]]

all kwargs are used to subset the dataset. For example brand=["A","B"] will select only those brands before any aggregation

{}

Raises:

Type Description
KeyError

error when specified subsets do not exist in the data

Returns:

Type Description
DataFrame

pd.DataFrame: transformed dataframe after all specified aggregations and subsets

Source code in wt_ml/tabulation/tabulation_utils.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def aggregate_impact_df(
    impacts_df: pd.DataFrame,
    agg_levels: list[str] | str | None = None,
    prepare_for_viz=True,
    **level_subset: dict[str, str | list[str]],
) -> pd.DataFrame:
    """function for aggregating the impact df at any level

    Args:
        impacts_df (pd.DataFrame): Dataframe containing impacts from the model
        agg_levels (list[str] | str | None, optional): Hierarchical level(s) to aggregate over. If not specified, full
                                                        country aggregation is computed.
        prepare_for_viz (bool, optional): _description_. Flattens columns in order to visualize. Defaults to True.
        **level_subset: all kwargs are used to subset the dataset. For example `brand=["A","B"]` will select only those
                        brands before any aggregation

    Raises:
        KeyError: error when specified subsets do not exist in the data

    Returns:
        pd.DataFrame: transformed dataframe after all specified aggregations and subsets
    """
    # make sure inputs are valid
    if agg_levels is None:
        agg_levels = []
    if isinstance(agg_levels, str):
        agg_levels = [agg_levels]
    assert all(
        level in impacts_df.columns.names for level in agg_levels
    ), "items in agg_levels must be level names of impacts_df.columns"
    assert "signal" not in agg_levels, "this function does not allow aggregation over the signal axis"
    assert all(
        level in impacts_df.columns.names for level, _ in level_subset.items()
    ), "keys for **level_subset must be level names of impacts_df.columns"
    # if we have multiple dataframes, call the function on each dataframe
    if isinstance(impacts_df, list):
        return [aggregate_impact_df(df, *agg_levels, **level_subset) for df in impacts_df]
    # attempt to slice into the specified level subsets, and raise keyerror if not possible
    column_slicer = tuple(level_subset.get(level_name, slice(None)) for level_name in impacts_df.columns.names)
    try:
        impacts_df = impacts_df.loc[:, column_slicer]
    except KeyError:
        raise KeyError(
            f"{level_subset} is not a valid subset because data doesn't exist at some intersection of these levels"
        )
    # aggregate to the specified level
    labels = impacts_df.columns.unique("signal").tolist()
    impacts_df = impacts_df.groupby(level=[*agg_levels, "signal"], axis=1).sum()
    if len(impacts_df.columns) > 0:
        impacts_df = impacts_df.reindex(labels=labels, level="signal", axis=1)
    else:
        logger.warning("Filters resulted in an empty dataframe.")
        return impacts_df

    if not prepare_for_viz:
        return impacts_df
    # the way visualization works is it assumes columns as a multiindex with first level="granularity"
    # and second level="signal" so we create reasonable naming for titles and flatten the levels accordingly
    agged_subsets = {k: v for k, v in level_subset.items() if k not in agg_levels}

    if len(agg_levels) == 0 and len(agged_subsets) == 0:
        key = "country=USA"
    else:
        key = " & ".join(f"{k}={v}" for k, v in agged_subsets.items())
    if len(impacts_df.columns.names) > 1:
        impacts_df.columns = impacts_df.columns.map(
            lambda x: tuple(
                f"{impacts_df.columns.names[i]}={v}" if i != impacts_df.columns.names.index("signal") else v
                for i, v in enumerate(x)
            )
        )
    impacts_df = pd.concat({key: impacts_df}, axis=1, names=["subset", *impacts_df.columns.names])
    granularity = impacts_df.columns.droplevel("signal")
    if isinstance(granularity, pd.MultiIndex):
        granularity = granularity.map(" & ".join)
    signals = impacts_df.columns.get_level_values("signal")
    impacts_df.columns = pd.MultiIndex.from_arrays([granularity, signals], names=["granularity", "signal"])

    return impacts_df

get_all_inc_revenues_df(dataset_factory, encodings, gt_model, denormalize=False, total_impact_from_date=False)

Utility function for getting impacts from investments for entire data

Parameters:

Name Type Description Default
dataset_factory DataFactoryType

generator function for iterating over batches of data.

required
encodings dict

mapping from names to indices for wholesalers, brands, products, vehicles and so on.

required
gt_model EconomicNetwork

trained ground truth model to be used for inference.

required
denormalize bool

Denormalize the results or not. Defaults to False.

False
total_impact_from_date bool

Whether to calculate the total impacts of today's spend alone or to take the impacts of all the spends made until today. Defaults to False. Must for True when calculating ROIs.

False

Returns:

Type Description

pd.DataFrame: impacts from media or promotional investments for entire data

Source code in wt_ml/tabulation/tabulation_utils.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def get_all_inc_revenues_df(dataset_factory, encodings, gt_model, denormalize=False, total_impact_from_date=False):
    """Utility function for getting impacts from investments for entire data

    Args:
        dataset_factory (DataFactoryType): generator function for iterating over
                                                                      batches of data.
        encodings (dict): mapping from names to indices for wholesalers, brands, products, vehicles and so on.
        gt_model (EconomicNetwork): trained ground truth model to be used for inference.
        denormalize (bool, optional): Denormalize the results or not. Defaults to False.
        total_impact_from_date (bool, optional): Whether to calculate the total impacts of today's spend alone or
                                                 to take the impacts of all the spends made until today.
                                                 Defaults to False. Must for True when calculating ROIs.

    Returns:
        pd.DataFrame: impacts from media or promotional investments for entire data
    """
    all_impacts_df = get_entire_impacts_df(
        dataset_factory, encodings, gt_model, denormalize=denormalize, total_impact_from_date=total_impact_from_date
    )
    all_signal_cols = all_impacts_df.columns.get_level_values("signal").unique().tolist()
    investment_signal_col_subs = ["league_", "team_", "vehicle_", "other_", "coupons"]
    investment_signal_cols = [
        col for col in all_signal_cols if any(col.startswith(sub) for sub in investment_signal_col_subs)
    ]
    all_inc_revs_df = all_impacts_df.loc[
        :, all_impacts_df.columns.get_level_values("signal").isin(investment_signal_cols)
    ]
    return all_inc_revs_df

get_all_roicurves_df(dataset_factory, encodings, gt_model, levers, denormalize=False)

Utility function for getting diminishing return curves dataframe for entire data

Parameters:

Name Type Description Default
dataset_factory DataFactoryType

generator function for iterating over batches of data.

required
encodings dict

mapping from names to indices for wholesalers, brands, products, vehicles and so on.

required
gt_model EconomicNetwork

trained ground truth model to be used for inference.

required
levers list

list of commerical levers. Can be either all media-vehicles or promo-vehicles currently.

required
curve_type str

Nature of ROICurve. Can be "invest" or "promo". Defaults to "invest".

required
denormalize bool

Denormalize the results or not. Defaults to False.

False

Returns:

Type Description

pd.DataFrame: diminishing return curves dataframe for entire data

Source code in wt_ml/tabulation/tabulation_utils.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_all_roicurves_df(
    dataset_factory: "DataFactoryType",
    encodings: dict,
    gt_model: "EconomicNetwork",
    levers: list[str],
    denormalize=False,
):
    """Utility function for getting diminishing return curves dataframe for entire data

    Args:
        dataset_factory (DataFactoryType): generator function for iterating over
                                                                      batches of data.
        encodings (dict): mapping from names to indices for wholesalers, brands, products, vehicles and so on.
        gt_model (EconomicNetwork): trained ground truth model to be used for inference.
        levers (list): list of commerical levers. Can be either all media-vehicles or promo-vehicles currently.
        curve_type (str, optional): Nature of ROICurve. Can be "invest" or "promo". Defaults to "invest".
        denormalize (bool, optional): Denormalize the results or not. Defaults to False.

    Returns:
        pd.DataFrame: diminishing return curves dataframe for entire data
    """
    normalization_factor = encodings["normalization_factor"] if denormalize else 1
    wholesalers = get_lookups(encodings["wholesaler"])
    brands = get_lookups(encodings["brand"])
    products = get_lookups(encodings["product"])
    wholesaler_state_lookup = encodings["wholesaler_state_lookup"]
    signals = ["inc_revenue", "spend"]
    dataset_gen = dataset_factory()
    all_roicurves = []
    for batch in dataset_gen:
        roicurves_obj = make_roi_tracker(
            net=gt_model, batch=batch, max_val=12, spacing=AxisSpacing.LINEAR, desired_num_points=200
        )()
        granularity_names = [
            [wholesaler_state_lookup[wholesalers[w]], wholesalers[w], brands[b], products[p]]
            for w, b, p in zip(
                roicurves_obj.input.wholesaler_index.numpy().tolist(),
                roicurves_obj.input.brand_index.numpy().tolist(),
                roicurves_obj.input.product_index.numpy().tolist(),
            )
        ]
        roicurves = np.stack([roicurves_obj.impact, roicurves_obj.spends], axis=0)
        col_names = [
            (
                signal,
                lever,
                *granularity,
            )
            for signal in signals
            for lever in levers
            for granularity in granularity_names
        ]
        transposed = np.transpose(roicurves, [2, 0, 3, 1])
        col_levels = ["type", "signal"] + list(GRANULARITY_OPTIONS)
        batch_roicurves = pd.DataFrame(
            transposed.reshape(roicurves.shape[2], -1),
            columns=pd.MultiIndex.from_tuples(col_names, names=col_levels),
        )
        all_roicurves.append(batch_roicurves)

    all_roicurves_df = pd.concat(all_roicurves, axis=1).sort_index(axis=1)
    return all_roicurves_df * normalization_factor

get_entire_impacts_df(dataset_factory, encodings, gt_model, separate_decay=False, merge_granularities=False, total_impact_from_date=False, only_impacts=True, denormalize=False, training=False, collapse_lead_lag=None, apply_sales_mask=True)

Utility function for getting model predicted impacts for entire data

Parameters:

Name Type Description Default
dataset_factory DataFactoryType

generator function for iterating over batches of data.

required
encodings dict

mapping from names to indices for wholesalers, brands, products, vehicles and so on.

required
gt_model EconomicNetwork

trained ground truth model to be used for inference.

required
separate_decay bool

Whether to report separate impact columns for betagamma decay or not. Defaults to False.

False
merge_granularities bool

Represent the wholesaler,product-packs as a single level or a multi-index. Defaults to False.

False
total_impact_from_date bool

Whether to use cumulative decay till date or today's decay on future as on today. Defaults to False.

False
only_impacts bool

Whether to get only model predicted impacts or impacts with epochs and yhat. Defaults to True.

True
denormalize bool

Denormalize the results or not. Defaults to False.

False
training bool

If True, query the model in training mode, otherwise not. Defaults to False.

False

Returns: pd.DataFrame: model learnt impacts dataframe on entire data

Source code in wt_ml/tabulation/tabulation_utils.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def get_entire_impacts_df(
    dataset_factory: "DataFactoryType",
    encodings: dict,
    gt_model: "EconomicNetwork",
    separate_decay=False,
    merge_granularities=False,
    total_impact_from_date=False,
    only_impacts=True,
    denormalize=False,
    training=False,
    collapse_lead_lag=None,
    apply_sales_mask=True,
):
    """Utility function for getting model predicted impacts for entire data

    Args:
        dataset_factory (DataFactoryType): generator function for iterating over
                                                                      batches of data.
        encodings (dict): mapping from names to indices for wholesalers, brands, products, vehicles and so on.
        gt_model (EconomicNetwork): trained ground truth model to be used for inference.
        separate_decay (bool, optional): Whether to report separate impact columns for betagamma decay or not.
                                         Defaults to False.
        merge_granularities (bool, optional): Represent the wholesaler,product-packs as a single level or a multi-index.
                                              Defaults to False.
        total_impact_from_date (bool, optional): Whether to use cumulative decay till date or
                                                 today's decay on future as on today.
                                                 Defaults to False.
        only_impacts (bool, optional): Whether to get only model predicted impacts or impacts with epochs and yhat.
                                       Defaults to True.
        denormalize (bool, optional): Denormalize the results or not. Defaults to False.
        training (bool, optional): If True, query the model in training mode, otherwise not. Defaults to False.
    Returns:
        pd.DataFrame: model learnt impacts dataframe on entire data
    """
    # this function calls get_impacts_df, which always normalizes. So if we want to denormalize,
    # we don't need anything. But if we want in the normalized space, we need normalization factor
    normalization_factor = encodings["normalization_factor"] if not denormalize else 1
    all_batch_intermediaries = []
    dataset_gen = dataset_factory()
    for batch in dataset_gen:
        batch_intermediaries = gt_model(batch, training=training)[gt_model.NetTypes[0].__name__.lower()]
        all_batch_intermediaries.append(batch_intermediaries)

    all_impacts_df, all_epochs_df, all_ys_df = OutputImpact(
        encodings=encodings,
        intermediaries=all_batch_intermediaries,
        separate_decay=separate_decay,
        separate_dfs=True,
        total_impact_from_date=total_impact_from_date,
        collapse_lead_lag=collapse_lead_lag,
        apply_sales_mask=apply_sales_mask,
        combine_granularities_flag=merge_granularities,
    ).df

    all_impacts_df = all_impacts_df.sort_index(axis=1) / normalization_factor
    if only_impacts:
        return all_impacts_df
    else:
        all_epochs_df = all_epochs_df.sort_index(axis=1)
        all_ys_df = all_ys_df.sort_index(axis=1) / normalization_factor
        all_df = pd.concat([all_impacts_df, all_epochs_df, all_ys_df], axis=1).sort_index(axis=1)
        return all_df

stack_df_and_rename_col(df, col_name)

Utility function for getting unpivoted view of the dataframe

Parameters:

Name Type Description Default
df DataFrame

dataframe to unpivot

required
col_name str

name of the column containing values of df

required

Returns:

Type Description

pd.DataFrame: unpivoted view of the dataframe

Source code in wt_ml/tabulation/tabulation_utils.py
176
177
178
179
180
181
182
183
184
185
186
187
188
def stack_df_and_rename_col(df, col_name):
    """Utility function for getting unpivoted view of the dataframe

    Args:
        df (pd.DataFrame): dataframe to unpivot
        col_name (str): name of the column containing values of df

    Returns:
        pd.DataFrame: unpivoted view of the dataframe
    """
    stacked_df = df.unstack().reset_index().rename(columns={0: col_name, "time": "date"})
    stacked_df = stacked_df.reindex(columns=["date", *GRANULARITY_OPTIONS, "signal", col_name])
    return stacked_df