Skip to content

Commit fab4b5e

Browse files
authored
fix: improve interval cache further (#1934)
1 parent 1c9bdee commit fab4b5e

File tree

2 files changed

+74
-65
lines changed

2 files changed

+74
-65
lines changed

sqlmesh/core/snapshot/definition.py

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from collections import defaultdict
66
from datetime import datetime, timedelta
77
from enum import IntEnum
8+
from functools import lru_cache
89

910
from pydantic import Field
1011
from sqlglot import exp
@@ -16,7 +17,6 @@
1617
from sqlmesh.core.model.definition import _Model
1718
from sqlmesh.core.node import IntervalUnit, NodeType
1819
from sqlmesh.utils import sanitize_name
19-
from sqlmesh.utils.cron import get_ts_range
2020
from sqlmesh.utils.dag import DAG
2121
from sqlmesh.utils.date import (
2222
TimeLike,
@@ -795,30 +795,10 @@ def missing_intervals(
795795
end_ts = min(end_ts, to_timestamp(interval_unit.cron_floor(upper_bound_ts)))
796796

797797
lookback = self.model.lookback if self.is_model else 0
798-
dates = get_ts_range(interval_unit.cron_expr, start_ts, end_ts, upper_bound_ts, lookback)
799798

800-
missing = []
801-
for i in range(len(dates)):
802-
if dates[i] >= end_ts:
803-
break
804-
current_ts = dates[i]
805-
next_ts = (
806-
dates[i + 1]
807-
if i + 1 < len(dates)
808-
else min(to_timestamp(interval_unit.cron_next(current_ts)), upper_bound_ts)
809-
)
810-
compare_ts = seq_get(dates, i + lookback) or dates[-1]
811-
812-
for low, high in intervals:
813-
if compare_ts < low:
814-
missing.append((current_ts, next_ts))
815-
break
816-
elif current_ts >= low and compare_ts < high:
817-
break
818-
else:
819-
missing.append((current_ts, next_ts))
820-
821-
return missing
799+
return compute_missing_intervals(
800+
interval_unit, tuple(intervals), start_ts, end_ts, upper_bound_ts, lookback
801+
)
822802

823803
def categorize_as(self, category: SnapshotChangeCategory) -> None:
824804
"""Assigns the given category to this snapshot.
@@ -1484,6 +1464,75 @@ def missing_intervals(
14841464
return missing
14851465

14861466

1467+
@lru_cache(maxsize=None)
1468+
def compute_missing_intervals(
1469+
interval_unit: IntervalUnit,
1470+
intervals: t.Tuple[Interval, ...],
1471+
start_ts: int,
1472+
end_ts: int,
1473+
upper_bound_ts: int,
1474+
lookback: int,
1475+
) -> Intervals:
1476+
"""Computes all missing intervals between start and end given intervals.
1477+
1478+
Args:
1479+
interval_unit: The interval unit.
1480+
intervals: The intervals to check what's missing.
1481+
start_ts: Inclusive timestamp start.
1482+
end_ts: Exclusive timestamp end.
1483+
upper_bound_ts: The exclusive upper bound timestamp for lookback.
1484+
lookback: A lookback window.
1485+
1486+
Returns:
1487+
A list of all timestamps in this range.
1488+
"""
1489+
croniter = interval_unit.croniter(start_ts)
1490+
timestamps = [start_ts]
1491+
1492+
# get all individual timestamps with the addition of extra lookback timestamps up to the execution date
1493+
# when a model has lookback, we need to check all the intervals between itself and its lookback exist.
1494+
while True:
1495+
ts = to_timestamp(croniter.get_next(estimate=True))
1496+
1497+
if ts < end_ts:
1498+
timestamps.append(ts)
1499+
else:
1500+
croniter.get_prev(estimate=True)
1501+
break
1502+
1503+
for _ in range(lookback):
1504+
ts = to_timestamp(croniter.get_next(estimate=True))
1505+
if ts < upper_bound_ts:
1506+
timestamps.append(ts)
1507+
else:
1508+
break
1509+
1510+
missing = []
1511+
for i in range(len(timestamps)):
1512+
if timestamps[i] >= end_ts:
1513+
break
1514+
current_ts = timestamps[i]
1515+
next_ts = (
1516+
timestamps[i + 1]
1517+
if i + 1 < len(timestamps)
1518+
else min(
1519+
to_timestamp(interval_unit.cron_next(current_ts, estimate=True)), upper_bound_ts
1520+
)
1521+
)
1522+
compare_ts = seq_get(timestamps, i + lookback) or timestamps[-1]
1523+
1524+
for low, high in intervals:
1525+
if compare_ts < low:
1526+
missing.append((current_ts, next_ts))
1527+
break
1528+
elif current_ts >= low and compare_ts < high:
1529+
break
1530+
else:
1531+
missing.append((current_ts, next_ts))
1532+
1533+
return missing
1534+
1535+
14871536
def earliest_start_date(
14881537
snapshots: t.Collection[Snapshot],
14891538
cache: t.Optional[t.Dict[str, datetime]] = None,

sqlmesh/utils/cron.py

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,47 +7,7 @@
77
from croniter import croniter
88
from sqlglot.helper import first
99

10-
from sqlmesh.utils.date import TimeLike, now, to_datetime, to_timestamp
11-
12-
13-
@lru_cache(maxsize=None)
14-
def get_ts_range(
15-
cron: str, start_ts: int, end_ts: int, upper_bound_ts: int, lookback: int
16-
) -> t.List[int]:
17-
"""Computes all timestamps between start and end given a cron expression and lookback.
18-
19-
Args:
20-
cron: The cron string.
21-
start_ts: Inclusive timestamp start.
22-
end_ts: Exclusive timestamp end.
23-
upper_bound_ts: The exclusive upper bound timestamp for lookback.
24-
lookback: A lookback window.
25-
26-
Returns:
27-
A list of all timestamps in this range.
28-
"""
29-
croniter = CroniterCache(cron, start_ts)
30-
timestamps = [start_ts]
31-
32-
# get all individual timestamps with the addition of extra lookback timestamps up to the execution date
33-
# when a model has lookback, we need to check all the intervals between itself and its lookback exist.
34-
while True:
35-
ts = to_timestamp(croniter.get_next(estimate=True))
36-
37-
if ts < end_ts:
38-
timestamps.append(ts)
39-
else:
40-
croniter.get_prev(estimate=True)
41-
break
42-
43-
for _ in range(lookback):
44-
ts = to_timestamp(croniter.get_next(estimate=True))
45-
if ts < upper_bound_ts:
46-
timestamps.append(ts)
47-
else:
48-
break
49-
50-
return timestamps
10+
from sqlmesh.utils.date import TimeLike, now, to_datetime
5111

5212

5313
@lru_cache(maxsize=None)

0 commit comments

Comments
 (0)