From 1c9f3fcc461c0396d54455eb0552cfdb5eec88ab Mon Sep 17 00:00:00 2001 From: Mark Date: Sun, 9 Mar 2025 15:47:14 -0500 Subject: [PATCH] Enhance OTXv2 usability with improved .getall() docstring and API key validation This commit improves the OTXv2.py SDK by: 1. Adding a detailed schema to the .getall() docstring, clarifying the structure of returned pulse and indicator data to reduce transformation errors. 2. Implementing proactive API key validation in __init__ to catch format errors (non-64-char hex strings) before API calls, preventing runtime 403 errors. These changes enhance developer experience by providing better documentation and early error detection, inspired by real-world usage in an ETL pipeline. --- OTXv2.py | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/OTXv2.py b/OTXv2.py index 9c14f6f..c4ec848 100755 --- a/OTXv2.py +++ b/OTXv2.py @@ -98,10 +98,14 @@ def __init__( user_agent=None, verify=True, cert=None ): self.key = api_key + # Validates API Key length and hexadecimal format + if not isinstance(api_key, str) or len(api_key) != 64 or not all(c in '0123456789abcdefABCDEF' for c in api_key): + raise ValueError(f"Invalid API key: '{api_key}'. Must be a 64-character hexadecimal string.") + self.server = server self.verify = verify self.cert = cert - + self.proxies = {} if proxy: self.proxies['http'] = proxy @@ -386,12 +390,40 @@ def walkapi(self, url, iter=False, max_page=None, max_items=None, method='GET', def getall(self, modified_since=None, author_name=None, limit=50, max_page=None, max_items=None, iter=False): """ - Get all pulses user is subscribed to. + Get all pulses the user is subscribed to. :param modified_since: datetime object representing earliest date you want returned in results :param author_name: Name of pulse author to limit results to :param limit: The page size to retrieve in a single request - :return: the consolidated set of pulses for the user + :return: + :list or iterator: If iter=False, a list of pulse dicts; if iter=True, an iterator over pulses. + :list: + - id (str): Pulse identifier (24-char hex) + - name (str): Pulse name + - description (str, optional): Pulse description + - author_name (str): Author’s username + - public (str): Defaults to 'True' + - revision (int): Revision number + - adversary (str, optional): Adversary name + - industries (list): List of industries + - tlp (str): Traffic Light Protocol (lowercase: 'white', 'green', etc.) + - tags (list): List of tags + - created (str): ISO timestamp + - modified (str): ISO timestamp + - references (list): External references + - targeted_countries (list): Country codes/names + - indicators (list): List of indicator dicts with: + - id (str): Numeric string (can exceed 32-bit int) + - indicator (str): IOC value + - type (str): Indicator type (e.g., 'IPv4') + - title (str, optional) + - description (str, optional) + - created (str): ISO timestamp + - is_active (int): 0 or 1 + - content (str, optional) + - expiration (str, optional) + - role (str, optional) """ + args = {'limit': limit} if modified_since is not None: if isinstance(modified_since, (datetime.datetime, datetime.date)):