EconomicModelInput

Bases: AnnotatedExtensionTypeWithShape, ExtensionType

The input class used to prepare batches of data.

Attributes:

Name Type Description
no_prediction_mask Tensor

Mask for places we don't want to predict or train on.

no_train_mask Tensor

Mask for places we don't want to train on. We also won't train anywhere we don't do predictions.

feature_masks Tensor

Stacked masks for places where we don't want to train, but think that due to unforeseen externalities we should perfectly predict. Each mask will attribute the prediction error to a different driver.

Source code in wt_ml/dataset/data_pipeline.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class EconomicModelInput(AnnotatedExtensionTypeWithShape, tf.experimental.ExtensionType):
    """
    The input class used to prepare batches of data.

    Attributes:
        no_prediction_mask (tf.Tensor): Mask for places we don't want to predict or train on.
        no_train_mask (tf.Tensor): Mask for places we don't want to train on. We also won't train
                anywhere we don't do predictions.
        feature_masks (tf.Tensor): Stacked masks for places where we don't want to train, but think
                that due to unforeseen externalities we should perfectly predict. Each mask will
                attribute the prediction error to a different driver.
    """

    # Required (without massive refactor) axis indices
    date_index: Annotated[tf.Tensor, TensorMetadata((Time,), np.int32)]
    # Required (without massive refactor) categorical hierarchy params
    state_index: Annotated[tf.Tensor, TensorMetadata((Batch,), np.int32)]
    wholesaler_index: Annotated[tf.Tensor, TensorMetadata((Batch,), np.int32)]
    brand_index: Annotated[tf.Tensor, TensorMetadata((Batch,), np.int32)]
    granularity_index: Annotated[tf.Tensor, TensorMetadata((Batch,), np.int32)]
    # continuous hierarchy params
    continuous_hier_params: Mapping[str, Annotated[tf.Tensor, TensorMetadata((Batch,), np.float32)]] = {}
    # target
    true_sales: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.float32)] = None
    # TODO(@ruler501): This could be replaced with a property that evaluates to true_sales / (price + EPSILON) right?
    true_volume: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.float32)] = None
    # axis indices
    vehicle_index: Annotated[Optional[tf.Tensor], TensorMetadata((Vehicle,), np.int32)] = None
    global_index: Annotated[Optional[tf.Tensor], TensorMetadata((GlobalSignal,), np.int32)] = None
    weather_index: Annotated[Optional[tf.Tensor], TensorMetadata((WeatherSignal,), np.int32)] = None
    temperature_index: Annotated[Optional[tf.Tensor], TensorMetadata((TemperatureSignal,), np.int32)] = None
    holiday_index: Annotated[Optional[tf.Tensor], TensorMetadata((HolidaySignal,), np.int32)] = None
    price_dev_index: Annotated[Optional[tf.Tensor], TensorMetadata((PriceDev,), np.int32)] = None
    price_ratio_index: Annotated[Optional[tf.Tensor], TensorMetadata((PriceRatio,), np.int32)] = None
    distribution_index: Annotated[Optional[tf.Tensor], TensorMetadata((Distribution,), np.int32)] = None
    national_trend_index: Annotated[Optional[tf.Tensor], TensorMetadata((NationalTrend,), np.int32)] = None
    regional_trend_index: Annotated[Optional[tf.Tensor], TensorMetadata((RegionalTrend,), np.int32)] = None
    # categorical hierarchy indices
    product_index: Annotated[Optional[tf.Tensor], TensorMetadata((Batch,), np.int32)] = None
    region_index: Annotated[Optional[tf.Tensor], TensorMetadata((Batch,), np.int32)] = None
    full_vehicle_index: Annotated[Optional[tf.Tensor], TensorMetadata((Vehicle,), np.int32)] = None
    parent_vehicle_index: Annotated[Optional[tf.Tensor], TensorMetadata((Vehicle,), np.int32)] = None
    global_parent_index: Annotated[Optional[tf.Tensor], TensorMetadata((GlobalSignal,), np.int32)] = None
    weather_parent_index: Annotated[Optional[tf.Tensor], TensorMetadata((WeatherSignal,), np.int32)] = None
    temperature_parent_index: Annotated[Optional[tf.Tensor], TensorMetadata((TemperatureSignal,), np.int32)] = None
    feature_mask_index: Annotated[Optional[tf.Tensor], TensorMetadata((FeatureMask,), np.int32)] = None
    # masks and weights
    no_prediction_mask: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.bool_)] = None
    no_train_mask: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.bool_)] = None
    feature_masks: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, FeatureMask), np.bool_)] = None
    before_2021_mask: Annotated[Optional[tf.Tensor], TensorMetadata((Time,), np.bool_)] = None
    instability_loss_mult: Annotated[Optional[tf.Tensor], TensorMetadata((Batch,), np.float32)] = None
    # normalization_factors
    price_normalization: Annotated[Optional[tf.Tensor], TensorMetadata((Batch,), np.float32)] = None
    distribution_means: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Distribution), np.float32)] = None
    # features
    distributions: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, Distribution), np.float32)] = None
    price: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.float32)] = None
    imputed_price: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.float32)] = None
    price_devs: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, PriceDev), np.float32)] = None
    vehicle_spends: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, Vehicle), np.float32)] = None
    global_signals: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, GlobalSignal), np.float32)] = None
    weather_signals: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, WeatherSignal), np.float32)] = None
    temperature_signals: Annotated[
        Optional[tf.Tensor], TensorMetadata((Batch, Time, TemperatureSignal), np.float32)
    ] = None
    holiday_signals: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, HolidaySignal), np.float32)] = None
    national_trend: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, NationalTrend), np.float32)] = None
    regional_trend: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, RegionalTrend), np.float32)] = None
    yearly_week_number: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.int32)] = None
    price_ratios: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time, PriceRatio), np.float32)] = None
    # Non-hierarchical metadata
    brand_size: Annotated[Optional[tf.Tensor], TensorMetadata((Batch,), np.float32)] = None
    investment_axis_scale: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Vehicle), np.float32)] = None
    num_restarts: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.int32)] = None
    weeks_since_restart: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.float32)] = None
    maco_cost: Annotated[Optional[tf.Tensor], TensorMetadata((Batch, Time), np.float32)] = None
    preinvestment_slope: Annotated[Optional[tf.Tensor], TensorMetadata((Batch,), np.float32)] = None
    preinvestment_intercept: Annotated[Optional[tf.Tensor], TensorMetadata((Batch,), np.float32)] = None

