Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 234 additions & 8 deletions nipyapi/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
"schedule_port",
"get_funnel",
"update_processor",
"prepare_processor_config",
"prepare_controller_config",
"purge_connection",
"list_flowfiles",
"get_flowfile_details",
Expand Down Expand Up @@ -1035,6 +1037,190 @@ def update_process_group(pg, update, refresh=True, greedy=True, identifier_type=
)


def _prepare_component_config(descriptors, component_type, dto_class, properties, allow_dynamic):
"""
Internal helper for prepare_processor_config and prepare_controller_config.

Consolidates the common validation and DTO preparation logic.

Args:
descriptors (dict): Dict of property descriptors from the component
component_type (str): String type name for error messages
dto_class (type): The DTO class to instantiate (ProcessorConfigDTO or
ControllerServiceDTO)
properties (dict or None): Dict of property key -> value, or None for
discovery mode
allow_dynamic (bool): Whether to skip validation

Returns:
list[str]: If properties is None, returns list of valid static property keys.
dto_class: If properties provided, returns instance with properties set.
"""
valid_keys = [key for key, desc in descriptors.items() if not desc.dynamic]

if properties is None:
return valid_keys

if not allow_dynamic:
invalid_keys = set(properties.keys()) - set(valid_keys)
if invalid_keys:
raise ValueError(
f"Property keys not in static descriptors: {sorted(invalid_keys)}\n"
f"Valid static keys for {component_type}:\n"
f" {sorted(valid_keys)}\n"
f"Use allow_dynamic=True to create dynamic properties intentionally."
)

return dto_class(properties=properties)


def prepare_processor_config(processor, properties=None, allow_dynamic=False):
"""
Discover valid property keys or prepare a validated ProcessorConfigDTO.

Two modes based on whether properties is provided:
- Discovery: Call with just processor to get list of valid property keys
- Prepare: Call with properties dict to get validated ProcessorConfigDTO

This helps prevent silent creation of dynamic properties from typos or
guessed property names. NiFi accepts any property name and silently creates
dynamic properties for unknown keys, which can cause hard-to-debug issues.

Property Value Semantics:
- ``value`` (string): Set property to that value
- ``''`` (empty string): Set property to empty string
- ``None``: For static properties, clears/unsets the value.
For dynamic properties, **deletes the property entirely**.

Args:
processor (ProcessorEntity or str): The processor to inspect, as a
ProcessorEntity object, processor ID, or processor name.
properties (dict, optional): Property key -> value mapping. If None,
returns list of valid property keys (discovery mode).
allow_dynamic (bool): If True, skip validation. Use for processors
like UpdateAttribute that expect dynamic properties. Default False.

Returns:
list[str]: If properties is None, returns list of valid property keys.
ProcessorConfigDTO: If properties provided, returns validated DTO
ready to pass to update_processor().

Raises:
ValueError: If any property key not found in processor's descriptors
(unless allow_dynamic=True). Error includes list of valid keys.

Example - Discovery::

>>> keys = nipyapi.canvas.prepare_processor_config(proc)
['SQL Query', 'Database Connection Pooling Service', 'Max Wait Time', ...]

Example - Prepare::

>>> config = nipyapi.canvas.prepare_processor_config(proc, {
... 'SQL Query': 'SELECT * FROM users',
... 'Database Connection Pooling Service': controller.id
... })
>>> nipyapi.canvas.update_processor(proc, update=config, auto_stop=True)

Example - Dynamic properties (UpdateAttribute)::

>>> config = nipyapi.canvas.prepare_processor_config(proc, {
... 'my.custom.attr': '${filename}'
... }, allow_dynamic=True)

Example - Clear static property::

>>> config = nipyapi.canvas.prepare_processor_config(proc, {
... 'Custom Text': None # Clears to unset state
... })

Example - Delete dynamic property::

>>> config = nipyapi.canvas.prepare_processor_config(proc, {
... 'my.custom.attr': None # Removes the dynamic property entirely
... }, allow_dynamic=True)
"""
processor = nipyapi.utils.resolve_entity(
processor,
get_processor,
nipyapi.nifi.ProcessorEntity,
strict=True,
)
return _prepare_component_config(
processor.component.config.descriptors,
processor.component.type,
nipyapi.nifi.ProcessorConfigDTO,
properties,
allow_dynamic,
)


