Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion extensions/business/deeploy/deeploy_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ def get_apps(
sender, inputs = self.deeploy_verify_and_get_inputs(request)
auth_result = self.deeploy_get_auth_result(inputs)

apps = self._get_online_apps(owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER])
apps = self._get_online_apps(
owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER],
project_id=inputs.get(DEEPLOY_KEYS.PROJECT_ID, None)
)

result = {
DEEPLOY_KEYS.STATUS : DEEPLOY_STATUS.SUCCESS,
Expand Down
10 changes: 9 additions & 1 deletion extensions/business/deeploy/deeploy_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2383,7 +2383,7 @@ def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, allow
#endfor each target node
return discovered_instances

def _get_online_apps(self, owner=None, target_nodes=None, job_id=None):
def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_id=None):
"""
if self.cfg_deeploy_verbose:
full_data = self.netmon.network_known_nodes()
Expand Down Expand Up @@ -2425,6 +2425,14 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None):
continue
filtered_result[node][app_name] = app_data
result = filtered_result
if project_id is not None:
filtered_result = self.defaultdict(dict)
for node, apps in result.items():
for app_name, app_data in apps.items():
if app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID, None) != project_id:
continue
filtered_result[node][app_name] = app_data
result = filtered_result
return result

# TODO: REMOVE THIS, once instance_id is coming from ui for instances that have to be updated
Expand Down
40 changes: 30 additions & 10 deletions extensions/business/deeploy/deeploy_target_nodes_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
DEEPLOY_KEYS,
DEEPLOY_RESOURCES,
DEFAULT_CONTAINER_RESOURCES,
CONTAINER_APP_RUNNER_SIGNATURE,
CONTAINERIZED_APPS_SIGNATURES,
JOB_APP_TYPES,
)
Expand All @@ -29,6 +28,27 @@ def Pd(self, s, *args, **kwargs):
return


def _has_containerized_plugins(self, plugin_signatures, all_must_match=False):
"""
Check plugin signatures for containerized app types.

Args:
plugin_signatures: Collection of plugin signatures to check
all_must_match: If True, all signatures must be containerized.
If False (default), at least one must be containerized.
Returns:
bool: True if the check passes
"""
if all_must_match:
return all(
str(sig).upper() in CONTAINERIZED_APPS_SIGNATURES
for sig in plugin_signatures
)
return any(
str(sig).upper() in CONTAINERIZED_APPS_SIGNATURES
for sig in plugin_signatures
)

def _parse_memory(self, mem):
"""
Convert memory string to bytes.
Expand All @@ -53,7 +73,7 @@ def _parse_memory(self, mem):
else:
return int(float(mem)) # assume bytes

