From 99b46bd158cf9fdd85ce9f2d54f2c95e4ef56c0d Mon Sep 17 00:00:00 2001 From: Akash S Date: Wed, 4 Jun 2025 17:34:13 +0530 Subject: [PATCH 01/15] Enhance evaluation result handling: include resource key address --- src/tirith/core/core.py | 19 ++++++++++++++++++- src/tirith/providers/json/handler.py | 7 ++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index f24917f5..f3b83ec2 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -82,7 +82,24 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): continue evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data) - evaluation_result["meta"] = evaluator_input.get("meta") + meta = evaluator_input.get("meta") + + # Include meta if it exists and isn't None + if meta: + evaluation_result["meta"] = meta + + # Handle address - provider may have already set it + address = evaluator_input.get("address") + if address: + # Use the address directly from the provider result and add it to the evaluation result + evaluation_result["address"] = address + evaluation_result["message"] = f"{evaluation_result['message']} - (Address: `{address}`)" + # Check if address is in meta as a fallback for backward compatibility + elif meta and isinstance(meta, dict) and "address" in meta: + address = meta["address"] + evaluation_result["address"] = address + evaluation_result["message"] = f"{evaluation_result['message']} - (Address: `{address}`)" + evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index ce4ced5b..fb244d7f 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -43,7 +43,12 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: ) ] - outputs = [create_result_dict(value=value, meta=None, err=None) for value in values] + # Create result dict with address as a separate property, not in meta + outputs = [] + for value in values: + result = create_result_dict(value=value, meta=None, err=None) + result["address"] = key_path + outputs.append(result) return outputs From afb9e1af52352f4cb9e136a515181992a7062f8a Mon Sep 17 00:00:00 2001 From: Akash S Date: Wed, 4 Jun 2025 17:37:09 +0530 Subject: [PATCH 02/15] lint fix --- src/tirith/core/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index f3b83ec2..c05299f0 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -83,7 +83,7 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data) meta = evaluator_input.get("meta") - + # Include meta if it exists and isn't None if meta: evaluation_result["meta"] = meta From 4806062ced9f5566227c2d76b3034dd8fcbf1477 Mon Sep 17 00:00:00 2001 From: Akash S Date: Tue, 10 Jun 2025 18:10:20 +0530 Subject: [PATCH 03/15] add meta in json provider even if its null --- src/tirith/core/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index c05299f0..70478003 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -85,8 +85,8 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): meta = evaluator_input.get("meta") # Include meta if it exists and isn't None - if meta: - evaluation_result["meta"] = meta + + evaluation_result["meta"] = meta # Handle address - provider may have already set it address = evaluator_input.get("address") From 85dd547e179c3b5466a5b38fa21d03cfee7abb03 Mon Sep 17 00:00:00 2001 From: Akash S Date: Tue, 10 Jun 2025 18:11:55 +0530 Subject: [PATCH 04/15] lint fix --- src/tirith/core/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index 70478003..ef85aacb 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -85,7 +85,7 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): meta = evaluator_input.get("meta") # Include meta if it exists and isn't None - + evaluation_result["meta"] = meta # Handle address - provider may have already set it From e3a1fe7745ef6e51e08c43c0c70a3606568c2e35 Mon Sep 17 00:00:00 2001 From: Akash S Date: Wed, 11 Jun 2025 10:19:27 +0530 Subject: [PATCH 05/15] move resource_key addition logic to respective handlers --- src/tirith/core/core.py | 28 ++--- .../providers/terraform_plan/handler.py | 117 ++++++++++++------ 2 files changed, 88 insertions(+), 57 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index ef85aacb..2a5a4fdd 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -81,24 +81,18 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): has_evaluation_passed = None continue + # Run evaluation on the provider's input value evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data) - meta = evaluator_input.get("meta") - - # Include meta if it exists and isn't None - - evaluation_result["meta"] = meta - - # Handle address - provider may have already set it - address = evaluator_input.get("address") - if address: - # Use the address directly from the provider result and add it to the evaluation result - evaluation_result["address"] = address - evaluation_result["message"] = f"{evaluation_result['message']} - (Address: `{address}`)" - # Check if address is in meta as a fallback for backward compatibility - elif meta and isinstance(meta, dict) and "address" in meta: - address = meta["address"] - evaluation_result["address"] = address - evaluation_result["message"] = f"{evaluation_result['message']} - (Address: `{address}`)" + + # Copy metadata and address if provided by the provider + if "meta" in evaluator_input: + evaluation_result["meta"] = evaluator_input["meta"] + + # Copy address directly if provided + if "address" in evaluator_input: + evaluation_result["address"] = evaluator_input["address"] + # Update the message to include the address + evaluation_result["message"] = f"{evaluation_result['message']} - (Address: `{evaluator_input['address']}`)" evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 04471756..0274ed95 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -97,24 +97,34 @@ def provide(provider_inputs, input_data): is_attribute_found = True local_is_found_attribute = True attribute_value = input_resource_change_attrs[attribute] - outputs.append( - { - "value": attribute_value, - "meta": resource_change, - "err": None, - } - ) + result = { + "value": attribute_value, + "meta": resource_change, + "err": None, + } + # Add address to the output result + if "address" in resource_change: + result["address"] = resource_change["address"] + outputs.append(result) elif "." in attribute or "*" in attribute: evaluated_outputs = _wrapper_get_exp_attribute(attribute, input_resource_change_attrs) if evaluated_outputs: is_attribute_found = True local_is_found_attribute = True for evaluated_output in evaluated_outputs: - outputs.append({"value": evaluated_output, "meta": resource_change, "err": None}) + result = {"value": evaluated_output, "meta": resource_change, "err": None} + # Add address to the output result + if "address" in resource_change: + result["address"] = resource_change["address"] + outputs.append(result) # If we didn't find the attribute in this resource, add a None value so it still gets evaluated if not local_is_found_attribute: - outputs.append({"value": None, "meta": resource_change, "err": None}) + result = {"value": None, "meta": resource_change, "err": None} + # Add address to the output result + if "address" in resource_change: + result["address"] = resource_change["address"] + outputs.append(result) else: outputs.append( { @@ -139,7 +149,7 @@ def provide(provider_inputs, input_data): "err": f"attribute: '{attribute}' is not found", } ) - + return outputs # CASE 2 # - Get actions performed on a resource @@ -154,13 +164,15 @@ def provide(provider_inputs, input_data): if resource_type in (resource_change["type"], "*"): is_resource_type_found = True for action in resource_change["change"]["actions"]: - outputs.append( - { - "value": action, - "meta": resource_change, - "err": None, - } - ) + result = { + "value": action, + "meta": resource_change, + "err": None, + } + # Add address to the output result + if "address" in resource_change: + result["address"] = resource_change["address"] + outputs.append(result) if not is_resource_type_found: outputs.append( { @@ -175,6 +187,7 @@ def provide(provider_inputs, input_data): elif input_type == "count": count = 0 resource_meta = {} + address = None resource_type = provider_inputs["terraform_resource_type"] for resource_change in resource_changes: # Skip if resource type is in exclude_resource_types when using wildcard @@ -184,15 +197,20 @@ def provide(provider_inputs, input_data): # No need to check if the resource is not found # because the count of a resource can be zero resource_meta = resource_change + # Use the last encountered address (this is just for count, which is an aggregate result) + if "address" in resource_change: + address = resource_change["address"] count += 1 - outputs.append( - { - "value": count, - "meta": resource_meta, - "err": None, - } - ) + result = { + "value": count, + "meta": resource_meta, + "err": None, + } + # Add address if available + if address: + result["address"] = address + outputs.append(result) return outputs # CASE 4 elif input_type == "direct_dependencies": @@ -212,7 +230,6 @@ def provide(provider_inputs, input_data): ) return outputs - def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: list): """ Operation type handler to get the provider config from terraform plan @@ -252,7 +269,6 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l continue is_provider_full_name_found = True - attribute_value = None if attribute_to_get == "version_constraint": @@ -271,12 +287,14 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l } ) return - outputs.append( - { - "value": attribute_value, - "meta": provider_config_dict, - } - ) + + result = { + "value": attribute_value, + "meta": provider_config_dict, + } + # Add provider full name as address for provider config + result["address"] = terraform_provider_full_name + outputs.append(result) if not is_provider_full_name_found: outputs.append( @@ -297,6 +315,7 @@ def terraform_version_operator(input_data: dict, provider_inputs: dict, outputs: :param provider_inputs: The provider inputs :param outputs: The outputs """ + # For terraform_version, there's no specific address as it applies to the entire plan outputs.append({"value": input_data.get("terraform_version"), "meta": input_data}) @@ -316,12 +335,16 @@ def direct_dependencies_operator(input_data: dict, provider_inputs: dict, output is_resource_found = False for resource in config_resources: - if resource.get("type") != resource_type: continue is_resource_found = True deps_resource_type = {resource_id.split(".")[0] for resource_id in resource.get("depends_on", [])} - outputs.append({"value": list(deps_resource_type), "meta": config_resources}) + result = {"value": list(deps_resource_type), "meta": config_resources} + # Add address if available + address = resource.get("address") + if address: + result["address"] = address + outputs.append(result) if not is_resource_found: outputs.append( @@ -386,13 +409,17 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: reference_address = f"{module_path}.{relative_reference_address}" if reference_address in reference_target_addresses: reference_target_addresses.remove(reference_address) - outputs.append( - {"value": True, "meta": {"address": reference_address, "referenced_by": resource_config}} - ) + result = {"value": True, "meta": {"referenced_by": resource_config}} + # Add address to the output + result["address"] = reference_address + outputs.append(result) # For all of the reference_target_addresses that don't have a reference for reference_target_address in reference_target_addresses: - outputs.append({"value": False, "meta": {"address": reference_target_address, "referenced_by": {}}}) + result = {"value": False, "meta": {"referenced_by": {}}} + # Add address to the output + result["address"] = reference_target_address + outputs.append(result) def get_module_resources_by_type_recursive(module: dict, resource_type: str, current_module_path: str = "") -> iter: @@ -478,7 +505,10 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: return is_all_resource_type_references_to = resource_type_count == reference_count - outputs.append({"value": is_all_resource_type_references_to, "meta": config_resources}) + result = {"value": is_all_resource_type_references_to, "meta": config_resources} + # While we don't have a specific address for the entire result, we can include the resource type + result["address"] = resource_type + outputs.append(result) def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: list): @@ -530,7 +560,12 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: # Only get the resource type resource_references.add(reference.split(".")[0]) - outputs.append({"value": list(resource_references), "meta": resource}) + result = {"value": list(resource_references), "meta": resource} + # Add address if available + address = resource.get("address") + if address: + result["address"] = address + outputs.append(result) if not is_resource_found: outputs.append( @@ -540,3 +575,5 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: "meta": config_resources, } ) + + From 625701e3c8d03eae728674064bdf4006a9268f2f Mon Sep 17 00:00:00 2001 From: Akash S Date: Wed, 11 Jun 2025 10:19:38 +0530 Subject: [PATCH 06/15] lint fix --- src/tirith/core/core.py | 8 +++++--- src/tirith/providers/terraform_plan/handler.py | 7 +++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index 2a5a4fdd..e953ea9d 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -83,16 +83,18 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): # Run evaluation on the provider's input value evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data) - + # Copy metadata and address if provided by the provider if "meta" in evaluator_input: evaluation_result["meta"] = evaluator_input["meta"] - + # Copy address directly if provided if "address" in evaluator_input: evaluation_result["address"] = evaluator_input["address"] # Update the message to include the address - evaluation_result["message"] = f"{evaluation_result['message']} - (Address: `{evaluator_input['address']}`)" + evaluation_result["message"] = ( + f"{evaluation_result['message']} - (Address: `{evaluator_input['address']}`)" + ) evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 0274ed95..471133aa 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -149,7 +149,7 @@ def provide(provider_inputs, input_data): "err": f"attribute: '{attribute}' is not found", } ) - + return outputs # CASE 2 # - Get actions performed on a resource @@ -230,6 +230,7 @@ def provide(provider_inputs, input_data): ) return outputs + def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: list): """ Operation type handler to get the provider config from terraform plan @@ -287,7 +288,7 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l } ) return - + result = { "value": attribute_value, "meta": provider_config_dict, @@ -575,5 +576,3 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: "meta": config_resources, } ) - - From 52a187537cd3885b1e149f09176560f334758597 Mon Sep 17 00:00:00 2001 From: Akash S Date: Thu, 12 Jun 2025 15:26:11 +0530 Subject: [PATCH 07/15] Update --- src/tirith/core/core.py | 4 - src/tirith/prettyprinter.py | 5 + .../providers/terraform_plan/handler.py | 97 +++++++++---------- 3 files changed, 52 insertions(+), 54 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index e953ea9d..f7803c42 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -91,10 +91,6 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): # Copy address directly if provided if "address" in evaluator_input: evaluation_result["address"] = evaluator_input["address"] - # Update the message to include the address - evaluation_result["message"] = ( - f"{evaluation_result['message']} - (Address: `{evaluator_input['address']}`)" - ) evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index 4134ba74..799ff757 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -99,6 +99,11 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: for result_num, result_dict in enumerate(check_dict["result"]): result_message = result_dict["message"] + + # Include address in the message if it exists in the result_dict + if "address" in result_dict: + result_message = f"{result_message} - (Address: `{result_dict['address']}`)" + if result_dict["passed"]: print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}")) elif check_dict["passed"] is None: diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 471133aa..34761a1f 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -101,10 +101,8 @@ def provide(provider_inputs, input_data): "value": attribute_value, "meta": resource_change, "err": None, + "addresses": resource_change.get("addresses") } - # Add address to the output result - if "address" in resource_change: - result["address"] = resource_change["address"] outputs.append(result) elif "." in attribute or "*" in attribute: evaluated_outputs = _wrapper_get_exp_attribute(attribute, input_resource_change_attrs) @@ -112,18 +110,22 @@ def provide(provider_inputs, input_data): is_attribute_found = True local_is_found_attribute = True for evaluated_output in evaluated_outputs: - result = {"value": evaluated_output, "meta": resource_change, "err": None} - # Add address to the output result - if "address" in resource_change: - result["address"] = resource_change["address"] + result = { + "value": evaluated_output, + "meta": resource_change, + "err": None, + "addresses": resource_change.get("addresses") + } outputs.append(result) # If we didn't find the attribute in this resource, add a None value so it still gets evaluated if not local_is_found_attribute: - result = {"value": None, "meta": resource_change, "err": None} - # Add address to the output result - if "address" in resource_change: - result["address"] = resource_change["address"] + result = { + "value": None, + "meta": resource_change, + "err": None, + "addresses": resource_change.get("addresses") + } outputs.append(result) else: outputs.append( @@ -168,10 +170,8 @@ def provide(provider_inputs, input_data): "value": action, "meta": resource_change, "err": None, + "addresses": resource_change.get("addresses") } - # Add address to the output result - if "address" in resource_change: - result["address"] = resource_change["address"] outputs.append(result) if not is_resource_type_found: outputs.append( @@ -187,7 +187,7 @@ def provide(provider_inputs, input_data): elif input_type == "count": count = 0 resource_meta = {} - address = None + addresses = None resource_type = provider_inputs["terraform_resource_type"] for resource_change in resource_changes: # Skip if resource type is in exclude_resource_types when using wildcard @@ -197,19 +197,17 @@ def provide(provider_inputs, input_data): # No need to check if the resource is not found # because the count of a resource can be zero resource_meta = resource_change - # Use the last encountered address (this is just for count, which is an aggregate result) - if "address" in resource_change: - address = resource_change["address"] + # Use the last encountered addresses (this is just for count, which is an aggregate result) + if "addresses" in resource_change: + addresses = resource_change["addresses"] count += 1 result = { "value": count, "meta": resource_meta, "err": None, + "addresses": addresses } - # Add address if available - if address: - result["address"] = address outputs.append(result) return outputs # CASE 4 @@ -292,9 +290,8 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l result = { "value": attribute_value, "meta": provider_config_dict, + "addresses": terraform_provider_full_name } - # Add provider full name as address for provider config - result["address"] = terraform_provider_full_name outputs.append(result) if not is_provider_full_name_found: @@ -316,8 +313,8 @@ def terraform_version_operator(input_data: dict, provider_inputs: dict, outputs: :param provider_inputs: The provider inputs :param outputs: The outputs """ - # For terraform_version, there's no specific address as it applies to the entire plan - outputs.append({"value": input_data.get("terraform_version"), "meta": input_data}) + # For terraform_version, there's no specific addresses as it applies to the entire plan + outputs.append({"value": input_data.get("terraform_version"), "meta": input_data, "addresses": "terraform_version"}) def direct_dependencies_operator(input_data: dict, provider_inputs: dict, outputs: list): @@ -341,10 +338,10 @@ def direct_dependencies_operator(input_data: dict, provider_inputs: dict, output is_resource_found = True deps_resource_type = {resource_id.split(".")[0] for resource_id in resource.get("depends_on", [])} result = {"value": list(deps_resource_type), "meta": config_resources} - # Add address if available - address = resource.get("address") - if address: - result["address"] = address + # Add addresses if available + addresses = resource.get("addresses") + if addresses: + result["addresses"] = addresses outputs.append(result) if not is_resource_found: @@ -371,7 +368,7 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: resource_changes = input_data.get("resource_changes", []) referenced_by = provider_inputs.get("referenced_by") - reference_target_addresses = set() + reference_target_addresseses = set() is_resource_found = False # Loop for adding reference_target @@ -380,7 +377,7 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: "destroy" ]: continue - reference_target_addresses.add(resource_change.get("address")) + reference_target_addresseses.add(resource_change.get("addresses")) is_resource_found = True if not is_resource_found: @@ -403,23 +400,23 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: for expression_val_dict in resource_config.get("expressions", {}).values(): if not isinstance(expression_val_dict, dict): continue - for relative_reference_address in expression_val_dict.get("references", []): + for relative_reference_addresses in expression_val_dict.get("references", []): if module_path == "": - reference_address = relative_reference_address + reference_addresses = relative_reference_addresses else: - reference_address = f"{module_path}.{relative_reference_address}" - if reference_address in reference_target_addresses: - reference_target_addresses.remove(reference_address) + reference_addresses = f"{module_path}.{relative_reference_addresses}" + if reference_addresses in reference_target_addresseses: + reference_target_addresseses.remove(reference_addresses) result = {"value": True, "meta": {"referenced_by": resource_config}} - # Add address to the output - result["address"] = reference_address + # Add addresses to the output + result["addresses"] = reference_addresses outputs.append(result) - # For all of the reference_target_addresses that don't have a reference - for reference_target_address in reference_target_addresses: + # For all of the reference_target_addresseses that don't have a reference + for reference_target_addresses in reference_target_addresseses: result = {"value": False, "meta": {"referenced_by": {}}} - # Add address to the output - result["address"] = reference_target_address + # Add addresses to the output + result["addresses"] = reference_target_addresses outputs.append(result) @@ -476,12 +473,12 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: continue is_resource_found = True resource_type_count += 1 - resource_change_address = resource_change.get("address") + resource_change_addresses = resource_change.get("addresses") # Look to the resource_config to get the references # TODO: Use the module_path for resource_config, module_path in get_resource_config_by_type(input_data, resource_type): - if resource_config.get("address") != resource_change_address: + if resource_config.get("addresses") != resource_change_addresses: continue for expression_val_dict in resource_config.get("expressions", {}).values(): if not isinstance(expression_val_dict, dict): @@ -507,8 +504,8 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: is_all_resource_type_references_to = resource_type_count == reference_count result = {"value": is_all_resource_type_references_to, "meta": config_resources} - # While we don't have a specific address for the entire result, we can include the resource type - result["address"] = resource_type + # While we don't have a specific addresses for the entire result, we can include the resource type + result["addresses"] = resource_type outputs.append(result) @@ -562,10 +559,10 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: resource_references.add(reference.split(".")[0]) result = {"value": list(resource_references), "meta": resource} - # Add address if available - address = resource.get("address") - if address: - result["address"] = address + # Add addresses if available + addresses = resource.get("addresses") + if addresses: + result["addresses"] = addresses outputs.append(result) if not is_resource_found: From 866f20cc673e4bf0d3ac66c89a38bb1ccefa71d3 Mon Sep 17 00:00:00 2001 From: Akash S Date: Thu, 12 Jun 2025 15:57:49 +0530 Subject: [PATCH 08/15] change address to addresses --- src/tirith/core/core.py | 8 +-- src/tirith/prettyprinter.py | 6 +- src/tirith/providers/json/handler.py | 4 +- .../providers/terraform_plan/handler.py | 56 +++++++++---------- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index f7803c42..4b258b01 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -84,13 +84,13 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): # Run evaluation on the provider's input value evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data) - # Copy metadata and address if provided by the provider + # Copy metadata and addresses if provided by the provider if "meta" in evaluator_input: evaluation_result["meta"] = evaluator_input["meta"] - # Copy address directly if provided - if "address" in evaluator_input: - evaluation_result["address"] = evaluator_input["address"] + # Copy addresses directly if provided + if "addresses" in evaluator_input: + evaluation_result["addresses"] = evaluator_input["addresses"] evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index 799ff757..901cb5ec 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -100,9 +100,9 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: for result_num, result_dict in enumerate(check_dict["result"]): result_message = result_dict["message"] - # Include address in the message if it exists in the result_dict - if "address" in result_dict: - result_message = f"{result_message} - (Address: `{result_dict['address']}`)" + # Include addresses in the message if it exists in the result_dict + if "addresses" in result_dict: + result_message = f"{result_message} - (Addresses: `{result_dict['addresses']}`)" if result_dict["passed"]: print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}")) diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index fb244d7f..5531f80b 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -43,11 +43,11 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: ) ] - # Create result dict with address as a separate property, not in meta + # Create result dict with addresses as a separate property, not in meta outputs = [] for value in values: result = create_result_dict(value=value, meta=None, err=None) - result["address"] = key_path + result["addresses"] = key_path outputs.append(result) return outputs diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 34761a1f..3ad40c57 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -101,7 +101,7 @@ def provide(provider_inputs, input_data): "value": attribute_value, "meta": resource_change, "err": None, - "addresses": resource_change.get("addresses") + "addresses": resource_change.get("address") } outputs.append(result) elif "." in attribute or "*" in attribute: @@ -114,7 +114,7 @@ def provide(provider_inputs, input_data): "value": evaluated_output, "meta": resource_change, "err": None, - "addresses": resource_change.get("addresses") + "addresses": resource_change.get("address") } outputs.append(result) @@ -124,7 +124,7 @@ def provide(provider_inputs, input_data): "value": None, "meta": resource_change, "err": None, - "addresses": resource_change.get("addresses") + "addresses": resource_change.get("address") } outputs.append(result) else: @@ -170,7 +170,7 @@ def provide(provider_inputs, input_data): "value": action, "meta": resource_change, "err": None, - "addresses": resource_change.get("addresses") + "addresses": resource_change.get("address") } outputs.append(result) if not is_resource_type_found: @@ -197,9 +197,9 @@ def provide(provider_inputs, input_data): # No need to check if the resource is not found # because the count of a resource can be zero resource_meta = resource_change - # Use the last encountered addresses (this is just for count, which is an aggregate result) - if "addresses" in resource_change: - addresses = resource_change["addresses"] + # Use the last encountered address (this is just for count, which is an aggregate result) + if "address" in resource_change: + addresses = resource_change["address"] count += 1 result = { @@ -313,7 +313,7 @@ def terraform_version_operator(input_data: dict, provider_inputs: dict, outputs: :param provider_inputs: The provider inputs :param outputs: The outputs """ - # For terraform_version, there's no specific addresses as it applies to the entire plan + # For terraform_version, there's no specific address as it applies to the entire plan outputs.append({"value": input_data.get("terraform_version"), "meta": input_data, "addresses": "terraform_version"}) @@ -339,9 +339,9 @@ def direct_dependencies_operator(input_data: dict, provider_inputs: dict, output deps_resource_type = {resource_id.split(".")[0] for resource_id in resource.get("depends_on", [])} result = {"value": list(deps_resource_type), "meta": config_resources} # Add addresses if available - addresses = resource.get("addresses") - if addresses: - result["addresses"] = addresses + address = resource.get("address") + if address: + result["addresses"] = address outputs.append(result) if not is_resource_found: @@ -368,7 +368,7 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: resource_changes = input_data.get("resource_changes", []) referenced_by = provider_inputs.get("referenced_by") - reference_target_addresseses = set() + reference_target_address = set() is_resource_found = False # Loop for adding reference_target @@ -377,7 +377,7 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: "destroy" ]: continue - reference_target_addresseses.add(resource_change.get("addresses")) + reference_target_address.add(resource_change.get("address")) is_resource_found = True if not is_resource_found: @@ -400,23 +400,23 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: for expression_val_dict in resource_config.get("expressions", {}).values(): if not isinstance(expression_val_dict, dict): continue - for relative_reference_addresses in expression_val_dict.get("references", []): + for relative_reference_address in expression_val_dict.get("references", []): if module_path == "": - reference_addresses = relative_reference_addresses + reference_address = relative_reference_address else: - reference_addresses = f"{module_path}.{relative_reference_addresses}" - if reference_addresses in reference_target_addresseses: - reference_target_addresseses.remove(reference_addresses) + reference_address = f"{module_path}.{relative_reference_address}" + if reference_address in reference_target_address: + reference_target_address.remove(reference_address) result = {"value": True, "meta": {"referenced_by": resource_config}} # Add addresses to the output - result["addresses"] = reference_addresses + result["addresses"] = reference_address outputs.append(result) - # For all of the reference_target_addresseses that don't have a reference - for reference_target_addresses in reference_target_addresseses: + # For all of the reference_target_address that don't have a reference + for reference_target_address in reference_target_address: result = {"value": False, "meta": {"referenced_by": {}}} # Add addresses to the output - result["addresses"] = reference_target_addresses + result["addresses"] = reference_target_address outputs.append(result) @@ -473,12 +473,12 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: continue is_resource_found = True resource_type_count += 1 - resource_change_addresses = resource_change.get("addresses") + resource_change_address = resource_change.get("address") # Look to the resource_config to get the references # TODO: Use the module_path for resource_config, module_path in get_resource_config_by_type(input_data, resource_type): - if resource_config.get("addresses") != resource_change_addresses: + if resource_config.get("address") != resource_change_address: continue for expression_val_dict in resource_config.get("expressions", {}).values(): if not isinstance(expression_val_dict, dict): @@ -504,7 +504,7 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: is_all_resource_type_references_to = resource_type_count == reference_count result = {"value": is_all_resource_type_references_to, "meta": config_resources} - # While we don't have a specific addresses for the entire result, we can include the resource type + # While we don't have a specific address for the entire result, we can include the resource type result["addresses"] = resource_type outputs.append(result) @@ -560,9 +560,9 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: result = {"value": list(resource_references), "meta": resource} # Add addresses if available - addresses = resource.get("addresses") - if addresses: - result["addresses"] = addresses + address = resource.get("address") + if address: + result["addresses"] = address outputs.append(result) if not is_resource_found: From ce9960af5fab5a03917b68b7abe236b815cb0da6 Mon Sep 17 00:00:00 2001 From: Akash S Date: Thu, 12 Jun 2025 16:11:17 +0530 Subject: [PATCH 09/15] lint fix --- src/tirith/prettyprinter.py | 4 +-- .../providers/terraform_plan/handler.py | 29 +++++++------------ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index 901cb5ec..0425783d 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -99,11 +99,11 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: for result_num, result_dict in enumerate(check_dict["result"]): result_message = result_dict["message"] - + # Include addresses in the message if it exists in the result_dict if "addresses" in result_dict: result_message = f"{result_message} - (Addresses: `{result_dict['addresses']}`)" - + if result_dict["passed"]: print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}")) elif check_dict["passed"] is None: diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 3ad40c57..4e97ba14 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -101,7 +101,7 @@ def provide(provider_inputs, input_data): "value": attribute_value, "meta": resource_change, "err": None, - "addresses": resource_change.get("address") + "addresses": resource_change.get("address"), } outputs.append(result) elif "." in attribute or "*" in attribute: @@ -111,20 +111,20 @@ def provide(provider_inputs, input_data): local_is_found_attribute = True for evaluated_output in evaluated_outputs: result = { - "value": evaluated_output, - "meta": resource_change, + "value": evaluated_output, + "meta": resource_change, "err": None, - "addresses": resource_change.get("address") + "addresses": resource_change.get("address"), } outputs.append(result) # If we didn't find the attribute in this resource, add a None value so it still gets evaluated if not local_is_found_attribute: result = { - "value": None, - "meta": resource_change, + "value": None, + "meta": resource_change, "err": None, - "addresses": resource_change.get("address") + "addresses": resource_change.get("address"), } outputs.append(result) else: @@ -170,7 +170,7 @@ def provide(provider_inputs, input_data): "value": action, "meta": resource_change, "err": None, - "addresses": resource_change.get("address") + "addresses": resource_change.get("address"), } outputs.append(result) if not is_resource_type_found: @@ -202,12 +202,7 @@ def provide(provider_inputs, input_data): addresses = resource_change["address"] count += 1 - result = { - "value": count, - "meta": resource_meta, - "err": None, - "addresses": addresses - } + result = {"value": count, "meta": resource_meta, "err": None, "addresses": addresses} outputs.append(result) return outputs # CASE 4 @@ -287,11 +282,7 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l ) return - result = { - "value": attribute_value, - "meta": provider_config_dict, - "addresses": terraform_provider_full_name - } + result = {"value": attribute_value, "meta": provider_config_dict, "addresses": terraform_provider_full_name} outputs.append(result) if not is_provider_full_name_found: From fbf9aa1dd38fd082037870bebbdbac043b31d74c Mon Sep 17 00:00:00 2001 From: Akash S Date: Thu, 12 Jun 2025 16:52:24 +0530 Subject: [PATCH 10/15] make addresses list --- src/tirith/core/core.py | 18 ++++++-- src/tirith/prettyprinter.py | 8 +++- src/tirith/providers/json/handler.py | 3 +- .../providers/terraform_plan/handler.py | 44 ++++++++++++------- 4 files changed, 52 insertions(+), 21 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index 4b258b01..c7327057 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -5,7 +5,7 @@ import yaml from types import CodeType -from typing import Any, Dict, List, Tuple, Optional +from typing import Any, Dict, List, Tuple, Optional, Union from tirith.providers.common import ProviderError from ..providers import PROVIDERS_DICT @@ -16,6 +16,18 @@ logger = logging.getLogger(__name__) +def _ensure_list_of_strings(value: Union[str, List[str], None]) -> List[str]: + """Convert a string or None value to a list of strings""" + if value is None: + return [] + elif isinstance(value, str): + return [value] + elif isinstance(value, list): + return [str(item) for item in value] + else: + return [str(value)] + + def get_evaluator_inputs_from_provider_inputs(provider_inputs, provider_module, input_data): # TODO: Get the inputs from given providers provider_func = PROVIDERS_DICT.get(provider_module) @@ -88,9 +100,9 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): if "meta" in evaluator_input: evaluation_result["meta"] = evaluator_input["meta"] - # Copy addresses directly if provided + # Copy addresses directly if provided, ensuring it's a list of strings if "addresses" in evaluator_input: - evaluation_result["addresses"] = evaluator_input["addresses"] + evaluation_result["addresses"] = _ensure_list_of_strings(evaluator_input["addresses"]) evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index 0425783d..c1c7a93b 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -102,7 +102,13 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: # Include addresses in the message if it exists in the result_dict if "addresses" in result_dict: - result_message = f"{result_message} - (Addresses: `{result_dict['addresses']}`)" + addresses = result_dict['addresses'] + # Convert to string representation for display + if isinstance(addresses, list): + addresses_str = ", ".join(addresses) + else: + addresses_str = str(addresses) + result_message = f"{result_message} - (Addresses: `{addresses_str}`)" if result_dict["passed"]: print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}")) diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index 5531f80b..dc31ad39 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -47,7 +47,8 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: outputs = [] for value in values: result = create_result_dict(value=value, meta=None, err=None) - result["addresses"] = key_path + # Ensure addresses is a list of strings + result["addresses"] = [key_path] outputs.append(result) return outputs diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 4e97ba14..ab99b097 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -8,7 +8,7 @@ # input->(list ["a.b","c", "d"],value of resource) # returns->[any, any, any] -from typing import Iterable, Tuple +from typing import Iterable, Tuple, List, Union, Any import pydash from ..common import ProviderError @@ -56,6 +56,18 @@ def _get_exp_attribute(split_expressions, input_data): return final_data +def _ensure_list_of_strings(value: Union[str, List[str], None]) -> List[str]: + """Convert a string or None value to a list of strings""" + if value is None: + return [] + elif isinstance(value, str): + return [value] + elif isinstance(value, list): + return [str(item) for item in value] + else: + return [str(value)] + + def provide(provider_inputs, input_data): # """Provides the value of the attribute from the input_data""" outputs = [] @@ -101,7 +113,7 @@ def provide(provider_inputs, input_data): "value": attribute_value, "meta": resource_change, "err": None, - "addresses": resource_change.get("address"), + "addresses": _ensure_list_of_strings(resource_change.get("address")), } outputs.append(result) elif "." in attribute or "*" in attribute: @@ -114,7 +126,7 @@ def provide(provider_inputs, input_data): "value": evaluated_output, "meta": resource_change, "err": None, - "addresses": resource_change.get("address"), + "addresses": _ensure_list_of_strings(resource_change.get("address")), } outputs.append(result) @@ -124,7 +136,7 @@ def provide(provider_inputs, input_data): "value": None, "meta": resource_change, "err": None, - "addresses": resource_change.get("address"), + "addresses": _ensure_list_of_strings(resource_change.get("address")), } outputs.append(result) else: @@ -170,7 +182,7 @@ def provide(provider_inputs, input_data): "value": action, "meta": resource_change, "err": None, - "addresses": resource_change.get("address"), + "addresses": _ensure_list_of_strings(resource_change.get("address")), } outputs.append(result) if not is_resource_type_found: @@ -187,7 +199,7 @@ def provide(provider_inputs, input_data): elif input_type == "count": count = 0 resource_meta = {} - addresses = None + addresses = [] resource_type = provider_inputs["terraform_resource_type"] for resource_change in resource_changes: # Skip if resource type is in exclude_resource_types when using wildcard @@ -197,9 +209,9 @@ def provide(provider_inputs, input_data): # No need to check if the resource is not found # because the count of a resource can be zero resource_meta = resource_change - # Use the last encountered address (this is just for count, which is an aggregate result) + # Add the address to our list of addresses if available if "address" in resource_change: - addresses = resource_change["address"] + addresses.append(resource_change["address"]) count += 1 result = {"value": count, "meta": resource_meta, "err": None, "addresses": addresses} @@ -282,7 +294,7 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l ) return - result = {"value": attribute_value, "meta": provider_config_dict, "addresses": terraform_provider_full_name} + result = {"value": attribute_value, "meta": provider_config_dict, "addresses": _ensure_list_of_strings(terraform_provider_full_name)} outputs.append(result) if not is_provider_full_name_found: @@ -305,7 +317,7 @@ def terraform_version_operator(input_data: dict, provider_inputs: dict, outputs: :param outputs: The outputs """ # For terraform_version, there's no specific address as it applies to the entire plan - outputs.append({"value": input_data.get("terraform_version"), "meta": input_data, "addresses": "terraform_version"}) + outputs.append({"value": input_data.get("terraform_version"), "meta": input_data, "addresses": ["terraform_version"]}) def direct_dependencies_operator(input_data: dict, provider_inputs: dict, outputs: list): @@ -332,7 +344,7 @@ def direct_dependencies_operator(input_data: dict, provider_inputs: dict, output # Add addresses if available address = resource.get("address") if address: - result["addresses"] = address + result["addresses"] = _ensure_list_of_strings(address) outputs.append(result) if not is_resource_found: @@ -400,14 +412,14 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: reference_target_address.remove(reference_address) result = {"value": True, "meta": {"referenced_by": resource_config}} # Add addresses to the output - result["addresses"] = reference_address + result["addresses"] = _ensure_list_of_strings(reference_address) outputs.append(result) # For all of the reference_target_address that don't have a reference - for reference_target_address in reference_target_address: + for reference_target_address_item in reference_target_address: result = {"value": False, "meta": {"referenced_by": {}}} # Add addresses to the output - result["addresses"] = reference_target_address + result["addresses"] = _ensure_list_of_strings(reference_target_address_item) outputs.append(result) @@ -496,7 +508,7 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: is_all_resource_type_references_to = resource_type_count == reference_count result = {"value": is_all_resource_type_references_to, "meta": config_resources} # While we don't have a specific address for the entire result, we can include the resource type - result["addresses"] = resource_type + result["addresses"] = [resource_type] outputs.append(result) @@ -553,7 +565,7 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: # Add addresses if available address = resource.get("address") if address: - result["addresses"] = address + result["addresses"] = _ensure_list_of_strings(address) outputs.append(result) if not is_resource_found: From c40ff7b5ff648abdae0d11ed1c678d0b9d7aa8da Mon Sep 17 00:00:00 2001 From: Akash S Date: Thu, 12 Jun 2025 16:52:46 +0530 Subject: [PATCH 11/15] lint fix --- src/tirith/prettyprinter.py | 2 +- src/tirith/providers/terraform_plan/handler.py | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index c1c7a93b..89870be9 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -102,7 +102,7 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: # Include addresses in the message if it exists in the result_dict if "addresses" in result_dict: - addresses = result_dict['addresses'] + addresses = result_dict["addresses"] # Convert to string representation for display if isinstance(addresses, list): addresses_str = ", ".join(addresses) diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index ab99b097..e2aa6616 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -294,7 +294,11 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l ) return - result = {"value": attribute_value, "meta": provider_config_dict, "addresses": _ensure_list_of_strings(terraform_provider_full_name)} + result = { + "value": attribute_value, + "meta": provider_config_dict, + "addresses": _ensure_list_of_strings(terraform_provider_full_name), + } outputs.append(result) if not is_provider_full_name_found: @@ -317,7 +321,9 @@ def terraform_version_operator(input_data: dict, provider_inputs: dict, outputs: :param outputs: The outputs """ # For terraform_version, there's no specific address as it applies to the entire plan - outputs.append({"value": input_data.get("terraform_version"), "meta": input_data, "addresses": ["terraform_version"]}) + outputs.append( + {"value": input_data.get("terraform_version"), "meta": input_data, "addresses": ["terraform_version"]} + ) def direct_dependencies_operator(input_data: dict, provider_inputs: dict, outputs: list): From 8786453361c1eb0fd9ec4c3d9e60f67fa0ed44b0 Mon Sep 17 00:00:00 2001 From: Akash S Date: Thu, 12 Jun 2025 17:02:16 +0530 Subject: [PATCH 12/15] fix --- src/tirith/core/core.py | 25 ++++++-------- src/tirith/prettyprinter.py | 10 +++--- src/tirith/providers/json/handler.py | 2 +- .../providers/terraform_plan/handler.py | 34 +++++++++---------- 4 files changed, 34 insertions(+), 37 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index c7327057..d64f7733 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -16,18 +16,6 @@ logger = logging.getLogger(__name__) -def _ensure_list_of_strings(value: Union[str, List[str], None]) -> List[str]: - """Convert a string or None value to a list of strings""" - if value is None: - return [] - elif isinstance(value, str): - return [value] - elif isinstance(value, list): - return [str(item) for item in value] - else: - return [str(value)] - - def get_evaluator_inputs_from_provider_inputs(provider_inputs, provider_module, input_data): # TODO: Get the inputs from given providers provider_func = PROVIDERS_DICT.get(provider_module) @@ -100,9 +88,18 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): if "meta" in evaluator_input: evaluation_result["meta"] = evaluator_input["meta"] - # Copy addresses directly if provided, ensuring it's a list of strings + # Copy addresses directly if provided if "addresses" in evaluator_input: - evaluation_result["addresses"] = _ensure_list_of_strings(evaluator_input["addresses"]) + # Ensure addresses is a list of strings + addresses = evaluator_input["addresses"] + if addresses is None: + evaluation_result["addresses"] = [] + elif isinstance(addresses, str): + evaluation_result["addresses"] = [addresses] + elif isinstance(addresses, list): + evaluation_result["addresses"] = [str(item) for item in addresses] + else: + evaluation_result["addresses"] = [str(addresses)] evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index 89870be9..77b6651b 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -103,12 +103,12 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: # Include addresses in the message if it exists in the result_dict if "addresses" in result_dict: addresses = result_dict["addresses"] - # Convert to string representation for display - if isinstance(addresses, list): + # Format addresses as a comma-separated string + if isinstance(addresses, list) and addresses: addresses_str = ", ".join(addresses) - else: - addresses_str = str(addresses) - result_message = f"{result_message} - (Addresses: `{addresses_str}`)" + result_message = f"{result_message} - (Addresses: `{addresses_str}`)" + elif addresses: # Fallback for any non-empty non-list value + result_message = f"{result_message} - (Addresses: `{addresses}`)" if result_dict["passed"]: print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}")) diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index dc31ad39..fa739839 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -47,7 +47,7 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: outputs = [] for value in values: result = create_result_dict(value=value, meta=None, err=None) - # Ensure addresses is a list of strings + # Simply use a list directly result["addresses"] = [key_path] outputs.append(result) diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index e2aa6616..53443e4c 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -109,11 +109,12 @@ def provide(provider_inputs, input_data): is_attribute_found = True local_is_found_attribute = True attribute_value = input_resource_change_attrs[attribute] + address = resource_change.get("address") result = { "value": attribute_value, "meta": resource_change, "err": None, - "addresses": _ensure_list_of_strings(resource_change.get("address")), + "addresses": [address] if address is not None else [], } outputs.append(result) elif "." in attribute or "*" in attribute: @@ -122,21 +123,23 @@ def provide(provider_inputs, input_data): is_attribute_found = True local_is_found_attribute = True for evaluated_output in evaluated_outputs: + address = resource_change.get("address") result = { "value": evaluated_output, "meta": resource_change, "err": None, - "addresses": _ensure_list_of_strings(resource_change.get("address")), + "addresses": [address] if address is not None else [], } outputs.append(result) # If we didn't find the attribute in this resource, add a None value so it still gets evaluated if not local_is_found_attribute: + address = resource_change.get("address") result = { "value": None, "meta": resource_change, "err": None, - "addresses": _ensure_list_of_strings(resource_change.get("address")), + "addresses": [address] if address is not None else [], } outputs.append(result) else: @@ -178,11 +181,12 @@ def provide(provider_inputs, input_data): if resource_type in (resource_change["type"], "*"): is_resource_type_found = True for action in resource_change["change"]["actions"]: + address = resource_change.get("address") result = { "value": action, "meta": resource_change, "err": None, - "addresses": _ensure_list_of_strings(resource_change.get("address")), + "addresses": [address] if address is not None else [], } outputs.append(result) if not is_resource_type_found: @@ -294,11 +298,7 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l ) return - result = { - "value": attribute_value, - "meta": provider_config_dict, - "addresses": _ensure_list_of_strings(terraform_provider_full_name), - } + result = {"value": attribute_value, "meta": provider_config_dict, "addresses": [terraform_provider_full_name]} outputs.append(result) if not is_provider_full_name_found: @@ -350,7 +350,7 @@ def direct_dependencies_operator(input_data: dict, provider_inputs: dict, output # Add addresses if available address = resource.get("address") if address: - result["addresses"] = _ensure_list_of_strings(address) + result["addresses"] = [address] outputs.append(result) if not is_resource_found: @@ -417,15 +417,15 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: if reference_address in reference_target_address: reference_target_address.remove(reference_address) result = {"value": True, "meta": {"referenced_by": resource_config}} - # Add addresses to the output - result["addresses"] = _ensure_list_of_strings(reference_address) + # Add addresses to the output as a simple list + result["addresses"] = [reference_address] outputs.append(result) # For all of the reference_target_address that don't have a reference for reference_target_address_item in reference_target_address: result = {"value": False, "meta": {"referenced_by": {}}} - # Add addresses to the output - result["addresses"] = _ensure_list_of_strings(reference_target_address_item) + # Add addresses as a simple list + result["addresses"] = [reference_target_address_item] outputs.append(result) @@ -513,7 +513,7 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: is_all_resource_type_references_to = resource_type_count == reference_count result = {"value": is_all_resource_type_references_to, "meta": config_resources} - # While we don't have a specific address for the entire result, we can include the resource type + # Simple list with resource type result["addresses"] = [resource_type] outputs.append(result) @@ -568,10 +568,10 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: resource_references.add(reference.split(".")[0]) result = {"value": list(resource_references), "meta": resource} - # Add addresses if available + # Add addresses if available as a simple list address = resource.get("address") if address: - result["addresses"] = _ensure_list_of_strings(address) + result["addresses"] = [address] outputs.append(result) if not is_resource_found: From 539bfddc8d143bedaeb05c174796cc40e833a0e7 Mon Sep 17 00:00:00 2001 From: Akash S Date: Fri, 13 Jun 2025 11:05:18 +0530 Subject: [PATCH 13/15] Refactor --- src/tirith/core/core.py | 14 +---- src/tirith/prettyprinter.py | 2 - .../providers/terraform_plan/handler.py | 55 +++++++++---------- 3 files changed, 28 insertions(+), 43 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index d64f7733..547a04c3 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -88,18 +88,10 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): if "meta" in evaluator_input: evaluation_result["meta"] = evaluator_input["meta"] - # Copy addresses directly if provided if "addresses" in evaluator_input: - # Ensure addresses is a list of strings - addresses = evaluator_input["addresses"] - if addresses is None: - evaluation_result["addresses"] = [] - elif isinstance(addresses, str): - evaluation_result["addresses"] = [addresses] - elif isinstance(addresses, list): - evaluation_result["addresses"] = [str(item) for item in addresses] - else: - evaluation_result["addresses"] = [str(addresses)] + # Add addresses directly + # TODO: We need to make a model class for the `evaluator_input` and move this validation there + evaluation_result["addresses"] = evaluator_input["addresses"] evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index 77b6651b..c731c0ea 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -107,8 +107,6 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: if isinstance(addresses, list) and addresses: addresses_str = ", ".join(addresses) result_message = f"{result_message} - (Addresses: `{addresses_str}`)" - elif addresses: # Fallback for any non-empty non-list value - result_message = f"{result_message} - (Addresses: `{addresses}`)" if result_dict["passed"]: print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}")) diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 53443e4c..00ede954 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -56,18 +56,6 @@ def _get_exp_attribute(split_expressions, input_data): return final_data -def _ensure_list_of_strings(value: Union[str, List[str], None]) -> List[str]: - """Convert a string or None value to a list of strings""" - if value is None: - return [] - elif isinstance(value, str): - return [value] - elif isinstance(value, list): - return [str(item) for item in value] - else: - return [str(value)] - - def provide(provider_inputs, input_data): # """Provides the value of the attribute from the input_data""" outputs = [] @@ -101,6 +89,10 @@ def provide(provider_inputs, input_data): if resource_type in (resource_change["type"], "*"): is_resource_found = True input_resource_change_attrs = resource_change["change"]["after"] + # Extract address once for reuse + address = resource_change.get("address") + addresses = [address] if address is not None else [] + # [local_is_found_attribute] (local scope) # Used to decide whether to append a None value for each specific resource that's missing the attribute if input_resource_change_attrs: @@ -109,12 +101,11 @@ def provide(provider_inputs, input_data): is_attribute_found = True local_is_found_attribute = True attribute_value = input_resource_change_attrs[attribute] - address = resource_change.get("address") result = { "value": attribute_value, "meta": resource_change, "err": None, - "addresses": [address] if address is not None else [], + "addresses": addresses, } outputs.append(result) elif "." in attribute or "*" in attribute: @@ -123,23 +114,21 @@ def provide(provider_inputs, input_data): is_attribute_found = True local_is_found_attribute = True for evaluated_output in evaluated_outputs: - address = resource_change.get("address") result = { "value": evaluated_output, "meta": resource_change, "err": None, - "addresses": [address] if address is not None else [], + "addresses": addresses, } outputs.append(result) # If we didn't find the attribute in this resource, add a None value so it still gets evaluated if not local_is_found_attribute: - address = resource_change.get("address") result = { "value": None, "meta": resource_change, "err": None, - "addresses": [address] if address is not None else [], + "addresses": addresses, } outputs.append(result) else: @@ -180,13 +169,15 @@ def provide(provider_inputs, input_data): continue if resource_type in (resource_change["type"], "*"): is_resource_type_found = True + # Extract address once for reuse + address = resource_change.get("address") + addresses = [address] if address is not None else [] for action in resource_change["change"]["actions"]: - address = resource_change.get("address") result = { "value": action, "meta": resource_change, "err": None, - "addresses": [address] if address is not None else [], + "addresses": addresses, } outputs.append(result) if not is_resource_type_found: @@ -377,7 +368,7 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: resource_changes = input_data.get("resource_changes", []) referenced_by = provider_inputs.get("referenced_by") - reference_target_address = set() + reference_target_addresses = set() is_resource_found = False # Loop for adding reference_target @@ -386,7 +377,7 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: "destroy" ]: continue - reference_target_address.add(resource_change.get("address")) + reference_target_addresses.add(resource_change.get("address")) is_resource_found = True if not is_resource_found: @@ -414,18 +405,21 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: reference_address = relative_reference_address else: reference_address = f"{module_path}.{relative_reference_address}" - if reference_address in reference_target_address: - reference_target_address.remove(reference_address) - result = {"value": True, "meta": {"referenced_by": resource_config}} - # Add addresses to the output as a simple list - result["addresses"] = [reference_address] + if reference_address in reference_target_addresses: + reference_target_addresses.remove(reference_address) + result = { + "value": True, + "meta": {"referenced_by": resource_config}, + "addresses": [reference_address], + } + outputs.append(result) - # For all of the reference_target_address that don't have a reference - for reference_target_address_item in reference_target_address: + # For all of the reference_target_addresses that don't have a reference + for reference_target_addresses_item in reference_target_addresses: result = {"value": False, "meta": {"referenced_by": {}}} # Add addresses as a simple list - result["addresses"] = [reference_target_address_item] + result["addresses"] = [reference_target_addresses_item] outputs.append(result) @@ -514,6 +508,7 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: is_all_resource_type_references_to = resource_type_count == reference_count result = {"value": is_all_resource_type_references_to, "meta": config_resources} # Simple list with resource type + # TODO: Use the real specific addresses result["addresses"] = [resource_type] outputs.append(result) From ac4bd7e548d5d6e71ca7083283e0903019377541 Mon Sep 17 00:00:00 2001 From: Akash S Date: Mon, 16 Jun 2025 12:31:22 +0530 Subject: [PATCH 14/15] Refactor --- src/tirith/core/core.py | 5 ++++- src/tirith/providers/terraform_plan/handler.py | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index 547a04c3..af94e9a4 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -90,8 +90,11 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): if "addresses" in evaluator_input: # Add addresses directly + addresses = evaluator_input["addresses"] + if not isinstance(addresses, list): + raise Exception("`addresses` should be a list") # TODO: We need to make a model class for the `evaluator_input` and move this validation there - evaluation_result["addresses"] = evaluator_input["addresses"] + evaluation_result["addresses"] = addresses evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 00ede954..22bdd68d 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -563,10 +563,10 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: resource_references.add(reference.split(".")[0]) result = {"value": list(resource_references), "meta": resource} - # Add addresses if available as a simple list - address = resource.get("address") - if address: - result["addresses"] = [address] + # Add references as addresses instead of just the resource address + addresses = expressions_val.get("references", []) + if addresses: + result["addresses"] = addresses outputs.append(result) if not is_resource_found: From e63e4624940ca8745848d7087cfbda676a0d2769 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Fri, 26 Sep 2025 18:07:29 +0700 Subject: [PATCH 15/15] Add address extraction helper and refactor address handling in providers --- src/tirith/core/core.py | 2 +- src/tirith/providers/json/handler.py | 2 -- .../providers/terraform_plan/handler.py | 28 +++++++++++++------ 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index af94e9a4..6ec2a727 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -91,9 +91,9 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): if "addresses" in evaluator_input: # Add addresses directly addresses = evaluator_input["addresses"] + # TODO: We need to make a model class for the `evaluator_input` and move this validation there if not isinstance(addresses, list): raise Exception("`addresses` should be a list") - # TODO: We need to make a model class for the `evaluator_input` and move this validation there evaluation_result["addresses"] = addresses evaluation_results.append(evaluation_result) diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index fa739839..a2b1b6b8 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -43,11 +43,9 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: ) ] - # Create result dict with addresses as a separate property, not in meta outputs = [] for value in values: result = create_result_dict(value=value, meta=None, err=None) - # Simply use a list directly result["addresses"] = [key_path] outputs.append(result) diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 22bdd68d..9a41132b 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -56,6 +56,20 @@ def _get_exp_attribute(split_expressions, input_data): return final_data +def _get_addresses_from_resource_change(resource_change: dict) -> List[str]: + """ + Helper function to extract addresses from a resource change. + + :param resource_change: A dictionary containing resource change information from Terraform plan + :type resource_change: dict + :return: A list containing the resource address if found, otherwise an empty list + :rtype: List[str] + + """ + address = resource_change.get("address") + return [address] if address is not None else [] + + def provide(provider_inputs, input_data): # """Provides the value of the attribute from the input_data""" outputs = [] @@ -89,9 +103,7 @@ def provide(provider_inputs, input_data): if resource_type in (resource_change["type"], "*"): is_resource_found = True input_resource_change_attrs = resource_change["change"]["after"] - # Extract address once for reuse - address = resource_change.get("address") - addresses = [address] if address is not None else [] + addresses = _get_addresses_from_resource_change(resource_change) # [local_is_found_attribute] (local scope) # Used to decide whether to append a None value for each specific resource that's missing the attribute @@ -169,9 +181,7 @@ def provide(provider_inputs, input_data): continue if resource_type in (resource_change["type"], "*"): is_resource_type_found = True - # Extract address once for reuse - address = resource_change.get("address") - addresses = [address] if address is not None else [] + addresses = _get_addresses_from_resource_change(resource_change) for action in resource_change["change"]["actions"]: result = { "value": action, @@ -545,7 +555,7 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: return is_resource_found = False - + addresses = [] for resource in config_resources: if resource.get("type") != resource_type: @@ -561,10 +571,10 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: for reference in expressions_val.get("references", []): # Only get the resource type resource_references.add(reference.split(".")[0]) + addresses.append(reference) result = {"value": list(resource_references), "meta": resource} - # Add references as addresses instead of just the resource address - addresses = expressions_val.get("references", []) + if addresses: result["addresses"] = addresses outputs.append(result)