def prepare_controller_config(controller, properties=None, allow_dynamic=False):
"""
Discover valid property keys or prepare a validated ControllerServiceDTO.

Two modes based on whether properties is provided:
- Discovery: Call with just controller to get list of valid property keys
- Prepare: Call with properties dict to get validated ControllerServiceDTO

This helps prevent silent creation of dynamic properties from typos or
guessed property names.

Property Value Semantics:
- ``value`` (string): Set property to that value
- ``''`` (empty string): Set property to empty string
- ``None``: For static properties, clears/unsets the value.
For dynamic properties, **deletes the property entirely**.

Args:
controller (ControllerServiceEntity or str): The controller to inspect,
as a ControllerServiceEntity object, controller ID, or controller name.
properties (dict, optional): Property key -> value mapping. If None,
returns list of valid property keys (discovery mode).
allow_dynamic (bool): If True, skip validation. Default False.

Returns:
list[str]: If properties is None, returns list of valid property keys.
ControllerServiceDTO: If properties provided, returns validated DTO
ready to pass to update_controller().

Raises:
ValueError: If any property key not found in controller's descriptors
(unless allow_dynamic=True). Error includes list of valid keys.

Example - Discovery::

>>> keys = nipyapi.canvas.prepare_controller_config(cs)
['Schema Access Strategy', 'Schema Registry', 'Schema Name', ...]

Example - Prepare::

>>> config = nipyapi.canvas.prepare_controller_config(cs, {
... 'Schema Access Strategy': 'schema-name'
... })
>>> nipyapi.canvas.update_controller(cs, update=config)

Example - Clear property::

>>> config = nipyapi.canvas.prepare_controller_config(cs, {
... 'Schema Name': None # Clears to unset state
... })
"""
controller = nipyapi.utils.resolve_entity(
controller,
get_controller,
nipyapi.nifi.ControllerServiceEntity,
strict=True,
)
return _prepare_component_config(
controller.component.descriptors,
controller.component.type,
nipyapi.nifi.ControllerServiceDTO,
properties,
allow_dynamic,
)


def update_processor(processor, update=None, name=None, refresh=True, auto_stop=False):
"""
Updates a Processor's configuration and/or name.
Expand Down Expand Up @@ -2052,10 +2238,17 @@ def _del_cont(cont_id):
return result


def update_controller(controller, update, refresh=True, greedy=True, identifier_type="auto"):
def update_controller( # pylint: disable=too-many-arguments,too-many-positional-arguments
controller, update, refresh=True, greedy=True, identifier_type="auto", auto_disable=False
):
"""
Updates the Configuration of a Controller Service

Note:
This function does not handle referencing components. If the controller
has processors or other controllers that reference it, you must stop/disable
those separately before updating.

Args:
controller (ControllerServiceEntity or str): Target Controller to update,
as a ControllerServiceEntity object, controller ID, or controller name.
Expand All @@ -2066,13 +2259,16 @@ def update_controller(controller, update, refresh=True, greedy=True, identifier_
greedy (bool): For name lookup, True for partial match, False for exact.
identifier_type (str): How to interpret string identifier:
"auto" (default) detects UUID vs name, "id" or "name" to force.
auto_disable (bool): If True, automatically disable the controller before
updating and re-enable afterward if it was enabled. Default False.

Returns:
(ControllerServiceEntity)

Raises:
TypeError: If controller is not a string or ControllerServiceEntity.
ValueError: If controller not found or multiple matches found.
ValueError: If controller not found, multiple matches found, or
controller is enabled and auto_disable=False.

"""
controller = nipyapi.utils.resolve_entity(
Expand All @@ -2088,14 +2284,44 @@ def update_controller(controller, update, refresh=True, greedy=True, identifier_
if refresh:
controller = get_controller(controller.id, "id")

was_enabled = controller.component.state == "ENABLED"

if was_enabled and not auto_disable:
raise ValueError(
f"Controller '{controller.component.name}' is enabled. "
"Disable it first or set auto_disable=True. "
"Note: auto_disable does not handle referencing components."
)

if was_enabled:
schedule_controller(controller, scheduled=False, refresh=True)
controller = get_controller(controller.id, "id")

# Insert the ID into the update
update.id = controller.id
return nipyapi.nifi.ControllerServicesApi().update_controller_service(
id=controller.id,
body=nipyapi.nifi.ControllerServiceEntity(
component=update, revision=controller.revision, id=controller.id
),
)
try:
result = nipyapi.nifi.ControllerServicesApi().update_controller_service(
id=controller.id,
body=nipyapi.nifi.ControllerServiceEntity(
component=update, revision=controller.revision, id=controller.id
),
)
except Exception:
# Attempt to restore enabled state if update fails
if was_enabled:
try:
schedule_controller(
get_controller(controller.id, "id"), scheduled=True, refresh=True
)
except Exception: # pylint: disable=broad-except
pass # Best effort - original exception takes priority
raise

if was_enabled:
schedule_controller(result, scheduled=True, refresh=True)
result = get_controller(result.id, "id")

return result


def schedule_controller(controller, scheduled, refresh=False, greedy=True, identifier_type="auto"):
Expand Down
9 changes: 7 additions & 2 deletions nipyapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import os
import sys

import urllib3


def _detect_output_format():
"""
Expand Down Expand Up @@ -425,10 +427,13 @@ def _apply_verbosity(verbosity):

def main():
"""CLI entry point."""
# Disable pager for help output so agents don't hang waiting for input
# Only set when help is requested or in non-interactive/CI environments
if "--help" in sys.argv or "-h" in sys.argv or not sys.stdout.isatty() or os.environ.get("CI"):
os.environ.setdefault("PAGER", "cat")

# Suppress SSL warnings early to prevent them polluting stdout in CI
# This is safe as the warnings are informational and CLI users expect clean output
import urllib3

if os.environ.get("NIFI_VERIFY_SSL", "true").lower() in ("false", "0", "no"):
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Expand Down
Loading