dataset_to_dataframe(data, attr_name, encodings, axis_types=None, index_types=('state', 'wholesaler', 'brand', 'product'), do_sort=True)

Create a dataframe for the specific attribute of the batched data.

Parameters:

Name Type Description Default
data EconomicModelInput

The batched data.

required
attr_name str

The attribute to make into a DataFrame.

required
axis_types Sequence[Axis]

The axis type labels for each axis of the index of the transposed DataFrame.

None
encodings dict[str, dict[str | int, int]]

The cached data and encodings about the full dataset.

required

Returns:

Type Description
DataFrame | Series

A transformed and transposed DataFrame from the data with the columns equal to the batch

DataFrame | Series

dimension.

Source code in wt_ml/dataset/data_pipeline.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def dataset_to_dataframe(
    data: AnnotatedExtensionTypeWithShape,
    attr_name: str,
    encodings: dict[str, dict[str | int, int]],
    axis_types: Sequence[Axis] | None = None,
    index_types: Sequence[str] = ("state", "wholesaler", "brand", "product"),
    do_sort: bool = True,
) -> pd.DataFrame | pd.Series:
    """Create a dataframe for the specific attribute of the batched data.

    Args:
        data (EconomicModelInput): The batched data.
        attr_name (str): The attribute to make into a DataFrame.
        axis_types (Sequence[Axis]): The axis type labels for each axis of the index of the
                                      transposed DataFrame.
        encodings (dict[str, dict[str | int, int]]): The cached data and encodings about the full
                                                     dataset.

    Returns:
        A transformed and transposed DataFrame from the data with the columns equal to the batch
        dimension.
    """
    if axis_types is None:
        axis_types = data.datasets[attr_name].axes[1:]
    lookups_idx = [get_lookups(encodings, k) for k in index_types]
    index_tuples = get_tuples(data, index_types, lookups_idx, encodings)
    index = pd.MultiIndex.from_tuples(
        index_tuples,
        names=index_types,
    )
    if len(axis_types) > 0:
        lookups_col = [get_lookups(encodings, k) for k in axis_types]
        column_arrs = [
            [x[0] for x in get_tuples(data, (axis_type,), (lookups,), encodings)]
            for axis_type, lookups in zip(axis_types, lookups_col)
        ]
        columns = pd.MultiIndex.from_product(
            column_arrs,
            names=(
                ("time", "signal")
                if len(axis_types) == 2 and axis_types[0] == Axis.Time
                else (i.value for i in axis_types)
            ),
        )
        result = pd.DataFrame(
            get_data_attr(data, attr_name, encodings).reshape(len(index), -1), index=index, columns=columns
        ).T
        sort_axis = 1
    else:
        result = pd.Series(get_data_attr(data, attr_name, encodings).reshape(-1), index=index, name=attr_name)
        sort_axis = 0
    if do_sort:
        return result.sort_index(axis=sort_axis)
    else:
        return result

