bdl_mult_diff(y_true, batch, bdl_mult, bdl_mf_mask, encodings)

A business metric that calculates how close our learned bud light multiplier is to describing the total drop of this year post bud light event with respect to same period last year

Parameters:

Name Type Description Default
y_true Tensor

The actual sales data. Shape is (batch, time).

required
batch EconomicModelInput

The batch input.

required
bdl_mult Tensor

The result of Impacts.bud_light_event.impact

required
bdl_mf_mask Tensor

The result of Impacts.bud_light_event.mf_mask.

required

Returns:

Type Description
tuple[Tensor, Tensor]

tf.Tensor: Absolute difference of multipliers measured. Shape is (batch, num_weeks)

Source code in wt_ml/layers/layer_utils.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def bdl_mult_diff(
    y_true: tf.Tensor, batch: EconomicModelInput, bdl_mult: tf.Tensor, bdl_mf_mask: tf.Tensor, encodings: Encodings
) -> tuple[tf.Tensor, tf.Tensor]:
    """
    A business metric that calculates how close our learned bud light multiplier is to describing the total drop
    of this year post bud light event with respect to same period last year

    Args:
        y_true (tf.Tensor): The actual sales data. Shape is (batch, time).
        batch (EconomicModelInput): The batch input.
        bdl_mult (tf.Tensor): The result of Impacts.bud_light_event.impact
        bdl_mf_mask (tf.Tensor): The result of Impacts.bud_light_event.mf_mask.

    Returns:
        tf.Tensor: Absolute difference of multipliers measured. Shape is (batch, num_weeks)
    """
    indices = [v for k, v in encodings["date"].items() if k >= "2023-04-01"]
    bdl_mask = tf.cast(batch.date_index >= min(indices), dtype=tf.float32)[None, :]
    # scalar such that -num_weeks is the index when the bud light event starts
    num_weeks = tf.cast(tf.reduce_max(tf.reduce_sum(bdl_mask, axis=1)), dtype=tf.int32)

    def compute_rest():
        # create a mask that is equivalent to shifting the bdl_mask down 52 weeks
        year_b4_bdl_mask = tf.pad(
            bdl_mask[:, 52:],
            [
                [
                    0,
                    0,
                ],
                [0, 52],
            ],
        )
        # apply the different masks to y_true
        y_true_bdl = y_true * bdl_mask
        y_true_ly = y_true * year_b4_bdl_mask
        y_true_p1 = y_true_ly[:, -52 - num_weeks : -52]
        y_true_p2 = y_true_bdl[:, -num_weeks:]
        result_mask = bdl_mask[:, -num_weeks:] * tf.cast((y_true_p1 > 0.0) & (y_true_p2 > 0.0), dtype=tf.float32)
        true_drop_mult = y_true_p2 / (y_true_p1 + EPSILON)
        result = true_drop_mult - bdl_mult[:, -num_weeks:]
        return result, result_mask

    false_value = tf.zeros((tf.shape(y_true)[0], 1), dtype=tf.float32)
    return tf.cond(
        (tf.shape(y_true)[1] >= 52 + num_weeks) & tf.reduce_any(bdl_mf_mask != 0.0),
        compute_rest,
        lambda: (false_value, false_value),
    )

create_mask(date_index, start_idx=None, end_idx=None, n_repeat=None, as_float=True)

Generates a mask for the given based on the start and end indices.

Parameters:

Name Type Description Default
date_index Tensor

Tensor containing date indices such that we can create a mask between start and end

required
start_idx optional int

Start date index. If not specified, dont mask from beginning

None
end_idx optional int

End date index. If not specified, dont mask from end

None
n_repeat optional int

If specified, repeat the mask along second axis n_repeat times

None

Returns:

Type Description

tf.Tensor: Mask based on the filter indices provided.

