From 1acd0ee909c1ba387acd9d5376b66273b2cb64ce Mon Sep 17 00:00:00 2001 From: josmanperez Date: Wed, 30 Oct 2024 13:51:00 +0000 Subject: [PATCH] * Added script to download and concatenate logs for an App Services application, e.g, Triggers. More information can be found in the README file --- atlas-app-logs-aggregator/.gitignore | 25 +++ atlas-app-logs-aggregator/LICENSE | 21 +++ atlas-app-logs-aggregator/README.md | 135 +++++++++++++++++ atlas-app-logs-aggregator/auth.py | 55 +++++++ atlas-app-logs-aggregator/config.py | 7 + atlas-app-logs-aggregator/log_pager.py | 168 +++++++++++++++++++++ atlas-app-logs-aggregator/logger.py | 111 ++++++++++++++ atlas-app-logs-aggregator/main.py | 129 ++++++++++++++++ atlas-app-logs-aggregator/requirements.txt | 5 + atlas-app-logs-aggregator/utils.py | 121 +++++++++++++++ 10 files changed, 777 insertions(+) create mode 100644 atlas-app-logs-aggregator/.gitignore create mode 100644 atlas-app-logs-aggregator/LICENSE create mode 100644 atlas-app-logs-aggregator/README.md create mode 100644 atlas-app-logs-aggregator/auth.py create mode 100644 atlas-app-logs-aggregator/config.py create mode 100644 atlas-app-logs-aggregator/log_pager.py create mode 100644 atlas-app-logs-aggregator/logger.py create mode 100644 atlas-app-logs-aggregator/main.py create mode 100644 atlas-app-logs-aggregator/requirements.txt create mode 100644 atlas-app-logs-aggregator/utils.py diff --git a/atlas-app-logs-aggregator/.gitignore b/atlas-app-logs-aggregator/.gitignore new file mode 100644 index 00000000..b21326f8 --- /dev/null +++ b/atlas-app-logs-aggregator/.gitignore @@ -0,0 +1,25 @@ +# Ignore virtual environment folder +venv/ + +# Ignore Python bytecode files +__pycache__/ +*.py[cod] +*$py.class + +# Ignore log files +*.log +logs.json + +# Ignore system files +.DS_Store + +# Ignore IDE/editor specific files +.vscode/ +.idea/ + +# Ignore environment variable files +.env + +# Application logs +app_*.log +logs/* \ No newline at end of file diff --git a/atlas-app-logs-aggregator/LICENSE b/atlas-app-logs-aggregator/LICENSE new file mode 100644 index 00000000..ec8ca247 --- /dev/null +++ b/atlas-app-logs-aggregator/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Josman + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/atlas-app-logs-aggregator/README.md b/atlas-app-logs-aggregator/README.md new file mode 100644 index 00000000..06746515 --- /dev/null +++ b/atlas-app-logs-aggregator/README.md @@ -0,0 +1,135 @@ +# MongoDB Atlas Logs Fetcher + +This tool is a Python script designed to fetch logs from a MongoDB Atlas App Services application using pagination. It supports optional date range filtering and provides a way to authenticate using MongoDB Atlas API keys. + +## Features + +- Fetch logs from MongoDB Atlas App Services application. +- Supports pagination to handle large sets of logs. +- Optional date range filtering using `start_date` and `end_date` parameters. +- Validates date inputs to ensure they follow the ISO 8601 format. +- Authenticates using MongoDB Atlas public and private API keys. +- Optional `user_id` for user id filtering logs. +- Optional `co_id` for correlation id filtering logs. +- Fetch only error logs using the `errors_only` option. +- Filter logs by key-value pairs using the `--filter` option. + +## Requirements + +- Python 3.6 or higher. +- `requirements.txt` library dependencies. + +## Installation + +### Create a virtual environment + +```bash +python3 -m venv venv +source venv/bin/activate # On Windows use `venv\Scripts\activate` +``` + +### Install dependencies + +```bash +pip install -r requirements.txt +``` + +## Usage + +### Command-Line Arguments + +- `project_id` (**required**): The Atlas Project ID (hexadecimal string). +app_id (**required**): The App ID (string). +- `public_api_key` (**required**): The Atlas Public API Key (string). +- `private_api_key` (**required**): The Atlas Private API Key (string with hyphens). +- `--start_date` (optional): Start Date in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.MMMZ). +- `--end_date` (optional): End Date in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.MMMZ). +- `--type` (optional): Comma-separated list of supported log types. Currently, the types available are: `TRIGGER_FAILURE, TRIGGER_ERROR_HANDLER, DB_TRIGGER, AUTH_TRIGGER, SCHEDULED_TRIGGER, FUNCTION, SERVICE_FUNCTION, STREAM_FUNCTION, SERVICE_STREAM_FUNCTION, AUTH, WEBHOOK, ENDPOINT, PUSH, API, API_KEY, GRAPHQL, SYNC_CONNECTION_START, SYNC_CONNECTION_END, SYNC_SESSION_START, SYNC_SESSION_END, SYNC_CLIENT_WRITE, SYNC_ERROR, SYNC_OTHER, SCHEMA_ADDITIVE_CHANGE, SCHEMA_GENERATION, SCHEMA_VALIDATION, LOG_FORWARDER` +- `--user_id` (optional): Return only log messages associated with the given user_id. +- `--co_id` (optional): Return only log messages associated with the given request Correlation ID. +- `--filter` (optional): Filter logs by key-value pairs (e.g., `--filter event_subscription_name=,function_name=`). +- `--errors_only` (optional): Return only error log messages. +- `--verbose` (optional): Enable verbose logging information. + +### Example + +```bash +python main.py --start_date 2024-10-05T14:30:00.000Z --end_date 2024-10-06T14:30:00.000Z --type TRIGGER_FAILURE,SCHEMA_GENERATION +``` + +With optional parameters + +```bash +python main.py --start_date 2024-10-05T14:30:00.000Z --type TRIGGER_FAILURE,SCHEMA_GENERATION --user_id 671d2e2010733ecbaa2bab8f --filter event_subscription_name=getUnpausedClustersMetrics +``` + +If `start_date` and `end_date` are not provided, the script will default `start_date` to the last 24 hours from the current time. + +##  Filtering Logs + +The `--filter` option allows you to filter logs by key-value pairs. This option accepts multiple key-value pairs separated by spaces. Each key-value pair should be in the format key=value. + +The `key-value` pair must be the values returned by the endpoint. This way it will use them to filter and only keep those that match. For example, for a `"type": "SCHEDULED_TRIGGER"`, the response key-values will be similar to: + +```json +{ + "_id": "671d2e2010733ecbaa2bab8f", + "co_id": "671d2e2010733ecbaa2bab8d", + "type": "SCHEDULED_TRIGGER", + "domain_id": "65b0fc719629ac8e4d8e8774", + "app_id": "65b0fc719629ac8e4d8e8773", + "group_id": "658d46ca7605526eb45222a4", + "request_url": "", + "request_method": "", + "started": "2024-10-26T18:00:00.041Z", + "completed": "2024-10-26T18:00:04.124Z", + "function_id": "65f31f9f3bfc77348cb1e2e7", + "function_name": "getOrgClustersProjects", + "error": "FunctionError: Cannot access member 'db' of undefined", + "event_subscription_id": "65f335c53d26a2b1ba5d7ba2", + "event_subscription_name": "getUnpausedClustersMetrics", + "messages": [ + "reading projects for page: 1", + "hay m\u00e1s p\u00e1ginas", + "reading projects for page: 2", + "fin" + ], + "mem_time_usage": 4081000000 +} +``` + +We can use any of this in the `--filter` option (e.g., `--filter event_subscription_name=getUnpausedClustersMetrics`) + +## Logging + +The script supports logging to both the console and a log file. By default, log files are stored in the logs folder. The log file name includes a timestamp to ensure uniqueness for each run. + +`--verbose`: When this flag is used, the log level is set to `DEBUG`, providing detailed logging information. Without this flag, the log level is set to `INFO`. + +### Log File Location + +Log files are stored in the logs folder. Each log file is named with a timestamp to ensure that logs from different runs do not overwrite each other. + +### Example Log File Name + +```bash +logs/app_20241005_143000.log +``` + +## Benefits + +- **Automated Log Retrieval**: Easily fetch logs from MongoDB Atlas App Services without manual intervention. +- **Date Range Filtering**: Filter logs by date range to focus on specific periods. +- **Pagination Support**: Handle large sets of logs efficiently using pagination. +- **Validation**: Ensure date inputs are in the correct format to avoid errors. + +## DISCLAIMER + +Please note: This repo is released for use "AS IS" without any warranties of any kind, including, but not limited to their installation, use, or performance. We disclaim any and all warranties, either express or implied, including but not limited to any warranty of noninfringement, merchantability, and/ or fitness for a particular purpose. We do not warrant that the technology will meet your requirements, that the operation thereof will be uninterrupted or error-free, or that any errors will be corrected. + +Any use of these scripts and tools is at your own risk. There is no guarantee that they have been through thorough testing in a comparable environment and we are not responsible for any damage or data loss incurred with their use. + +You are responsible for reviewing and testing any scripts you run thoroughly before use in any non-testing environment. + +Thanks, +The MongoDB Support Team diff --git a/atlas-app-logs-aggregator/auth.py b/atlas-app-logs-aggregator/auth.py new file mode 100644 index 00000000..58eb9bcc --- /dev/null +++ b/atlas-app-logs-aggregator/auth.py @@ -0,0 +1,55 @@ +import requests +from config import ADMIN_API_BASE_URL +from logger import Logger + +""" +auth.py + +This module contains the function for authenticating with MongoDB Atlas using +public and private API keys. + +Constants: + ADMIN_API_BASE_URL (str): The base URL for the MongoDB Atlas Admin API. + +Functions: + authenticate(public_api_key, private_api_key): Authenticate with MongoDB Atlas and obtain an access token. +""" + + +def authenticate(public_api_key, private_api_key, logger=None): + """ + Authenticate with MongoDB Atlas using the provided public and private API keys. + + This function sends a POST request to the MongoDB Atlas authentication endpoint + with the provided public and private API keys. If the authentication is successful, + it returns the access token. + + Args: + public_api_key (str): The public API key for MongoDB Atlas. + private_api_key (str): The private API key for MongoDB Atlas. + + Returns: + str: The access token obtained from MongoDB Atlas. + + Raises: + requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code. + requests.exceptions.RequestException: For other types of request-related errors. + """ + if logger is None: + logger = Logger() + + url = f"{ADMIN_API_BASE_URL}/auth/providers/mongodb-cloud/login" + headers = {"Content-Type": "application/json", "Accept": "application/json"} + payload = {"username": public_api_key, "apiKey": private_api_key} + + logger.info("Sending authentication request to MongoDB Atlas.") + logger.debug(f"Authenticating to {url} with public key: {public_api_key}") + response = requests.post(url, headers=headers, json=payload) + try: + response.raise_for_status() + logger.info("Authentication successful.") + logger.debug(f"Received access token: {response.json()['access_token']}") + return response.json()["access_token"] + except requests.exceptions.HTTPError as err: + logger.error(f"Authentication failed: {err}") + raise diff --git a/atlas-app-logs-aggregator/config.py b/atlas-app-logs-aggregator/config.py new file mode 100644 index 00000000..8d442e13 --- /dev/null +++ b/atlas-app-logs-aggregator/config.py @@ -0,0 +1,7 @@ +""" +config.py + +This module contains configuration constants for the application. +""" + +ADMIN_API_BASE_URL = "https://services.cloud.mongodb.com/api/admin/v3.0" \ No newline at end of file diff --git a/atlas-app-logs-aggregator/log_pager.py b/atlas-app-logs-aggregator/log_pager.py new file mode 100644 index 00000000..59d20ba7 --- /dev/null +++ b/atlas-app-logs-aggregator/log_pager.py @@ -0,0 +1,168 @@ +import requests +from logger import Logger +from config import ADMIN_API_BASE_URL + +""" +log_pager.py + +This module contains the LogPager class, which is responsible for fetching logs +from a MongoDB Atlas App Services application using pagination. + +Classes: + LogPager: A class to handle pagination and fetching logs from MongoDB Atlas. +""" + + +class LogPager: + """ + A class to handle pagination and fetching logs from MongoDB Atlas. + + Attributes: + logs_endpoint (str): The endpoint URL for fetching logs. + query_params (dict): The query parameters for the log request. + auth_headers (dict): The authorization headers for the log request. + logger (Logger): The logger instance for logging operations. + """ + + def __init__( + self, + project_id, + app_id, + access_token, + query_params={}, + filtering={}, + logger=None, + ): + """ + Initialize the LogPager with project ID, app ID, access token, query parameters, and logger. + + Args: + project_id (str): The Atlas Project ID. + app_id (str): The App ID. + access_token (str): The access token obtained from MongoDB Atlas. + query_params (dict, optional): The query parameters for the log request. Defaults to {}. + logger (Logger, optional): The logger instance for logging operations. Defaults to None. + """ + self.logs_endpoint = ( + f"{ADMIN_API_BASE_URL}/groups/{project_id}/apps/{app_id}/logs" + ) + self.query_params = query_params + self.filtering = filtering + self.auth_headers = {"Authorization": f"Bearer {access_token}"} + self.logger = logger or Logger() + + def get_next_page(self, prev_page=None): + """ + Fetch the next page of logs. + + Args: + prev_page (dict, optional): The previous page of logs. Defaults to None. + + Returns: + dict: The next page of logs. + + Raises: + Exception: If there are no more pages to fetch. + requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code. + """ + next_end_date = prev_page.get("nextEndDate") if prev_page else None + self.logger.debug(f"Fetching logs with end date: {next_end_date}") + next_skip = prev_page.get("nextSkip") if prev_page else None + self.logger.debug(f"Fetching logs with skip: {next_skip}") + if prev_page and not next_end_date: + self.logger.error("Paginated API does not have any more pages.") + raise Exception("Paginated API does not have any more pages.") + + params = {**self.query_params, "end_date": next_end_date, "skip": next_skip} + self.logger.debug(f"Fetching logs with params: {params}") + try: + response = requests.get( + self.logs_endpoint, headers=self.auth_headers, params=params + ) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as e: + self.logger.error(f"HTTP error occurred: {e}") + try: + error_message = response.json().get( + "error", "No error message provided" + ) + self.logger.error(f"Error message from response: {error_message}") + except ValueError as e: + self.logger.error("Failed to parse error message from response") + raise e + + def filter_logs(self, logs): + """ + Filter logs based on the filtering dictionary. + + Args: + logs (list): List of log entries. + + Returns: + list: Filtered list of log entries. + """ + if not self.filtering: + return logs + + def log_matches_filter(log): + """ + Check if a log entry matches the filtering criteria and log the process. + + Args: + log (dict): A log entry. + + Returns: + bool: True if the log entry matches the filtering criteria, False otherwise. + """ + for ( + key, + value, + ) in ( + self.filtering.items() + ): # iterates over the key-value pairs in the filtering dictionary. + + log_value = log.get(key) + if log_value is None: + self.logger.debug(f"Key '{key}' not found in log entry: {log}") + return False + if log_value != value: + self.logger.debug( + f"Value mismatch for key '{key}': expected '{value}', found '{log_value}'" + ) + return False + return True + + """ + Iterates over each log in the logs list and applies the log_matches_filter function. + * Only the first log entry matches the filtering criteria, so it is included in the filtered_logs list. + * The other log entries do not match the criteria and are excluded from the filtered_logs list. + """ + filtered_logs = [log for log in logs if log_matches_filter(log)] + return filtered_logs + + def get_all_logs(self): + """ + Fetch all logs using pagination. + + Returns: + list: A list of all logs. + + Raises: + requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code. + """ + logs = [] + has_next = True + prev_page = None + page_number = 1 # Initialize page counter + while has_next: + self.logger.info( + f"Fetching page {page_number}..." + ) # Log current page number + page = self.get_next_page(prev_page) + filtered_logs = self.filter_logs(page["logs"]) + logs.extend(filtered_logs) + has_next = "nextEndDate" in page + prev_page = page + page_number += 1 # Increment page counter + return logs diff --git a/atlas-app-logs-aggregator/logger.py b/atlas-app-logs-aggregator/logger.py new file mode 100644 index 00000000..51965d74 --- /dev/null +++ b/atlas-app-logs-aggregator/logger.py @@ -0,0 +1,111 @@ +import logging +import os +from datetime import datetime + + +class Logger: + """ + A singleton Logger class to handle logging operations. + + This class ensures that only one instance of the logger is created and used + throughout the application. It supports logging to both a file and the console, + with different log levels (INFO and DEBUG). + + Attributes: + _instance (Logger): The singleton instance of the Logger class. + _initialized (bool): A flag to indicate if the logger has been initialized. + logger (logging.Logger): The underlying logger instance from the logging module. + """ + + _instance = None + + def __new__(cls, *args, **kwargs): + """ + Ensure that only one instance of the Logger class is created. + + Args: + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + Logger: The singleton instance of the Logger class. + """ + if cls._instance is None: + cls._instance = super(Logger, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self, log_file=None, verbose=False): + """ + Initialize the Logger instance. + + This method sets up the logger with a file handler and a console handler, + and configures the log level based on the verbose flag. + + Args: + log_file (str, optional): The name of the log file. If not provided, a timestamped log file name is used. + verbose (bool): A flag to set the log level to DEBUG if True, or INFO if False. Defaults to False. + """ + + if self._initialized: + return + self._initialized = True + + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + if log_file is None: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = os.path.join(log_dir, f"app_{timestamp}.log") + else: + log_file = os.path.join(log_dir, log_file) + + self.logger = logging.getLogger("AppLogger") + self.logger.setLevel(logging.DEBUG if verbose else logging.INFO) + + # Check if handlers already exist + if not self.logger.handlers: + # Create file handler + fh = logging.FileHandler(log_file) + fh.setLevel(logging.DEBUG if verbose else logging.INFO) + + # Create console handler + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG if verbose else logging.INFO) + + # Create formatter and add it to the handlers + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + fh.setFormatter(formatter) + ch.setFormatter(formatter) + + # Add the handlers to the logger + self.logger.addHandler(fh) + self.logger.addHandler(ch) + + def info(self, message): + """ + Log an info-level message. + + Args: + message (str): The message to log. + """ + self.logger.info(message) + + def debug(self, message): + """ + Log a debug-level message. + + Args: + message (str): The message to log. + """ + self.logger.debug(message) + + def error(self, message): + """ + Log an error-level message. + + Args: + message (str): The message to log. + """ + self.logger.error(message) diff --git a/atlas-app-logs-aggregator/main.py b/atlas-app-logs-aggregator/main.py new file mode 100644 index 00000000..fc26fac2 --- /dev/null +++ b/atlas-app-logs-aggregator/main.py @@ -0,0 +1,129 @@ +import argparse +import json +from auth import authenticate +from log_pager import LogPager +from logger import Logger +from utils import ( + validate_hex, + validate_string, + validate_private_key, + validate_date, + validate_types, +) + + +def parse_filtering_args(filtering_args): + """ + Parse filtering arguments into a dictionary. + + Args: + filtering_args (list): List of key-value pairs. + + Returns: + dict: Dictionary of parsed key-value pairs. + """ + filter_dic = {} + for arg in filtering_args: + key, value = arg.split("=") + filter_dic[key] = value + return filter_dic + + +def main(): + parser = argparse.ArgumentParser( + description="Fetch logs from App Services Application using pagination." + ) + parser.add_argument( + "project_id", type=validate_hex, help="Atlas Project ID (hexadecimal string)" + ) + parser.add_argument("app_id", type=validate_hex, help="App ID (string)") + parser.add_argument( + "public_api_key", type=validate_string, help="Atlas Public API Key (string)" + ) + parser.add_argument( + "private_api_key", + type=validate_private_key, + help="Atlas Private API Key (hexadecimal string)", + ) + parser.add_argument( + "--start_date", + type=validate_date, + default=None, + help="Start Date in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.MMMZ)", + ) + parser.add_argument( + "--end_date", + type=validate_date, + default=None, + help="End Date in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.MMMZ)", + ) + parser.add_argument( + "--user_id", + type=validate_hex, + default=None, + help="Return only log messages associated with the given user_id.", + ) + parser.add_argument( + "--co_id", + type=validate_hex, + default=None, + help="Return only log messages associated with the given request Correlation ID.", + ) + parser.add_argument( + "--type", + type=validate_types, + default=None, + help="Comma-separated list of log types to fetch", + ) + parser.add_argument( + "--errors_only", action="store_true", help="Return only error log messages" + ) + parser.add_argument( + "--filter", + nargs="+", + help="Filter logs by key-value pairs (e.g., --filter key1=value1 key2=value2)", + ) + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + filter_dic = parse_filtering_args(args.filter) if args.filter else {} + + logger = Logger(verbose=args.verbose) + logger.info("Starting log fetching process...") + + try: + access_token = authenticate(args.public_api_key, args.private_api_key) + query_params = { + "start_date": args.start_date, + "end_date": args.end_date, + "type": args.type, + "user_id": args.user_id, + "co_id": args.co_id, + } + if args.errors_only: + query_params["errors_only"] = ( + "true" # Add the only_error parameter if the flag is specified + ) + + pager = LogPager( + args.project_id, + args.app_id, + access_token, + query_params, + filtering=filter_dic, # Pass the filtering dictionary + logger=logger, + ) + + all_logs = pager.get_all_logs() + with open("logs.json", "w") as file: + json.dump(all_logs, file, indent=4) + + logger.info("Log fetching process completed successfully.") + + except Exception as e: + logger.error(f"An error occurred: {str(e)}") + + +if __name__ == "__main__": + main() diff --git a/atlas-app-logs-aggregator/requirements.txt b/atlas-app-logs-aggregator/requirements.txt new file mode 100644 index 00000000..9ee2289f --- /dev/null +++ b/atlas-app-logs-aggregator/requirements.txt @@ -0,0 +1,5 @@ +certifi==2024.8.30 +charset-normalizer==3.4.0 +idna==3.10 +requests==2.32.3 +urllib3==2.2.3 diff --git a/atlas-app-logs-aggregator/utils.py b/atlas-app-logs-aggregator/utils.py new file mode 100644 index 00000000..c16795a8 --- /dev/null +++ b/atlas-app-logs-aggregator/utils.py @@ -0,0 +1,121 @@ +import re +import argparse +from functools import wraps + +""" +utils.py + +This module contains utility functions for validating various types of input values. +These functions are used to ensure that input values meet specific criteria before +they are processed by other parts of the application. + +Functions: + validate_hex(value): Validate that the given string is a valid hexadecimal string. + validate_string(value): Validate that the given string is a non-empty string. + validate_private_key(value): Validate that the given string is a valid private key format. + validate_date(value): Validate that the given date string follows the ISO 8601 format. + validate_types(value): Validate that the given string is a comma-separated list of valid log types. +""" + + +def validate_hex(value): + """ + validate_hex function checks if the input is a valid ObjectID hexadecimal string. + """ + if not re.fullmatch(r"^[0-9a-fA-F]{24}$", value): + raise argparse.ArgumentTypeError(f"{value} is not a valid ObjectID hexadecimal string") + + return value + + +def validate_string(value): + """ + validate_string function checks if the input is a valid string. + """ + if not isinstance(value, str) or not value.strip(): + raise argparse.ArgumentTypeError(f"{value} is not a valid string") + + return value + + +def validate_private_key(value): + """ + validate_private_key function checks if the input is a valid private key format. + """ + if not re.fullmatch(r"[0-9a-fA-F-]+", value): + raise argparse.ArgumentTypeError(f"{value} is not a valid private key format") + + return value + + +def validate_date(value): + """ + Validate that the given date string follows the ISO 8601 format: YYYY-MM-DDTHH:MM:SS.MMM. + + Args: + value (str): The date string to validate. + + Returns: + str: The validated date string. + + Raises: + argparse.ArgumentTypeError: If the date string does not follow the ISO 8601 format. + """ + iso_8601_pattern = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z" + if not re.fullmatch(iso_8601_pattern, value): + raise argparse.ArgumentTypeError( + f"{value} is not a valid date format. Use 'YYYY-MM-DDTHH:MM:SS.MMMZ'" + ) + return value + + +def validate_types(value): + """ + Validate that the given string is a comma-separated list of valid log types. + + Args: + value (str): The comma-separated list of log types to validate. + + Returns: + list: The validated list of log types. + + Raises: + argparse.ArgumentTypeError: If any of the log types are not valid. + """ + valid_types = [ + "TRIGGER_FAILURE", + "TRIGGER_ERROR_HANDLER", + "DB_TRIGGER", + "AUTH_TRIGGER", + "SCHEDULED_TRIGGER", + "FUNCTION", + "SERVICE_FUNCTION", + "STREAM_FUNCTION", + "SERVICE_STREAM_FUNCTION", + "AUTH", + "WEBHOOK", + "ENDPOINT", + "PUSH", + "API", + "API_KEY", + "GRAPHQL", + "SYNC_CONNECTION_START", + "SYNC_CONNECTION_END", + "SYNC_SESSION_START", + "SYNC_SESSION_END", + "SYNC_CLIENT_WRITE", + "SYNC_ERROR", + "SYNC_OTHER", + "SCHEMA_ADDITIVE_CHANGE", + "SCHEMA_GENERATION", + "SCHEMA_VALIDATION", + "LOG_FORWARDER", + ] + types = value.split(",") + for t in types: + if t not in valid_types: + raise argparse.ArgumentTypeError( + f"{t} is not a valid type. Valid types are: {', '.join(valid_types)}" + ) + + return value