Skip to content
This repository was archived by the owner on Oct 21, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions argo/workflows/dsl/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,26 @@
from argo.workflows.client.models import V1alpha1WorkflowStatus
from argo.workflows.client.models import V1ObjectMeta
from argo.workflows.client.models import V1alpha1WorkflowCreateRequest
from mergedict import ConfigDict

from ._base import Prop
from ._base import Spec
from . import _utils

__all__ = ["Workflow"]


_LOGGER = logging.getLogger(__name__)


class WorkflowMeta(ABCMeta):

__model__ = V1alpha1Workflow

def __new__(
cls,
name: Union[str, Type["Workflow"]],
bases: Tuple[Type["Workflow"], ...],
props: Dict[str, Any],
**kwargs,
cls,
name: Union[str, Type["Workflow"]],
bases: Tuple[Type["Workflow"], ...],
props: Dict[str, Any],
**kwargs,
):
workflow_name = dasherize(underscore(name))

Expand Down Expand Up @@ -90,12 +89,12 @@ def __new__(

@classmethod
def __compile(
cls,
klass: "Workflow",
name: str,
bases: Tuple[Type["Workflow"], ...],
props: Dict[str, Any],
**kwargs,
cls,
klass: "Workflow",
name: str,
bases: Tuple[Type["Workflow"], ...],
props: Dict[str, Any],
**kwargs,
):
tasks: List[V1alpha1DAGTask] = []
templates: List[V1alpha1Template] = []
Expand Down Expand Up @@ -142,14 +141,20 @@ def __compile(
templates.insert(0, main_template)

spec_dict: dict = klass.spec
spec_dict["entrypoint"] = spec_dict.get("entrypoint", "main")
parent_spec_dict: dict = bases[0].spec if isinstance(bases[0].spec, dict) else bases[
0].spec.to_dict()
config_spec_dict = ConfigDict(parent_spec_dict)

spec_dict["entrypoint"] = spec_dict.get("entrypoint",
parent_spec_dict.get("entrypoint", 'main'))
spec_dict["templates"] = templates
config_spec_dict.merge(spec_dict)

klass.spec: V1alpha1WorkflowSpec = V1alpha1WorkflowSpec(**spec_dict)
klass.spec: V1alpha1WorkflowSpec = V1alpha1WorkflowSpec(**config_spec_dict)

@classmethod
def __compile_closure(
cls, template: V1alpha1Template, scopes: Dict[str, Any] = None
cls, template: V1alpha1Template, scopes: Dict[str, Any] = None
) -> V1alpha1Template:
scopes = scopes or {}

Expand Down Expand Up @@ -386,11 +391,11 @@ def _compile(obj: Any):
return self.model

def submit(
self,
client: ApiClient,
namespace: str,
*,
parameters: Optional[Dict[str, str]] = None,
self,
client: ApiClient,
namespace: str,
*,
parameters: Optional[Dict[str, str]] = None,
) -> V1alpha1Workflow:
"""Submit an Argo Workflow to a given namespace.

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ inflection
pyyaml
requests
python-dateutil
mergedict

# --- development ---
# black
Expand Down