send_metrics(metrics, dd_metric_name, default_tags=None)

Send metrics to Datadog. If the input is a DataFrame, it is converted to a list of dictionaries. Each dictionary should have a string "value" in key and all other keys are treated as tags. Ex: [{"value": 10, "wt_source": "wfo", "tag2": "val2"}, {"value": 20, "tag1": "val1", "tag3": "val2"}]

Parameters:

Name Type Description Default
metrics DataFrame | list[dict]

Metrics to send to Datadog.

required
dd_metric_name str

The Datadog metric name to push the data to.

required
default_tags dict | None

Default tags to apply to all metrics. Defaults to None.

None
Source code in wt_ml/utils/datadog_utils.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def send_metrics(metrics: pd.DataFrame | list[dict], dd_metric_name: str, default_tags: dict | None = None):
    """Send metrics to Datadog. If the input is a DataFrame, it is converted to a list of dictionaries.
    Each dictionary should have a string "value" in key and all other keys are treated as tags.
     Ex: [{"value": 10, "wt_source": "wfo", "tag2": "val2"}, {"value": 20, "tag1": "val1", "tag3": "val2"}]

    Args:
        metrics (pd.DataFrame | list[dict]): Metrics to send to Datadog.
        dd_metric_name (str): The Datadog metric name to push the data to.
        default_tags (dict | None, optional): Default tags to apply to all metrics. Defaults to None.
    """
    start_time = time.time()
    if len(metrics) == 0:
        logger.warning("No items to sent.")
        return
    if isinstance(metrics, pd.DataFrame):
        metrics = metrics.to_dict(orient="records")
    if default_tags is None:
        default_tags = {}
    default_tags = DATADOG_TAGS | get_supporting_tags() | default_tags
    create_series_partial = partial(_create_metric_series, metric_name=dd_metric_name, default_tags=default_tags)
    try:
        for batch in batched(metrics, BATCH_SIZE):
            payload = MetricPayload(series=list(map(create_series_partial, batch)))
            _ = _send_metrics(payload)
        logger.info(f"Sent {len(metrics)} Metrics to {dd_metric_name} in {(time.time()-start_time):.1f}s.")
    except Exception as e:
        logger.error(f"Error sending metrics: {e}")