calculate_metrics(output, val_dates_idx, encodings, mask=None, weights=None, level=('brand', 'wholesaler'), calculate_custom_metrics=False)

Calculate metrics for given output. If output is a dict, we recursively calculate metrics for each key in the dict.

Parameters:

Name Type Description Default
output ModelOutputType

Outputs of the model.

required
val_dates_idx NDArray[int64]

The dates_index indices which is in validation period.

required
encodings dict[str, dict[str, int]]

Encodings dict which will be used to decode the index values.

required
mask NDArray[bool_] | None

Mask tensor of the same shape as y_true and y_pred indicating which elements to mask out. Default is None.

None
weights NDArray[float_] | None

Weights used for taking weighted mean on the metrics. Defaults to None.

None
level tuple[str, ...] | str | None

The level at which we want to aggregate it or the metrics correspond to. 'country' will aggregate it to all. None will use ('brand', 'wholesaler'). Defaults to ('brand', 'wholesaler').

('brand', 'wholesaler')
calculate_custom_metrics bool

Calculate custom metrics as well. Defaults to False.

False

Returns:

Name Type Description
GroupedMetrics GroupedMetrics | dict[str, GroupedMetrics]

Metrics grouped as full, train and test.

Source code in wt_ml/tuning/train_test_model.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def calculate_metrics(
    output: ModelOutputType,
    val_dates_idx: NDArray[np.int64],
    encodings: Encodings,
    mask: NDArray[np.bool_] | None = None,
    weights: NDArray[np.float_] | None = None,
    level: tuple[str, ...] | str | None = ("brand", "wholesaler"),
    calculate_custom_metrics: bool = False,
) -> GroupedMetrics | dict[str, GroupedMetrics]:
    """Calculate metrics for given output.
    If output is a dict, we recursively calculate metrics for each key in the dict.

    Args:
        output (ModelOutputType): Outputs of the model.
        val_dates_idx (NDArray[np.int64]): The dates_index indices which is in validation period.
        encodings (dict[str, dict[str, int]]): Encodings dict which will be used to decode the index values.
        mask (NDArray[np.bool_] | None, optional): Mask tensor of the same shape as y_true and y_pred
            indicating which elements to mask out. Default is None.
        weights (NDArray[np.float_] | None, optional): Weights used for taking weighted mean on the metrics.
            Defaults to None.
        level (tuple[str, ...] | str | None): The level at which we want to aggregate it or the metrics correspond to.
            'country' will aggregate it to all. `None` will use ('brand', 'wholesaler').
            Defaults to ('brand', 'wholesaler').
        calculate_custom_metrics (bool, optional): Calculate custom metrics as well. Defaults to False.

    Returns:
        GroupedMetrics: `Metrics` grouped as full, train and test.
    """
    if isinstance(output, dict):
        return {
            key: calculate_metrics(
                output[key],
                val_dates_idx,
                encodings,
                mask=mask,
                weights=weights,
                level=level,
                calculate_custom_metrics=calculate_custom_metrics,
            )
            for key in output.keys()
        }

    if TYPE_CHECKING:
        assert isinstance(output, EconomicIntermediaries)
        assert output.inputs is not None

    if mask is None:
        mask = np.array(output.mask)

    # batch, time
    train_mask = np.array(mask)
    train_dates_idx = np.setdiff1d(np.arange(mask.shape[1]), val_dates_idx, assume_unique=True)
    train_mask[:, val_dates_idx] = 0.0
    test_mask = np.array(mask)
    test_mask[:, train_dates_idx] = 0.0

    group_index = utils.get_index(level=level, inputs=output.inputs, encodings=encodings)

    if weights is not None:
        # weights will aggregate the metrics to a scalar.
        agg_index = pd.Index(["aggregated"])
    elif len(group_index.names) == 1:
        agg_index = pd.Index(pd.unique(group_index), name=group_index.names[0])
    else:
        # all the metrics are aggregated in this index
        # NOTE: currently we do not sort the values based on index!
        # if we are sorting on index then this should be changed.
        agg_index = pd.MultiIndex.from_tuples(pd.unique(group_index), names=group_index.names)

    norm_factor = encodings.get("normalization_factor", 1.0)
    get_metrics = partial(
        _get_metrics, norm_factor=norm_factor, weights=weights, calculate_custom_metrics=calculate_custom_metrics
    )
    return GroupedMetrics(
        train=get_metrics(group_index, output, train_mask),
        test=get_metrics(group_index, output, test_mask),
        index=agg_index,
    )

index_flatten(index)

Flattens a given index to avoid pesky inner nested tuples.