Source code in wt_ml/layers/layer_utils.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
def create_mask(
    date_index: tf.Tensor,
    start_idx: int | None = None,
    end_idx: int | None = None,
    n_repeat: int | None = None,
    as_float: bool = True,
):
    """Generates a mask for the given based on the start and end indices.

    Args:
        date_index (tf.Tensor): Tensor containing date indices such that we can create a mask between start and end
        start_idx (optional int): Start date index. If not specified, dont mask from beginning
        end_idx (optional int): End date index. If not specified, dont mask from end
        n_repeat (optional int): If specified, repeat the mask along second axis n_repeat times

    Returns:
        tf.Tensor: Mask based on the filter indices provided.
    """
    if end_idx is not None and start_idx is not None:
        mask = (date_index >= start_idx) & (date_index <= end_idx)
    elif start_idx is not None:
        mask = date_index >= start_idx
    elif end_idx is not None:
        mask = date_index <= end_idx
    else:
        mask = tf.ones_like(date_index, dtype=tf.bool)
    if n_repeat is not None:
        mask = tf.repeat(mask[None, :], n_repeat, axis=0)
    if as_float:
        return tf.cast(mask, dtype=tf.float32)
    else:
        return mask

exp_moving_avg(impact, beta, gamma, name=None, decay_length=32)

Calculate gamma * prev + beta * cur shifted along the second axis in an efficient manner.

Parameters:

Name Type Description Default
impact TensorLike

The batched time series to apply the average over.

required
beta TensorLike

The factor to scale down in the first time step.

required
gamma TensorLike

The factor to scale down in each subsequent time step.

required
threshold float

We guarantee to decay far enough down that the smallest decay factor is smaller than threshold or equal to the total length of the time series. Defaults to EMA_THRESH.

required
name str

The name to give to this operation.

None
decay_length int

how many steps to allow impacts to count. In practice 32 is always enough. we can lower this for training efficiency, since profiling suggests this is the most expensive op in our training pipeline. If we want exact calc, and don't care for speed. We can calculate the right number of max steps via >>> tf.math.reduce_max( tf.math.log(threshold * (1 - gamma) / tf.math.abs(beta)) / tf.math.log(gamma) ) + 2

32
Source code in wt_ml/layers/layer_utils.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def exp_moving_avg(
    impact: TensorLike,
    beta: TensorLike,
    gamma: TensorLike,
    name: str | None = None,
    decay_length: int = 32,
):
    """Calculate gamma * prev + beta * cur shifted along the second axis in an efficient manner.

    Args:
        impact (TensorLike): The batched time series to apply the average over.
        beta (TensorLike): The factor to scale down in the first time step.
        gamma (TensorLike): The factor to scale down in each subsequent time step.
        threshold (float, optional): We guarantee to decay far enough down that the smallest decay factor is smaller
                                     than threshold or equal to the total length of the time series.
                                     Defaults to EMA_THRESH.
        name (str, optional): The name to give to this operation.
        decay_length (int, optional): how many steps to allow impacts to count. In practice 32 is always enough.
                                      we can lower this for training efficiency, since profiling suggests this
                                      is the most expensive op in our training pipeline. If we want exact calc,
                                      and don't care for speed. We can calculate the right number of max steps via
                                      >>> tf.math.reduce_max(
                                              tf.math.log(threshold * (1 - gamma) / tf.math.abs(beta))
                                              / tf.math.log(gamma)
                                           ) + 2
    """
    with tf.name_scope(name or "ExponentialMovingAverage") as scope:
        num_time = tf.shape(impact)[1]

        decay_range = tf.range(decay_length)
        indices = tf.range(num_time)[:, None] - decay_range
        # num_time x num_decay
        indices = tf.where(indices > 0, indices - 1, num_time)
        # batch x ... x 1
        decay_factor = tf.expand_dims(beta, -1) * tf.math.pow(
            tf.expand_dims(gamma, -1), tf.cast(decay_range, dtype=tf.float32)
        )
        gathered = tf.gather(tf.concat([impact, tf.zeros_like(impact[:, :1])], axis=1), indices, axis=1)
        return tf.einsum("btd...,b...d->bt...", gathered, decay_factor, name=scope)

groupby(tensor, groups, agg)

Performs a groupby operation on tensor using groups index. NOTE: this does not return the values sorted on groups as we expect with usual pandas groupby function!

Parameters:

Name Type Description Default
tensor Tensor

The input that we need to aggregate. Shape is (batch, samples, features?). If 'features' axis isn't available, we expand_dim to include one.

required
groups Tensor

The clusters that each input belongs to. Shape is (batch, samples).

required
agg Literal['mean', 'sum']

Perform a mean or sum aggregation.

required

Returns:

Type Description
Tensor

tf.Tensor: A group aggregated tensor with shape (batch, depth, features?)

