Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
60d31ed
Add fixture to test volume analysis
LDiazN Feb 6, 2026
e433f92
Add volume analysis tests
LDiazN Feb 6, 2026
ccf99d4
Fixing testing of volume analysis query
LDiazN Feb 6, 2026
617edd8
Fix broken tests
LDiazN Feb 10, 2026
5a5d590
Add assert to check expected time in event
LDiazN Feb 10, 2026
0fa1509
Add task to analyse measurement volume for measurement anomalies
LDiazN Feb 10, 2026
894e810
simplify tests code
LDiazN Feb 10, 2026
c37eefb
black reformat
LDiazN Feb 10, 2026
cf90993
Add volume analysis files
LDiazN Feb 10, 2026
c1c2665
Remove hourly analysis from dependencies
LDiazN Feb 10, 2026
985fdef
Add software_name key details and probe_id
LDiazN Feb 10, 2026
8ed602b
Add time inconsistencies analysis
LDiazN Feb 11, 2026
711bcdf
Add time inconsistencies fixtures
LDiazN Feb 11, 2026
609e9a2
Add tests for the time inconsistencies analysis
LDiazN Feb 11, 2026
01817ff
Refactor tests
LDiazN Feb 11, 2026
83df810
Create task for time inconsistencies
LDiazN Feb 11, 2026
dfb58cf
Add integration test for time inconsistency analysis
LDiazN Feb 11, 2026
afd0d70
Add time inconsistency analysis dag
LDiazN Feb 11, 2026
d78bba0
Change time definition to be similar to other tables
LDiazN Feb 12, 2026
44bd91b
Change time to ts in tests
LDiazN Feb 12, 2026
334c065
Change time column to ts
LDiazN Feb 12, 2026
b8b2e18
Merge branch 'volume-analysis' into time-inconsistencies
LDiazN Feb 12, 2026
7f7263d
Fix bad assertion message
LDiazN Feb 17, 2026
085e27e
Make different types of anomalies for measurements from the past or t…
LDiazN Feb 17, 2026
9df242c
add future and past treshold in pipeline definition
LDiazN Feb 17, 2026
5878ceb
simplify tests
LDiazN Feb 17, 2026
317cde3
Add uid to faulty measurements table
LDiazN Feb 18, 2026
15e53b2
Add default time stamping to faulty_measurements table
LDiazN Feb 18, 2026
3c4f3cc
Change future and past thresholds
LDiazN Feb 19, 2026
32903ce
Add software_name, software_version, platform in details
LDiazN Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions dags/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,50 @@ def run_make_event_detector(

make_detector(params)

def run_make_volume_analysis(
clickhouse_url: str,
timestamp: str = "",
ts: str = "",
threshold: int = 200
):
from oonipipeline.tasks.volume import MakeVolumeParams, make_volume_analysis

if timestamp == "":
timestamp = ts[:13]

params = MakeVolumeParams(
clickhouse_url=clickhouse_url,
timestamp=timestamp,
threshold=threshold
)

make_volume_analysis(params)


def run_make_time_inconsistencies_analysis(
clickhouse_url: str,
timestamp: str = "",
ts: str = "",
future_threshold: int = 3600,
past_threshold: int = 3600
):
from oonipipeline.tasks.time_inconsistencies import (
MakeTimeInconsistenciesParams,
make_time_inconsistencies_analysis,
)

if timestamp == "":
timestamp = ts[:13]

params = MakeTimeInconsistenciesParams(
clickhouse_url=clickhouse_url,
timestamp=timestamp,
future_threshold=future_threshold,
past_threshold=past_threshold
)

make_time_inconsistencies_analysis(params)


REQUIREMENTS = [str((pathlib.Path(__file__).parent.parent / "oonipipeline").absolute())]

Expand Down Expand Up @@ -186,6 +230,31 @@ def run_make_event_detector(
system_site_packages=False,
)

op_make_volume_analysis_hourly = PythonVirtualenvOperator(
task_id="make_volume_analysis",
python_callable=run_make_volume_analysis,
op_kwargs={
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"ts": "{{ ts }}",
"threshold": 200, # TODO adjust this parameter dynamically in the future
},
requirements=REQUIREMENTS,
system_site_packages=False,
)

op_make_time_inconsistencies_analysis_hourly = PythonVirtualenvOperator(
task_id="make_time_inconsistencies_analysis",
python_callable=run_make_time_inconsistencies_analysis,
op_kwargs={
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"ts": "{{ ts }}",
"future_threshold": 60 * 30, # 30 mins
"past_threshold": 2 * 3600, # 2 hour
},
requirements=REQUIREMENTS,
system_site_packages=False,
)

(
op_make_observations_hourly
>> op_make_analysis_hourly
Expand Down
126 changes: 126 additions & 0 deletions oonipipeline/src/oonipipeline/analysis/time_inconsistencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
This file will implement time inconsistencies analysis for anomaly detection.

We consider that there's a time inconsistency anomaly when the measurement_start_time
differs significantly from the timestamp embedded in the measurement_uid.
"""

from clickhouse_driver import Client as Clickhouse
from datetime import datetime
import logging
import orjson

log = logging.getLogger(__name__)


def run_time_inconsistencies_analysis(
clickhouse_url: str,
start_time: datetime,
end_time: datetime,
future_threshold: int,
past_threshold: int,
):
"""
This function will measure the drift between the reported measurement_start_time
and the time it was reported to the fastpath.

future_threshold: time in seconds to trigger an anomaly for future measurements (diff_seconds < 0)
past_threshold: time in seconds to trigger an anomaly for past measurements (diff_seconds > 0)
"""

db = Clickhouse.from_url(clickhouse_url)
query = """
SELECT
probe_cc,
probe_asn,
measurement_uid,
measurement_start_time,
uid_timestamp,
diff_seconds,
software_name,
software_version,
platform
FROM (
SELECT
probe_cc,
probe_asn,
measurement_uid,
measurement_start_time,
software_name,
software_version,
platform,
parseDateTimeBestEffort(substring(measurement_uid, 1, 15)) AS uid_timestamp,
dateDiff('second', parseDateTimeBestEffort(substring(measurement_uid, 1, 15)), measurement_start_time) AS diff_seconds
FROM fastpath
WHERE
measurement_start_time >= %(start_time)s AND
measurement_start_time < %(end_time)s
)
WHERE
(diff_seconds < 0 AND abs(diff_seconds) >= %(future_treshold)s) OR
(diff_seconds > 0 AND diff_seconds >= %(past_treshold)s)
ORDER BY diff_seconds DESC
"""

rows = (
db.execute(
query,
params={
"start_time": start_time,
"end_time": end_time,
"future_treshold": future_threshold,
"past_treshold": past_threshold,
},
)
or []
)

if len(rows) == 0:
log.info("No time inconsistency anomalies where found")
return
else:
log.info(f"Found {len(rows)} time inconsistencies from {start_time} to {end_time}")

values = []
for row in rows:

(
probe_cc,
probe_asn,
measurement_uid,
measurement_start_time,
uid_timestamp,
diff_seconds,
software_name,
software_version,
platform,
) = row

# Choose type and threshold based on whether measurement is from future or past
# diff_seconds < 0 => future
# diff_seconds >> 0 => Measurement too far away in the past
if diff_seconds < 0:
inconsistency_type = "time_inconsistency_future"
threshold = future_threshold
else:
inconsistency_type = "time_inconsistency_past"
threshold = past_threshold

details = {
"measurement_uid": measurement_uid,
"measurement_start_time": measurement_start_time.isoformat(),
"uid_timestamp": uid_timestamp.isoformat(),
"diff_seconds": diff_seconds,
"threshold": threshold,
"software_name": software_name,
"software_version": software_version,
"platform": platform,
}

values.append((inconsistency_type, probe_cc, probe_asn, orjson.dumps(details).decode()))

db.execute(
"INSERT INTO faulty_measurements (type, probe_cc, probe_asn, details) VALUES",
values,
types_check=True,
)
88 changes: 88 additions & 0 deletions oonipipeline/src/oonipipeline/analysis/volume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
This file will implement volume analysis for anomaly detection.

We consider that there's a volume anomaly when a single probe seems to be sending too many measurements
in small time windows
"""

from clickhouse_driver import Client as Clickhouse
from datetime import datetime, timedelta
import logging
import orjson

log = logging.getLogger(__name__)


def run_volume_analysis(
clickhouse_url: str, start_time: datetime, end_time: datetime, threshold: int
):

db = Clickhouse.from_url(clickhouse_url)
query = """
SELECT
probe_cc, probe_asn, engine_version,
software_name, software_version, platform, architecture,
toStartOfMinute(measurement_start_time) as minute_start,
count() as total
FROM fastpath
WHERE
measurement_start_time >= %(start_time)s AND
measurement_start_time < %(end_time)s
GROUP BY probe_cc, probe_asn, engine_version, software_name, software_version, platform, architecture, minute_start
HAVING total >= %(treshold)s
"""

rows = (
db.execute(
query,
params={
"start_time": start_time,
"end_time": end_time,
"treshold": threshold,
},
)
or []
)

if len(rows) == 0:
log.info("No volume anomalies where found")
return

# Prepare results to insert
values = []
for row in rows:

(
probe_cc,
probe_asn,
engine_version,
software_name,
software_version,
platform,
architecture,
minute_start,
total,
) = row

# Calculate end_time as start of next minute
minute_end = minute_start + timedelta(minutes=1)

details = {
"start_time": minute_start.isoformat(),
"end_time": minute_end.isoformat(),
"engine_version": engine_version,
"software_name": software_name,
"software_version": software_version,
"platform": platform,
"architecture": architecture,
"total": total,
"threshold": threshold,
}

values.append(("volume", probe_cc, probe_asn, orjson.dumps(details).decode()))

db.execute(
"INSERT INTO faulty_measurements (type, probe_cc, probe_asn, details) VALUES",
values,
types_check=True,
)
2 changes: 1 addition & 1 deletion oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _to_utc(ctx, param, d: datetime) -> datetime:
type=click.DateTime(),
callback=_to_utc,
default=str(datetime.now(timezone.utc).date() + timedelta(days=1)),
help="""the timestamp of the day for which we should start processing data (inclusive).
help="""the timestamp of the day for which we should start processing data (inclusive).

Note: this is the upload date, which doesn't necessarily match the measurement date.
""",
Expand Down
18 changes: 18 additions & 0 deletions oonipipeline/src/oonipipeline/db/create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,24 @@ def make_create_queries():
""",
"event_detector_cusums",
),
(
"""
CREATE TABLE IF NOT EXISTS faulty_measurements
(
`ts` DateTime64(3, 'UTC') DEFAULT now64(),
`type` String,
`uid` UUID DEFAULT generateUUIDv4(),
-- geoip lookup result for the probe IP
`probe_cc` String,
`probe_asn` UInt32,
-- JSON-encoded details about the anomaly
`details` String
)
ENGINE = ReplacingMergeTree
ORDER BY (ts, type, probe_cc, probe_asn, uid);
""",
"event_detector_cusums",
),
]
for model in table_models:
table_name = model.__table_name__
Expand Down
37 changes: 37 additions & 0 deletions oonipipeline/src/oonipipeline/tasks/time_inconsistencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
This file defines the task for the time inconsistencies analysis for faulty
measurements detection
"""

from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from ..analysis.time_inconsistencies import run_time_inconsistencies_analysis


@dataclass
class MakeTimeInconsistenciesParams:
clickhouse_url: str
timestamp: str
future_threshold: int
past_threshold: int


def make_time_inconsistencies_analysis(params: MakeTimeInconsistenciesParams):
if "T" in params.timestamp:
start_time = (datetime.strptime(params.timestamp, "%Y-%m-%dT%H")).replace(
tzinfo=timezone.utc
)
end_time = start_time + timedelta(hours=1)
else:
start_time = (datetime.strptime(params.timestamp, "%Y-%m-%d")).replace(
tzinfo=timezone.utc
)
end_time = start_time + timedelta(days=1)

run_time_inconsistencies_analysis(
clickhouse_url=params.clickhouse_url,
start_time=start_time,
end_time=end_time,
future_threshold=params.future_threshold,
past_threshold=params.past_threshold,
)
Loading