denormalize_z_score(batch_df, signal, encodings, normalized_on=None, mean_rescaled=False)

Denormalize z-score transformed batch_df.

Parameters:

Name Type Description Default
batch_df DataFrame

The dataframe that needs denomralized.

required
signal str

Signal that is being transformed or name of df.

required
encodings dict[str, Any]

Encodings to get std and means dict for reverse transformation.

required
normalized_on str | None

On what level, the df is normalized on. The given level wil be mapped by f"{signal}_means_row_lookup". Defaults to None.

None
mean_rescaled bool

Specifies if the data is rescaled by the mean (transformed_mean=1) or centered (transformed_mean=0). Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame after applying reverse z score transformation.

Source code in wt_ml/dataset/data_pipeline.py
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
def denormalize_z_score(
    batch_df: pd.DataFrame,
    signal: str,
    encodings: dict[str, Any],
    normalized_on: str | None = None,
    mean_rescaled: bool = False,
) -> pd.DataFrame:
    """Denormalize z-score transformed batch_df.

    Args:
        batch_df (pd.DataFrame): The dataframe that needs denomralized.
        signal (str): Signal that is being transformed or name of df.
        encodings (dict[str, Any]): Encodings to get std and means dict for reverse transformation.
        normalized_on (str | None, optional): On what level, the df is normalized on.
            The given level wil be mapped by f"{signal}_means_row_lookup". Defaults to None.
        mean_rescaled (bool, optional): Specifies if the data is rescaled by the mean (transformed_mean=1)
            or centered (transformed_mean=0). Defaults to False.

    Returns:
        pd.DataFrame: DataFrame after applying reverse z score transformation.
    """
    if f"{signal}_stds_dict" in encodings:
        # applied on signal level
        stds = encodings[f"{signal}_stds_dict"]
        batch_df = batch_df * batch_df.columns.get_level_values("signal").map(defaultdict(lambda: 1, stds))

    if f"{signal}_means_row_lookup" in encodings:
        # it's mean normalized on a specific granularity
        if normalized_on not in batch_df.columns.names or normalized_on == "signal":
            raise ValueError(f"`normalized_on` must be one of these {set(batch_df.columns.names)-{'signal',}}")

        # mean applied on each col and then row level
        means_row_lookup = get_lookups(encodings[f"{signal}_means_row_lookup"])
        means_col_lookup = get_lookups(encodings[f"{signal}_means_col_lookup"])
        normalized_on_means_dict = {
            k: {name: vi for name, vi in zip(means_col_lookup, v)}
            for k, v in zip(means_row_lookup, encodings[f"{signal}_means"])
        }
        signal_level = batch_df.columns.get_level_values("signal")
        normalized_on_level = batch_df.columns.get_level_values(normalized_on)
        means_dict_index = normalized_on_level.map(normalized_on_means_dict)
        for i, (signal, means_dict) in enumerate(zip(signal_level, means_dict_index)):
            if signal in means_dict:
                if mean_rescaled:
                    batch_df.iloc[:, i] *= means_dict[signal] + EPSILON
                else:
                    batch_df.iloc[:, i] += means_dict[signal]
        batch_df = batch_df.round(3)
    elif signal == "regional_trend":
        brand_lookup = encodings["brand_lookup"]
        region_lookup = encodings["region_lookup"]
        brand_region_index = [(r, b, "regional_trend") for r in region_lookup for b in brand_lookup]
        means_series = pd.DataFrame(
            encodings["regional_trend_means_dict"]["regional_trend"],
            index=pd.MultiIndex.from_tuples(brand_region_index, names=["region", "brand", "signal"]),
        )
        wholesaler_region = pd.DataFrame.from_dict(encodings["wholesaler_region_lookup"], orient="index").reset_index(
            names=["wholesaler"]
        )
        wholesaler_region = wholesaler_region.rename(columns={0: "region"})
        original_order = list(batch_df.columns.names)
        means_series = (
            means_series.reset_index()
            .merge(wholesaler_region, on="region")
            .drop("region", axis=1)
            .set_index(["wholesaler", "brand", "signal"])
        )
        batch_df, means_series = batch_df.align(means_series.T, join="left", axis=1)
        if mean_rescaled:
            batch_df = batch_df * means_series.values
        else:
            batch_df = batch_df + means_series.values
        batch_df = batch_df.reorder_levels(original_order, axis=1)
    else:
        # it's simple mean normalized df
        means_series = pd.DataFrame(encodings[f"{signal}_means_dict"]).stack()
        means_series.index.names = ["brand", "signal"]
        original_order = list(batch_df.columns.names)
        batch_df, means_series = batch_df.align(means_series, join="left", axis=1)
        if mean_rescaled:
            batch_df = batch_df * means_series.values[None, :]
        else:
            batch_df = batch_df + means_series.values[None, :]
        batch_df = batch_df.reorder_levels(original_order, axis=1)
    return batch_df

