check_weather(dataloader)

Validates shape, columns and data for weather across parquet, json and hdf5 for all wholesalers

Source code in wt_ml/dataset/data_validator/checks/check_weather.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def check_weather(dataloader: DataLoader) -> DataStatus:
    """Validates shape, columns and data for weather across parquet, json and hdf5 for all wholesalers"""
    data = dataloader.weather
    statuses: list[StatusType] = []
    messages: list[str] = []
    hdf5_wholesalers = data.hdf5.wholesaler.unique()
    hdf5_columns = data.hdf5.columns
    filtered_json = data.json.loc[
        (data.json["wholesaler"].isin(hdf5_wholesalers)) & (data.json["date"].isin(dataloader.date_idx)), hdf5_columns
    ]
    filtered_json = filtered_json.sort_index(axis=1).sort_values(by=["wholesaler", "date"])

    filtered_hdf5 = (
        data.hdf5.loc[data.hdf5["date"].isin(dataloader.date_idx)]
        .sort_index(axis=1)
        .sort_values(by=["wholesaler", "date"])
    )
    if data.parquet.shape == data.json.shape:
        statuses.append(StatusType.PASS)
    else:
        statuses.append(StatusType.FAIL)
        messages.append(
            f"Shapes mismatch between parquet {data.parquet.shape} and json {data.json.shape} in weather data"
        )
    if all(data.hdf5.columns.isin(data.json.columns)):
        statuses.append(StatusType.PASS)
    else:
        statuses.append(StatusType.FAIL)
        messages.append(
            f"Following columns mismatch between hdf5 and json: {data.hdf5.columns.difference(data.json.columns)}"
        )
    if np.isin(data.hdf5.wholesaler.unique(), data.json.wholesaler.unique()).all():
        statuses.append(StatusType.PASS)
    else:
        statuses.append(StatusType.FAIL)
        messages.append(
            f"Following wholesalers mismatch between hdf5 and "
            f"json: {set(data.hdf5.wholesaler).difference(data.json.wholesaler)}"
        )
    if filtered_hdf5.select_dtypes([int, float]).shape == filtered_json.select_dtypes([int, float]).shape:
        if np.allclose(
            filtered_hdf5.select_dtypes([int, float]).values,
            filtered_json.select_dtypes([int, float]).values,
            atol=ABS_TOLERANCE,
        ):
            statuses.append(StatusType.PASS)
        else:
            statuses.append(StatusType.FAIL)
            messages.append("Data mismatch for hdf5 and json in weather data")
    else:
        statuses.append(StatusType.FAIL)
        messages.append("Shape mismatch for hdf5 and json in weather data")
    return DataStatus(
        status=StatusType.PASS if all(status == StatusType.PASS for status in statuses) else StatusType.FAIL,
        message="\n".join(messages),
    )