Anomaly Detection Example

This example compares a current metric value against historical repository data.

Full Code

import asyncio

from qualink import (
    AnalysisRunner,
    AnalyzerContext,
    AnalyzerMetric,
    AnomalyDetectionRunner,
    InMemoryMetricsRepository,
    RelativeRateOfChangeStrategy,
    ResultKey,
    ZScoreStrategy,
)


async def main() -> None:
    repository = InMemoryMetricsRepository()

    for data_set_date, size in ((20260312, 100), (20260313, 101), (20260314, 103)):
        historical = AnalyzerContext()
        historical.store_metric(AnalyzerMetric("size", "size", size))
        repository.save(ResultKey(data_set_date=data_set_date, tags={"dataset": "users"}), historical)

    current_analysis = await AnalysisRunner().run(ctx=None, table_name="users")
    current_analysis.context.store_metric(AnalyzerMetric("size", "size", 140))

    anomalies = (
        AnomalyDetectionRunner(repository)
        .add_strategy("size", RelativeRateOfChangeStrategy(max_rate_increase=0.1))
        .add_strategy("size", ZScoreStrategy(z_threshold=1.0, min_history=2))
        .detect(ResultKey(data_set_date=20260315, tags={"dataset": "users"}), current_analysis.context)
    )

    for anomaly in anomalies:
        print(anomaly.metric_key, anomaly.message)


if __name__ == "__main__":
    asyncio.run(main())

What It Does