get_data_attr(data, attr_name, encodings)

if data has attribute attr_name, return the numpy (often temporal) object. If attr_name is the name of an index, not of a signal (like wholesaler, product, state, etc), then return data.{attr_name}_index. For instance, for wholesaler, return data.wholesaler_index.

Source code in wt_ml/dataset/data_pipeline.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def get_data_attr(data: EconomicModelInput, attr_name: str | Axis | list[str], encodings: dict[str, Any]):
    """
    if data has attribute attr_name, return the numpy (often temporal) object.
    If attr_name is the name of an index, not of a signal (like wholesaler, product, state, etc),
    then return data.{attr_name}_index. For instance, for wholesaler, return data.wholesaler_index.
    """
    if attr_name == "time" or attr_name == "time_index":
        date_lookups = get_lookups(encodings["date"])
        return pd.DatetimeIndex([date_lookups[i] for i in to_numpy(data.date_index)], name="date")
    elif isinstance(attr_name, str):
        attr_name = str(attr_name)
        if hasattr(data, attr_name):
            return to_numpy(getattr(data, attr_name))
        elif hasattr(data, f"{attr_name}_index"):
            result = to_numpy(getattr(data, f"{attr_name}_index"))
            if len(result.shape) > 1:
                result = result[0]
            return result
    else:
        return attr_name