Source code in wt_ml/tuning/train_test_model.py
533
534
535
def index_flatten(index: pd.Index | pd.MultiIndex | Iterable[Iterable]) -> list[tuple[...]]:
    """Flattens a given index to avoid pesky inner nested tuples."""
    return [tuple(_index_flatten(d)) for d in index]

load_model_result(test_output, val_dates_idx)

Convert dict output into mocked Model Intermediaries using a Namespace.

Source code in wt_ml/tuning/train_test_model.py
576
577
578
579
580
581
582
583
584
585
def load_model_result(test_output: dict[str, ...], val_dates_idx: NDArray[np.int64]) -> TrainTestOutput:
    """Convert dict output into mocked Model Intermediaries using a Namespace."""
    if "yhat" in test_output.keys():
        # doesn't have multiple baselines, simple model!
        output = RecursiveNamespace(**test_output)
    else:
        output = {
            net_name: RecursiveNamespace(**test_output[net_name]) for net_name, saved_output in test_output.items()
        }
    return TrainTestOutput(output=output, val_dates_idx=val_dates_idx)

mask_smoothing_weeks(batch, num_head_weeks=0, num_tail_weeks=0)

Post processing step for batch data during iteration to mask out tail weeks.

Source code in wt_ml/tuning/train_test_model.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def mask_smoothing_weeks(batch: "EconomicModelInput", num_head_weeks=0, num_tail_weeks=0):
    """Post processing step for batch data during iteration to mask out tail weeks."""
    if num_tail_weeks == 0 and num_head_weeks == 0:
        return batch

    mask = batch.no_prediction_mask.numpy()
    if num_head_weeks > 0:
        mask[:, :num_head_weeks] = 0.0
    if num_tail_weeks > 0:
        mask[:, -num_tail_weeks:] = 0.0
    # Overwrite the mask in the batch
    input_type = type(batch)
    batch_dict = tf.experimental.extension_type.as_dict(batch)
    batch_dict["no_prediction_mask"] = mask
    return input_type(**batch_dict)

metrics_to_df(metrics_data, index_names=['dataset', 'metric'])

Convert given GroupedMetrics to a dataframe.

Parameters:

Name Type Description Default
metrics_data GroupedMetrics | dict[str, GroupedMetrics]

Calculated GroupedMetrics.

required
index_names list

The index names for the dataframe. Defaults to ["dataset", "metric"]. The GroupedMetrics are flattened, so the index is usually (*any_parent_levels, 'dataset', 'metric').

['dataset', 'metric']

Returns:

Type Description
DataFrame

pd.DataFrame: Metrics pandas dataframe.

Source code in wt_ml/tuning/train_test_model.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def metrics_to_df(
    metrics_data: GroupedMetrics | dict[str, GroupedMetrics],
    index_names=["dataset", "metric"],
) -> pd.DataFrame:
    """Convert given GroupedMetrics to a dataframe.

    Args:
        metrics_data (GroupedMetrics | dict[str, GroupedMetrics]): Calculated GroupedMetrics.
        index_names (list, optional): The index names for the dataframe. Defaults to ["dataset", "metric"].
            The GroupedMetrics are flattened, so the index is usually (*any_parent_levels, 'dataset', 'metric').

    Returns:
        pd.DataFrame: Metrics pandas dataframe.
    """
    if isinstance(metrics_data, dict):
        df = pd.concat(
            [
                pd.DataFrame(flatten({key: _process_grouped_metrics(met)}), index=met.index).T
                for key, met in flatten(metrics_data).items()
            ],
            axis=0,
        )
    else:
        index = metrics_data.index
        metrics_data = flatten(_process_grouped_metrics(metrics_data))
        df = pd.DataFrame(metrics_data, index=index).T

    # initial levels are flattened to a tuple. we need unflatten it again so it doesn't become like ((...), ...).
    # we don't use unflatten from flatten_dict as metrics_data consumes a lot of memory.
    df.index = pd.MultiIndex.from_tuples(index_flatten(df.index))

    if len(df.index.names) > len(index_names):
        index_names = [None for _ in range(len(df.index.names) - len(index_names))] + index_names

    df.index.set_names(index_names, inplace=True)
    return df

train_test_model(val_period, full_dataset, model, epochs, start_date=None, verbosity=1, calculate_trackers=False, calculate_time=False, checkpoint_freq=None, callbacks_builder=None, save_dir=None, smoothing_window=False, delete_existing_checkpoints=False, **kwargs)

Runs model on train Time period and then validates on train and test Time period.

Parameters:

Name Type Description Default
val_period tuple[str, str, ...] | NDArray[datetime64]

The inclusive validation period.

required
full_dataset EconomicDataset

The dataset object. We will subset train & test periods from this.

required
model TrainableModule

The TrainableModule model.

required
epochs int

Number of epochs to train the model.

required
start_date str | datetime64 | None

The start date after which we start training. Defaults to None which is the first date.

None
verbosity int

Verbosity level. Defaults to 1.

1
calculate_trackers bool

Calculate and include trackers in output. Defaults to False.

False
calculate_time bool

Calculate model training time. Defaults to False.

False
checkpoint_freq int | Sequence[int] | None

Freq at which checkpoints are created.

None
callbacks_builder Callable[[], CallbacksList] | None

Function that returns CallbacksList which will be appended with LearningCurveCallback

None
save_dir Path | None

Directory to save the learning curve. None will save in temp dir.

None
smoothing_window bool

Whether to smooth the validation period. Defaults to False.

False
delete_existing_checkpoints bool

Delete previous checkpoints. Defaults to False.

False
**kwargs

Keyword arguments passed into TrainableModule.train function.

{}

Returns:

Name Type Description
TrainTestOutput TrainTestOutput | Tuple[TrainTestOutput, float]

Returns the full model intermediaries and validation date idx.

float optional

Returns the model training time.

Source code in wt_ml/tuning/train_test_model.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def train_test_model(
    val_period: tuple[str | np.datetime64, ...] | NDArray[np.datetime64],
    full_dataset: EconomicDataset,
    model: TrainableModule,
    epochs: int | None,
    start_date: str | np.datetime64 | None = None,
    verbosity: int = 1,
    calculate_trackers: bool = False,
    calculate_time: bool = False,
    checkpoint_freq: int | Sequence[int] | None = None,
    callbacks_builder: Callable[[], CallbacksList] | None = None,
    save_dir: Path | None = None,
    smoothing_window: bool = False,
    delete_existing_checkpoints: bool = False,
    **kwargs,
) -> TrainTestOutput | Tuple[TrainTestOutput, float]:
    """Runs model on train Time period and then validates on train and test Time period.

    Args:
        val_period (tuple[str, str, ...] | NDArray[np.datetime64]): The inclusive validation period.
        full_dataset (EconomicDataset): The dataset object. We will subset train & test periods from this.
        model (TrainableModule): The TrainableModule model.
        epochs (int): Number of epochs to train the model.
        start_date (str | np.datetime64 | None, optional):  The start date after which we start training.
            Defaults to None which is the first date.
        verbosity (int, optional): Verbosity level. Defaults to 1.
        calculate_trackers (bool, optional): Calculate and include trackers in output. Defaults to False.
        calculate_time (bool, optional): Calculate model training time. Defaults to False.
        checkpoint_freq (int | Sequence[int] | None, optional): Freq at which checkpoints are created.
        callbacks_builder (Callable[[], CallbacksList] | None, optional): Function that returns CallbacksList
            which will be appended with LearningCurveCallback
        save_dir (Path | None, optional): Directory to save the learning curve. `None` will save in temp dir.
        smoothing_window (bool, optional): Whether to smooth the validation period. Defaults to False.
        delete_existing_checkpoints (bool, optional): Delete previous checkpoints. Defaults to False.
        **kwargs: Keyword arguments passed into `TrainableModule.train` function.

    Returns:
        TrainTestOutput: Returns the full model intermediaries and validation date idx.
        float (optional): Returns the model training time.
    """
    train_dataset, test_dataset, val_dates_idx = split_dataset(
        val_period, model, full_dataset, start_date, smoothing_window
    )
    if callbacks_builder is not None:
        callbacks: CallbacksList = callbacks_builder()
    else:
        callbacks = CallbacksList()

    if checkpoint_freq is not None and save_dir is not None:
        ckpt_callback = CheckpointCallback(
            frequency=checkpoint_freq,
            root_dir=save_dir,
            delete_existing_checkpoints=delete_existing_checkpoints,
        )
        callbacks.append(ckpt_callback)

    start = time() if calculate_time else None
    if epochs is not None:
        model.train(
            dataset_factory=train_dataset,
            num_steps=len(train_dataset),
            epochs=epochs,
            verbosity=verbosity,
            callbacks=callbacks,
            **kwargs,
        )

    logger.info("Running inference.")

    test_output = _inference(model, test_dataset, calculate_trackers=calculate_trackers)
    output = TrainTestOutput(test_output, val_dates_idx)
    if calculate_time:
        assert start is not None
        return output, time() - start
    else:
        return output