Source code in wt_ml/layers/layer_utils.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def groupby(tensor: tf.Tensor, groups: tf.Tensor, agg: Literal["mean", "sum"]) -> tf.Tensor:
    """
    Performs a groupby operation on `tensor` using `groups` index.
    NOTE: this does not return the values sorted on `groups` as we expect with usual pandas `groupby` function!

    Args:
        tensor (tf.Tensor): The input that we need to aggregate. Shape is (batch, samples, features?).
            If 'features' axis isn't available, we expand_dim to include one.
        groups (tf.Tensor): The clusters that each input belongs to. Shape is (batch, samples).
        agg (Literal['mean', 'sum']): Perform a mean or sum aggregation.

    Returns:
        tf.Tensor: A group aggregated tensor with shape (batch, depth, features?)
    """
    same_rank = tf.rank(tensor) == tf.rank(groups)
    # batch, samples, features
    tensor = tf.cond(same_rank, lambda: tf.expand_dims(tensor, -1), lambda: tensor)
    # make groups start with 0
    groups = groups - tf.reduce_min(groups)
    # tf.one_hot required groups to be int32
    groups = tf.cast(groups, tf.int32)
    # we take max as we can have non continuous group indexes
    # batch, depth, samples
    weights = tf.one_hot(groups, depth=tf.reduce_max(groups) + 1, dtype=tensor.dtype, name="weights", axis=1)
    if agg == "mean":
        weights = tf.math.divide_no_nan(weights, tf.reduce_sum(weights, axis=2, keepdims=True))

    # batch, depth, features
    group_agg = tf.matmul(weights, tensor, name="group_agg")
    group_agg = tf.cond(same_rank, lambda: tf.squeeze(group_agg, -1), lambda: group_agg)
    return group_agg

identity_with_mult_grad(x1, forward_scale, reverse_scale)

function for scaling multiplying tensors where the gradient is scaled by a constant

Source code in wt_ml/layers/layer_utils.py
72
73
74
75
76
77
78
79
80
81
82
@tf.custom_gradient
def identity_with_mult_grad(x1, forward_scale, reverse_scale):
    """
    function for scaling multiplying tensors where the gradient is scaled by a constant
    """

    def grad(dy):
        grad_x1 = forward_scale * dy * reverse_scale
        return grad_x1, None, None

    return x1 * forward_scale, grad

isin(element, test_elements, invert=False)

Calculates element in test_elements, broadcasting over element only. Returns a boolean array of the same shape as element that is True where an element of element is in test_elements and False otherwise.

Parameters:

Name Type Description Default
element Tensor

Input array.

required
test_elements Tensor

The values against which to test each value of element. This argument is flattened if it is an array or array_like.

required
invert bool

If True, the values in the returned array are inverted, as if calculating element not in test_elements. Default is False.

False

Returns:

Type Description
Tensor

tf.Tensor: Has the same shape as element. The values element[isin] are in test_elements.

Source code in wt_ml/layers/layer_utils.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def isin(element: tf.Tensor, test_elements: tf.Tensor, invert: bool = False) -> tf.Tensor:
    """
    Calculates element in test_elements, broadcasting over element only.
    Returns a boolean array of the same shape as element that is True where an element of element is in test_elements
    and False otherwise.

    Args:
        element (tf.Tensor): Input array.
        test_elements (tf.Tensor): The values against which to test each value of element.
            This argument is flattened if it is an array or array_like.
        invert (bool, optional): If True, the values in the returned array are inverted,
            as if calculating element not in test_elements. Default is False.

    Returns:
        tf.Tensor: Has the same shape as element. The values element[isin] are in test_elements.
    """
    test_elements, _ = tf.unique(tf.reshape(test_elements, (-1,)))
    equal_mask = tf.reduce_any(tf.equal(tf.expand_dims(element, -1), test_elements), axis=-1)
    return tf.math.logical_not(equal_mask) if invert else equal_mask

transform_softbounded(x, add_loss, name, max_val=AUX_MAX, min_val=AUX_MIN, fcn=tf.nn.sigmoid, mult=None, scale=AUX_SCALE, enabled=True)

Applies a softbound transform on the x input so that it is within the range (min_val, max_val). It then returns the output of the activation function fcn applied on x.