def _get_request_plugin_signatures(self, inputs):
def _get_request_plugin_signatures_from_pipeline(self, inputs):
"""
Extract plugin signatures from normalized request payload.
Returns a set of upper-cased signatures covering both legacy and plugins-array formats.
Expand Down Expand Up @@ -194,7 +214,7 @@ def __find_suitable_nodes_for_container_app(self, nodes_with_resources, containe
continue

pipeline_plugins = pipeline_data.get(NetMonCt.PLUGINS, [])
has_different_signatures = not all(sign == CONTAINER_APP_RUNNER_SIGNATURE for sign in pipeline_plugins.keys()) #FIX CAR OR WORKER APP RUNNER
has_different_signatures = not self._has_containerized_plugins(pipeline_plugins.keys(), all_must_match=True)

if has_different_signatures:
self.Pd(f"Node {addr} has pipeline '{pipeline_name}' with Native Apps signature. Plugin signatures: {list(pipeline_plugins.keys())}. Skipping node...")
Expand Down Expand Up @@ -224,7 +244,7 @@ def __find_suitable_nodes_for_container_app(self, nodes_with_resources, containe
self.Pd(f" Pipeline '{pipeline_name}' last_config: {last_config} (ts: {ts})")

if skip_node:
self.Pd(f"Node {addr} skipped due to incompatible pipeline signatures")
self.Pd(f"Node {addr} skipped, as it's running a pipeline with a native app.")
continue
self.Pd(f"Node {addr} has {self.json_dumps(used_container_resources)} used container resources.")
# Sum up resources used by node.
Expand Down Expand Up @@ -344,8 +364,8 @@ def __check_nodes_capabilities_and_extract_resources(self, nodes: list['str'], i
node_req_memory = node_res_req.get(DEEPLOY_RESOURCES.MEMORY)
node_req_memory_bytes = self._parse_memory(node_req_memory)
job_tags = inputs.get(DEEPLOY_KEYS.JOB_TAGS, [])
plugin_signatures = self._get_request_plugin_signatures(inputs)
requires_container_capabilities = CONTAINER_APP_RUNNER_SIGNATURE in plugin_signatures
plugin_signatures = self._get_request_plugin_signatures_from_pipeline(inputs)
requires_container_capabilities = self._has_containerized_plugins(plugin_signatures)

suitable_nodes_with_resources = {}
for addr in nodes:
Expand Down Expand Up @@ -391,8 +411,8 @@ def __check_nodes_capabilities_and_extract_resources(self, nodes: list['str'], i
def _find_nodes_for_deeployment(self, inputs):
# Get required resources from the request
required_resources = self._aggregate_container_resources(inputs) or {}
plugin_signatures = self._get_request_plugin_signatures(inputs)
has_container_plugins = any(signature in plugin_signatures for signature in CONTAINERIZED_APPS_SIGNATURES)
plugin_signatures = self._get_request_plugin_signatures_from_pipeline(inputs)
has_container_plugins = self._has_containerized_plugins(plugin_signatures)
target_nodes_count = inputs.get(DEEPLOY_KEYS.TARGET_NODES_COUNT, None)

if not target_nodes_count:
Expand Down Expand Up @@ -562,8 +582,8 @@ def check_node_available_resources(self, addr, inputs):
DEEPLOY_RESOURCES.REQUIRED: {}
}

plugin_signatures = self._get_request_plugin_signatures(inputs)
has_container_plugins = CONTAINER_APP_RUNNER_SIGNATURE in plugin_signatures
plugin_signatures = self._get_request_plugin_signatures_from_pipeline(inputs)
has_container_plugins = self._has_containerized_plugins(plugin_signatures)
if not has_container_plugins:
return result

Expand Down
4 changes: 2 additions & 2 deletions plugins/data/tutorials/sensibo_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"CAP_RESOLUTION" : 0.5,
"TYPE": "SensiboSimple",

"SENSIBO_DEVICE_NAME" : "Alex's device",
"SENSIBO_DEVICE_NAME" : "<device name>",
"SENSIBO_API_KEY" : "0B073b470DeXHoqmXmdeBpVzBbHcLh",

"URL" : ""
Expand All @@ -48,7 +48,7 @@
**DataCaptureThread.CONFIG,

"SENSIBO_API_KEY" : "0B073b470DeXHoqmXmdeBpVzBbHcLh",
"SENSIBO_DEVICE_NAME" : "Alex's device",
"SENSIBO_DEVICE_NAME" : "",


'VALIDATION_RULES' : {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""

TODO:

1. Review and fix Sensibo DCT
2. Configure outlier proba
3. fit-predict at each step
4. Add plugin with alert set to 2-3 successive positives

"""

from naeural_core.serving.base import ModelServingProcess as BaseServingProcess
from naeural_core.utils.basic_anomaly_model import BasicAnomalyModel

__VER__ = '0.1.0.0'

_CONFIG = {
**BaseServingProcess.CONFIG,

"PICKED_INPUT" : "STRUCT_DATA",

"RUNS_ON_EMPTY_INPUT" : False,

'VALIDATION_RULES': {
**BaseServingProcess.CONFIG['VALIDATION_RULES'],

},

}

class SimpleSensorAnomalyDetector(BaseServingProcess):


def on_init(self):
self._counter = 0
# check some params that can be re-configured from biz plugins or (lower priority)
# serving env in config_startup.txt

self.model = BasicAnomalyModel()
return


def pre_process(self, inputs):
debug = False
lst_inputs = inputs.get('DATA', [])
serving_params = inputs.get('SERVING_PARAMS', [])
if len(serving_params) > 0:
if isinstance(serving_params[0], dict):
debug = serving_params[0].get('SHOW_EXTRA_DEBUG', False)
if debug:
self.P("Inference step info:\n - Detected 'SERVING_PARAMS': {}\n - Inputs: {}".format(
self.json_dumps(serving_params, indent=4),
self.json_dumps(inputs, indent=4)
))

preprocessed = []
for i, inp in enumerate(lst_inputs):
params = serving_params[i].get('TEST_INFERENCE_PARAM', None) if i < len(serving_params) else None
preprocessed.append([
inp.get('OBS') if isinstance(inp, dict) else 0,
params,
]
)
return preprocessed


def predict(self, inputs):
self._counter += 1
dummy_result = []
for inp in inputs:
# for each stream input
input_data = inp[0]
input_params = inp[1]
model = lambda x: int(round(x)) % 2 == 0
dummy_result.append(
[model(input_data), self._counter, input_data, input_params]
)
dummy_result = self.np.array(dummy_result)
return dummy_result


def post_process(self, preds):
result = [{'pred': x[0], 'cnt': x[1], 'inp':x[2], 'cfg':x[3]} for x in preds]
return result


2 changes: 1 addition & 1 deletion ver.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__VER__ = '2.9.990'
__VER__ = '2.10.0'
Loading