consecutive_positive_lengths(column)

Calculate the lengths of consecutive positive values in the input column.

:param column: A pandas Series or NumPy array representing a single column. :return: A NumPy array containing the lengths of consecutive positive values.

Source code in wt_ml/dataset/data_utils.py
178
179
180
181
182
183
184
185
186
187
188
189
190
def consecutive_positive_lengths(column: pd.Series | np.ndarray) -> np.ndarray:
    """
    Calculate the lengths of consecutive positive values in the input column.

    :param column: A pandas Series or NumPy array representing a single column.
    :return: A NumPy array containing the lengths of consecutive positive values.
    """
    column = np.asarray(column)
    padded_column = np.concatenate(([0], column, [0]))
    zeros_indices = np.nonzero(padded_column == 0)[0]
    starts = zeros_indices[:-1]
    ends = zeros_indices[1:]
    return ends - starts

get_expected_yearly_aos_vehicle_totals()

Read the yearly_aos_vehicle_totals data from cached folder and extract it into dataframe

Source code in wt_ml/dataset/data_utils.py
207
208
209
210
211
212
213
214
215
216
217
218
def get_expected_yearly_aos_vehicle_totals() -> pd.DataFrame:
    """Read the yearly_aos_vehicle_totals data from cached folder and extract it into dataframe"""
    data_dir = BASE_DIR_PATH / "data"
    expected_veh_spend_path = data_dir / "cached" / "expected_vehicle_spends.csv"
    expected_veh_spend_mapping_path = data_dir / "cached" / "expected_vehicle_mapping.csv"
    expected_vehicle_spends = pd.read_csv(expected_veh_spend_path)
    expected_vehicle_mapping = pd.read_csv(expected_veh_spend_mapping_path)
    expected_vehicle_spends["veh_names"] = expected_vehicle_spends["Row Labels"].map(
        expected_vehicle_mapping.set_index("Business Mapping")["Vehicle to map"]
    )
    expected_vehicle_spends = expected_vehicle_spends.drop("Row Labels", axis=1).dropna()
    return expected_vehicle_spends

mean_positive_sequence_length(column)

Calculate the mean length of consecutive positive values in the input column.

:param column: A pandas Series or NumPy array representing a single column. :return: The mean length of consecutive positive values as a float.

Source code in wt_ml/dataset/data_utils.py
193
194
195
196
197
198
199
200
201
202
203
204
def mean_positive_sequence_length(column: pd.Series | np.ndarray) -> float:
    """
    Calculate the mean length of consecutive positive values in the input column.

    :param column: A pandas Series or NumPy array representing a single column.
    :return: The mean length of consecutive positive values as a float.
    """
    lengths = consecutive_positive_lengths(column)
    non_zero_lengths = lengths[lengths > 0]
    if len(non_zero_lengths) == 0:
        return 0
    return np.mean(non_zero_lengths)

revenue_spread_national_media_across_geos(lnm_df, all_revenue, all_wholesaler_brand_df, geom_mean=True, rev_freq='Y')

Take national media investments at week x brand x vehicle level, and spread them geographically according to yearly/monthly/weekly, brand x wholesaler revenue. Finally, ensure that the total national investments at week x brand x vehicle level is not changed

Parameters:

Name Type Description Default
lnm_df DataFrame

dataframe consisting of media broken down as local, national or other media

required
all_revenue DataFrame

revenue for all the wholesaler, brands combined in a single dataframe

required
all_wholesaler_brand_df DataFrame

investments for all the wholesaler, brands combined in a single dataframe

required
geom_mean bool

Whether to take geometric mean of revenue based spreading and the as is population based spreading. Defaults to True.

True
rev_freq RevAggUnit

Temporal aggregation of revenue for spreading. Can be yearly (Y), monthly (M) or weekly (W). Defaults to "Y".

'Y'

Returns:

Type Description
DataFrame

pd.DataFrame: dataframe with national media investments spreaded geographically

Source code in wt_ml/dataset/data_utils.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def revenue_spread_national_media_across_geos(
    lnm_df: pd.DataFrame,
    all_revenue: pd.DataFrame,
    all_wholesaler_brand_df: pd.DataFrame,
    geom_mean: bool = True,
    rev_freq: RevAggUnit = "Y",
) -> pd.DataFrame:
    """Take national media investments at week x brand x vehicle level,
    and spread them geographically according to yearly/monthly/weekly, brand x wholesaler revenue.
    Finally, ensure that the total national investments at week x brand x vehicle level is not changed

    Args:
        lnm_df (pd.DataFrame): dataframe consisting of media broken down as local, national or other media
        all_revenue (pd.DataFrame): revenue for all the wholesaler, brands combined in a single dataframe
        all_wholesaler_brand_df (pd.DataFrame): investments for all the wholesaler, brands combined
                                                in a single dataframe
        geom_mean (bool, optional): Whether to take geometric mean of revenue based spreading and
                                    the as is population based spreading. Defaults to True.
        rev_freq (RevAggUnit, optional): Temporal aggregation of revenue for spreading.
                                         Can be yearly (Y), monthly (M) or weekly (W).
                                         Defaults to "Y".

    Returns:
        pd.DataFrame: dataframe with national media investments spreaded geographically
    """
    # extract the vehicles corresponding to national_media
    vehicle_spends_df = all_wholesaler_brand_df.reorder_levels(
        ["wholesaler", "brand_code", "product_code", "signals"], axis=1
    ).sort_index(axis=1)
    national_media_vehicles = lnm_df[lnm_df.parent_signal == "national_media"]["signal"].tolist()
    national_media_or_not = vehicle_spends_df.columns.get_level_values("signals").isin(national_media_vehicles)
    national_media = vehicle_spends_df.loc[:, national_media_or_not]

    total_country_media = national_media.groupby(level=["brand_code", "signals"], axis=1).transform(sum)
    total_sales_brand_wslr_rev_freq = (
        all_revenue.resample(rev_freq).transform(sum).groupby(level=["brand_code", "wholesaler"], axis=1).transform(sum)
    )
    total_sales_brand_wslr_rev_freq = total_sales_brand_wslr_rev_freq * (all_revenue > 0)
    inv_allocation_raw = total_country_media.mul(total_sales_brand_wslr_rev_freq, axis=1)

    # take geometric mean of the new investment allocation with the existing allocation
    inv_allocation = np.sqrt(inv_allocation_raw * national_media) if geom_mean else inv_allocation_raw

    modified_total_country_media = inv_allocation.groupby(level=["brand_code", "signals"], axis=1).transform(sum)
    inv_spreaded = (inv_allocation * total_country_media) / (modified_total_country_media + 1e-40)
    vehicle_spends_df.loc[:, national_media_or_not] = inv_spreaded.values
    return vehicle_spends_df

surrounding_rolling_average(series, weeks_surrounding)

Caculate centered rolling average excluding the current point

Source code in wt_ml/dataset/data_utils.py
221
222
223
224
225
def surrounding_rolling_average(series: pd.Series, weeks_surrounding: int):
    """Caculate centered rolling average excluding the current point"""
    return (series.rolling(weeks_surrounding + 1, min_periods=0, center=True).sum() - series) / (
        series.shift(-1).rolling(weeks_surrounding + 1, min_periods=0, center=True).count() - 1
    )