Validates shape, columns and data for economic effects (consumer price index, unemployment rate)
across parquet, json and hdf5
Source code in wt_ml/dataset/data_validator/checks/check_econ_effects.py
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 | def check_econ(dataloader: DataLoader) -> DataStatus:
"""Validates shape, columns and data for economic effects (consumer price index, unemployment rate)
across parquet, json and hdf5"""
data = dataloader.econ_effects
statuses: list[StatusType] = []
messages: list[str] = []
hdf5_columns = data.hdf5.columns
filtered_json = (
data.json.loc[data.json["date"].isin(dataloader.date_idx), hdf5_columns]
.sort_index(axis=1)
.sort_values(by=["date"])
)
filtered_hdf5 = (
data.hdf5.loc[data.hdf5["date"].isin(dataloader.date_idx)].sort_index(axis=1).sort_values(by=["date"])
)
if data.parquet.shape == data.json.shape:
statuses.append(StatusType.PASS)
else:
statuses.append(StatusType.FAIL)
messages.append(
f"Shapes mismatch between parquet {data.parquet.shape} and json {data.json.shape} in economic data"
)
if all(data.hdf5.columns.isin(data.json.columns)):
statuses.append(StatusType.PASS)
else:
statuses.append(StatusType.FAIL)
messages.append(
f"Following columns mismatch between hdf5 and json: {data.hdf5.columns.difference(data.json.columns)}"
)
if filtered_hdf5.select_dtypes([int, float]).shape == filtered_json.select_dtypes([int, float]).shape:
if np.allclose(
filtered_hdf5.select_dtypes([int, float]).values,
filtered_json.select_dtypes([int, float]).values,
atol=ABS_TOLERANCE,
):
statuses.append(StatusType.PASS)
else:
statuses.append(StatusType.FAIL)
messages.append("Data mismatch hdf5 and json in economic data")
else:
statuses.append(StatusType.FAIL)
messages.append("Shape mismatch hdf5 and json in economic data")
return DataStatus(
status=StatusType.PASS if all(status == StatusType.PASS for status in statuses) else StatusType.FAIL,
message="\n".join(messages),
)
|