Validates brand_code to product mapping across parquet, json and hdf5
Source code in wt_ml/dataset/data_validator/checks/check_brand_prod_mapping.py
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 | def check_brand_prod_mapping(dataloader: DataLoader) -> DataStatus:
"""Validates brand_code to product mapping across parquet, json and hdf5"""
statuses: list[StatusType] = []
messages: list[str] = []
prod_brand_parquet = pd.read_parquet(
list((dataloader.parquet_dir / "product_brand_mapping").glob("*.parquet"))[0]
).rename(columns={"brand_cluster_code": "brand_code"})
prod_brand_json = pd.read_json(
dataloader.json_dir / "product_brand_mapping" / "product_brand_mapping.json", orient="split"
)
mapping_from_json = (
prod_brand_json[["brand_code", "brand_cluster_name"]]
.drop_duplicates()
.set_index("brand_code")
.to_dict()["brand_cluster_name"]
)
mapping_from_hdf5 = dataloader.encodings["brand_code_name_lookup"]
if prod_brand_parquet.equals(prod_brand_json):
statuses.append(StatusType.PASS)
else:
mismatch_data = (
pd.MultiIndex.from_frame(prod_brand_json)
.symmetric_difference(pd.MultiIndex.from_frame(prod_brand_parquet))
.values
)
statuses.append(StatusType.FAIL)
messages.append(f"Following data mismatch between parquet and json in brand-product map : {mismatch_data}")
if mapping_from_json == mapping_from_hdf5:
statuses.append(StatusType.PASS)
else:
mismatch = set(mapping_from_hdf5.items()).symmetric_difference(set(mapping_from_json.items()))
statuses.append(StatusType.FAIL)
messages.append(f"Following brand to brand cluster mapping mismatch between hdf5 and json: {mismatch}")
return DataStatus(
status=StatusType.PASS if all(status == StatusType.PASS for status in statuses) else StatusType.FAIL,
message="\n".join(messages),
)
|