We calculate the "offset" by (max_val + min_val) / 2.0 and the "threshold" to be offset - min_val. We calculate the "loss" = abs(x - offset) - threshold A vectorized_softplus function is then applied (using scale) and this operation give a loss of 0 when x is between (min_val, max_val). We sqaure it so that we have the L2 of the loss. This ensures that your input x is within the range (min_val, max_val).

Parameters:

Name Type Description Default
x GraphTensor

The Input Tensor we want to apply the softbound transform on.

required
add_loss Callable[[str, Tensor], None]

The function used to add the aux loss created by the softbound.

required
name str

The scope name for the transform softbound operation.

required
max_val TensorLike

The max value x can be (exclusive). Defaults to AUX_MAX.

AUX_MAX
min_val TensorLike

The min value x can be (exclusive). Defaults to AUX_MIN.

AUX_MIN
fcn Callable[[GraphTensor], Tensor]

The activation function to be applied on x. Defaults to tf.nn.sigmoid.

sigmoid
mult TensorLike

The labmda hyperparam for the total loss. Defaults to AUX_MULT.

None
scale TensorLike

The scale needed for vectorized softplus. Smaller the scale, the closer it assymptotes to 0. Defaults to AUX_SCALE.

AUX_SCALE
enabled bool

Enable the transform_softbound aux loss, else simply return the activation. Defaults to True.

True

Returns:

Type Description

tf.Tensor: The activation function fcn applied on input x.

Source code in wt_ml/layers/layer_utils.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def transform_softbounded(
    x: GraphTensor,
    add_loss: Callable[[str, tf.Tensor, str, float | None], None],
    name: str,
    max_val: TensorLike | float = AUX_MAX,
    min_val: TensorLike | float = AUX_MIN,
    fcn: Callable[[GraphTensor], tf.Tensor] = tf.nn.sigmoid,
    mult: float | None = None,
    scale: ScalarLike | float = AUX_SCALE,
    enabled=True,
):
    """
    Applies a softbound transform on the `x` input so that it is within the range (`min_val`, `max_val`).
    It then returns the output of the activation function `fcn` applied on `x`.

    We calculate the "offset" by (`max_val` + `min_val`) / 2.0 and the "threshold" to be offset - `min_val`.
    We calculate the "loss" = abs(x - offset) - threshold
    A vectorized_softplus function is then applied (using `scale`) and this operation give a loss of 0 when x is between
    (`min_val`, `max_val`). We sqaure it so that we have the L2 of the loss.
    This ensures that your input `x` is within the range (`min_val`, `max_val`).

    Args:
        x (GraphTensor): The Input Tensor we want to apply the softbound transform on.
        add_loss (Callable[[str, tf.Tensor], None]): The function used to add the aux loss created by the softbound.
        name (str): The scope name for the transform softbound operation.
        max_val (TensorLike, optional): The max value `x` can be (exclusive). Defaults to AUX_MAX.
        min_val (TensorLike, optional): The min value `x` can be (exclusive). Defaults to AUX_MIN.
        fcn (Callable[[GraphTensor], tf.Tensor], optional): The activation function to be applied on x.
                                                            Defaults to tf.nn.sigmoid.
        mult (TensorLike, optional): The labmda hyperparam for the total loss. Defaults to AUX_MULT.
        scale (TensorLike, optional): The scale needed for vectorized softplus.
                                    Smaller the scale, the closer it assymptotes to 0. Defaults to AUX_SCALE.
        enabled (bool, optional): Enable the transform_softbound aux loss, else simply return the activation.
                                  Defaults to True.

    Returns:
        tf.Tensor: The activation function `fcn` applied on input `x`.
    """
    with tf.name_scope(name):
        if enabled:
            max_val = tf.convert_to_tensor(max_val, dtype_hint=x.dtype, name="max_val")
            min_val = tf.convert_to_tensor(min_val, dtype_hint=x.dtype, name="min_val")
            offset = tf.identity((max_val + min_val) / 2.0, name="offset")  # the midpoint of the range
            thresh = tf.identity(offset - min_val, name="thresh")  # the range where softplus will be 0.
            x_off = x - offset
            margin_loss = tf.abs(x_off) - thresh
            softplus_output = vectorized_softplus(margin_loss, scale, name="positive_margin_loss")
            aux_loss = tf.reduce_sum(
                tf.square(softplus_output),
                name="softbound",
            )
            add_loss(f"{name}/softbound", aux_loss, "aux", mult)
        return fcn(x)