Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 168 additions & 13 deletions src/keepa/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from collections.abc import Sequence
from enum import Enum
from pathlib import Path
from typing import Any, Literal

import aiohttp
Expand Down Expand Up @@ -507,11 +508,105 @@ def wait_for_tokens(self) -> None:
time.sleep(tdelay)
self.update_status()

def download_graph_image(
self,
asin: str,
filename: str | Path,
domain: str | Domain = "US",
wait: bool = True,
**graph_kwargs: dict[str, Any],
) -> None:
"""
Download the graph image of an ASIN from keepa.

See `Graph Image API
<https://keepa.com/#!discuss/t/graph-image-api/7928>`_ for more
details.

Parameters
----------
asin : str
The ASIN of the product.
filename : str | pathlib.Path
Path to save the png to.
domain : str | keepa.Domain, default: 'US'
A valid Amazon domain. See :class:`keepa.Domain`.
wait : bool, default: True
Wait for available tokens before querying the keepa backend.
**graph_kwargs : dict[str, Any], optional
Optional graph keyword arguments. See `Graph Image API
<https://keepa.com/#!discuss/t/graph-image-api/7928>`_ for more
details.

Notes
-----
Graph images are cached for 90 minutes on a per-user basis. The cache
invalidates if any parameter changes. Submitting the exact same request
within this time frame will not consume any tokens.

Examples
--------
Download a keepa graph image showing the current Amazon price, new
price, and the sales rank of a product with ASIN ``"B09YNQCQKR"``.

>>> from keepa import Keepa
>>> api = Keepa("<YOUR_API_KEY>")
>>> api.download_graph_image(
... asin="B09YNQCQKR",
... filename="product_graph.png",
... amazon=1,
... new=1,
... salesrank=1,
... )

Show Amazon price, new and used graphs, buy box and FBA, for last 365
days, with custom width/height and custom colors. See
<https://keepa.com/#!discuss/t/graph-image-api/7928>`_ for more
details.

api.download_graph_image(
asin="B09YNQCQKR",
filename="product_graph_365.png",
domain="US",
amazon=1,
new=1,
used=1,
bb=1,
fba=1,
range=365,
width=800,
height=400,
cBackground="ffffff",
cAmazon="FFA500",
cNew="8888dd",
cUsed="444444",
cBB="ff00b4",
cFBA="ff5722"
)

"""
payload = {"asin": asin, "key": self.accesskey, "domain": _domain_to_dcode(domain)}
payload.update(graph_kwargs)

resp = self._request("graphimage", payload, wait=wait, is_json=False)

first_chunk = True
filename = Path(filename)
with open(filename, "wb") as f:
for chunk in resp.iter_content(8192):
if first_chunk:
if not chunk.startswith(b"\x89PNG\r\n\x1a\n"):
raise ValueError(
"Response from api.keepa.com/graphimage is not a valid PNG image"
)
first_chunk = False
f.write(chunk)

