diff --git a/nipyapi/canvas.py b/nipyapi/canvas.py index 691f8237..f34fa925 100644 --- a/nipyapi/canvas.py +++ b/nipyapi/canvas.py @@ -35,6 +35,8 @@ "schedule_port", "get_funnel", "update_processor", + "prepare_processor_config", + "prepare_controller_config", "purge_connection", "list_flowfiles", "get_flowfile_details", @@ -1035,6 +1037,190 @@ def update_process_group(pg, update, refresh=True, greedy=True, identifier_type= ) +def _prepare_component_config(descriptors, component_type, dto_class, properties, allow_dynamic): + """ + Internal helper for prepare_processor_config and prepare_controller_config. + + Consolidates the common validation and DTO preparation logic. + + Args: + descriptors (dict): Dict of property descriptors from the component + component_type (str): String type name for error messages + dto_class (type): The DTO class to instantiate (ProcessorConfigDTO or + ControllerServiceDTO) + properties (dict or None): Dict of property key -> value, or None for + discovery mode + allow_dynamic (bool): Whether to skip validation + + Returns: + list[str]: If properties is None, returns list of valid static property keys. + dto_class: If properties provided, returns instance with properties set. + """ + valid_keys = [key for key, desc in descriptors.items() if not desc.dynamic] + + if properties is None: + return valid_keys + + if not allow_dynamic: + invalid_keys = set(properties.keys()) - set(valid_keys) + if invalid_keys: + raise ValueError( + f"Property keys not in static descriptors: {sorted(invalid_keys)}\n" + f"Valid static keys for {component_type}:\n" + f" {sorted(valid_keys)}\n" + f"Use allow_dynamic=True to create dynamic properties intentionally." + ) + + return dto_class(properties=properties) + + +def prepare_processor_config(processor, properties=None, allow_dynamic=False): + """ + Discover valid property keys or prepare a validated ProcessorConfigDTO. + + Two modes based on whether properties is provided: + - Discovery: Call with just processor to get list of valid property keys + - Prepare: Call with properties dict to get validated ProcessorConfigDTO + + This helps prevent silent creation of dynamic properties from typos or + guessed property names. NiFi accepts any property name and silently creates + dynamic properties for unknown keys, which can cause hard-to-debug issues. + + Property Value Semantics: + - ``value`` (string): Set property to that value + - ``''`` (empty string): Set property to empty string + - ``None``: For static properties, clears/unsets the value. + For dynamic properties, **deletes the property entirely**. + + Args: + processor (ProcessorEntity or str): The processor to inspect, as a + ProcessorEntity object, processor ID, or processor name. + properties (dict, optional): Property key -> value mapping. If None, + returns list of valid property keys (discovery mode). + allow_dynamic (bool): If True, skip validation. Use for processors + like UpdateAttribute that expect dynamic properties. Default False. + + Returns: + list[str]: If properties is None, returns list of valid property keys. + ProcessorConfigDTO: If properties provided, returns validated DTO + ready to pass to update_processor(). + + Raises: + ValueError: If any property key not found in processor's descriptors + (unless allow_dynamic=True). Error includes list of valid keys. + + Example - Discovery:: + + >>> keys = nipyapi.canvas.prepare_processor_config(proc) + ['SQL Query', 'Database Connection Pooling Service', 'Max Wait Time', ...] + + Example - Prepare:: + + >>> config = nipyapi.canvas.prepare_processor_config(proc, { + ... 'SQL Query': 'SELECT * FROM users', + ... 'Database Connection Pooling Service': controller.id + ... }) + >>> nipyapi.canvas.update_processor(proc, update=config, auto_stop=True) + + Example - Dynamic properties (UpdateAttribute):: + + >>> config = nipyapi.canvas.prepare_processor_config(proc, { + ... 'my.custom.attr': '${filename}' + ... }, allow_dynamic=True) + + Example - Clear static property:: + + >>> config = nipyapi.canvas.prepare_processor_config(proc, { + ... 'Custom Text': None # Clears to unset state + ... }) + + Example - Delete dynamic property:: + + >>> config = nipyapi.canvas.prepare_processor_config(proc, { + ... 'my.custom.attr': None # Removes the dynamic property entirely + ... }, allow_dynamic=True) + """ + processor = nipyapi.utils.resolve_entity( + processor, + get_processor, + nipyapi.nifi.ProcessorEntity, + strict=True, + ) + return _prepare_component_config( + processor.component.config.descriptors, + processor.component.type, + nipyapi.nifi.ProcessorConfigDTO, + properties, + allow_dynamic, + ) + + +def prepare_controller_config(controller, properties=None, allow_dynamic=False): + """ + Discover valid property keys or prepare a validated ControllerServiceDTO. + + Two modes based on whether properties is provided: + - Discovery: Call with just controller to get list of valid property keys + - Prepare: Call with properties dict to get validated ControllerServiceDTO + + This helps prevent silent creation of dynamic properties from typos or + guessed property names. + + Property Value Semantics: + - ``value`` (string): Set property to that value + - ``''`` (empty string): Set property to empty string + - ``None``: For static properties, clears/unsets the value. + For dynamic properties, **deletes the property entirely**. + + Args: + controller (ControllerServiceEntity or str): The controller to inspect, + as a ControllerServiceEntity object, controller ID, or controller name. + properties (dict, optional): Property key -> value mapping. If None, + returns list of valid property keys (discovery mode). + allow_dynamic (bool): If True, skip validation. Default False. + + Returns: + list[str]: If properties is None, returns list of valid property keys. + ControllerServiceDTO: If properties provided, returns validated DTO + ready to pass to update_controller(). + + Raises: + ValueError: If any property key not found in controller's descriptors + (unless allow_dynamic=True). Error includes list of valid keys. + + Example - Discovery:: + + >>> keys = nipyapi.canvas.prepare_controller_config(cs) + ['Schema Access Strategy', 'Schema Registry', 'Schema Name', ...] + + Example - Prepare:: + + >>> config = nipyapi.canvas.prepare_controller_config(cs, { + ... 'Schema Access Strategy': 'schema-name' + ... }) + >>> nipyapi.canvas.update_controller(cs, update=config) + + Example - Clear property:: + + >>> config = nipyapi.canvas.prepare_controller_config(cs, { + ... 'Schema Name': None # Clears to unset state + ... }) + """ + controller = nipyapi.utils.resolve_entity( + controller, + get_controller, + nipyapi.nifi.ControllerServiceEntity, + strict=True, + ) + return _prepare_component_config( + controller.component.descriptors, + controller.component.type, + nipyapi.nifi.ControllerServiceDTO, + properties, + allow_dynamic, + ) + + def update_processor(processor, update=None, name=None, refresh=True, auto_stop=False): """ Updates a Processor's configuration and/or name. @@ -2052,10 +2238,17 @@ def _del_cont(cont_id): return result -def update_controller(controller, update, refresh=True, greedy=True, identifier_type="auto"): +def update_controller( # pylint: disable=too-many-arguments,too-many-positional-arguments + controller, update, refresh=True, greedy=True, identifier_type="auto", auto_disable=False +): """ Updates the Configuration of a Controller Service + Note: + This function does not handle referencing components. If the controller + has processors or other controllers that reference it, you must stop/disable + those separately before updating. + Args: controller (ControllerServiceEntity or str): Target Controller to update, as a ControllerServiceEntity object, controller ID, or controller name. @@ -2066,13 +2259,16 @@ def update_controller(controller, update, refresh=True, greedy=True, identifier_ greedy (bool): For name lookup, True for partial match, False for exact. identifier_type (str): How to interpret string identifier: "auto" (default) detects UUID vs name, "id" or "name" to force. + auto_disable (bool): If True, automatically disable the controller before + updating and re-enable afterward if it was enabled. Default False. Returns: (ControllerServiceEntity) Raises: TypeError: If controller is not a string or ControllerServiceEntity. - ValueError: If controller not found or multiple matches found. + ValueError: If controller not found, multiple matches found, or + controller is enabled and auto_disable=False. """ controller = nipyapi.utils.resolve_entity( @@ -2088,14 +2284,44 @@ def update_controller(controller, update, refresh=True, greedy=True, identifier_ if refresh: controller = get_controller(controller.id, "id") + was_enabled = controller.component.state == "ENABLED" + + if was_enabled and not auto_disable: + raise ValueError( + f"Controller '{controller.component.name}' is enabled. " + "Disable it first or set auto_disable=True. " + "Note: auto_disable does not handle referencing components." + ) + + if was_enabled: + schedule_controller(controller, scheduled=False, refresh=True) + controller = get_controller(controller.id, "id") + # Insert the ID into the update update.id = controller.id - return nipyapi.nifi.ControllerServicesApi().update_controller_service( - id=controller.id, - body=nipyapi.nifi.ControllerServiceEntity( - component=update, revision=controller.revision, id=controller.id - ), - ) + try: + result = nipyapi.nifi.ControllerServicesApi().update_controller_service( + id=controller.id, + body=nipyapi.nifi.ControllerServiceEntity( + component=update, revision=controller.revision, id=controller.id + ), + ) + except Exception: + # Attempt to restore enabled state if update fails + if was_enabled: + try: + schedule_controller( + get_controller(controller.id, "id"), scheduled=True, refresh=True + ) + except Exception: # pylint: disable=broad-except + pass # Best effort - original exception takes priority + raise + + if was_enabled: + schedule_controller(result, scheduled=True, refresh=True) + result = get_controller(result.id, "id") + + return result def schedule_controller(controller, scheduled, refresh=False, greedy=True, identifier_type="auto"): diff --git a/nipyapi/cli.py b/nipyapi/cli.py index 93acbf9f..2f45bffa 100644 --- a/nipyapi/cli.py +++ b/nipyapi/cli.py @@ -64,6 +64,8 @@ import os import sys +import urllib3 + def _detect_output_format(): """ @@ -425,10 +427,13 @@ def _apply_verbosity(verbosity): def main(): """CLI entry point.""" + # Disable pager for help output so agents don't hang waiting for input + # Only set when help is requested or in non-interactive/CI environments + if "--help" in sys.argv or "-h" in sys.argv or not sys.stdout.isatty() or os.environ.get("CI"): + os.environ.setdefault("PAGER", "cat") + # Suppress SSL warnings early to prevent them polluting stdout in CI # This is safe as the warnings are informational and CLI users expect clean output - import urllib3 - if os.environ.get("NIFI_VERIFY_SSL", "true").lower() in ("false", "0", "no"): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) diff --git a/nipyapi/parameters.py b/nipyapi/parameters.py index bd33c044..225412b4 100644 --- a/nipyapi/parameters.py +++ b/nipyapi/parameters.py @@ -781,23 +781,31 @@ def delete_asset(context_id, asset_id): } -def prepare_parameter_with_asset(name, asset_id, asset_name, description=None): +def prepare_parameter_with_asset( + name, asset_id=None, asset_name=None, description=None, assets=None +): """ - Prepare a parameter that references an asset. + Prepare a parameter that references one or more assets. - Use this to update a parameter to point to an uploaded asset. + Use this to update a parameter to point to uploaded asset(s). + Provide either (asset_id + asset_name) for a single asset, or assets + for multiple assets. These are mutually exclusive. Args: - name (str): Parameter name - asset_id (str): ID of the asset to reference - asset_name (str): Name of the asset - description (str): Optional parameter description + name (str): Parameter name (required) + asset_id (str, optional): ID of a single asset. Must be provided + together with asset_name. Cannot be used with assets. + asset_name (str, optional): Name of a single asset. Must be provided + together with asset_id. Cannot be used with assets. + description (str, optional): Parameter description + assets (list, optional): List of asset dicts, each with 'id' and + 'name' keys. Cannot be used with asset_id/asset_name. Returns: :class:`~nipyapi.nifi.models.ParameterEntity`: ParameterEntity ready for use with update_parameter_context or upsert_parameter_to_context - Example:: + Example (single asset):: >>> # Upload asset first >>> asset = upload_asset(context_id, file_path="/path/to/driver.jar") @@ -807,13 +815,51 @@ def prepare_parameter_with_asset(name, asset_id, asset_name, description=None): ... asset_id=asset['id'], ... asset_name=asset['name'] ... ) - >>> # Update the parameter context + >>> upsert_parameter_to_context(context, param) + + Example (multiple assets):: + + >>> # Upload multiple JARs + >>> jar1 = upload_asset(context_id, file_path="/path/to/activemq-client.jar") + >>> jar2 = upload_asset(context_id, file_path="/path/to/activemq-broker.jar") + >>> # Reference all in one parameter + >>> param = prepare_parameter_with_asset( + ... name="JMS Client Libraries", + ... assets=[jar1, jar2] # Each dict has 'id' and 'name' + ... ) >>> upsert_parameter_to_context(context, param) """ enforce_min_ver("1.10.0") - asset_ref = AssetReferenceDTO(id=asset_id, name=asset_name) + # Validate argument combinations + has_single = asset_id is not None or asset_name is not None + has_multiple = assets is not None + + if has_single and has_multiple: + raise ValueError( + "Cannot specify both asset_id/asset_name and assets. " + "Use asset_id/asset_name for single asset, or assets for multiple." + ) + + if not has_single and not has_multiple: + raise ValueError("Must specify either asset_id and asset_name, or assets list.") + + if has_single: + if asset_id is None or asset_name is None: + raise ValueError("Both asset_id and asset_name are required for single asset.") + asset_refs = [AssetReferenceDTO(id=asset_id, name=asset_name)] + else: + # Validate assets list + if not isinstance(assets, list) or len(assets) == 0: + raise ValueError("assets must be a non-empty list of dicts with 'id' and 'name' keys.") + asset_refs = [] + for i, asset in enumerate(assets): + if not isinstance(asset, dict) or "id" not in asset or "name" not in asset: + raise ValueError( + f"Asset at index {i} must be a dict with 'id' and 'name' keys. " f"Got: {asset}" + ) + asset_refs.append(AssetReferenceDTO(id=asset["id"], name=asset["name"])) return ParameterEntity( - parameter=ParameterDTO(name=name, description=description, referenced_assets=[asset_ref]) + parameter=ParameterDTO(name=name, description=description, referenced_assets=asset_refs) ) diff --git a/tests/test_canvas.py b/tests/test_canvas.py index 523b688d..33e8df12 100644 --- a/tests/test_canvas.py +++ b/tests/test_canvas.py @@ -2058,6 +2058,267 @@ def test_list_flowfiles_with_data(fix_proc): canvas.delete_connection(conn) +def test_prepare_processor_config_discovery(fix_proc): + """Test prepare_processor_config discovery mode returns list of valid keys.""" + proc = fix_proc.generate() + keys = canvas.prepare_processor_config(proc) + assert isinstance(keys, list) + assert len(keys) > 0 + assert 'File Size' in keys + assert 'Batch Size' in keys + + +def test_prepare_processor_config_valid_properties(fix_proc): + """Test prepare_processor_config with valid properties returns DTO.""" + proc = fix_proc.generate() + config = canvas.prepare_processor_config(proc, {'File Size': '10 B'}) + assert isinstance(config, nifi.ProcessorConfigDTO) + assert config.properties == {'File Size': '10 B'} + + +def test_prepare_processor_config_invalid_properties(fix_proc): + """Test prepare_processor_config raises on invalid property keys.""" + proc = fix_proc.generate() + with pytest.raises(ValueError) as exc_info: + canvas.prepare_processor_config(proc, {'Invalid Key': 'value'}) + assert 'Property keys not in static descriptors' in str(exc_info.value) + assert 'Invalid Key' in str(exc_info.value) + assert 'Valid static keys for' in str(exc_info.value) + + +def test_prepare_processor_config_allow_dynamic(fix_proc): + """Test prepare_processor_config allow_dynamic bypasses validation.""" + proc = fix_proc.generate() + config = canvas.prepare_processor_config( + proc, {'my.dynamic.prop': 'value'}, allow_dynamic=True + ) + assert isinstance(config, nifi.ProcessorConfigDTO) + assert config.properties == {'my.dynamic.prop': 'value'} + + +def test_prepare_controller_config_discovery(fix_cont): + """Test prepare_controller_config discovery mode returns list of valid keys.""" + controller = fix_cont() + keys = canvas.prepare_controller_config(controller) + assert isinstance(keys, list) + assert len(keys) > 0 + + +def test_prepare_controller_config_valid_properties(fix_cont): + """Test prepare_controller_config with valid properties returns DTO.""" + controller = fix_cont() + keys = canvas.prepare_controller_config(controller) + first_key = keys[0] + config = canvas.prepare_controller_config(controller, {first_key: 'test'}) + assert isinstance(config, nifi.ControllerServiceDTO) + assert config.properties == {first_key: 'test'} + + +def test_prepare_controller_config_invalid_properties(fix_cont): + """Test prepare_controller_config raises on invalid property keys.""" + controller = fix_cont() + with pytest.raises(ValueError) as exc_info: + canvas.prepare_controller_config(controller, {'Invalid Key': 'value'}) + assert 'Property keys not in static descriptors' in str(exc_info.value) + assert 'Invalid Key' in str(exc_info.value) + + +def test_prepare_controller_config_allow_dynamic(fix_cont): + """Test prepare_controller_config allow_dynamic bypasses validation.""" + controller = fix_cont() + config = canvas.prepare_controller_config( + controller, {'my.dynamic.prop': 'value'}, allow_dynamic=True + ) + assert isinstance(config, nifi.ControllerServiceDTO) + assert config.properties == {'my.dynamic.prop': 'value'} + + +def test_update_controller_auto_disable(fix_cont): + """Test update_controller auto_disable stops and restarts enabled controller.""" + controller = fix_cont() + # Enable the controller first + canvas.schedule_controller(controller, True) + controller = canvas.get_controller(controller.id, 'id') + assert controller.component.state == 'ENABLED' + + # Update with auto_disable should work + keys = canvas.prepare_controller_config(controller) + assert keys, "Expected controller to have configurable properties" + config = canvas.prepare_controller_config(controller, {keys[0]: 'test'}) + result = canvas.update_controller(controller, update=config, auto_disable=True) + assert result.component.state == 'ENABLED' + + +def test_update_controller_raises_when_enabled(fix_cont): + """Test update_controller raises ValueError when enabled and auto_disable=False.""" + controller = fix_cont() + canvas.schedule_controller(controller, True) + controller = canvas.get_controller(controller.id, 'id') + + with pytest.raises(ValueError) as exc_info: + canvas.update_controller( + controller, + update=nifi.ControllerServiceDTO(comments='test') + ) + assert 'is enabled' in str(exc_info.value) + assert 'auto_disable' in str(exc_info.value) + + +def test_update_controller_auto_disable_restores_on_error(fix_cont): + """Test auto_disable re-enables controller if update fails.""" + from unittest.mock import patch + + controller = fix_cont() + canvas.schedule_controller(controller, True) + controller = canvas.get_controller(controller.id, 'id') + assert controller.component.state == 'ENABLED' + + # Mock the API call to raise an exception after the controller is disabled + with patch.object( + nifi.ControllerServicesApi, + 'update_controller_service', + side_effect=RuntimeError('Simulated API failure') + ): + with pytest.raises(RuntimeError, match='Simulated API failure'): + canvas.update_controller( + controller, + update=nifi.ControllerServiceDTO(comments='should fail'), + auto_disable=True + ) + + # Controller should be re-enabled after error recovery + controller = canvas.get_controller(controller.id, 'id') + assert controller.component.state == 'ENABLED' + + +def test_update_controller_auto_disable_recovery_also_fails(fix_cont): + """Test that original error is raised even if recovery fails.""" + from unittest.mock import patch + + controller = fix_cont() + canvas.schedule_controller(controller, True) + controller = canvas.get_controller(controller.id, 'id') + assert controller.component.state == 'ENABLED' + + original_schedule = canvas.schedule_controller + call_count = [0] # Use list to allow mutation in closure + + def schedule_side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + # First call (disable) - use real implementation + return original_schedule(*args, **kwargs) + else: + # Second call (recovery re-enable) - fail + raise RuntimeError('Simulated recovery failure') + + # Mock update to fail, and schedule_controller to fail only on recovery + with patch.object( + nifi.ControllerServicesApi, + 'update_controller_service', + side_effect=RuntimeError('Simulated update failure') + ): + with patch.object( + canvas, + 'schedule_controller', + side_effect=schedule_side_effect + ): + # The original exception should be raised, not the recovery exception + with pytest.raises(RuntimeError, match='Simulated update failure'): + canvas.update_controller( + controller, + update=nifi.ControllerServiceDTO(comments='should fail'), + auto_disable=True + ) + + # Verify the recovery was attempted (2 calls to schedule_controller) + assert call_count[0] == 2 + + +def test_property_none_clears_static_property(fix_proc): + """Test that setting a static property to None clears/unsets it.""" + proc = fix_proc.generate() + # Set Custom Text to a value + canvas.update_processor( + proc, update=nifi.ProcessorConfigDTO(properties={'Custom Text': 'hello'}) + ) + proc = canvas.get_processor(proc.id, 'id') + assert proc.component.config.properties['Custom Text'] == 'hello' + + # Clear it with None + canvas.update_processor( + proc, update=nifi.ProcessorConfigDTO(properties={'Custom Text': None}) + ) + proc = canvas.get_processor(proc.id, 'id') + assert proc.component.config.properties['Custom Text'] is None + + +def test_property_empty_string_sets_empty(fix_proc): + """Test that setting a property to empty string keeps it as empty string.""" + proc = fix_proc.generate() + # Set Custom Text to empty string + canvas.update_processor( + proc, update=nifi.ProcessorConfigDTO(properties={'Custom Text': ''}) + ) + proc = canvas.get_processor(proc.id, 'id') + assert proc.component.config.properties['Custom Text'] == '' + + +def test_dynamic_property_none_deletes(fix_proc): + """Test that setting a dynamic property to None deletes it entirely.""" + # Create UpdateAttribute which supports dynamic properties + root_id = canvas.get_root_pg_id() + proc_type = canvas.get_processor_type('UpdateAttribute') + proc = canvas.create_processor(root_id, proc_type, (300, 300), conftest.test_basename + '_dyn') + + try: + # Add a dynamic property + canvas.update_processor( + proc, update=nifi.ProcessorConfigDTO(properties={'my.test.attr': 'test_value'}) + ) + proc = canvas.get_processor(proc.id, 'id') + assert 'my.test.attr' in proc.component.config.properties + assert proc.component.config.properties['my.test.attr'] == 'test_value' + assert 'my.test.attr' in proc.component.config.descriptors + assert proc.component.config.descriptors['my.test.attr'].dynamic is True + + # Delete it with None + canvas.update_processor( + proc, update=nifi.ProcessorConfigDTO(properties={'my.test.attr': None}) + ) + proc = canvas.get_processor(proc.id, 'id') + assert 'my.test.attr' not in proc.component.config.properties + assert 'my.test.attr' not in proc.component.config.descriptors + finally: + canvas.delete_processor(proc, force=True) + + +def test_dynamic_property_empty_string_keeps(fix_proc): + """Test that setting a dynamic property to empty string keeps it.""" + root_id = canvas.get_root_pg_id() + proc_type = canvas.get_processor_type('UpdateAttribute') + proc = canvas.create_processor(root_id, proc_type, (350, 350), conftest.test_basename + '_dyn2') + + try: + # Add a dynamic property + canvas.update_processor( + proc, update=nifi.ProcessorConfigDTO(properties={'my.test.attr': 'test_value'}) + ) + proc = canvas.get_processor(proc.id, 'id') + assert 'my.test.attr' in proc.component.config.properties + + # Set to empty string - should keep property + canvas.update_processor( + proc, update=nifi.ProcessorConfigDTO(properties={'my.test.attr': ''}) + ) + proc = canvas.get_processor(proc.id, 'id') + assert 'my.test.attr' in proc.component.config.properties + assert proc.component.config.properties['my.test.attr'] == '' + assert 'my.test.attr' in proc.component.config.descriptors + finally: + canvas.delete_processor(proc, force=True) + + def test_get_flowfile_details(fix_proc): """Test getting full FlowFile details including attributes.""" f_p1 = fix_proc.generate() diff --git a/tests/test_parameters.py b/tests/test_parameters.py index 4f9995df..3b7b8c4d 100644 --- a/tests/test_parameters.py +++ b/tests/test_parameters.py @@ -348,6 +348,84 @@ def test_prepare_parameter_with_asset(fix_context): parameters.delete_asset(context_id=c1.id, asset_id=asset["id"]) +def test_prepare_parameter_with_multiple_assets(fix_context): + """Test preparing a parameter that references multiple assets.""" + if check_version('1.10.0') > 0: + pytest.skip("NiFi not 1.10+") + + c1 = fix_context.generate() + + # Upload multiple assets + asset1 = parameters.upload_asset( + context_id=c1.id, + file_bytes=b"ActiveMQ client JAR", + filename="activemq-client-5.18.0.jar" + ) + asset2 = parameters.upload_asset( + context_id=c1.id, + file_bytes=b"ActiveMQ broker JAR", + filename="activemq-broker-5.18.0.jar" + ) + + # Prepare a parameter referencing multiple assets + param = parameters.prepare_parameter_with_asset( + name="JMS Client Libraries", + assets=[asset1, asset2], + description="ActiveMQ client JARs" + ) + + # Verify the parameter structure + assert param.parameter.name == "JMS Client Libraries" + assert param.parameter.description == "ActiveMQ client JARs" + assert len(param.parameter.referenced_assets) == 2 + asset_ids = {a.id for a in param.parameter.referenced_assets} + assert asset1["id"] in asset_ids + assert asset2["id"] in asset_ids + + # Clean up + parameters.delete_asset(context_id=c1.id, asset_id=asset1["id"]) + parameters.delete_asset(context_id=c1.id, asset_id=asset2["id"]) + + +def test_prepare_parameter_with_asset_validation(): + """Test validation errors for prepare_parameter_with_asset.""" + if check_version('1.10.0') > 0: + pytest.skip("NiFi not 1.10+") + + # Cannot specify both single and multiple assets + with pytest.raises(ValueError) as exc_info: + parameters.prepare_parameter_with_asset( + name="Test", + asset_id="123", + asset_name="test.jar", + assets=[{"id": "456", "name": "other.jar"}] + ) + assert "Cannot specify both" in str(exc_info.value) + + # Must specify at least one + with pytest.raises(ValueError) as exc_info: + parameters.prepare_parameter_with_asset(name="Test") + assert "Must specify either" in str(exc_info.value) + + # Single asset requires both id and name + with pytest.raises(ValueError) as exc_info: + parameters.prepare_parameter_with_asset(name="Test", asset_id="123") + assert "Both asset_id and asset_name are required" in str(exc_info.value) + + # Assets list must have valid dicts + with pytest.raises(ValueError) as exc_info: + parameters.prepare_parameter_with_asset( + name="Test", + assets=[{"id": "123"}] # Missing 'name' + ) + assert "must be a dict with 'id' and 'name' keys" in str(exc_info.value) + + # Assets cannot be empty list + with pytest.raises(ValueError) as exc_info: + parameters.prepare_parameter_with_asset(name="Test", assets=[]) + assert "non-empty list" in str(exc_info.value) + + def test_upload_asset_and_link_to_parameter(fix_context): """Test full workflow: upload asset and link it to a parameter.""" if check_version('1.10.0') > 0: