260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347 | def get_music_festivals_hack(paths: list[Path]) -> pd.DataFrame:
"""Shift music festival spends from problematic wslrs to other wslrs based on their
national TV spend(as a proxy for propulation spread).
Valid wholesalers to receive shifted spends are:
1) Wholesalers with non-zero national TV spends which is used a weight.
2) Wholesalers with non-zero music festival spends.
3) Wholesalers who are not part of problematic wslrs
Args:
paths (list[Path]): Wholesaler data paths
Returns:
pd.DataFrame: Music festival spends to be updated.
"""
# These wslrs have extreme spikes in music festivals
problematic_wslrs = [
"HENSLEY CO PHOENIX (10261)",
"HENSLEY CO CHANDLER (10275)",
"HENSLEY CO PRESCOTT (10285)",
]
# Use these brands to get approximate population spread of wholesalers
top_brands = ["BUD", "MUL", "BDL"]
# These are the weeks with extreme spikes in music festivals
weeks_to_filter = [
"2023-01-07",
"2023-01-14",
"2023-01-21",
"2023-01-28",
"2023-02-04",
"2023-02-11",
"2023-02-18",
"2023-02-25",
]
columns_to_read = ["week_date", "brand_code", "product_code", "league_Music_Festivals"]
pop_spread = {}
filtered_rows = []
for i, current_path in enumerate(paths):
wholesaler = current_path.parent.name.split("=")[-1]
if wholesaler == "__HIVE_DEFAULT_PARTITION__":
continue
df = pd.read_parquet(current_path)
df["week_date"] = pd.to_datetime(df["week_date"])
df.rename(columns={"brand_cluster_code": "brand_code"}, inplace=True)
if "product_code" not in df.columns:
df["product_code"] = "NA"
rows_to_fix = df[df.week_date.isin(weeks_to_filter)][columns_to_read]
rows_to_fix["wholesaler"] = wholesaler
filtered_rows.append(rows_to_fix)
if wholesaler not in problematic_wslrs:
# Use data since 2023 to get approximate population spread of wholesalers using national TV spend
df = df[(df.week_date >= "2023-01-01") & (df.brand_code.isin(top_brands))]
pop_spread[wholesaler] = df["vehicle_NATIONAL_TV"].sum()
pop_spread = pd.DataFrame(list(pop_spread.items()), columns=["wholesaler", "weight"])
pop_spread["weight"] = pop_spread["weight"] / pop_spread["weight"].sum()
filtered_rows = pd.concat(filtered_rows)
filtered_rows["issue_wlsr"] = filtered_rows["wholesaler"].isin(problematic_wslrs)
filtered_rows = filtered_rows.merge(pop_spread, on="wholesaler", how="left")
# Update music festival spends only when there is non-zero value.
filtered_rows["weight"] = filtered_rows["weight"] * (filtered_rows["league_Music_Festivals"] > 0)
music_festival_updates = []
for week in filtered_rows.week_date.unique():
for brand in filtered_rows.brand_code.unique():
df = filtered_rows[(filtered_rows["week_date"] == week) & (filtered_rows["brand_code"] == brand)]
# Spend to distribute for the week-brand - 2/3 of the spike's spend
issue_spend = df[df.issue_wlsr]["league_Music_Festivals"].sum() * (2 / 3)
df["weight"] = df["weight"] / df["weight"].sum()
if df[~df.issue_wlsr]["weight"].isna().all() or issue_spend == 0:
# Skip if no valid wslrs to distribute the spend
continue
spends_to_add = df["weight"] * issue_spend
spends_to_remove = df["issue_wlsr"] * df["league_Music_Festivals"] * (-2 / 3)
df["league_Music_Festivals"] = reduce(
lambda x, y: x.add(y, fill_value=0), [df["league_Music_Festivals"], spends_to_add, spends_to_remove]
)
music_festival_updates.append(
df[["week_date", "brand_code", "product_code", "wholesaler", "league_Music_Festivals"]]
)
if len(music_festival_updates) == 0:
return None
music_festival_updates = pd.concat(music_festival_updates).set_index(
["wholesaler", "product_code", "brand_code", "week_date"]
)
return music_festival_updates
|