def query(
self,
items: str | Sequence[str],
stats: int | None = None,
domain: str = "US",
domain: str | Domain = "US",
history: bool = True,
offers: int | None = None,
update: int | None = None,
Expand Down Expand Up @@ -1673,8 +1768,16 @@ def deals(

return self._request("deal", payload, wait=wait)["deals"]

def _request(self, request_type, payload, wait: bool = True, raw_response: bool = False):
"""Query keepa api server.
def _request(
self,
request_type: str,
payload: dict[str, Any],
wait: bool = True,
raw_response: bool = False,
is_json: bool = True,
):
"""
Query keepa api server.

Parses raw response from keepa into a json format. Handles errors and
waits for available tokens if allowed.
Expand All @@ -1687,10 +1790,13 @@ def _request(self, request_type, payload, wait: bool = True, raw_response: bool
)
status_code = str(raw.status_code)

try:
response = raw.json()
except Exception:
raise RuntimeError(f"Invalid JSON from Keepa API (status {status_code})")
if is_json:
try:
response = raw.json()
except Exception:
raise RuntimeError(f"Invalid JSON from Keepa API (status {status_code})")
else:
return raw

# user status is always returned
if "tokensLeft" in response:
Expand Down Expand Up @@ -2010,21 +2116,28 @@ async def _product_query(self, items, product_code_is_asin=True, **kwargs):

@is_documented_by(Keepa.best_sellers_query)
async def best_sellers_query(
self, category, rank_avg_range=0, domain: str | Domain = "US", wait: bool = True
self,
category: str,
rank_avg_range: Literal[0, 30, 90, 180] = 0,
variations: bool = False,
sublist: bool = False,
domain: str | Domain = "US",
wait: bool = True,
):
"""Documented by Keepa.best_sellers_query."""
payload = {
"key": self.accesskey,
"domain": _domain_to_dcode(domain),
"variations": int(variations),
"sublist": int(sublist),
"category": category,
"range": rank_avg_range,
}

response = await self._request("bestsellers", payload, wait=wait)
if "bestSellersList" in response:
return response["bestSellersList"]["asinList"]
else: # pragma: no cover
log.info("Best sellers search results not yet available")
if "bestSellersList" not in response:
raise RuntimeError(f"Best sellers search results for {category} not yet available")
return response["bestSellersList"]["asinList"]

@is_documented_by(Keepa.search_for_categories)
async def search_for_categories(
Expand Down Expand Up @@ -2144,7 +2257,46 @@ async def deals(self, deal_parms, domain: str | Domain = "US", wait: bool = True
deals = await self._request("deal", payload, wait=wait)
return deals["deals"]

async def _request(self, request_type, payload, wait: bool = True, raw_response: bool = False):
@is_documented_by(Keepa.download_graph_image)
async def download_graph_image(
self,
asin: str,
filename: str | Path,
domain: str | Domain = "US",
wait: bool = True,
**graph_kwargs: dict[str, Any],
) -> None:
"""Documented in Keepa.download_graph_image."""
payload = {"asin": asin, "key": self.accesskey, "domain": _domain_to_dcode(domain)}
payload.update(graph_kwargs)

async with aiohttp.ClientSession() as session:
async with session.get(
"https://api.keepa.com/graphimage",
params=payload,
timeout=self._timeout,
) as resp:
first_chunk = True
filename = Path(filename)
with open(filename, "wb") as f:
async for chunk in resp.content.iter_chunked(8192):
if first_chunk:
if not chunk.startswith(b"\x89PNG\r\n\x1a\n"):
raise ValueError(
"Response from api.keepa.com/graphimage is not a valid "
"PNG image"
)
first_chunk = False
f.write(chunk)

async def _request(
self,
request_type: str,
payload: dict[str, Any],
wait: bool = True,
raw_response: bool = False,
is_json: bool = True,
):
"""Documented in Keepa._request."""
while True:
async with aiohttp.ClientSession() as session:
Expand All @@ -2155,6 +2307,9 @@ async def _request(self, request_type, payload, wait: bool = True, raw_response:
) as raw:
status_code = str(raw.status)

if not is_json:
return raw

try:
response = await raw.json()
except Exception:
Expand Down
14 changes: 14 additions & 0 deletions tests/test_async_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Test the asynchronous interface to the keepa backend.
"""

from pathlib import Path
import datetime
import os
import warnings
Expand Down Expand Up @@ -339,6 +344,15 @@ async def test_to_datetime_parm(api):
assert times[0].dtype == "<M8[m]"


@pytest.mark.asyncio
async def test_download_graph_image(api, tmp_path: Path) -> None:
filename = tmp_path / "out.png"
await api.download_graph_image(PRODUCT_ASIN, filename)

data = filename.read_bytes()
assert data.startswith(b"\x89PNG\r\n\x1a\n")


@pytest.mark.asyncio
async def test_plotting(api):
request = await api.query(PRODUCT_ASIN, history=True)
Expand Down
35 changes: 33 additions & 2 deletions tests/test_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Test the synchronous interface to the keepa backend.
"""

from pathlib import Path
import datetime
from itertools import chain
import os
Expand Down Expand Up @@ -388,7 +393,7 @@ def test_keepatime(api):
assert keepa.keepa_minutes_to_time(0, to_datetime=False)


def test_to_datetime_parm(api: Keepa):
def test_to_datetime_parm(api: Keepa) -> None:
product = api.query(PRODUCT_ASIN, to_datetime=True)[0]
times = product["data"]["AMAZON_time"]
assert isinstance(times[0], datetime.datetime)
Expand All @@ -398,7 +403,33 @@ def test_to_datetime_parm(api: Keepa):
assert times[0].dtype == "<M8[m]"


def test_plotting(api):
def test_download_graph_image(api: Keepa, tmp_path: Path) -> None:
filename = tmp_path / "out.png"
api.download_graph_image(
asin=PRODUCT_ASIN,
filename=filename,
domain="US",
amazon=1,
new=1,
used=1,
bb=1,
fba=1,
range=365,
width=800,
height=400,
cBackground="ffffff",
cAmazon="FFA500",
cNew="8888dd",
cUsed="444444",
cBB="ff00b4",
cFBA="ff5722",
)

data = filename.read_bytes()
assert data.startswith(b"\x89PNG\r\n\x1a\n")


def test_plotting(api: Keepa) -> None:
request = api.query(PRODUCT_ASIN, history=True)
product = request[0]
keepa.plot_product(product, show=False)
Expand Down