Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 106 additions & 90 deletions src/letta_client/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@
_T = TypeVar("_T")


def _next_cursor_params(*, params: Mapping[str, Any], first_id: str, last_id: str) -> dict[str, str]:
"""Advance cursors in the same traversal direction regardless of sort order.

- asc + forward traversal: use `after=<last>`
- desc + forward traversal: use `before=<first>`
- backward traversal (`before` explicitly set): keep using `before=<first>`
"""
if params.get("before", False):
return {"before": first_id}

if params.get("order") == "desc":
return {"before": first_id}

return {"after": last_id}


@runtime_checkable
class ArrayPageItem(Protocol):
id: Optional[str]
Expand Down Expand Up @@ -50,26 +66,26 @@ def _get_page_items(self) -> List[_T]:

@override
def next_page_info(self) -> Optional[PageInfo]:
is_forwards = not self._options.params.get("before", False)

items = self.items
if not items:
return None

if is_forwards:
item = cast(Any, items[-1])
if not isinstance(item, ArrayPageItem) or item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"after": item.id})
else:
item = cast(Any, self.items[0])
if not isinstance(item, ArrayPageItem) or item.id is None:
# TODO emit warning log
return None
first_item = cast(Any, items[0])
last_item = cast(Any, items[-1])
if not isinstance(first_item, ArrayPageItem) or first_item.id is None:
# TODO emit warning log
return None
if not isinstance(last_item, ArrayPageItem) or last_item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"before": item.id})
return PageInfo(
params=_next_cursor_params(
params=self._options.params,
first_id=first_item.id,
last_id=last_item.id,
)
)

@classmethod
def build(cls: Type[_BaseModelT], *, response: Response, data: object) -> _BaseModelT: # noqa: ARG003
Expand All @@ -93,26 +109,26 @@ def _get_page_items(self) -> List[_T]:

@override
def next_page_info(self) -> Optional[PageInfo]:
is_forwards = not self._options.params.get("before", False)

items = self.items
if not items:
return None

if is_forwards:
item = cast(Any, items[-1])
if not isinstance(item, ArrayPageItem) or item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"after": item.id})
else:
item = cast(Any, self.items[0])
if not isinstance(item, ArrayPageItem) or item.id is None:
# TODO emit warning log
return None
first_item = cast(Any, items[0])
last_item = cast(Any, items[-1])
if not isinstance(first_item, ArrayPageItem) or first_item.id is None:
# TODO emit warning log
return None
if not isinstance(last_item, ArrayPageItem) or last_item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"before": item.id})
return PageInfo(
params=_next_cursor_params(
params=self._options.params,
first_id=first_item.id,
last_id=last_item.id,
)
)

@classmethod
def build(cls: Type[_BaseModelT], *, response: Response, data: object) -> _BaseModelT: # noqa: ARG003
Expand All @@ -136,26 +152,26 @@ def _get_page_items(self) -> List[_T]:

@override
def next_page_info(self) -> Optional[PageInfo]:
is_forwards = not self._options.params.get("before", False)

messages = self.messages
if not messages:
return None

if is_forwards:
item = cast(Any, messages[-1])
if not isinstance(item, ObjectPageItem) or item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"after": item.id})
else:
item = cast(Any, self.messages[0])
if not isinstance(item, ObjectPageItem) or item.id is None:
# TODO emit warning log
return None
first_item = cast(Any, messages[0])
last_item = cast(Any, messages[-1])
if not isinstance(first_item, ObjectPageItem) or first_item.id is None:
# TODO emit warning log
return None
if not isinstance(last_item, ObjectPageItem) or last_item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"before": item.id})
return PageInfo(
params=_next_cursor_params(
params=self._options.params,
first_id=first_item.id,
last_id=last_item.id,
)
)


class AsyncObjectPage(BaseAsyncPage[_T], BasePage[_T], Generic[_T]):
Expand All @@ -170,26 +186,26 @@ def _get_page_items(self) -> List[_T]:

@override
def next_page_info(self) -> Optional[PageInfo]:
is_forwards = not self._options.params.get("before", False)

messages = self.messages
if not messages:
return None

if is_forwards:
item = cast(Any, messages[-1])
if not isinstance(item, ObjectPageItem) or item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"after": item.id})
else:
item = cast(Any, self.messages[0])
if not isinstance(item, ObjectPageItem) or item.id is None:
# TODO emit warning log
return None
first_item = cast(Any, messages[0])
last_item = cast(Any, messages[-1])
if not isinstance(first_item, ObjectPageItem) or first_item.id is None:
# TODO emit warning log
return None
if not isinstance(last_item, ObjectPageItem) or last_item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"before": item.id})
return PageInfo(
params=_next_cursor_params(
params=self._options.params,
first_id=first_item.id,
last_id=last_item.id,
)
)


class SyncNextFilesPage(BaseSyncPage[_T], BasePage[_T], Generic[_T]):
Expand All @@ -214,26 +230,26 @@ def has_next_page(self) -> bool:

@override
def next_page_info(self) -> Optional[PageInfo]:
is_forwards = not self._options.params.get("before", False)

files = self.files
if not files:
return None

if is_forwards:
item = cast(Any, files[-1])
if not isinstance(item, NextFilesPageItem) or item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"after": item.id})
else:
item = cast(Any, self.files[0])
if not isinstance(item, NextFilesPageItem) or item.id is None:
# TODO emit warning log
return None
first_item = cast(Any, files[0])
last_item = cast(Any, files[-1])
if not isinstance(first_item, NextFilesPageItem) or first_item.id is None:
# TODO emit warning log
return None
if not isinstance(last_item, NextFilesPageItem) or last_item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"before": item.id})
return PageInfo(
params=_next_cursor_params(
params=self._options.params,
first_id=first_item.id,
last_id=last_item.id,
)
)


class AsyncNextFilesPage(BaseAsyncPage[_T], BasePage[_T], Generic[_T]):
Expand All @@ -258,23 +274,23 @@ def has_next_page(self) -> bool:

@override
def next_page_info(self) -> Optional[PageInfo]:
is_forwards = not self._options.params.get("before", False)

files = self.files
if not files:
return None

if is_forwards:
item = cast(Any, files[-1])
if not isinstance(item, NextFilesPageItem) or item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"after": item.id})
else:
item = cast(Any, self.files[0])
if not isinstance(item, NextFilesPageItem) or item.id is None:
# TODO emit warning log
return None
first_item = cast(Any, files[0])
last_item = cast(Any, files[-1])
if not isinstance(first_item, NextFilesPageItem) or first_item.id is None:
# TODO emit warning log
return None
if not isinstance(last_item, NextFilesPageItem) or last_item.id is None:
# TODO emit warning log
return None

return PageInfo(params={"before": item.id})
return PageInfo(
params=_next_cursor_params(
params=self._options.params,
first_id=first_item.id,
last_id=last_item.id,
)
)