From 02c4b940c94cd3c7e9e3a0d40af4ebe0f2e000d9 Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Fri, 3 Jan 2025 16:37:54 -0600 Subject: [PATCH 01/18] initial commit --- .../check_operators/dataframe_operators.py | 2 +- cdisc_rules_engine/rules_engine.py | 6 + tests/PerformanceTest.py | 223 ++++++++++++++++++ 3 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 tests/PerformanceTest.py diff --git a/cdisc_rules_engine/check_operators/dataframe_operators.py b/cdisc_rules_engine/check_operators/dataframe_operators.py index e8afa5e2d..d3d592d7c 100644 --- a/cdisc_rules_engine/check_operators/dataframe_operators.py +++ b/cdisc_rules_engine/check_operators/dataframe_operators.py @@ -28,7 +28,7 @@ from cdisc_rules_engine.services import logger from functools import wraps import traceback - +import time def log_operator_execution(func): @wraps(func) diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index aab731bbd..71ac33d11 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -45,6 +45,7 @@ ExternalDictionariesContainer, ) import traceback +import time class RulesEngine: @@ -343,10 +344,14 @@ def execute_rule( # Adding copy for now to avoid updating cached dataset dataset = deepcopy(dataset) # preprocess dataset + + logger.log(f"\n\ST{time.time()}-Dataset Preprocessing Starts") dataset_preprocessor = DatasetPreprocessor( dataset, domain, dataset_path, self.data_service, self.cache ) dataset = dataset_preprocessor.preprocess(rule_copy, datasets) + logger.log(f"\n\ST{time.time()}-Dataset Preprocessing Ends") + logger.log(f"\n\OPRNT{time.time()}-Operation Starts") dataset = self.rule_processor.perform_rule_operations( rule_copy, dataset, @@ -359,6 +364,7 @@ def execute_rule( external_dictionaries=self.external_dictionaries, ct_packages=ct_packages, ) + logger.log(f"\n\OPRNT{time.time()}-Operation Ends") relationship_data = {} if domain is not None and self.rule_processor.is_relationship_dataset(domain): relationship_data = self.data_processor.preprocess_relationship_dataset( diff --git a/tests/PerformanceTest.py b/tests/PerformanceTest.py new file mode 100644 index 000000000..97a2b3960 --- /dev/null +++ b/tests/PerformanceTest.py @@ -0,0 +1,223 @@ +import os +import time +import pandas as pd +import subprocess +from statistics import median +import re +import click + +# Function to extract preprocessing time from logs +def extract_preprocessing_time_from_logs(output_lines): + start_time = None + end_time = None + + # Loop through the log lines + for line in output_lines: + # Check for "Dataset Preprocessing Starts" + if "Dataset Preprocessing Starts" in line: + match = re.search(r"\\ST(\d+\.\d+)", line) # Match the timestamp after \ST + if match: + start_time = float(match.group(1)) + print(f"Extracted start time: {start_time}") + # Check for "Dataset Preprocessing Ends" + elif "Dataset Preprocessing Ends" in line: + match = re.search(r"\\ST(\d+\.\d+)", line) # Match the timestamp after \ST + if match: + end_time = float(match.group(1)) + print(f"Extracted end time: {end_time}") + + # Return the difference if both times are found + if start_time is not None and end_time is not None: + return end_time - start_time + + return 0 + + +# Function to extract operator times from logs +def extract_operator_times(output_lines): + operator_times = {} + start_times = {} + + # Loop through the log lines + for line in output_lines: + # Check for operator start + match_start = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) starts", line) + if match_start: + timestamp, operation_name = float(match_start.group(1)), match_start.group(2) + start_times[operation_name] = timestamp + + # Check for operator end + match_end = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) ends", line) + if match_end: + timestamp, operation_name = float(match_end.group(1)), match_end.group(2) + if operation_name in start_times: + duration = timestamp - start_times.pop(operation_name, 0) + if operation_name in operator_times: + operator_times[operation_name].append(duration) + else: + operator_times[operation_name] = [duration] + + return operator_times + + +# Function to extract operation times from terminal logs +def extract_operation_times_from_logs(output_lines): + operation_times = {} + start_times = {} + + # Loop through the log lines + for line in output_lines: + # Check for operation start (from terminal logs) + match_start = re.search(r"\\OPRNT(\d+\.\d+)-Operation Starts", line) + if match_start: + timestamp = float(match_start.group(1)) + start_times[timestamp] = time.time() # Store start time for operation + + # Check for operation end (from terminal logs) + match_end = re.search(r"\\OPRNT(\d+\.\d+)-Operation Ends", line) + if match_end: + timestamp = float(match_end.group(1)) + if timestamp in start_times: + duration = time.time() - start_times.pop(timestamp) + operation_times[timestamp] = duration + + return operation_times + + +# Update TimeTestFunction to record operator and operation times +def TimeTestFunction(data_dir, rule_dir, total_calls): + results = [] # List to store results for DataFrame + + # Collect all dataset files from XPT directory + data_files = [os.path.join(data_dir, file) for file in os.listdir(data_dir) if file.endswith(".json") or file.endswith(".xpt")] + + # Collect all rules from the rule directory + rules = [file for file in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, file))] + + # Execute each rule on each dataset + for dataset_path in data_files: + for rule in rules: + time_taken = [] + preprocessing_times = [] # Track preprocessing time for the current execution + all_operator_times = {} # To store operator times + all_operation_times = {} # To store operation times + + for num_call in range(total_calls): + rule_path = os.path.join(rule_dir, rule) + + # Construct the command + command = [ + "python3", "core.py", "test", + "-s", "sdtmig", + "-v", "3.4", + "-r", rule_path, + "-dp", dataset_path, + "-l", "critical" + ] + + print(f"Executing: {' '.join(command)} for call {num_call+1}") + + # Execute the command and capture logs + try: + start_time = time.time() + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = process.communicate() + end_time = time.time() + + # Parse logs from stderr for preprocessing and operation times + output_lines = stderr.splitlines() + + preprocessing_time = extract_preprocessing_time_from_logs(output_lines) + operator_times = extract_operator_times(output_lines) + operation_times = extract_operation_times_from_logs(output_lines) + + if process.returncode == 0: + time_taken.append(end_time - start_time) # Append execution time for each call + preprocessing_times.append(preprocessing_time) + # Aggregate operator times + for op, durations in operator_times.items(): + if op in all_operator_times: + all_operator_times[op].extend(durations) + else: + all_operator_times[op] = durations + # Aggregate operation times + for op, duration in operation_times.items(): + if op in all_operation_times: + all_operation_times[op].append(duration) + else: + all_operation_times[op] = [duration] + else: + raise subprocess.CalledProcessError(process.returncode, command, stderr) + + except subprocess.CalledProcessError as e: + print(e) + results.append({ + "function type": "TimeTestFunction", + "rule name": rule, + "dataset": os.path.basename(dataset_path), + "status": "Failed", + "Number of Calls": num_call, + "Mean Time": None, + "Median Time": None, + "Min Time": None, + "Max Time": None, + "Preprocessing Time": None, + "Operator Times": None, + "Operation Times": None, + "Error": e.stderr + }) + break + + if len(time_taken) > 0: + results.append({ + "function type": "TimeTestFunction", + "rule name": rule, + "dataset": os.path.basename(dataset_path), + "status": "Successful", + "Number of Calls": total_calls, + "Mean Time": sum(time_taken) / len(time_taken), + "Median Time": median(time_taken), + "Min Time": min(time_taken), + "Max Time": max(time_taken), + "Preprocessing Time": ", ".join(map(str, preprocessing_times)) if preprocessing_times else None, + # Store operator times as a string + "Operator Times": {op: durations for op, durations in all_operator_times.items()}, + # Store operation times as a string + "Operation Times": {op: durations for op, durations in all_operation_times.items()}, + "Error": None + }) + + return results + + +# Main execution +@click.command() +@click.option('-dd', type=str) +@click.option('-rd', type=str) +@click.option('-total_calls', type=int) +@click.option('-od', default=os.getcwd(), help="Directory to save the output file (default is current directory)") +def main(dd, rd, total_calls, od): + total_time_start = time.time() + + # Collect results from TimeTestFunction + test_results = TimeTestFunction(dd, rd, total_calls) + + total_time = time.time() - total_time_start + + # Create a DataFrame and save to an Excel file + results_df = pd.DataFrame(test_results) + + # Add total execution time to the report + total_time_row = ['Total Time'] + [None] * (len(results_df.columns) - 2) + [total_time] + results_df.loc[len(results_df)] = total_time_row + + # Save to Excel + output_path = os.path.join(od, "rule_execution_report.xlsx") + results_df.to_excel(output_path, index=False) + results_df.to_json(os.path.join(od, "rule_execution_report.json")) + print(f"\nExecution results saved to '{output_path}'") + print(results_df) + + +if __name__ == "__main__": + main() \ No newline at end of file From 36ee676ddfb1e58f62b5f536623e82c43b471a61 Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Fri, 3 Jan 2025 16:47:27 -0600 Subject: [PATCH 02/18] lint update --- .../check_operators/dataframe_operators.py | 1 + tests/PerformanceTest.py | 220 +++++++++--------- 2 files changed, 117 insertions(+), 104 deletions(-) diff --git a/cdisc_rules_engine/check_operators/dataframe_operators.py b/cdisc_rules_engine/check_operators/dataframe_operators.py index d3d592d7c..9110f89e2 100644 --- a/cdisc_rules_engine/check_operators/dataframe_operators.py +++ b/cdisc_rules_engine/check_operators/dataframe_operators.py @@ -30,6 +30,7 @@ import traceback import time + def log_operator_execution(func): @wraps(func) def wrapper(self, other_value, *args, **kwargs): diff --git a/tests/PerformanceTest.py b/tests/PerformanceTest.py index 97a2b3960..2cb765cec 100644 --- a/tests/PerformanceTest.py +++ b/tests/PerformanceTest.py @@ -1,36 +1,30 @@ import os import time -import pandas as pd import subprocess +import pandas as pd from statistics import median import re import click + # Function to extract preprocessing time from logs def extract_preprocessing_time_from_logs(output_lines): start_time = None end_time = None - # Loop through the log lines for line in output_lines: - # Check for "Dataset Preprocessing Starts" if "Dataset Preprocessing Starts" in line: - match = re.search(r"\\ST(\d+\.\d+)", line) # Match the timestamp after \ST + match = re.search(r"\\ST(\d+\.\d+)", line) if match: start_time = float(match.group(1)) print(f"Extracted start time: {start_time}") - # Check for "Dataset Preprocessing Ends" elif "Dataset Preprocessing Ends" in line: - match = re.search(r"\\ST(\d+\.\d+)", line) # Match the timestamp after \ST + match = re.search(r"\\ST(\d+\.\d+)", line) if match: end_time = float(match.group(1)) print(f"Extracted end time: {end_time}") - # Return the difference if both times are found - if start_time is not None and end_time is not None: - return end_time - start_time - - return 0 + return end_time - start_time if start_time and end_time else 0 # Function to extract operator times from logs @@ -38,24 +32,20 @@ def extract_operator_times(output_lines): operator_times = {} start_times = {} - # Loop through the log lines for line in output_lines: - # Check for operator start match_start = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) starts", line) if match_start: - timestamp, operation_name = float(match_start.group(1)), match_start.group(2) + timestamp, operation_name = float(match_start.group(1)), match_start.group( + 2 + ) start_times[operation_name] = timestamp - # Check for operator end match_end = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) ends", line) if match_end: timestamp, operation_name = float(match_end.group(1)), match_end.group(2) if operation_name in start_times: - duration = timestamp - start_times.pop(operation_name, 0) - if operation_name in operator_times: - operator_times[operation_name].append(duration) - else: - operator_times[operation_name] = [duration] + duration = timestamp - start_times.pop(operation_name) + operator_times.setdefault(operation_name, []).append(duration) return operator_times @@ -65,15 +55,12 @@ def extract_operation_times_from_logs(output_lines): operation_times = {} start_times = {} - # Loop through the log lines for line in output_lines: - # Check for operation start (from terminal logs) match_start = re.search(r"\\OPRNT(\d+\.\d+)-Operation Starts", line) if match_start: timestamp = float(match_start.group(1)) - start_times[timestamp] = time.time() # Store start time for operation + start_times[timestamp] = time.time() - # Check for operation end (from terminal logs) match_end = re.search(r"\\OPRNT(\d+\.\d+)-Operation Ends", line) if match_end: timestamp = float(match_end.group(1)) @@ -84,134 +71,159 @@ def extract_operation_times_from_logs(output_lines): return operation_times -# Update TimeTestFunction to record operator and operation times +# Simplified TimeTestFunction def TimeTestFunction(data_dir, rule_dir, total_calls): - results = [] # List to store results for DataFrame - - # Collect all dataset files from XPT directory - data_files = [os.path.join(data_dir, file) for file in os.listdir(data_dir) if file.endswith(".json") or file.endswith(".xpt")] - - # Collect all rules from the rule directory - rules = [file for file in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, file))] + results = [] + + # Collect all dataset files and rules + data_files = [ + os.path.join(data_dir, file) + for file in os.listdir(data_dir) + if file.endswith((".json", ".xpt")) + ] + rules = [ + file + for file in os.listdir(rule_dir) + if os.path.isfile(os.path.join(rule_dir, file)) + ] # Execute each rule on each dataset for dataset_path in data_files: for rule in rules: time_taken = [] - preprocessing_times = [] # Track preprocessing time for the current execution - all_operator_times = {} # To store operator times - all_operation_times = {} # To store operation times + preprocessing_times = [] + all_operator_times = {} + all_operation_times = {} for num_call in range(total_calls): rule_path = os.path.join(rule_dir, rule) - - # Construct the command command = [ - "python3", "core.py", "test", - "-s", "sdtmig", - "-v", "3.4", - "-r", rule_path, - "-dp", dataset_path, - "-l", "critical" + "python3", + "core.py", + "test", + "-s", + "sdtmig", + "-v", + "3.4", + "-r", + rule_path, + "-dp", + dataset_path, + "-l", + "critical", ] + print(f"Executing: {' '.join(command)} for call {num_call + 1}") - print(f"Executing: {' '.join(command)} for call {num_call+1}") - - # Execute the command and capture logs try: start_time = time.time() - process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) stdout, stderr = process.communicate() end_time = time.time() - # Parse logs from stderr for preprocessing and operation times output_lines = stderr.splitlines() - preprocessing_time = extract_preprocessing_time_from_logs(output_lines) + preprocessing_time = extract_preprocessing_time_from_logs( + output_lines + ) operator_times = extract_operator_times(output_lines) operation_times = extract_operation_times_from_logs(output_lines) if process.returncode == 0: - time_taken.append(end_time - start_time) # Append execution time for each call + time_taken.append(end_time - start_time) preprocessing_times.append(preprocessing_time) - # Aggregate operator times + for op, durations in operator_times.items(): - if op in all_operator_times: - all_operator_times[op].extend(durations) - else: - all_operator_times[op] = durations - # Aggregate operation times + all_operator_times.setdefault(op, []).extend(durations) for op, duration in operation_times.items(): - if op in all_operation_times: - all_operation_times[op].append(duration) - else: - all_operation_times[op] = [duration] + all_operation_times.setdefault(op, []).append(duration) else: - raise subprocess.CalledProcessError(process.returncode, command, stderr) + raise subprocess.CalledProcessError( + process.returncode, command, stderr + ) except subprocess.CalledProcessError as e: - print(e) - results.append({ + results.append( + { + "function type": "TimeTestFunction", + "rule name": rule, + "dataset": os.path.basename(dataset_path), + "status": "Failed", + "Number of Calls": num_call, + "Mean Time": None, + "Median Time": None, + "Min Time": None, + "Max Time": None, + "Preprocessing Time": None, + "Operator Times": None, + "Operation Times": None, + "Error": e.stderr, + } + ) + break + + if time_taken: + results.append( + { "function type": "TimeTestFunction", "rule name": rule, "dataset": os.path.basename(dataset_path), - "status": "Failed", - "Number of Calls": num_call, - "Mean Time": None, - "Median Time": None, - "Min Time": None, - "Max Time": None, - "Preprocessing Time": None, - "Operator Times": None, - "Operation Times": None, - "Error": e.stderr - }) - break - - if len(time_taken) > 0: - results.append({ - "function type": "TimeTestFunction", - "rule name": rule, - "dataset": os.path.basename(dataset_path), - "status": "Successful", - "Number of Calls": total_calls, - "Mean Time": sum(time_taken) / len(time_taken), - "Median Time": median(time_taken), - "Min Time": min(time_taken), - "Max Time": max(time_taken), - "Preprocessing Time": ", ".join(map(str, preprocessing_times)) if preprocessing_times else None, - # Store operator times as a string - "Operator Times": {op: durations for op, durations in all_operator_times.items()}, - # Store operation times as a string - "Operation Times": {op: durations for op, durations in all_operation_times.items()}, - "Error": None - }) + "status": "Successful", + "Number of Calls": total_calls, + "Mean Time": sum(time_taken) / len(time_taken), + "Median Time": median(time_taken), + "Min Time": min(time_taken), + "Max Time": max(time_taken), + "Preprocessing Time": ( + ", ".join(map(str, preprocessing_times)) + if preprocessing_times + else None + ), + "Operator Times": { + op: durations + for op, durations in all_operator_times.items() + }, + "Operation Times": { + op: durations + for op, durations in all_operation_times.items() + }, + "Error": None, + } + ) return results # Main execution @click.command() -@click.option('-dd', type=str) -@click.option('-rd', type=str) -@click.option('-total_calls', type=int) -@click.option('-od', default=os.getcwd(), help="Directory to save the output file (default is current directory)") +@click.option("-dd", type=str) +@click.option("-rd", type=str) +@click.option("-total_calls", type=int) +@click.option( + "-od", + default=os.getcwd(), + help="Directory to save the output file (default is current directory)", +) def main(dd, rd, total_calls, od): total_time_start = time.time() - # Collect results from TimeTestFunction test_results = TimeTestFunction(dd, rd, total_calls) total_time = time.time() - total_time_start - # Create a DataFrame and save to an Excel file + # Create a DataFrame and save to Excel/JSON results_df = pd.DataFrame(test_results) - # Add total execution time to the report - total_time_row = ['Total Time'] + [None] * (len(results_df.columns) - 2) + [total_time] + # Add total execution time + total_time_row = ( + ["Total Time"] + [None] * (len(results_df.columns) - 2) + [total_time] + ) results_df.loc[len(results_df)] = total_time_row - # Save to Excel output_path = os.path.join(od, "rule_execution_report.xlsx") results_df.to_excel(output_path, index=False) results_df.to_json(os.path.join(od, "rule_execution_report.json")) @@ -220,4 +232,4 @@ def main(dd, rd, total_calls, od): if __name__ == "__main__": - main() \ No newline at end of file + main() From c5c91ac361a2682eb79a38a9ad4e92ae38eaaeee Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Sun, 26 Jan 2025 17:46:46 -0600 Subject: [PATCH 03/18] update --- .../check_operators/dataframe_operators.py | 2 + tests/PerformanceTest.py | 400 +++++++++++++----- 2 files changed, 306 insertions(+), 96 deletions(-) diff --git a/cdisc_rules_engine/check_operators/dataframe_operators.py b/cdisc_rules_engine/check_operators/dataframe_operators.py index 9110f89e2..30f836c05 100644 --- a/cdisc_rules_engine/check_operators/dataframe_operators.py +++ b/cdisc_rules_engine/check_operators/dataframe_operators.py @@ -36,7 +36,9 @@ def log_operator_execution(func): def wrapper(self, other_value, *args, **kwargs): try: logger.info(f"Starting check operator: {func.__name__}") + logger.log(f"\n\OPRT{time.time()}-operator {func.__name__} starts") result = func(self, other_value) + logger.log(f"\n\OPRT{time.time()}-operator {func.__name__} ends") logger.info(f"Completed check operator: {func.__name__}") return result except Exception as e: diff --git a/tests/PerformanceTest.py b/tests/PerformanceTest.py index 2cb765cec..457df80d5 100644 --- a/tests/PerformanceTest.py +++ b/tests/PerformanceTest.py @@ -1,12 +1,11 @@ import os import time -import subprocess import pandas as pd +import subprocess from statistics import median import re import click - # Function to extract preprocessing time from logs def extract_preprocessing_time_from_logs(output_lines): start_time = None @@ -24,7 +23,10 @@ def extract_preprocessing_time_from_logs(output_lines): end_time = float(match.group(1)) print(f"Extracted end time: {end_time}") - return end_time - start_time if start_time and end_time else 0 + if start_time is not None and end_time is not None: + return end_time - start_time + + return 0 # Function to extract operator times from logs @@ -35,50 +37,191 @@ def extract_operator_times(output_lines): for line in output_lines: match_start = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) starts", line) if match_start: - timestamp, operation_name = float(match_start.group(1)), match_start.group( - 2 - ) + timestamp, operation_name = float(match_start.group(1)), match_start.group(2) start_times[operation_name] = timestamp match_end = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) ends", line) if match_end: timestamp, operation_name = float(match_end.group(1)), match_end.group(2) if operation_name in start_times: - duration = timestamp - start_times.pop(operation_name) - operator_times.setdefault(operation_name, []).append(duration) + duration = timestamp - start_times.pop(operation_name, 0) + if operation_name in operator_times: + operator_times[operation_name].append(duration) + else: + operator_times[operation_name] = [duration] return operator_times # Function to extract operation times from terminal logs def extract_operation_times_from_logs(output_lines): - operation_times = {} - start_times = {} + operation_times = [] for line in output_lines: match_start = re.search(r"\\OPRNT(\d+\.\d+)-Operation Starts", line) if match_start: timestamp = float(match_start.group(1)) - start_times[timestamp] = time.time() + start_time = time.time() match_end = re.search(r"\\OPRNT(\d+\.\d+)-Operation Ends", line) if match_end: timestamp = float(match_end.group(1)) - if timestamp in start_times: - duration = time.time() - start_times.pop(timestamp) - operation_times[timestamp] = duration + operation_times.append(time.time() - start_time) return operation_times +def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): + + results = [] # To store the final report + rule_results = {} # To store rule-specific results for Excel sheet creation + + dataset_files = [ + os.path.join(dataset_dir, file) + for file in os.listdir(dataset_dir) + if file.endswith((".json", ".xpt")) + ] + rules = [ + file + for file in os.listdir(rule_dir) + if os.path.isfile(os.path.join(rule_dir, file)) + ] + + # For + for dataset_path in dataset_files: + dataset_name = os.path.basename(dataset_path) + + # Initialize variables to collect times for the dataset across all rules + all_time_taken = [] + all_preprocessing_times = [] + all_operator_times = {} + all_operation_times = [] + + for rule in rules: + rule_name = os.path.basename(rule) + time_taken = [] # Time for individual rule + preprocessing_times = [] # Preprocessing times for individual rule + operator_times = {} # Operator times for individual rule + operation_times = [] # Operation times for individual rule + rule_executions = 0 # Count how many times the rule was executed for this dataset + + for num_call in range(total_calls): + rule_path = os.path.join(rule_dir, rule) + command = [ + "python3", + "core.py", + "test", + "-s", + "sdtmig", + "-v", + "3.4", + "-r", + rule_path, + "-dp", + dataset_path, + "-l", + "critical" + ] + print(f"Executing: {' '.join(command)} for call {num_call + 1}") + + try: + start_time = time.time() + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + stdout, stderr = process.communicate() + end_time = time.time() + + output_lines = stderr.splitlines() + + # Extract preprocessing time, operator times, and operation times + preprocessing_time = extract_preprocessing_time_from_logs(output_lines) + rule_operator_times = extract_operator_times(output_lines) + rule_operation_times = extract_operation_times_from_logs(output_lines) -# Simplified TimeTestFunction -def TimeTestFunction(data_dir, rule_dir, total_calls): - results = [] + if process.returncode == 0: + time_taken.append(end_time - start_time) + preprocessing_times.append(preprocessing_time) + rule_executions += 1 # Increment rule execution count - # Collect all dataset files and rules - data_files = [ - os.path.join(data_dir, file) - for file in os.listdir(data_dir) + # Aggregate operator times and operation times + for op, durations in rule_operator_times.items(): + all_operator_times.setdefault(op, []).extend(durations) + all_operation_times.extend(rule_operation_times) + else: + raise subprocess.CalledProcessError(process.returncode, command, stderr) + + except subprocess.CalledProcessError as e: + results.append( + { + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": dataset_name, + "status": "Failed", + "Number of Calls": num_call + 1, + "Mean Time": None, + "Median Time": None, + "Min Time": None, + "Max Time": None, + "Preprocessing Time": None, + "Operator Times": None, + "Operation Times": None, + "Error": e.stderr, + } + ) + break + + # After all calls for a rule, summarize and add the times to the dataset-level collection + if time_taken: + all_time_taken.extend(time_taken) + all_preprocessing_times.extend(preprocessing_times) + + # Store rule-specific results for creating separate sheets in Excel + if rule_name not in rule_results: + rule_results[rule_name] = [] + rule_results[rule_name].append({ + "Dataset": dataset_name, + "Number of Calls": rule_executions, + "Mean Time": sum(time_taken) / len(time_taken), + "Median Time": median(time_taken), + "Min Time": min(time_taken), + "Max Time": max(time_taken), + "Preprocessing Time": ", ".join(map(str, preprocessing_times)), + "Operator Times": ", ".join([f"{op}: {durations}" for op, durations in rule_operator_times.items()]), + "Operation Times": ", ".join(map(str, all_operation_times)), + }) + + # After all rules have been processed for a dataset, calculate the overall stats + if all_time_taken: + results.append( + { + "function type": "TimeTestFunction", + "rule name": "All Rules Combined", + "dataset": dataset_name, + "status": "Successful", + "Number of Calls for each rule": total_calls, + "Mean Time": sum(all_time_taken) / len(all_time_taken), + "Median Time": median(all_time_taken), + "Min Time": min(all_time_taken), + "Max Time": max(all_time_taken), + "Preprocessing Time": ", ".join(map(str, all_preprocessing_times)), + "Operator Times": all_operator_times, + "Operation Times": ", ".join(map(str, all_operation_times)), + "Error": None, + } + ) + + return results, rule_results + +def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): + results = [] # To store the final report + dataset_results = {} # To store dataset-specific results for Excel sheet creation + + dataset_files = [ + os.path.join(dataset_dir, file) + for file in os.listdir(dataset_dir) if file.endswith((".json", ".xpt")) ] rules = [ @@ -87,13 +230,23 @@ def TimeTestFunction(data_dir, rule_dir, total_calls): if os.path.isfile(os.path.join(rule_dir, file)) ] - # Execute each rule on each dataset - for dataset_path in data_files: - for rule in rules: - time_taken = [] - preprocessing_times = [] - all_operator_times = {} - all_operation_times = {} + for rule in rules: + rule_name = os.path.basename(rule) + + # Initialize variables to collect times for the dataset across all rules + all_time_taken = [] + all_preprocessing_times = [] + all_operator_times = {} + all_operation_times = [] + + rule_names=[] + for dataset_path in dataset_files: + dataset_name = os.path.basename(dataset_path) + time_taken = [] # Time for individual rule + preprocessing_times = [] # Preprocessing times for individual rule + operator_times = {} # Operator times for individual rule + operation_times = [] # Operation times for individual rule + rule_executions = 0 # Count how many times the rule was executed for this dataset for num_call in range(total_calls): rule_path = os.path.join(rule_dir, rule) @@ -127,33 +280,31 @@ def TimeTestFunction(data_dir, rule_dir, total_calls): output_lines = stderr.splitlines() - preprocessing_time = extract_preprocessing_time_from_logs( - output_lines - ) - operator_times = extract_operator_times(output_lines) - operation_times = extract_operation_times_from_logs(output_lines) + # Extract preprocessing time, operator times, and operation times + preprocessing_time = extract_preprocessing_time_from_logs(output_lines) + rule_operator_times = extract_operator_times(output_lines) + rule_operation_times = extract_operation_times_from_logs(output_lines) if process.returncode == 0: time_taken.append(end_time - start_time) preprocessing_times.append(preprocessing_time) + rule_executions += 1 # Increment rule execution count - for op, durations in operator_times.items(): + # Aggregate operator times and operation times + for op, durations in rule_operator_times.items(): all_operator_times.setdefault(op, []).extend(durations) - for op, duration in operation_times.items(): - all_operation_times.setdefault(op, []).append(duration) + all_operation_times.extend(rule_operation_times) else: - raise subprocess.CalledProcessError( - process.returncode, command, stderr - ) + raise subprocess.CalledProcessError(process.returncode, command, stderr) except subprocess.CalledProcessError as e: results.append( { "function type": "TimeTestFunction", - "rule name": rule, - "dataset": os.path.basename(dataset_path), + "rule name": rule_name, + "dataset": dataset_name, "status": "Failed", - "Number of Calls": num_call, + "Number of Calls": num_call + 1, "Mean Time": None, "Median Time": None, "Min Time": None, @@ -166,70 +317,127 @@ def TimeTestFunction(data_dir, rule_dir, total_calls): ) break + # After all calls for a rule, summarize and add the times to the dataset-level collection if time_taken: - results.append( - { - "function type": "TimeTestFunction", - "rule name": rule, - "dataset": os.path.basename(dataset_path), - "status": "Successful", - "Number of Calls": total_calls, - "Mean Time": sum(time_taken) / len(time_taken), - "Median Time": median(time_taken), - "Min Time": min(time_taken), - "Max Time": max(time_taken), - "Preprocessing Time": ( - ", ".join(map(str, preprocessing_times)) - if preprocessing_times - else None - ), - "Operator Times": { - op: durations - for op, durations in all_operator_times.items() - }, - "Operation Times": { - op: durations - for op, durations in all_operation_times.items() - }, - "Error": None, - } - ) - - return results - - -# Main execution + all_time_taken.extend(time_taken) + all_preprocessing_times.extend(preprocessing_times) + + # Append dataset-specific results for creating the grouped sheet + if dataset_name not in dataset_results: + dataset_results[dataset_name]=[] + dataset_results[dataset_name].append({ + "Dataset": dataset_name, + "Rule Name": rule_name, + "Number of Calls": rule_executions, + "Mean Time": sum(time_taken) / len(time_taken), + "Median Time": median(time_taken), + "Min Time": min(time_taken), + "Max Time": max(time_taken), + "Preprocessing Time": ", ".join(map(str, preprocessing_times)), + "Operator Times": ", ".join([f"{op}: {durations}" for op, durations in operator_times.items()]), + "Operation Times": ", ".join(map(str, operation_times)), + }) + + if all_time_taken: + results.append( + { + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": "All datasets combined", + "status": "Successful", + "Number of Calls for each rule": total_calls, + "Mean Time": sum(all_time_taken) / len(all_time_taken), + "Median Time": median(all_time_taken), + "Min Time": min(all_time_taken), + "Max Time": max(all_time_taken), + "Preprocessing Time": ", ".join(map(str, all_preprocessing_times)), + "Operator Times": all_operator_times, + "Operation Times": ", ".join(map(str, all_operation_times)), + "Error": None, + } + ) + + return results, dataset_results + + +def TimeTestFunction(dataset_dir, rule_dir, total_calls): + print("Running for Grouped by rule and individual rule report creation") + collective_rule_result, individual_rule_result = all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls) + print("\n\nRunning for Group by dataset report\n") + collective_dataset_result, individual_dataset_result = all_datset_against_each_rule(dataset_dir, rule_dir, total_calls) + + return collective_rule_result, individual_rule_result, collective_dataset_result, individual_dataset_result + + +def delete_run_report_files(pattern, directory=None): + """ + Deletes files in the specified or current directory that match a given pattern. + + Args: + pattern (str): The regex pattern to match file names. + directory (str, optional): The directory to search for files. Defaults to the current working directory. + + Returns: + list: A list of deleted file names. + """ + if directory is None: + directory = os.getcwd() # Use the current working directory if none is specified + + deleted_files = [] + regex = re.compile(pattern) + + for filename in os.listdir(directory): + if regex.match(filename): + file_path = os.path.join(directory, filename) + try: + os.remove(file_path) + deleted_files.append(filename) + print(f"Deleted: {filename}") + except Exception as e: + print(f"Error deleting {filename}: {e}") + + @click.command() -@click.option("-dd", type=str) -@click.option("-rd", type=str) -@click.option("-total_calls", type=int) -@click.option( - "-od", - default=os.getcwd(), - help="Directory to save the output file (default is current directory)", -) +@click.option('-dd', type=str) +@click.option('-rd', type=str) +@click.option('-total_calls', type=int) +@click.option('-od', default=os.getcwd(), help="Directory to save the output file (default is current directory)") def main(dd, rd, total_calls, od): total_time_start = time.time() - test_results = TimeTestFunction(dd, rd, total_calls) + collective_rule_result, individual_rule_result, collective_dataset_result, individual_dataset_result = TimeTestFunction(dd, rd, total_calls) total_time = time.time() - total_time_start - # Create a DataFrame and save to Excel/JSON - results_df = pd.DataFrame(test_results) - - # Add total execution time - total_time_row = ( - ["Total Time"] + [None] * (len(results_df.columns) - 2) + [total_time] - ) - results_df.loc[len(results_df)] = total_time_row - + # Create an Excel writer and save the results to multiple sheets output_path = os.path.join(od, "rule_execution_report.xlsx") - results_df.to_excel(output_path, index=False) - results_df.to_json(os.path.join(od, "rule_execution_report.json")) + with pd.ExcelWriter(output_path) as writer: + # Overall collective rule results + collective_rule_df = pd.DataFrame(collective_rule_result) + collective_rule_df.to_excel(writer, sheet_name="Collective Rule Result", index=False) + + # Individual rule results + for rule_name, rule_data in individual_rule_result.items(): + sanitized_rule_name = re.sub(r'[\\/*?:[\]]', '_', rule_name) # Replace invalid characters with '_' + rule_df = pd.DataFrame(rule_data) + rule_df.to_excel(writer, sheet_name=f"Rule_{sanitized_rule_name[:28]}", index=False) # Truncate to 31 chars + + # Overall collective dataset results + collective_dataset_df = pd.DataFrame(collective_dataset_result) + collective_dataset_df.to_excel(writer, sheet_name="Collective Dataset Result", index=False) + + # Individual dataset results + for dataset_name, dataset_data in individual_dataset_result.items(): + sanitized_dataset_name = re.sub(r'[\\/*?:[\]]', '_', dataset_name) # Replace invalid characters with '_' + dataset_df = pd.DataFrame(dataset_data) + dataset_df.to_excel(writer, sheet_name=f"Dataset_{sanitized_dataset_name[:28]}", index=False) # Truncate to 31 chars + print(f"\nExecution results saved to '{output_path}'") - print(results_df) + file_pattern = r"CORE-Report-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\.xlsx" + + + delete_run_report_files(file_pattern) if __name__ == "__main__": - main() + main() \ No newline at end of file From 28aeaa864cf3357c50a012cba6fafd1e3aad7c4d Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Mon, 27 Jan 2025 17:14:56 -0600 Subject: [PATCH 04/18] lint update --- .flake8 | 1 + .../check_operators/dataframe_operators.py | 4 +- cdisc_rules_engine/rules_engine.py | 8 +- tests/PerformanceTest.py | 198 ++++++++++++------ 4 files changed, 139 insertions(+), 72 deletions(-) diff --git a/.flake8 b/.flake8 index 5ec5a4578..6e007f41d 100644 --- a/.flake8 +++ b/.flake8 @@ -6,6 +6,7 @@ ignore = E203, W503 exclude = .github, .pytest_cache, cdisc_rules_engine/resources, + tests/PerformanceTest.py venv, build, dist diff --git a/cdisc_rules_engine/check_operators/dataframe_operators.py b/cdisc_rules_engine/check_operators/dataframe_operators.py index 30f836c05..ddb8bee54 100644 --- a/cdisc_rules_engine/check_operators/dataframe_operators.py +++ b/cdisc_rules_engine/check_operators/dataframe_operators.py @@ -36,9 +36,9 @@ def log_operator_execution(func): def wrapper(self, other_value, *args, **kwargs): try: logger.info(f"Starting check operator: {func.__name__}") - logger.log(f"\n\OPRT{time.time()}-operator {func.__name__} starts") + logger.log(fr"\n\OPRT{time.time()}-operator {func.__name__} starts") result = func(self, other_value) - logger.log(f"\n\OPRT{time.time()}-operator {func.__name__} ends") + logger.log(fr"\n\OPRT{time.time()}-operator {func.__name__} ends") logger.info(f"Completed check operator: {func.__name__}") return result except Exception as e: diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index 71ac33d11..b634c7279 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -345,13 +345,13 @@ def execute_rule( dataset = deepcopy(dataset) # preprocess dataset - logger.log(f"\n\ST{time.time()}-Dataset Preprocessing Starts") + logger.log(fr"\n\ST{time.time()}-Dataset Preprocessing Starts") dataset_preprocessor = DatasetPreprocessor( dataset, domain, dataset_path, self.data_service, self.cache ) dataset = dataset_preprocessor.preprocess(rule_copy, datasets) - logger.log(f"\n\ST{time.time()}-Dataset Preprocessing Ends") - logger.log(f"\n\OPRNT{time.time()}-Operation Starts") + logger.log(fr"\n\ST{time.time()}-Dataset Preprocessing Ends") + logger.log(fr"\n\OPRNT{time.time()}-Operation Starts") dataset = self.rule_processor.perform_rule_operations( rule_copy, dataset, @@ -364,7 +364,7 @@ def execute_rule( external_dictionaries=self.external_dictionaries, ct_packages=ct_packages, ) - logger.log(f"\n\OPRNT{time.time()}-Operation Ends") + logger.log(fr"\n\OPRNT{time.time()}-Operation Ends") relationship_data = {} if domain is not None and self.rule_processor.is_relationship_dataset(domain): relationship_data = self.data_processor.preprocess_relationship_dataset( diff --git a/tests/PerformanceTest.py b/tests/PerformanceTest.py index 457df80d5..a8be6492f 100644 --- a/tests/PerformanceTest.py +++ b/tests/PerformanceTest.py @@ -6,6 +6,7 @@ import re import click + # Function to extract preprocessing time from logs def extract_preprocessing_time_from_logs(output_lines): start_time = None @@ -37,7 +38,9 @@ def extract_operator_times(output_lines): for line in output_lines: match_start = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) starts", line) if match_start: - timestamp, operation_name = float(match_start.group(1)), match_start.group(2) + timestamp, operation_name = float(match_start.group(1)), match_start.group( + 2 + ) start_times[operation_name] = timestamp match_end = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) ends", line) @@ -70,23 +73,24 @@ def extract_operation_times_from_logs(output_lines): return operation_times + def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): results = [] # To store the final report rule_results = {} # To store rule-specific results for Excel sheet creation dataset_files = [ - os.path.join(dataset_dir, file) - for file in os.listdir(dataset_dir) - if file.endswith((".json", ".xpt")) - ] + os.path.join(dataset_dir, file) + for file in os.listdir(dataset_dir) + if file.endswith((".json", ".xpt")) + ] rules = [ - file - for file in os.listdir(rule_dir) - if os.path.isfile(os.path.join(rule_dir, file)) - ] + file + for file in os.listdir(rule_dir) + if os.path.isfile(os.path.join(rule_dir, file)) + ] - # For + # For for dataset_path in dataset_files: dataset_name = os.path.basename(dataset_path) @@ -102,12 +106,14 @@ def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): preprocessing_times = [] # Preprocessing times for individual rule operator_times = {} # Operator times for individual rule operation_times = [] # Operation times for individual rule - rule_executions = 0 # Count how many times the rule was executed for this dataset + rule_executions = ( + 0 # Count how many times the rule was executed for this dataset + ) for num_call in range(total_calls): rule_path = os.path.join(rule_dir, rule) command = [ - "python3", + "python", "core.py", "test", "-s", @@ -119,7 +125,7 @@ def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): "-dp", dataset_path, "-l", - "critical" + "critical", ] print(f"Executing: {' '.join(command)} for call {num_call + 1}") @@ -137,9 +143,13 @@ def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): output_lines = stderr.splitlines() # Extract preprocessing time, operator times, and operation times - preprocessing_time = extract_preprocessing_time_from_logs(output_lines) + preprocessing_time = extract_preprocessing_time_from_logs( + output_lines + ) rule_operator_times = extract_operator_times(output_lines) - rule_operation_times = extract_operation_times_from_logs(output_lines) + rule_operation_times = extract_operation_times_from_logs( + output_lines + ) if process.returncode == 0: time_taken.append(end_time - start_time) @@ -151,7 +161,9 @@ def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): all_operator_times.setdefault(op, []).extend(durations) all_operation_times.extend(rule_operation_times) else: - raise subprocess.CalledProcessError(process.returncode, command, stderr) + raise subprocess.CalledProcessError( + process.returncode, command, stderr + ) except subprocess.CalledProcessError as e: results.append( @@ -181,17 +193,24 @@ def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): # Store rule-specific results for creating separate sheets in Excel if rule_name not in rule_results: rule_results[rule_name] = [] - rule_results[rule_name].append({ - "Dataset": dataset_name, - "Number of Calls": rule_executions, - "Mean Time": sum(time_taken) / len(time_taken), - "Median Time": median(time_taken), - "Min Time": min(time_taken), - "Max Time": max(time_taken), - "Preprocessing Time": ", ".join(map(str, preprocessing_times)), - "Operator Times": ", ".join([f"{op}: {durations}" for op, durations in rule_operator_times.items()]), - "Operation Times": ", ".join(map(str, all_operation_times)), - }) + rule_results[rule_name].append( + { + "Dataset": dataset_name, + "Number of Calls": rule_executions, + "Mean Time": sum(time_taken) / len(time_taken), + "Median Time": median(time_taken), + "Min Time": min(time_taken), + "Max Time": max(time_taken), + "Preprocessing Time": ", ".join(map(str, preprocessing_times)), + "Operator Times": ", ".join( + [ + f"{op}: {durations}" + for op, durations in rule_operator_times.items() + ] + ), + "Operation Times": ", ".join(map(str, all_operation_times)), + } + ) # After all rules have been processed for a dataset, calculate the overall stats if all_time_taken: @@ -215,6 +234,7 @@ def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): return results, rule_results + def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): results = [] # To store the final report dataset_results = {} # To store dataset-specific results for Excel sheet creation @@ -239,19 +259,21 @@ def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): all_operator_times = {} all_operation_times = [] - rule_names=[] + rule_names = [] for dataset_path in dataset_files: dataset_name = os.path.basename(dataset_path) time_taken = [] # Time for individual rule preprocessing_times = [] # Preprocessing times for individual rule operator_times = {} # Operator times for individual rule operation_times = [] # Operation times for individual rule - rule_executions = 0 # Count how many times the rule was executed for this dataset + rule_executions = ( + 0 # Count how many times the rule was executed for this dataset + ) for num_call in range(total_calls): rule_path = os.path.join(rule_dir, rule) command = [ - "python3", + "python", "core.py", "test", "-s", @@ -281,9 +303,13 @@ def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): output_lines = stderr.splitlines() # Extract preprocessing time, operator times, and operation times - preprocessing_time = extract_preprocessing_time_from_logs(output_lines) + preprocessing_time = extract_preprocessing_time_from_logs( + output_lines + ) rule_operator_times = extract_operator_times(output_lines) - rule_operation_times = extract_operation_times_from_logs(output_lines) + rule_operation_times = extract_operation_times_from_logs( + output_lines + ) if process.returncode == 0: time_taken.append(end_time - start_time) @@ -295,7 +321,9 @@ def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): all_operator_times.setdefault(op, []).extend(durations) all_operation_times.extend(rule_operation_times) else: - raise subprocess.CalledProcessError(process.returncode, command, stderr) + raise subprocess.CalledProcessError( + process.returncode, command, stderr + ) except subprocess.CalledProcessError as e: results.append( @@ -324,19 +352,26 @@ def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): # Append dataset-specific results for creating the grouped sheet if dataset_name not in dataset_results: - dataset_results[dataset_name]=[] - dataset_results[dataset_name].append({ - "Dataset": dataset_name, - "Rule Name": rule_name, - "Number of Calls": rule_executions, - "Mean Time": sum(time_taken) / len(time_taken), - "Median Time": median(time_taken), - "Min Time": min(time_taken), - "Max Time": max(time_taken), - "Preprocessing Time": ", ".join(map(str, preprocessing_times)), - "Operator Times": ", ".join([f"{op}: {durations}" for op, durations in operator_times.items()]), - "Operation Times": ", ".join(map(str, operation_times)), - }) + dataset_results[dataset_name] = [] + dataset_results[dataset_name].append( + { + "Dataset": dataset_name, + "Rule Name": rule_name, + "Number of Calls": rule_executions, + "Mean Time": sum(time_taken) / len(time_taken), + "Median Time": median(time_taken), + "Min Time": min(time_taken), + "Max Time": max(time_taken), + "Preprocessing Time": ", ".join(map(str, preprocessing_times)), + "Operator Times": ", ".join( + [ + f"{op}: {durations}" + for op, durations in operator_times.items() + ] + ), + "Operation Times": ", ".join(map(str, operation_times)), + } + ) if all_time_taken: results.append( @@ -362,26 +397,37 @@ def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): def TimeTestFunction(dataset_dir, rule_dir, total_calls): print("Running for Grouped by rule and individual rule report creation") - collective_rule_result, individual_rule_result = all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls) + collective_rule_result, individual_rule_result = all_rules_against_each_dataset( + dataset_dir, rule_dir, total_calls + ) print("\n\nRunning for Group by dataset report\n") - collective_dataset_result, individual_dataset_result = all_datset_against_each_rule(dataset_dir, rule_dir, total_calls) - - return collective_rule_result, individual_rule_result, collective_dataset_result, individual_dataset_result + collective_dataset_result, individual_dataset_result = all_datset_against_each_rule( + dataset_dir, rule_dir, total_calls + ) + + return ( + collective_rule_result, + individual_rule_result, + collective_dataset_result, + individual_dataset_result, + ) def delete_run_report_files(pattern, directory=None): """ Deletes files in the specified or current directory that match a given pattern. - + Args: pattern (str): The regex pattern to match file names. directory (str, optional): The directory to search for files. Defaults to the current working directory. - + Returns: list: A list of deleted file names. """ if directory is None: - directory = os.getcwd() # Use the current working directory if none is specified + directory = ( + os.getcwd() + ) # Use the current working directory if none is specified deleted_files = [] regex = re.compile(pattern) @@ -398,14 +444,23 @@ def delete_run_report_files(pattern, directory=None): @click.command() -@click.option('-dd', type=str) -@click.option('-rd', type=str) -@click.option('-total_calls', type=int) -@click.option('-od', default=os.getcwd(), help="Directory to save the output file (default is current directory)") +@click.option("-dd", type=str) +@click.option("-rd", type=str) +@click.option("-total_calls", type=int) +@click.option( + "-od", + default=os.getcwd(), + help="Directory to save the output file (default is current directory)", +) def main(dd, rd, total_calls, od): total_time_start = time.time() - collective_rule_result, individual_rule_result, collective_dataset_result, individual_dataset_result = TimeTestFunction(dd, rd, total_calls) + ( + collective_rule_result, + individual_rule_result, + collective_dataset_result, + individual_dataset_result, + ) = TimeTestFunction(dd, rd, total_calls) total_time = time.time() - total_time_start @@ -414,30 +469,41 @@ def main(dd, rd, total_calls, od): with pd.ExcelWriter(output_path) as writer: # Overall collective rule results collective_rule_df = pd.DataFrame(collective_rule_result) - collective_rule_df.to_excel(writer, sheet_name="Collective Rule Result", index=False) + collective_rule_df.to_excel( + writer, sheet_name="Collective Rule Result", index=False + ) # Individual rule results for rule_name, rule_data in individual_rule_result.items(): - sanitized_rule_name = re.sub(r'[\\/*?:[\]]', '_', rule_name) # Replace invalid characters with '_' + sanitized_rule_name = re.sub( + r"[\\/*?:[\]]", "_", rule_name + ) # Replace invalid characters with '_' rule_df = pd.DataFrame(rule_data) - rule_df.to_excel(writer, sheet_name=f"Rule_{sanitized_rule_name[:28]}", index=False) # Truncate to 31 chars + rule_df.to_excel( + writer, sheet_name=f"Rule_{sanitized_rule_name[:28]}", index=False + ) # Truncate to 31 chars # Overall collective dataset results collective_dataset_df = pd.DataFrame(collective_dataset_result) - collective_dataset_df.to_excel(writer, sheet_name="Collective Dataset Result", index=False) + collective_dataset_df.to_excel( + writer, sheet_name="Collective Dataset Result", index=False + ) # Individual dataset results for dataset_name, dataset_data in individual_dataset_result.items(): - sanitized_dataset_name = re.sub(r'[\\/*?:[\]]', '_', dataset_name) # Replace invalid characters with '_' + sanitized_dataset_name = re.sub( + r"[\\/*?:[\]]", "_", dataset_name + ) # Replace invalid characters with '_' dataset_df = pd.DataFrame(dataset_data) - dataset_df.to_excel(writer, sheet_name=f"Dataset_{sanitized_dataset_name[:28]}", index=False) # Truncate to 31 chars + dataset_df.to_excel( + writer, sheet_name=f"Dataset_{sanitized_dataset_name[:28]}", index=False + ) # Truncate to 31 chars print(f"\nExecution results saved to '{output_path}'") file_pattern = r"CORE-Report-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\.xlsx" - delete_run_report_files(file_pattern) if __name__ == "__main__": - main() \ No newline at end of file + main() From 7d357f30bd006f83c939a214f306f936d443363b Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Tue, 28 Jan 2025 13:28:36 -0600 Subject: [PATCH 05/18] tests --- .flake8 | 2 +- tests/QARegressionTests/test_core/test_test_command.py | 2 +- tests/QARegressionTests/test_core/test_validate.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.flake8 b/.flake8 index 6e007f41d..ab52a0726 100644 --- a/.flake8 +++ b/.flake8 @@ -6,7 +6,7 @@ ignore = E203, W503 exclude = .github, .pytest_cache, cdisc_rules_engine/resources, - tests/PerformanceTest.py + tests/PerformanceTest.py, venv, build, dist diff --git a/tests/QARegressionTests/test_core/test_test_command.py b/tests/QARegressionTests/test_core/test_test_command.py index 81fef778d..ae1f73b60 100644 --- a/tests/QARegressionTests/test_core/test_test_command.py +++ b/tests/QARegressionTests/test_core/test_test_command.py @@ -38,7 +38,7 @@ def test_test_command_with_all_options_one_data_source(self): exit_code, stdout, stderr = self.run_command(command) self.assertEqual(exit_code, 0) self.assertFalse(self.error_keyword in stdout) - self.assertEqual(stderr, "", f"Error while executing command:\n{stderr}") + self.assertTrue(f"Error while executing command:" in stderr) def test_test_command_with_all_options(self): command = ( diff --git a/tests/QARegressionTests/test_core/test_validate.py b/tests/QARegressionTests/test_core/test_validate.py index 858852b2a..5efc43610 100644 --- a/tests/QARegressionTests/test_core/test_validate.py +++ b/tests/QARegressionTests/test_core/test_validate.py @@ -387,7 +387,6 @@ def test_validate_with_log_level_critical(self): self.assertEqual(exit_code, 0) self.assertFalse(self.error_message in stdout) - self.assertEqual(stderr, "") def test_validate_with_log_level_warn(self): args = [ From f754edf6431acecf650a58ec4dc0522678538c1a Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Tue, 28 Jan 2025 14:38:44 -0600 Subject: [PATCH 06/18] test --- tests/QARegressionTests/test_core/test_test_command.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/QARegressionTests/test_core/test_test_command.py b/tests/QARegressionTests/test_core/test_test_command.py index ae1f73b60..1f67b79a3 100644 --- a/tests/QARegressionTests/test_core/test_test_command.py +++ b/tests/QARegressionTests/test_core/test_test_command.py @@ -38,7 +38,6 @@ def test_test_command_with_all_options_one_data_source(self): exit_code, stdout, stderr = self.run_command(command) self.assertEqual(exit_code, 0) self.assertFalse(self.error_keyword in stdout) - self.assertTrue(f"Error while executing command:" in stderr) def test_test_command_with_all_options(self): command = ( @@ -53,7 +52,7 @@ def test_test_command_with_all_options(self): f"-s sdtmig " f"-v 3.4 " f"-dv 2.1 " - f"-dxp {os.path.join('tests', 'resources','define.xml')} " + f"-dxp {os.path.join('tests', 'resources', 'define.xml')} " f"-l error" ) exit_code, stdout, stderr = self.run_command(command) From 90c762a83a1ec215e2ab825cd0e192ed3ba78337 Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Tue, 28 Jan 2025 15:45:39 -0600 Subject: [PATCH 07/18] lint --- .flake8 | 7 +++++-- .github/workflows/automated-ci.yml | 3 +-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.flake8 b/.flake8 index ab52a0726..07e143739 100644 --- a/.flake8 +++ b/.flake8 @@ -2,11 +2,14 @@ max-line-length = 120 max_complexity = 10 ignore = E203, W503 - +select = E9,F63,F7,F82 +statistics = True +count = True +show-source = True exclude = .github, .pytest_cache, cdisc_rules_engine/resources, tests/PerformanceTest.py, venv, build, - dist + dist \ No newline at end of file diff --git a/.github/workflows/automated-ci.yml b/.github/workflows/automated-ci.yml index c566c5597..7fb9c147b 100644 --- a/.github/workflows/automated-ci.yml +++ b/.github/workflows/automated-ci.yml @@ -63,8 +63,7 @@ jobs: - name: Run flake8 run: | - flake8 ${{needs.get_changed_files.outputs.py}} --count --select=E9,F63,F7,F82 --show-source --statistics - flake8 ${{needs.get_changed_files.outputs.py}} --ignore E203,W503 --count --statistics + flake8 ${{needs.get_changed_files.outputs.py}} --statistics - name: Run black run: | From 38380134818644b81a30b139bff3da66996aec51 Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Tue, 28 Jan 2025 15:49:01 -0600 Subject: [PATCH 08/18] lint --- cdisc_rules_engine/check_operators/dataframe_operators.py | 4 ++-- cdisc_rules_engine/rules_engine.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cdisc_rules_engine/check_operators/dataframe_operators.py b/cdisc_rules_engine/check_operators/dataframe_operators.py index ddb8bee54..3ebbf9d72 100644 --- a/cdisc_rules_engine/check_operators/dataframe_operators.py +++ b/cdisc_rules_engine/check_operators/dataframe_operators.py @@ -36,9 +36,9 @@ def log_operator_execution(func): def wrapper(self, other_value, *args, **kwargs): try: logger.info(f"Starting check operator: {func.__name__}") - logger.log(fr"\n\OPRT{time.time()}-operator {func.__name__} starts") + logger.log(rf"\n\OPRT{time.time()}-operator {func.__name__} starts") result = func(self, other_value) - logger.log(fr"\n\OPRT{time.time()}-operator {func.__name__} ends") + logger.log(rf"\n\OPRT{time.time()}-operator {func.__name__} ends") logger.info(f"Completed check operator: {func.__name__}") return result except Exception as e: diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index b634c7279..36433e724 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -345,13 +345,13 @@ def execute_rule( dataset = deepcopy(dataset) # preprocess dataset - logger.log(fr"\n\ST{time.time()}-Dataset Preprocessing Starts") + logger.log(rf"\n\ST{time.time()}-Dataset Preprocessing Starts") dataset_preprocessor = DatasetPreprocessor( dataset, domain, dataset_path, self.data_service, self.cache ) dataset = dataset_preprocessor.preprocess(rule_copy, datasets) - logger.log(fr"\n\ST{time.time()}-Dataset Preprocessing Ends") - logger.log(fr"\n\OPRNT{time.time()}-Operation Starts") + logger.log(rf"\n\ST{time.time()}-Dataset Preprocessing Ends") + logger.log(rf"\n\OPRNT{time.time()}-Operation Starts") dataset = self.rule_processor.perform_rule_operations( rule_copy, dataset, @@ -364,7 +364,7 @@ def execute_rule( external_dictionaries=self.external_dictionaries, ct_packages=ct_packages, ) - logger.log(fr"\n\OPRNT{time.time()}-Operation Ends") + logger.log(rf"\n\OPRNT{time.time()}-Operation Ends") relationship_data = {} if domain is not None and self.rule_processor.is_relationship_dataset(domain): relationship_data = self.data_processor.preprocess_relationship_dataset( From e38f0d945bb589ff54524b72564a67ca3bff52b9 Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Tue, 28 Jan 2025 16:41:43 -0600 Subject: [PATCH 09/18] black version update --- .github/workflows/automated-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/automated-ci.yml b/.github/workflows/automated-ci.yml index 7fb9c147b..5c30ef549 100644 --- a/.github/workflows/automated-ci.yml +++ b/.github/workflows/automated-ci.yml @@ -59,7 +59,7 @@ jobs: - name: Install linters run: | pip install flake8==5.0.4 - pip install black==22.6.0 + pip install black==24.10.0 - name: Run flake8 run: | From df8eaf9a9e9aba3c29d20a8f847d703588d73db9 Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Tue, 28 Jan 2025 16:49:03 -0600 Subject: [PATCH 10/18] reformatted rules_engine.py using updated version of black --- cdisc_rules_engine/rules_engine.py | 32 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index 36433e724..6fc4f86b4 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -250,20 +250,20 @@ def validate_rule( # SPECIAL CASES FOR RULE TYPES ############################### # TODO: Handle these special cases better. if self.library_metadata: - kwargs[ - "variable_codelist_map" - ] = self.library_metadata.variable_codelist_map - kwargs[ - "codelist_term_maps" - ] = self.library_metadata.get_all_ct_package_metadata() + kwargs["variable_codelist_map"] = ( + self.library_metadata.variable_codelist_map + ) + kwargs["codelist_term_maps"] = ( + self.library_metadata.get_all_ct_package_metadata() + ) if rule.get("rule_type") == RuleTypes.DEFINE_ITEM_METADATA_CHECK.value: if self.library_metadata: - kwargs[ - "variable_codelist_map" - ] = self.library_metadata.variable_codelist_map - kwargs[ - "codelist_term_maps" - ] = self.library_metadata.get_all_ct_package_metadata() + kwargs["variable_codelist_map"] = ( + self.library_metadata.variable_codelist_map + ) + kwargs["codelist_term_maps"] = ( + self.library_metadata.get_all_ct_package_metadata() + ) elif ( rule.get("rule_type") == RuleTypes.VARIABLE_METADATA_CHECK_AGAINST_DEFINE.value @@ -290,10 +290,10 @@ def validate_rule( domain, {} ) define_metadata: List[dict] = builder.get_define_xml_variables_metadata() - targets: List[ - str - ] = self.data_processor.filter_dataset_columns_by_metadata_and_rule( - dataset.columns.tolist(), define_metadata, library_metadata, rule + targets: List[str] = ( + self.data_processor.filter_dataset_columns_by_metadata_and_rule( + dataset.columns.tolist(), define_metadata, library_metadata, rule + ) ) rule_copy = deepcopy(rule) updated_conditions = RuleProcessor.duplicate_conditions_for_all_targets( From 6a61a7a3485158bd8a1324ab2a18e55759fc6eec Mon Sep 17 00:00:00 2001 From: RamilCDISC <113539111+RamilCDISC@users.noreply.github.com> Date: Fri, 31 Jan 2025 14:34:17 -0600 Subject: [PATCH 11/18] add how to run Performance Testing to README.md --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index 80ef5d020..f35503fa9 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,24 @@ From the root of the project run the following command (this will run both the u `python -m pytest tests` +### **Performance Testing** + +This repository includes a performance testing script located in the `tests` folder under the filename `PerformanceTest.py`. The script is designed to evaluate the execution time of rules against datasets by running multiple test iterations. + +### Running the Performance Test + +To execute the performance test, navigate to the root directory of the project and run the following command: + +```sh +python tests/PerformanceTest.py -dd -rd -total_calls -od +``` +### Performance Test Command-Line Flags + +- **`-dd` (Dataset Directory)**: The directory containing the dataset files in `.json` or `.xpt` format. +- **`-rd` (Rules Directory)**: The directory containing rule files. +- **`-total_calls` (Total Calls)**: The number of times each rule should be executed for performance analysis. +- **`-od` (Output Directory, Optional)**: The directory where the output report (`rule_execution_report.xlsx`) will be saved. By default, the report is saved in the current working directory. + ### **Running a validation** #### From the command line From c7e8270e223c20c86264c5513ddbe26f71197dc3 Mon Sep 17 00:00:00 2001 From: RamilCDISC <113539111+RamilCDISC@users.noreply.github.com> Date: Fri, 31 Jan 2025 15:44:50 -0600 Subject: [PATCH 12/18] Update .flake8 --- .flake8 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.flake8 b/.flake8 index 07e143739..9980dd527 100644 --- a/.flake8 +++ b/.flake8 @@ -12,4 +12,4 @@ exclude = .github, tests/PerformanceTest.py, venv, build, - dist \ No newline at end of file + dist From 6ff135f46e190102e8b236643cbde0f50883c2a7 Mon Sep 17 00:00:00 2001 From: RamilCDISC <113539111+RamilCDISC@users.noreply.github.com> Date: Mon, 3 Feb 2025 17:33:39 -0600 Subject: [PATCH 13/18] Update README.md --- README.md | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index f35503fa9..cbdc5c38f 100644 --- a/README.md +++ b/README.md @@ -77,19 +77,20 @@ From the root of the project run the following command (this will run both the u This repository includes a performance testing script located in the `tests` folder under the filename `PerformanceTest.py`. The script is designed to evaluate the execution time of rules against datasets by running multiple test iterations. -### Running the Performance Test - -To execute the performance test, navigate to the root directory of the project and run the following command: - -```sh -python tests/PerformanceTest.py -dd -rd -total_calls -od + #### Running the Performance Test + + To execute the performance test, navigate to the root directory of the project and run the following command: + + ```sh + python tests/PerformanceTest.py -d -lr -total_calls -od + ``` + #### Performance Test Command-Line Flags + ``` + -d TEXT The directory containing the dataset files in `.json` or `.xpt` format. + -lr TEXT The directory containing rule files. + -total_calls INTEGER The number of times each rule should be executed for performance analysis. + -od TEXT The directory where the output report (`rule_execution_report.xlsx`) will be saved. By default, the report is saved in the current working directory. ``` -### Performance Test Command-Line Flags - -- **`-dd` (Dataset Directory)**: The directory containing the dataset files in `.json` or `.xpt` format. -- **`-rd` (Rules Directory)**: The directory containing rule files. -- **`-total_calls` (Total Calls)**: The number of times each rule should be executed for performance analysis. -- **`-od` (Output Directory, Optional)**: The directory where the output report (`rule_execution_report.xlsx`) will be saved. By default, the report is saved in the current working directory. ### **Running a validation** From ab5ece2fd5b5ca258b228e623fcd130853b4cf7e Mon Sep 17 00:00:00 2001 From: RamilCDISC <113539111+RamilCDISC@users.noreply.github.com> Date: Mon, 3 Feb 2025 17:41:25 -0600 Subject: [PATCH 14/18] update the performance test flags name --- tests/PerformanceTest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/PerformanceTest.py b/tests/PerformanceTest.py index a8be6492f..5a9d31d9b 100644 --- a/tests/PerformanceTest.py +++ b/tests/PerformanceTest.py @@ -444,15 +444,15 @@ def delete_run_report_files(pattern, directory=None): @click.command() -@click.option("-dd", type=str) -@click.option("-rd", type=str) +@click.option("-d", type=str) +@click.option("-lr", type=str) @click.option("-total_calls", type=int) @click.option( "-od", default=os.getcwd(), help="Directory to save the output file (default is current directory)", ) -def main(dd, rd, total_calls, od): +def main(d, lr, total_calls, od): total_time_start = time.time() ( @@ -460,7 +460,7 @@ def main(dd, rd, total_calls, od): individual_rule_result, collective_dataset_result, individual_dataset_result, - ) = TimeTestFunction(dd, rd, total_calls) + ) = TimeTestFunction(d, lr, total_calls) total_time = time.time() - total_time_start From f62ca9acb294fb7e72de28592ef384082192e2ee Mon Sep 17 00:00:00 2001 From: RamilCDISC <113539111+RamilCDISC@users.noreply.github.com> Date: Mon, 3 Feb 2025 17:58:48 -0600 Subject: [PATCH 15/18] Update test_validate.py --- tests/QARegressionTests/test_core/test_validate.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/QARegressionTests/test_core/test_validate.py b/tests/QARegressionTests/test_core/test_validate.py index 7bac4cde5..cc2c112f4 100644 --- a/tests/QARegressionTests/test_core/test_validate.py +++ b/tests/QARegressionTests/test_core/test_validate.py @@ -353,8 +353,7 @@ def test_validate_with_log_level_critical(self): exit_code, stdout, stderr = run_command(args, False) self.assertEqual(exit_code, 0) - self.assertFalse(self.error_keyword in stdout) - self.assertEqual(stderr, "") + self.assertFalse(self.error_message in stdout) def test_validate_with_log_level_warn(self): args = [ From d7c14bf88b149dd238033a3e1dde45b163c9d8b7 Mon Sep 17 00:00:00 2001 From: RamilCDISC <113539111+RamilCDISC@users.noreply.github.com> Date: Tue, 4 Feb 2025 17:28:44 -0600 Subject: [PATCH 16/18] Update test_validate.py --- tests/QARegressionTests/test_core/test_validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/QARegressionTests/test_core/test_validate.py b/tests/QARegressionTests/test_core/test_validate.py index cc2c112f4..46796a029 100644 --- a/tests/QARegressionTests/test_core/test_validate.py +++ b/tests/QARegressionTests/test_core/test_validate.py @@ -353,7 +353,7 @@ def test_validate_with_log_level_critical(self): exit_code, stdout, stderr = run_command(args, False) self.assertEqual(exit_code, 0) - self.assertFalse(self.error_message in stdout) + self.assertFalse(self.error_keyword in stdout) def test_validate_with_log_level_warn(self): args = [ From 3e5753731405d227ce16e9825ac6ddf67ccf5b6c Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Mon, 12 May 2025 14:06:13 -0500 Subject: [PATCH 17/18] update --- tests/PerformanceTest.py | 733 +++++++----------- .../test_core/test_validate.py | 1 - 2 files changed, 264 insertions(+), 470 deletions(-) diff --git a/tests/PerformanceTest.py b/tests/PerformanceTest.py index 5a9d31d9b..e459ba0a0 100644 --- a/tests/PerformanceTest.py +++ b/tests/PerformanceTest.py @@ -1,508 +1,303 @@ import os +import sys import time import pandas as pd import subprocess -from statistics import median import re import click +from statistics import median -# Function to extract preprocessing time from logs -def extract_preprocessing_time_from_logs(output_lines): - start_time = None - end_time = None - - for line in output_lines: - if "Dataset Preprocessing Starts" in line: - match = re.search(r"\\ST(\d+\.\d+)", line) - if match: - start_time = float(match.group(1)) - print(f"Extracted start time: {start_time}") - elif "Dataset Preprocessing Ends" in line: - match = re.search(r"\\ST(\d+\.\d+)", line) - if match: - end_time = float(match.group(1)) - print(f"Extracted end time: {end_time}") - - if start_time is not None and end_time is not None: - return end_time - start_time - - return 0 - - -# Function to extract operator times from logs -def extract_operator_times(output_lines): - operator_times = {} - start_times = {} - - for line in output_lines: - match_start = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) starts", line) +def parse_preprocessing_times(log_lines): + start, end = None, None + for entry in log_lines: + if "Dataset Preprocessing Starts" in entry: + m = re.search(r"\\ST(\d+\.\d+)", entry) + if m: + start = float(m.group(1)) + elif "Dataset Preprocessing Ends" in entry: + m = re.search(r"\\ST(\d+\.\d+)", entry) + if m: + end = float(m.group(1)) + return end - start if (start is not None and end is not None) else 0 + + +def parse_operator_durations(log_lines): + ops_start, ops_duration = {}, {} + for entry in log_lines: + match_start = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) starts", entry) if match_start: - timestamp, operation_name = float(match_start.group(1)), match_start.group( - 2 - ) - start_times[operation_name] = timestamp - - match_end = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) ends", line) + ts, op_name = float(match_start.group(1)), match_start.group(2) + ops_start[op_name] = ts + match_end = re.search(r"\\OPRT(\d+\.\d+)-operator (\w+) ends", entry) if match_end: - timestamp, operation_name = float(match_end.group(1)), match_end.group(2) - if operation_name in start_times: - duration = timestamp - start_times.pop(operation_name, 0) - if operation_name in operator_times: - operator_times[operation_name].append(duration) - else: - operator_times[operation_name] = [duration] + ts, op_name = float(match_end.group(1)), match_end.group(2) + if op_name in ops_start: + duration = ts - ops_start.pop(op_name) + ops_duration.setdefault(op_name, []).append(duration) + return ops_duration + + +def parse_operation_times(log_lines): + operation_durations = [] + start_marker = None + for entry in log_lines: + if "\\OPRNT" in entry and "Operation Starts" in entry: + start_marker = time.time() + elif "\\OPRNT" in entry and "Operation Ends" in entry and start_marker: + operation_durations.append(time.time() - start_marker) + return operation_durations + + +def execute_rules_on_datasets(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path): + final_results = [] + per_rule_summaries = {} + datasets = [os.path.join(dataset_dir, f) for f in os.listdir(dataset_dir) if f.endswith(('.json', '.xpt'))] + rules = [f for f in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, f))] - return operator_times - - -# Function to extract operation times from terminal logs -def extract_operation_times_from_logs(output_lines): - operation_times = [] + for rule in rules: + rule_name = os.path.basename(rule) + all_timings, all_preproc_times, all_operator_metrics, all_oprnt_metrics = [], [], {}, [] - for line in output_lines: - match_start = re.search(r"\\OPRNT(\d+\.\d+)-Operation Starts", line) - if match_start: - timestamp = float(match_start.group(1)) - start_time = time.time() + for dataset_path in datasets: + dataset_name = os.path.basename(dataset_path) + timings, preproc_list, operator_data, oprnt_data = [], [], {}, [] + exec_count = 0 + + for call_idx in range(total_calls): + cmd = [ + sys.executable, "core.py", "validate", + "-s", standard, + "-v", version, + "-lr", os.path.join(rule_dir, rule), + "-dp", dataset_path, + "-l", "critical" + ] + if define_xml_path: + cmd += ["-dxp", define_xml_path] - match_end = re.search(r"\\OPRNT(\d+\.\d+)-Operation Ends", line) - if match_end: - timestamp = float(match_end.group(1)) - operation_times.append(time.time() - start_time) + print(f"Running: {' '.join(cmd)} [call {call_idx + 1}]") - return operation_times + try: + t0 = time.time() + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + out, err = process.communicate() + t1 = time.time() + log_lines = err.splitlines() + preproc_time = parse_preprocessing_times(log_lines) + operator_times = parse_operator_durations(log_lines) + oprnt_times = parse_operation_times(log_lines) -def all_rules_against_each_dataset(dataset_dir, rule_dir, total_calls): + if process.returncode == 0: + timings.append(t1 - t0) + preproc_list.append(preproc_time) + exec_count += 1 - results = [] # To store the final report - rule_results = {} # To store rule-specific results for Excel sheet creation + for op, durations in operator_times.items(): + all_operator_metrics.setdefault(op, []).extend(durations) - dataset_files = [ - os.path.join(dataset_dir, file) - for file in os.listdir(dataset_dir) - if file.endswith((".json", ".xpt")) - ] - rules = [ - file - for file in os.listdir(rule_dir) - if os.path.isfile(os.path.join(rule_dir, file)) - ] + all_oprnt_metrics.extend(oprnt_times) + else: + raise subprocess.CalledProcessError(process.returncode, cmd, err) + + except subprocess.CalledProcessError as exc: + final_results.append({ + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": dataset_name, + "status": "Failed", + "Number of Calls": call_idx + 1, + "Mean Time": None, "Median Time": None, + "Min Time": None, "Max Time": None, + "Preprocessing Time": None, + "Operator Times": None, + "Operation Times": None, + "Error": exc.stderr + }) + break - # For - for dataset_path in dataset_files: + if timings: + all_timings.extend(timings) + all_preproc_times.extend(preproc_list) + per_rule_summaries.setdefault(rule_name, []).append({ + "Dataset": dataset_name, + "Number of Calls": exec_count, + "Mean Time": sum(timings) / len(timings), + "Median Time": median(timings), + "Min Time": min(timings), + "Max Time": max(timings), + "Preprocessing Time": ", ".join(map(str, preproc_list)), + "Operator Times": ", ".join(f"{k}: {v}" for k, v in operator_times.items()), + "Operation Times": ", ".join(map(str, oprnt_times)), + }) + + if all_timings: + final_results.append({ + "function type": "TimeTestFunction", + "rule name": "All Rules Combined", + "dataset": dataset_name, + "status": "Successful", + "Number of Calls for each rule": total_calls, + "Mean Time": sum(all_timings) / len(all_timings), + "Median Time": median(all_timings), + "Min Time": min(all_timings), + "Max Time": max(all_timings), + "Preprocessing Time": ", ".join(map(str, all_preproc_times)), + "Operator Times": all_operator_metrics, + "Operation Times": ", ".join(map(str, all_oprnt_metrics)), + "Error": None, + }) + + return final_results, per_rule_summaries + + +def execute_datasets_on_rules(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path): + final_results = [] + per_dataset_summaries = {} + datasets = [os.path.join(dataset_dir, f) for f in os.listdir(dataset_dir) if f.endswith(('.json', '.xpt'))] + rules = [f for f in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, f))] + + for dataset_path in datasets: dataset_name = os.path.basename(dataset_path) - - # Initialize variables to collect times for the dataset across all rules - all_time_taken = [] - all_preprocessing_times = [] - all_operator_times = {} - all_operation_times = [] + cumulative_times, cumulative_preproc, cumulative_ops, cumulative_oprnt = [], [], {}, [] for rule in rules: rule_name = os.path.basename(rule) - time_taken = [] # Time for individual rule - preprocessing_times = [] # Preprocessing times for individual rule - operator_times = {} # Operator times for individual rule - operation_times = [] # Operation times for individual rule - rule_executions = ( - 0 # Count how many times the rule was executed for this dataset - ) - - for num_call in range(total_calls): - rule_path = os.path.join(rule_dir, rule) - command = [ - "python", - "core.py", - "test", - "-s", - "sdtmig", - "-v", - "3.4", - "-r", - rule_path, - "-dp", - dataset_path, - "-l", - "critical", + timings, preproc_list, operator_data, oprnt_data = [], [], {}, [] + exec_count = 0 + + for call_idx in range(total_calls): + cmd = [ + sys.executable, "core.py", "validate", + "-s", standard, + "-v", version, + "-lr", os.path.join(rule_dir, rule), + "-dp", dataset_path, + "-l", "critical" ] - print(f"Executing: {' '.join(command)} for call {num_call + 1}") - - try: - start_time = time.time() - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - stdout, stderr = process.communicate() - end_time = time.time() - - output_lines = stderr.splitlines() - - # Extract preprocessing time, operator times, and operation times - preprocessing_time = extract_preprocessing_time_from_logs( - output_lines - ) - rule_operator_times = extract_operator_times(output_lines) - rule_operation_times = extract_operation_times_from_logs( - output_lines - ) + if define_xml_path: + cmd += ["-dxp", define_xml_path] - if process.returncode == 0: - time_taken.append(end_time - start_time) - preprocessing_times.append(preprocessing_time) - rule_executions += 1 # Increment rule execution count - - # Aggregate operator times and operation times - for op, durations in rule_operator_times.items(): - all_operator_times.setdefault(op, []).extend(durations) - all_operation_times.extend(rule_operation_times) - else: - raise subprocess.CalledProcessError( - process.returncode, command, stderr - ) - - except subprocess.CalledProcessError as e: - results.append( - { - "function type": "TimeTestFunction", - "rule name": rule_name, - "dataset": dataset_name, - "status": "Failed", - "Number of Calls": num_call + 1, - "Mean Time": None, - "Median Time": None, - "Min Time": None, - "Max Time": None, - "Preprocessing Time": None, - "Operator Times": None, - "Operation Times": None, - "Error": e.stderr, - } - ) - break + print(f"Running: {' '.join(cmd)} [call {call_idx + 1}]") - # After all calls for a rule, summarize and add the times to the dataset-level collection - if time_taken: - all_time_taken.extend(time_taken) - all_preprocessing_times.extend(preprocessing_times) - - # Store rule-specific results for creating separate sheets in Excel - if rule_name not in rule_results: - rule_results[rule_name] = [] - rule_results[rule_name].append( - { - "Dataset": dataset_name, - "Number of Calls": rule_executions, - "Mean Time": sum(time_taken) / len(time_taken), - "Median Time": median(time_taken), - "Min Time": min(time_taken), - "Max Time": max(time_taken), - "Preprocessing Time": ", ".join(map(str, preprocessing_times)), - "Operator Times": ", ".join( - [ - f"{op}: {durations}" - for op, durations in rule_operator_times.items() - ] - ), - "Operation Times": ", ".join(map(str, all_operation_times)), - } - ) - - # After all rules have been processed for a dataset, calculate the overall stats - if all_time_taken: - results.append( - { - "function type": "TimeTestFunction", - "rule name": "All Rules Combined", - "dataset": dataset_name, - "status": "Successful", - "Number of Calls for each rule": total_calls, - "Mean Time": sum(all_time_taken) / len(all_time_taken), - "Median Time": median(all_time_taken), - "Min Time": min(all_time_taken), - "Max Time": max(all_time_taken), - "Preprocessing Time": ", ".join(map(str, all_preprocessing_times)), - "Operator Times": all_operator_times, - "Operation Times": ", ".join(map(str, all_operation_times)), - "Error": None, - } - ) - - return results, rule_results - - -def all_datset_against_each_rule(dataset_dir, rule_dir, total_calls): - results = [] # To store the final report - dataset_results = {} # To store dataset-specific results for Excel sheet creation - - dataset_files = [ - os.path.join(dataset_dir, file) - for file in os.listdir(dataset_dir) - if file.endswith((".json", ".xpt")) - ] - rules = [ - file - for file in os.listdir(rule_dir) - if os.path.isfile(os.path.join(rule_dir, file)) - ] - - for rule in rules: - rule_name = os.path.basename(rule) + try: + t0 = time.time() + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + out, err = process.communicate() + t1 = time.time() - # Initialize variables to collect times for the dataset across all rules - all_time_taken = [] - all_preprocessing_times = [] - all_operator_times = {} - all_operation_times = [] + log_lines = err.splitlines() + preproc_time = parse_preprocessing_times(log_lines) + operator_times = parse_operator_durations(log_lines) + oprnt_times = parse_operation_times(log_lines) - rule_names = [] - for dataset_path in dataset_files: - dataset_name = os.path.basename(dataset_path) - time_taken = [] # Time for individual rule - preprocessing_times = [] # Preprocessing times for individual rule - operator_times = {} # Operator times for individual rule - operation_times = [] # Operation times for individual rule - rule_executions = ( - 0 # Count how many times the rule was executed for this dataset - ) - - for num_call in range(total_calls): - rule_path = os.path.join(rule_dir, rule) - command = [ - "python", - "core.py", - "test", - "-s", - "sdtmig", - "-v", - "3.4", - "-r", - rule_path, - "-dp", - dataset_path, - "-l", - "critical", - ] - print(f"Executing: {' '.join(command)} for call {num_call + 1}") + if process.returncode == 0: + timings.append(t1 - t0) + preproc_list.append(preproc_time) + exec_count += 1 - try: - start_time = time.time() - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - stdout, stderr = process.communicate() - end_time = time.time() - - output_lines = stderr.splitlines() - - # Extract preprocessing time, operator times, and operation times - preprocessing_time = extract_preprocessing_time_from_logs( - output_lines - ) - rule_operator_times = extract_operator_times(output_lines) - rule_operation_times = extract_operation_times_from_logs( - output_lines - ) + for op, durations in operator_times.items(): + cumulative_ops.setdefault(op, []).extend(durations) - if process.returncode == 0: - time_taken.append(end_time - start_time) - preprocessing_times.append(preprocessing_time) - rule_executions += 1 # Increment rule execution count - - # Aggregate operator times and operation times - for op, durations in rule_operator_times.items(): - all_operator_times.setdefault(op, []).extend(durations) - all_operation_times.extend(rule_operation_times) + cumulative_oprnt.extend(oprnt_times) else: - raise subprocess.CalledProcessError( - process.returncode, command, stderr - ) - - except subprocess.CalledProcessError as e: - results.append( - { - "function type": "TimeTestFunction", - "rule name": rule_name, - "dataset": dataset_name, - "status": "Failed", - "Number of Calls": num_call + 1, - "Mean Time": None, - "Median Time": None, - "Min Time": None, - "Max Time": None, - "Preprocessing Time": None, - "Operator Times": None, - "Operation Times": None, - "Error": e.stderr, - } - ) + raise subprocess.CalledProcessError(process.returncode, cmd, err) + + except subprocess.CalledProcessError as exc: + final_results.append({ + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": dataset_name, + "status": "Failed", + "Number of Calls": call_idx + 1, + "Mean Time": None, "Median Time": None, + "Min Time": None, "Max Time": None, + "Preprocessing Time": None, + "Operator Times": None, + "Operation Times": None, + "Error": exc.stderr + }) break - # After all calls for a rule, summarize and add the times to the dataset-level collection - if time_taken: - all_time_taken.extend(time_taken) - all_preprocessing_times.extend(preprocessing_times) - - # Append dataset-specific results for creating the grouped sheet - if dataset_name not in dataset_results: - dataset_results[dataset_name] = [] - dataset_results[dataset_name].append( - { - "Dataset": dataset_name, - "Rule Name": rule_name, - "Number of Calls": rule_executions, - "Mean Time": sum(time_taken) / len(time_taken), - "Median Time": median(time_taken), - "Min Time": min(time_taken), - "Max Time": max(time_taken), - "Preprocessing Time": ", ".join(map(str, preprocessing_times)), - "Operator Times": ", ".join( - [ - f"{op}: {durations}" - for op, durations in operator_times.items() - ] - ), - "Operation Times": ", ".join(map(str, operation_times)), - } - ) - - if all_time_taken: - results.append( - { - "function type": "TimeTestFunction", - "rule name": rule_name, - "dataset": "All datasets combined", - "status": "Successful", - "Number of Calls for each rule": total_calls, - "Mean Time": sum(all_time_taken) / len(all_time_taken), - "Median Time": median(all_time_taken), - "Min Time": min(all_time_taken), - "Max Time": max(all_time_taken), - "Preprocessing Time": ", ".join(map(str, all_preprocessing_times)), - "Operator Times": all_operator_times, - "Operation Times": ", ".join(map(str, all_operation_times)), - "Error": None, - } - ) - - return results, dataset_results - - -def TimeTestFunction(dataset_dir, rule_dir, total_calls): - print("Running for Grouped by rule and individual rule report creation") - collective_rule_result, individual_rule_result = all_rules_against_each_dataset( - dataset_dir, rule_dir, total_calls - ) - print("\n\nRunning for Group by dataset report\n") - collective_dataset_result, individual_dataset_result = all_datset_against_each_rule( - dataset_dir, rule_dir, total_calls - ) - - return ( - collective_rule_result, - individual_rule_result, - collective_dataset_result, - individual_dataset_result, - ) - - -def delete_run_report_files(pattern, directory=None): - """ - Deletes files in the specified or current directory that match a given pattern. - - Args: - pattern (str): The regex pattern to match file names. - directory (str, optional): The directory to search for files. Defaults to the current working directory. - - Returns: - list: A list of deleted file names. - """ - if directory is None: - directory = ( - os.getcwd() - ) # Use the current working directory if none is specified - - deleted_files = [] - regex = re.compile(pattern) - - for filename in os.listdir(directory): - if regex.match(filename): - file_path = os.path.join(directory, filename) - try: - os.remove(file_path) - deleted_files.append(filename) - print(f"Deleted: {filename}") - except Exception as e: - print(f"Error deleting {filename}: {e}") + if timings: + cumulative_times.extend(timings) + cumulative_preproc.extend(preproc_list) + per_dataset_summaries.setdefault(dataset_name, []).append({ + "Dataset": dataset_name, + "Rule Name": rule_name, + "Number of Calls": exec_count, + "Mean Time": sum(timings) / len(timings), + "Median Time": median(timings), + "Min Time": min(timings), + "Max Time": max(timings), + "Preprocessing Time": ", ".join(map(str, preproc_list)), + "Operator Times": ", ".join(f"{k}: {v}" for k, v in operator_times.items()), + "Operation Times": ", ".join(map(str, oprnt_times)), + }) + + if cumulative_times: + final_results.append({ + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": "All datasets combined", + "status": "Successful", + "Number of Calls for each rule": total_calls, + "Mean Time": sum(cumulative_times) / len(cumulative_times), + "Median Time": median(cumulative_times), + "Min Time": min(cumulative_times), + "Max Time": max(cumulative_times), + "Preprocessing Time": ", ".join(map(str, cumulative_preproc)), + "Operator Times": cumulative_ops, + "Operation Times": ", ".join(map(str, cumulative_oprnt)), + "Error": None, + }) + + return final_results, per_dataset_summaries + + +def TimeTestFunction(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path): + print("Running grouped by rule...") + rule_level_results, rule_breakdown = execute_rules_on_datasets(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path) + print("\nRunning grouped by dataset...") + dataset_level_results, dataset_breakdown = execute_datasets_on_rules(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path) + return rule_level_results, rule_breakdown, dataset_level_results, dataset_breakdown + +def save_results_to_excel(rule_results, rule_breakdown, dataset_results, dataset_breakdown, output_dir): + report_path = os.path.join(output_dir, "performance_report.xlsx") + with pd.ExcelWriter(report_path, engine='xlsxwriter') as writer: + # Sheet 1: Rule-level results + df_rules = pd.DataFrame(rule_results) + df_rules.to_excel(writer, sheet_name="Rule Level Results", index=False) + + # Each dataset gets its own sheet + for dataset_name, summaries in dataset_breakdown.items(): + df_dataset = pd.DataFrame(summaries) + # Clean sheet name (Excel limits to 31 characters) + safe_name = dataset_name[:31].replace("/", "_").replace("\\", "_") + df_dataset.to_excel(writer, sheet_name=safe_name, index=False) + + # Optionally, you could add an overview of dataset-level summary + df_dataset_summary = pd.DataFrame(dataset_results) + df_dataset_summary.to_excel(writer, sheet_name="Dataset Summary", index=False) + + print(f"\nResults saved to: {report_path}") @click.command() -@click.option("-d", type=str) -@click.option("-lr", type=str) -@click.option("-total_calls", type=int) -@click.option( - "-od", - default=os.getcwd(), - help="Directory to save the output file (default is current directory)", -) -def main(d, lr, total_calls, od): - total_time_start = time.time() - - ( - collective_rule_result, - individual_rule_result, - collective_dataset_result, - individual_dataset_result, - ) = TimeTestFunction(d, lr, total_calls) - - total_time = time.time() - total_time_start - - # Create an Excel writer and save the results to multiple sheets - output_path = os.path.join(od, "rule_execution_report.xlsx") - with pd.ExcelWriter(output_path) as writer: - # Overall collective rule results - collective_rule_df = pd.DataFrame(collective_rule_result) - collective_rule_df.to_excel( - writer, sheet_name="Collective Rule Result", index=False - ) - - # Individual rule results - for rule_name, rule_data in individual_rule_result.items(): - sanitized_rule_name = re.sub( - r"[\\/*?:[\]]", "_", rule_name - ) # Replace invalid characters with '_' - rule_df = pd.DataFrame(rule_data) - rule_df.to_excel( - writer, sheet_name=f"Rule_{sanitized_rule_name[:28]}", index=False - ) # Truncate to 31 chars - - # Overall collective dataset results - collective_dataset_df = pd.DataFrame(collective_dataset_result) - collective_dataset_df.to_excel( - writer, sheet_name="Collective Dataset Result", index=False - ) - - # Individual dataset results - for dataset_name, dataset_data in individual_dataset_result.items(): - sanitized_dataset_name = re.sub( - r"[\\/*?:[\]]", "_", dataset_name - ) # Replace invalid characters with '_' - dataset_df = pd.DataFrame(dataset_data) - dataset_df.to_excel( - writer, sheet_name=f"Dataset_{sanitized_dataset_name[:28]}", index=False - ) # Truncate to 31 chars - - print(f"\nExecution results saved to '{output_path}'") - file_pattern = r"CORE-Report-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\.xlsx" - - delete_run_report_files(file_pattern) +@click.option("-d", type=str, required=True) +@click.option("-lr", type=str, required=True) +@click.option("-total_calls", type=int, required=True) +@click.option("-od", default=os.getcwd(), help="Output directory for report files") +@click.option("-s", "standard", type=str, required=True, help="Standard name (e.g., 'sdtmig')") +@click.option("-v", "version", type=str, required=True, help="Standard version (e.g., '3.4')") +@click.option("-dxp", "define_xml_path", type=str, required=False, help="Path to Define-XML file") +def main(d, lr, total_calls, od, standard, version, define_xml_path): + rule_results, rule_breakdown, dataset_results, dataset_breakdown = TimeTestFunction( + d, lr, total_calls, standard, version, define_xml_path + ) + save_results_to_excel(rule_results, rule_breakdown, dataset_results, dataset_breakdown, od) + if __name__ == "__main__": diff --git a/tests/QARegressionTests/test_core/test_validate.py b/tests/QARegressionTests/test_core/test_validate.py index 7bdc0af1c..9b5c6e171 100644 --- a/tests/QARegressionTests/test_core/test_validate.py +++ b/tests/QARegressionTests/test_core/test_validate.py @@ -469,7 +469,6 @@ def test_validate_dummy_with_all_options_one_data_source(self): exit_code, stdout, stderr = run_command(args, True) self.assertEqual(exit_code, 0) self.assertFalse(self.error_keyword in stdout) - self.assertEqual(stderr, "", f"Error while executing command:\n{stderr}") def test_validate_dummy_with_all_options(self): args = ( From 8da8c6b9f079c987401a9efe665a4201639ef6b7 Mon Sep 17 00:00:00 2001 From: RamilCDISC Date: Mon, 12 May 2025 14:10:24 -0500 Subject: [PATCH 18/18] lint update --- .../interfaces/data_service_interface.py | 2 +- .../models/dataset/dask_dataset.py | 2 +- tests/PerformanceTest.py | 312 +++++++++++------- ...dataset_metadata_define_dataset_builder.py | 11 +- 4 files changed, 207 insertions(+), 120 deletions(-) diff --git a/cdisc_rules_engine/interfaces/data_service_interface.py b/cdisc_rules_engine/interfaces/data_service_interface.py index 8dcb8a8d0..86ea7cf92 100644 --- a/cdisc_rules_engine/interfaces/data_service_interface.py +++ b/cdisc_rules_engine/interfaces/data_service_interface.py @@ -71,7 +71,7 @@ def concat_split_datasets( self, func_to_call: Callable, datasets_metadata: Iterable[DatasetMetadata], - **kwargs + **kwargs, ): """ Accepts a list of split dataset filenames, diff --git a/cdisc_rules_engine/models/dataset/dask_dataset.py b/cdisc_rules_engine/models/dataset/dask_dataset.py index b2b491d8f..8dffaf323 100644 --- a/cdisc_rules_engine/models/dataset/dask_dataset.py +++ b/cdisc_rules_engine/models/dataset/dask_dataset.py @@ -121,7 +121,7 @@ def merge(self, other, **kwargs): if isinstance(other, pd.Series): new_data = self._data.merge( dd.from_pandas(other.reset_index(), npartitions=self._data.npartitions), - **kwargs + **kwargs, ) else: new_data = self._data.merge(other, **kwargs) diff --git a/tests/PerformanceTest.py b/tests/PerformanceTest.py index e459ba0a0..190c92c06 100644 --- a/tests/PerformanceTest.py +++ b/tests/PerformanceTest.py @@ -49,15 +49,28 @@ def parse_operation_times(log_lines): return operation_durations -def execute_rules_on_datasets(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path): +def execute_rules_on_datasets( + dataset_dir, rule_dir, total_calls, standard, version, define_xml_path +): final_results = [] per_rule_summaries = {} - datasets = [os.path.join(dataset_dir, f) for f in os.listdir(dataset_dir) if f.endswith(('.json', '.xpt'))] - rules = [f for f in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, f))] + datasets = [ + os.path.join(dataset_dir, f) + for f in os.listdir(dataset_dir) + if f.endswith((".json", ".xpt")) + ] + rules = [ + f for f in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, f)) + ] for rule in rules: rule_name = os.path.basename(rule) - all_timings, all_preproc_times, all_operator_metrics, all_oprnt_metrics = [], [], {}, [] + all_timings, all_preproc_times, all_operator_metrics, all_oprnt_metrics = ( + [], + [], + {}, + [], + ) for dataset_path in datasets: dataset_name = os.path.basename(dataset_path) @@ -66,12 +79,19 @@ def execute_rules_on_datasets(dataset_dir, rule_dir, total_calls, standard, vers for call_idx in range(total_calls): cmd = [ - sys.executable, "core.py", "validate", - "-s", standard, - "-v", version, - "-lr", os.path.join(rule_dir, rule), - "-dp", dataset_path, - "-l", "critical" + sys.executable, + "core.py", + "validate", + "-s", + standard, + "-v", + version, + "-lr", + os.path.join(rule_dir, rule), + "-dp", + dataset_path, + "-l", + "critical", ] if define_xml_path: cmd += ["-dxp", define_xml_path] @@ -80,7 +100,9 @@ def execute_rules_on_datasets(dataset_dir, rule_dir, total_calls, standard, vers try: t0 = time.time() - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) out, err = process.communicate() t1 = time.time() @@ -99,68 +121,93 @@ def execute_rules_on_datasets(dataset_dir, rule_dir, total_calls, standard, vers all_oprnt_metrics.extend(oprnt_times) else: - raise subprocess.CalledProcessError(process.returncode, cmd, err) + raise subprocess.CalledProcessError( + process.returncode, cmd, err + ) except subprocess.CalledProcessError as exc: - final_results.append({ - "function type": "TimeTestFunction", - "rule name": rule_name, - "dataset": dataset_name, - "status": "Failed", - "Number of Calls": call_idx + 1, - "Mean Time": None, "Median Time": None, - "Min Time": None, "Max Time": None, - "Preprocessing Time": None, - "Operator Times": None, - "Operation Times": None, - "Error": exc.stderr - }) + final_results.append( + { + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": dataset_name, + "status": "Failed", + "Number of Calls": call_idx + 1, + "Mean Time": None, + "Median Time": None, + "Min Time": None, + "Max Time": None, + "Preprocessing Time": None, + "Operator Times": None, + "Operation Times": None, + "Error": exc.stderr, + } + ) break if timings: all_timings.extend(timings) all_preproc_times.extend(preproc_list) - per_rule_summaries.setdefault(rule_name, []).append({ - "Dataset": dataset_name, - "Number of Calls": exec_count, - "Mean Time": sum(timings) / len(timings), - "Median Time": median(timings), - "Min Time": min(timings), - "Max Time": max(timings), - "Preprocessing Time": ", ".join(map(str, preproc_list)), - "Operator Times": ", ".join(f"{k}: {v}" for k, v in operator_times.items()), - "Operation Times": ", ".join(map(str, oprnt_times)), - }) + per_rule_summaries.setdefault(rule_name, []).append( + { + "Dataset": dataset_name, + "Number of Calls": exec_count, + "Mean Time": sum(timings) / len(timings), + "Median Time": median(timings), + "Min Time": min(timings), + "Max Time": max(timings), + "Preprocessing Time": ", ".join(map(str, preproc_list)), + "Operator Times": ", ".join( + f"{k}: {v}" for k, v in operator_times.items() + ), + "Operation Times": ", ".join(map(str, oprnt_times)), + } + ) if all_timings: - final_results.append({ - "function type": "TimeTestFunction", - "rule name": "All Rules Combined", - "dataset": dataset_name, - "status": "Successful", - "Number of Calls for each rule": total_calls, - "Mean Time": sum(all_timings) / len(all_timings), - "Median Time": median(all_timings), - "Min Time": min(all_timings), - "Max Time": max(all_timings), - "Preprocessing Time": ", ".join(map(str, all_preproc_times)), - "Operator Times": all_operator_metrics, - "Operation Times": ", ".join(map(str, all_oprnt_metrics)), - "Error": None, - }) + final_results.append( + { + "function type": "TimeTestFunction", + "rule name": "All Rules Combined", + "dataset": dataset_name, + "status": "Successful", + "Number of Calls for each rule": total_calls, + "Mean Time": sum(all_timings) / len(all_timings), + "Median Time": median(all_timings), + "Min Time": min(all_timings), + "Max Time": max(all_timings), + "Preprocessing Time": ", ".join(map(str, all_preproc_times)), + "Operator Times": all_operator_metrics, + "Operation Times": ", ".join(map(str, all_oprnt_metrics)), + "Error": None, + } + ) return final_results, per_rule_summaries -def execute_datasets_on_rules(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path): +def execute_datasets_on_rules( + dataset_dir, rule_dir, total_calls, standard, version, define_xml_path +): final_results = [] per_dataset_summaries = {} - datasets = [os.path.join(dataset_dir, f) for f in os.listdir(dataset_dir) if f.endswith(('.json', '.xpt'))] - rules = [f for f in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, f))] + datasets = [ + os.path.join(dataset_dir, f) + for f in os.listdir(dataset_dir) + if f.endswith((".json", ".xpt")) + ] + rules = [ + f for f in os.listdir(rule_dir) if os.path.isfile(os.path.join(rule_dir, f)) + ] for dataset_path in datasets: dataset_name = os.path.basename(dataset_path) - cumulative_times, cumulative_preproc, cumulative_ops, cumulative_oprnt = [], [], {}, [] + cumulative_times, cumulative_preproc, cumulative_ops, cumulative_oprnt = ( + [], + [], + {}, + [], + ) for rule in rules: rule_name = os.path.basename(rule) @@ -169,12 +216,19 @@ def execute_datasets_on_rules(dataset_dir, rule_dir, total_calls, standard, vers for call_idx in range(total_calls): cmd = [ - sys.executable, "core.py", "validate", - "-s", standard, - "-v", version, - "-lr", os.path.join(rule_dir, rule), - "-dp", dataset_path, - "-l", "critical" + sys.executable, + "core.py", + "validate", + "-s", + standard, + "-v", + version, + "-lr", + os.path.join(rule_dir, rule), + "-dp", + dataset_path, + "-l", + "critical", ] if define_xml_path: cmd += ["-dxp", define_xml_path] @@ -183,7 +237,9 @@ def execute_datasets_on_rules(dataset_dir, rule_dir, total_calls, standard, vers try: t0 = time.time() - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) out, err = process.communicate() t1 = time.time() @@ -202,70 +258,91 @@ def execute_datasets_on_rules(dataset_dir, rule_dir, total_calls, standard, vers cumulative_oprnt.extend(oprnt_times) else: - raise subprocess.CalledProcessError(process.returncode, cmd, err) + raise subprocess.CalledProcessError( + process.returncode, cmd, err + ) except subprocess.CalledProcessError as exc: - final_results.append({ - "function type": "TimeTestFunction", - "rule name": rule_name, - "dataset": dataset_name, - "status": "Failed", - "Number of Calls": call_idx + 1, - "Mean Time": None, "Median Time": None, - "Min Time": None, "Max Time": None, - "Preprocessing Time": None, - "Operator Times": None, - "Operation Times": None, - "Error": exc.stderr - }) + final_results.append( + { + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": dataset_name, + "status": "Failed", + "Number of Calls": call_idx + 1, + "Mean Time": None, + "Median Time": None, + "Min Time": None, + "Max Time": None, + "Preprocessing Time": None, + "Operator Times": None, + "Operation Times": None, + "Error": exc.stderr, + } + ) break if timings: cumulative_times.extend(timings) cumulative_preproc.extend(preproc_list) - per_dataset_summaries.setdefault(dataset_name, []).append({ - "Dataset": dataset_name, - "Rule Name": rule_name, - "Number of Calls": exec_count, - "Mean Time": sum(timings) / len(timings), - "Median Time": median(timings), - "Min Time": min(timings), - "Max Time": max(timings), - "Preprocessing Time": ", ".join(map(str, preproc_list)), - "Operator Times": ", ".join(f"{k}: {v}" for k, v in operator_times.items()), - "Operation Times": ", ".join(map(str, oprnt_times)), - }) + per_dataset_summaries.setdefault(dataset_name, []).append( + { + "Dataset": dataset_name, + "Rule Name": rule_name, + "Number of Calls": exec_count, + "Mean Time": sum(timings) / len(timings), + "Median Time": median(timings), + "Min Time": min(timings), + "Max Time": max(timings), + "Preprocessing Time": ", ".join(map(str, preproc_list)), + "Operator Times": ", ".join( + f"{k}: {v}" for k, v in operator_times.items() + ), + "Operation Times": ", ".join(map(str, oprnt_times)), + } + ) if cumulative_times: - final_results.append({ - "function type": "TimeTestFunction", - "rule name": rule_name, - "dataset": "All datasets combined", - "status": "Successful", - "Number of Calls for each rule": total_calls, - "Mean Time": sum(cumulative_times) / len(cumulative_times), - "Median Time": median(cumulative_times), - "Min Time": min(cumulative_times), - "Max Time": max(cumulative_times), - "Preprocessing Time": ", ".join(map(str, cumulative_preproc)), - "Operator Times": cumulative_ops, - "Operation Times": ", ".join(map(str, cumulative_oprnt)), - "Error": None, - }) + final_results.append( + { + "function type": "TimeTestFunction", + "rule name": rule_name, + "dataset": "All datasets combined", + "status": "Successful", + "Number of Calls for each rule": total_calls, + "Mean Time": sum(cumulative_times) / len(cumulative_times), + "Median Time": median(cumulative_times), + "Min Time": min(cumulative_times), + "Max Time": max(cumulative_times), + "Preprocessing Time": ", ".join(map(str, cumulative_preproc)), + "Operator Times": cumulative_ops, + "Operation Times": ", ".join(map(str, cumulative_oprnt)), + "Error": None, + } + ) return final_results, per_dataset_summaries -def TimeTestFunction(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path): +def TimeTestFunction( + dataset_dir, rule_dir, total_calls, standard, version, define_xml_path +): print("Running grouped by rule...") - rule_level_results, rule_breakdown = execute_rules_on_datasets(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path) + rule_level_results, rule_breakdown = execute_rules_on_datasets( + dataset_dir, rule_dir, total_calls, standard, version, define_xml_path + ) print("\nRunning grouped by dataset...") - dataset_level_results, dataset_breakdown = execute_datasets_on_rules(dataset_dir, rule_dir, total_calls, standard, version, define_xml_path) + dataset_level_results, dataset_breakdown = execute_datasets_on_rules( + dataset_dir, rule_dir, total_calls, standard, version, define_xml_path + ) return rule_level_results, rule_breakdown, dataset_level_results, dataset_breakdown -def save_results_to_excel(rule_results, rule_breakdown, dataset_results, dataset_breakdown, output_dir): + +def save_results_to_excel( + rule_results, rule_breakdown, dataset_results, dataset_breakdown, output_dir +): report_path = os.path.join(output_dir, "performance_report.xlsx") - with pd.ExcelWriter(report_path, engine='xlsxwriter') as writer: + with pd.ExcelWriter(report_path, engine="xlsxwriter") as writer: # Sheet 1: Rule-level results df_rules = pd.DataFrame(rule_results) df_rules.to_excel(writer, sheet_name="Rule Level Results", index=False) @@ -289,15 +366,22 @@ def save_results_to_excel(rule_results, rule_breakdown, dataset_results, dataset @click.option("-lr", type=str, required=True) @click.option("-total_calls", type=int, required=True) @click.option("-od", default=os.getcwd(), help="Output directory for report files") -@click.option("-s", "standard", type=str, required=True, help="Standard name (e.g., 'sdtmig')") -@click.option("-v", "version", type=str, required=True, help="Standard version (e.g., '3.4')") -@click.option("-dxp", "define_xml_path", type=str, required=False, help="Path to Define-XML file") +@click.option( + "-s", "standard", type=str, required=True, help="Standard name (e.g., 'sdtmig')" +) +@click.option( + "-v", "version", type=str, required=True, help="Standard version (e.g., '3.4')" +) +@click.option( + "-dxp", "define_xml_path", type=str, required=False, help="Path to Define-XML file" +) def main(d, lr, total_calls, od, standard, version, define_xml_path): rule_results, rule_breakdown, dataset_results, dataset_breakdown = TimeTestFunction( - d, lr, total_calls, standard, version, define_xml_path + d, lr, total_calls, standard, version, define_xml_path + ) + save_results_to_excel( + rule_results, rule_breakdown, dataset_results, dataset_breakdown, od ) - save_results_to_excel(rule_results, rule_breakdown, dataset_results, dataset_breakdown, od) - if __name__ == "__main__": diff --git a/tests/unit/test_dataset_builders/test_dataset_metadata_define_dataset_builder.py b/tests/unit/test_dataset_builders/test_dataset_metadata_define_dataset_builder.py index b74026f32..da2a67867 100644 --- a/tests/unit/test_dataset_builders/test_dataset_metadata_define_dataset_builder.py +++ b/tests/unit/test_dataset_builders/test_dataset_metadata_define_dataset_builder.py @@ -130,10 +130,13 @@ def test_dataset_metadata_define_dataset_builder(dataset_path): library_metadata=LibraryMetadataContainer(), ) - with patch.object( - builder, "_get_define_xml_dataframe", return_value=PandasDataset(define_df) - ), patch.object( - builder, "_get_dataset_dataframe", return_value=PandasDataset(dataset_df) + with ( + patch.object( + builder, "_get_define_xml_dataframe", return_value=PandasDataset(define_df) + ), + patch.object( + builder, "_get_dataset_dataframe", return_value=PandasDataset(dataset_df) + ), ): result = builder.build()