check_media_mapping(dataloader)

Validates media vehicles to their parent mappings across parquet, json and hdf5

Source code in wt_ml/dataset/data_validator/checks/check_media_mapping.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def check_media_mapping(dataloader: DataLoader) -> DataStatus:
    """Validates media vehicles to their parent mappings across parquet, json and hdf5"""
    statuses: list[StatusType] = []
    messages: list[str] = []

    vehicle_parquet = pd.read_parquet(list((dataloader.parquet_dir / "vehicle_mapping").glob("*.parquet"))[0])
    vehicle_json = pd.read_json(
        dataloader.json_dir / "vehicle_mapping" / "vehicle_mapping.json", orient="split"
    ).reset_index(names="vehicle")[["parent_vehicle", "vehicle", "sub_type"]]
    media_vehicles_from_json = set(vehicle_json.vehicle)
    media_vehicles_from_hdf5 = set(
        [k for k in dataloader.encodings["vehicle"].keys() if k.startswith("vehicle_") or k.startswith("team_")]
    )
    if vehicle_parquet.equals(vehicle_json):
        statuses.append(StatusType.PASS)
    else:
        mismatch_data = (
            pd.MultiIndex.from_frame(vehicle_json)
            .symmetric_difference(pd.MultiIndex.from_frame(vehicle_parquet))
            .values
        )
        statuses.append(StatusType.FAIL)
        messages.append(f"Following data mismatch between parquet and json in vehicle map : {mismatch_data}")
    if media_vehicles_from_hdf5.issubset(media_vehicles_from_json):
        statuses.append(StatusType.PASS)
    else:
        mismatch = list(media_vehicles_from_hdf5.difference(media_vehicles_from_json))
        statuses.append(StatusType.FAIL)
        messages.append(f"Following extra vehicles are in hdf5 and not in json for media map: {mismatch}")
    return DataStatus(
        status=StatusType.PASS if all(status == StatusType.PASS for status in statuses) else StatusType.FAIL,
        message="\n".join(messages),
    )