Send metrics to Datadog. If the input is a DataFrame, it is converted to a list of dictionaries.
Each dictionary should have a string "value" in key and all other keys are treated as tags.
Ex: [{"value": 10, "wt_source": "wfo", "tag2": "val2"}, {"value": 20, "tag1": "val1", "tag3": "val2"}]
Parameters:
Name |
Type |
Description |
Default |
metrics |
DataFrame | list[dict]
|
Metrics to send to Datadog.
|
required
|
dd_metric_name |
str
|
The Datadog metric name to push the data to.
|
required
|
default_tags |
dict | None
|
Default tags to apply to all metrics. Defaults to None.
|
None
|
Source code in wt_ml/utils/datadog_utils.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230 | def send_metrics(metrics: pd.DataFrame | list[dict], dd_metric_name: str, default_tags: dict | None = None):
"""Send metrics to Datadog. If the input is a DataFrame, it is converted to a list of dictionaries.
Each dictionary should have a string "value" in key and all other keys are treated as tags.
Ex: [{"value": 10, "wt_source": "wfo", "tag2": "val2"}, {"value": 20, "tag1": "val1", "tag3": "val2"}]
Args:
metrics (pd.DataFrame | list[dict]): Metrics to send to Datadog.
dd_metric_name (str): The Datadog metric name to push the data to.
default_tags (dict | None, optional): Default tags to apply to all metrics. Defaults to None.
"""
start_time = time.time()
if len(metrics) == 0:
logger.warning("No items to sent.")
return
if isinstance(metrics, pd.DataFrame):
metrics = metrics.to_dict(orient="records")
if default_tags is None:
default_tags = {}
default_tags = DATADOG_TAGS | get_supporting_tags() | default_tags
create_series_partial = partial(_create_metric_series, metric_name=dd_metric_name, default_tags=default_tags)
try:
for batch in batched(metrics, BATCH_SIZE):
payload = MetricPayload(series=list(map(create_series_partial, batch)))
_ = _send_metrics(payload)
logger.info(f"Sent {len(metrics)} Metrics to {dd_metric_name} in {(time.time()-start_time):.1f}s.")
except Exception as e:
logger.error(f"Error sending metrics: {e}")
|