From 508ea448e56879c73acda9f396374ddf63539e7a Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 1 Jun 2018 14:25:21 -0700 Subject: [PATCH 01/11] start datadisk implementation --- aztk/client.py | 27 ++++++++++------ aztk/core/models/model.py | 2 +- aztk/models/__init__.py | 1 + aztk/models/cluster_configuration.py | 4 ++- aztk/models/data_disk.py | 4 +++ aztk/models/vm_image.py | 3 +- aztk/spark/client.py | 9 ++++-- aztk/spark/models/models.py | 4 +++ .../spark/endpoints/cluster/cluster_create.py | 31 ++++++++++++------- 9 files changed, 59 insertions(+), 26 deletions(-) create mode 100644 aztk/models/data_disk.py diff --git a/aztk/client.py b/aztk/client.py index 03b03fa3..c9bfc184 100644 --- a/aztk/client.py +++ b/aztk/client.py @@ -4,17 +4,18 @@ import azure.batch.models as batch_models import azure.batch.models.batch_error as batch_error +from azure.batch.models import batch_error as batch_error +from Cryptodome.PublicKey import RSA + +import aztk.models as models import aztk.utils.azure_api as azure_api import aztk.utils.constants as constants import aztk.utils.get_ssh_key as get_ssh_key import aztk.utils.helpers as helpers import aztk.utils.ssh as ssh_lib -import aztk.models as models -import azure.batch.models as batch_models -from azure.batch.models import batch_error -from Cryptodome.PublicKey import RSA from aztk.internal import cluster_data + class Client: def __init__(self, secrets_config: models.SecretsConfiguration): self.secrets_config = secrets_config @@ -65,14 +66,14 @@ def __delete_pool_and_job(self, pool_id: str, keep_logs: bool = False): return job_exists or pool_exists - def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel): + def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, vm_image_model): """ Create a pool and job :param cluster_conf: the configuration object used to create the cluster :type cluster_conf: aztk.models.ClusterConfiguration :parm software_metadata_key: the id of the software being used on the cluster :param start_task: the start task for the cluster - :param VmImageModel: the type of image to provision for the cluster + :param vm_image_model: the type of image to provision for the cluster :param wait: wait until the cluster is ready """ self._get_cluster_data(cluster_conf.cluster_id).save_cluster_config(cluster_conf) @@ -83,7 +84,7 @@ def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, softw # Get a verified node agent sku sku_to_use, image_ref_to_use = \ helpers.select_latest_verified_vm_image_with_node_agent_sku( - VmImageModel.publisher, VmImageModel.offer, VmImageModel.sku, self.batch_client) + vm_image_model.publisher, vm_image_model.offer, vm_image_model.sku, self.batch_client) network_conf = None if cluster_conf.subnet_id is not None: @@ -97,7 +98,11 @@ def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, softw id=pool_id, virtual_machine_configuration=batch_models.VirtualMachineConfiguration( image_reference=image_ref_to_use, - node_agent_sku_id=sku_to_use), + node_agent_sku_id=sku_to_use, + data_disks=[batch_models.DataDisk( + lun=i, + disk_size_gb=data_disk.disk_size_gb + ) for i, data_disk in enumerate(vm_image_model.data_disks)]), vm_size=cluster_conf.vm_size, enable_auto_scale=True, auto_scale_formula=auto_scale_formula, @@ -324,7 +329,11 @@ def __submit_job(self, display_name=job_configuration.id, virtual_machine_configuration=batch_models.VirtualMachineConfiguration( image_reference=image_ref_to_use, - node_agent_sku_id=sku_to_use), + node_agent_sku_id=sku_to_use, + data_disks=[batch_models.DataDisk( + lun=i, + disk_size_gb=data_disk.disk_size_gb + ) for i, data_disk in enumerate(vm_image_model.data_disks)]), vm_size=job_configuration.vm_size, enable_auto_scale=True, auto_scale_formula=autoscale_formula, diff --git a/aztk/core/models/model.py b/aztk/core/models/model.py index 6f016f49..8ef194e5 100644 --- a/aztk/core/models/model.py +++ b/aztk/core/models/model.py @@ -90,7 +90,7 @@ def validate(self): def merge(self, other): if not isinstance(other, self.__class__): raise AztkError("Cannot merge {0} as is it not an instance of {1}".format(other, self.__class__.__name__)) - + for field in other._fields.values(): if field in other._data: field.merge(self, other._data[field]) diff --git a/aztk/models/__init__.py b/aztk/models/__init__.py index eb9b2de3..2ee236cd 100644 --- a/aztk/models/__init__.py +++ b/aztk/models/__init__.py @@ -13,6 +13,7 @@ from .remote_login import RemoteLogin from .ssh_log import SSHLog from .vm_image import VmImage +from .data_disk import DataDisk from .software import Software from .cluster import Cluster from .scheduling_target import SchedulingTarget diff --git a/aztk/models/cluster_configuration.py b/aztk/models/cluster_configuration.py index 7e2c4b85..d69ec377 100644 --- a/aztk/models/cluster_configuration.py +++ b/aztk/models/cluster_configuration.py @@ -3,6 +3,7 @@ from aztk.utils import deprecated,deprecate, helpers from .custom_script import CustomScript +from .data_disk import DataDisk from .file_share import FileShare from .plugins import PluginConfiguration from .toolkit import Toolkit @@ -36,6 +37,7 @@ class ClusterConfiguration(Model): plugins = fields.List(PluginConfiguration) custom_scripts = fields.List(CustomScript) file_shares = fields.List(FileShare) + data_disks = fields.List(DataDisk) user_configuration = fields.Model(UserConfiguration, default=None) scheduling_target = fields.Enum(SchedulingTarget, default=None) @@ -103,5 +105,5 @@ def __validate__(self) -> bool: if self.custom_scripts: deprecate("Custom scripts are DEPRECATED and will be removed in 0.8.0. Use plugins instead See https://aztk.readthedocs.io/en/v0.7.0/15-plugins.html") - if self.scheduling_target == SchedulingTarget.Dedicated and self.vm_count == 0: + if self.scheduling_target == SchedulingTarget.Dedicated and self.size == 0: raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0") diff --git a/aztk/models/data_disk.py b/aztk/models/data_disk.py new file mode 100644 index 00000000..8b6e4887 --- /dev/null +++ b/aztk/models/data_disk.py @@ -0,0 +1,4 @@ +from aztk.core.models import Model, fields + +class DataDisk(Model): + disk_size_gb = fields.Integer() diff --git a/aztk/models/vm_image.py b/aztk/models/vm_image.py index baa3483c..f1c76022 100644 --- a/aztk/models/vm_image.py +++ b/aztk/models/vm_image.py @@ -1,5 +1,6 @@ class VmImage: - def __init__(self, publisher, offer, sku): + def __init__(self, publisher, offer, sku, data_disks): self.publisher = publisher self.offer = offer self.sku = sku + self.data_disks = data_disks diff --git a/aztk/spark/client.py b/aztk/spark/client.py index c42e809b..c9656add 100644 --- a/aztk/spark/client.py +++ b/aztk/spark/client.py @@ -33,7 +33,7 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = Returns: aztk.spark.models.Cluster """ - cluster_conf = _apply_default_for_cluster_config(cluster_conf) + # cluster_conf = _apply_default_for_cluster_config(cluster_conf) cluster_conf.validate() cluster_data = self._get_cluster_data(cluster_conf.cluster_id) @@ -53,11 +53,11 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = cluster_conf.worker_on_master) software_metadata_key = "spark" - vm_image = models.VmImage( publisher='Canonical', offer='UbuntuServer', - sku='16.04') + sku='16.04', + data_disks=cluster_conf.data_disks) cluster = self.__create_pool_and_job( cluster_conf, software_metadata_key, start_task, vm_image) @@ -328,10 +328,13 @@ def _default_scheduling_target(vm_count: int): return models.SchedulingTarget.Dedicated def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration): + print("before", configuration.data_disks) cluster_conf = models.ClusterConfiguration() cluster_conf.merge(configuration) if cluster_conf.scheduling_target is None: cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size) + print("after", cluster_conf.data_disks) + return cluster_conf def _apply_default_for_job_config(job_conf: models.JobConfiguration): diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index f96d00b2..0536c8d7 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -108,6 +108,10 @@ class SecretsConfiguration(aztk.models.SecretsConfiguration): pass +class DataDisk(aztk.models.DataDisk): + pass + + class VmImage(aztk.models.VmImage): pass diff --git a/aztk_cli/spark/endpoints/cluster/cluster_create.py b/aztk_cli/spark/endpoints/cluster/cluster_create.py index 1b02f551..9cfc62ca 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_create.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_create.py @@ -2,7 +2,7 @@ import typing import aztk.spark -from aztk.spark.models import ClusterConfiguration, UserConfiguration +from aztk.spark.models import ClusterConfiguration, UserConfiguration, DataDisk from aztk_cli import config, log, utils from aztk_cli.config import load_aztk_spark_config @@ -26,6 +26,8 @@ def setup_parser(parser: argparse.ArgumentParser): (/:)') parser.add_argument('--subnet-id', help='The subnet in which to create the cluster.') + parser.add_argument('--data-disk-size', type=int, + help="Size in GB of additional local disk storage on each node.") parser.add_argument('--no-wait', dest='wait', action='store_false') parser.add_argument('--wait', dest='wait', action='store_true') @@ -40,16 +42,23 @@ def execute(args: typing.NamedTuple): # read cluster.yaml configuartion file, overwrite values with args file_config, wait = config.read_cluster_config() cluster_conf.merge(file_config) - cluster_conf.merge(ClusterConfiguration( - cluster_id=args.cluster_id, - size=args.size, - size_low_priority=args.size_low_priority, - vm_size=args.vm_size, - subnet_id=args.subnet_id, - user_configuration=UserConfiguration( - username=args.username, - password=args.password, - ))) + cluster_conf.merge( + ClusterConfiguration( + cluster_id=args.cluster_id, + size=args.size, + size_low_priority=args.size_low_priority, + vm_size=args.vm_size, + subnet_id=args.subnet_id, + user_configuration=UserConfiguration( + username=args.username, + password=args.password, + ), + ) + ) + + if args.data_disk_size: + cluster_conf.data_disks.append(DataDisk(disk_size_gb=args.data_disk_size)) + if args.docker_repo and cluster_conf.toolkit: cluster_conf.toolkit.docker_repo = args.docker_repo From 0c4c60dc3ae66df8b7c28e873b6d3355546c29e7 Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 1 Jun 2018 17:55:00 -0700 Subject: [PATCH 02/11] fix fields list issue, continue data-disk implementation --- aztk/core/models/fields.py | 11 ++++++- aztk/node_scripts/install/install.py | 31 ++++++++++++++++-- aztk/node_scripts/install/mount_data_disk.sh | 8 +++++ aztk/spark/client.py | 5 ++- tests/core/test_models.py | 33 ++++++++++++++++++++ 5 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 aztk/node_scripts/install/mount_data_disk.sh diff --git a/aztk/core/models/fields.py b/aztk/core/models/fields.py index 20cad87a..8033a0db 100644 --- a/aztk/core/models/fields.py +++ b/aztk/core/models/fields.py @@ -142,6 +142,15 @@ def __set__(self, instance, value): value = [] super().__set__(instance, value) + def __get__(self, instance, _): + if instance is not None: + value = instance._data.get(self) + if value is None: + return instance._data.setdefault(self, self._default(instance)) + return value + + return self + def _resolve(self, value): result = [] for item in value: @@ -158,7 +167,7 @@ def merge(self, instance, value): value = [] if self.merge_strategy == ListMergeStrategy.Append: - current = instance._data.get(self) + current = instance._data.get(self) if current is None: current = [] value = current + value diff --git a/aztk/node_scripts/install/install.py b/aztk/node_scripts/install/install.py index 57957797..9321a087 100644 --- a/aztk/node_scripts/install/install.py +++ b/aztk/node_scripts/install/install.py @@ -1,9 +1,12 @@ import os -from core import config -from install import pick_master, spark, scripts, create_user, plugins, spark_container +import subprocess + import wait_until_master_selected -from aztk.models.plugins import PluginTarget from aztk.internal import cluster_data +from aztk.models.plugins import PluginTarget +from core import config +from install import (create_user, pick_master, plugins, scripts, spark, spark_container) + from .node_scheduling import setup_node_scheduling @@ -13,10 +16,32 @@ def read_cluster_config(): print("Got cluster config", cluster_config) return cluster_config + +def mount_data_disk(device_name): + p = subprocess.Popen([os.environ["AZTK_WORKING_DIR"] + "/mount_data_disk.sh", device_name], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, error = p.communicate() + print(output, error) + return p.returncode + + +def setup_data_disk(number): + # setup datadisks, starting with sdc + import string + chars = string.ascii_lowercase[2:] + for i in range(number): + mount_data_disk("/dev/sd" + chars[i]) + + def setup_host(docker_repo: str): """ Code to be run on the node(NOT in a container) """ + p = subprocess.Popen(["lsblk -lnS --sort name | wc -l"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, _ = p.communicate() + output = int(output) - 3 # by default, there are 3 devices on each host: sda, sdb, sr0 + if output > 2: + setup_data_disk(output) + client = config.batch_client create_user.create_user(batch_client=client) diff --git a/aztk/node_scripts/install/mount_data_disk.sh b/aztk/node_scripts/install/mount_data_disk.sh new file mode 100644 index 00000000..f55ed7ec --- /dev/null +++ b/aztk/node_scripts/install/mount_data_disk.sh @@ -0,0 +1,8 @@ +#!/bin/bash +devicename=$1 +parted --script --align optimal /dev/sdd mklabel gpt +parted --script --align optimal /dev/sdd mkpart primary ext4 0% 100% +mkfs.ext4 /dev/sdd1 +mkdir -p /mnt/data-disk1 +echo "/dev/sdd1 /mnt/data-disk1 auto defaults,nofail 0 0" >> /etc/fstab +mount /dev/sdd1 /mnt/data-disk1 diff --git a/aztk/spark/client.py b/aztk/spark/client.py index c9656add..b5d4fb62 100644 --- a/aztk/spark/client.py +++ b/aztk/spark/client.py @@ -33,7 +33,8 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = Returns: aztk.spark.models.Cluster """ - # cluster_conf = _apply_default_for_cluster_config(cluster_conf) + cluster_conf = _apply_default_for_cluster_config(cluster_conf) + cluster_conf.validate() cluster_data = self._get_cluster_data(cluster_conf.cluster_id) @@ -328,12 +329,10 @@ def _default_scheduling_target(vm_count: int): return models.SchedulingTarget.Dedicated def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration): - print("before", configuration.data_disks) cluster_conf = models.ClusterConfiguration() cluster_conf.merge(configuration) if cluster_conf.scheduling_target is None: cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size) - print("after", cluster_conf.data_disks) return cluster_conf diff --git a/tests/core/test_models.py b/tests/core/test_models.py index 3ec9682d..d0fb591a 100644 --- a/tests/core/test_models.py +++ b/tests/core/test_models.py @@ -294,6 +294,39 @@ class UserList(Model): assert obj1.infos[1].age == 38 +def test_merge_nested_model_append_strategy_initial_not_set(): + class UserList(Model): + infos = fields.List(UserInfo, merge_strategy=ListMergeStrategy.Append) + + obj1 = UserList() + obj1.infos.append(UserInfo( + name="John", + age=29, + )) + + obj2 = UserList( + infos=[ + dict( + name="Frank", + age=38, + ), + ], + ) + + assert len(obj1.infos) == 1 + assert len(obj2.infos) == 1 + assert obj1.infos[0].name == "John" + assert obj1.infos[0].age == 29 + assert obj2.infos[0].name == "Frank" + assert obj2.infos[0].age == 38 + + obj1.merge(obj2) + assert len(obj1.infos) == 2 + assert obj1.infos[0].name == "John" + assert obj1.infos[0].age == 29 + assert obj1.infos[1].name == "Frank" + assert obj1.infos[1].age == 38 + def test_serialize_simple_model_to_yaml(): info = UserInfo(name="John", age=29) output = yaml.dump(info) From 38e082235ff53b85ed711e28aa47786b67c89f7d Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 1 Jun 2018 17:59:42 -0700 Subject: [PATCH 03/11] devicename --- aztk/node_scripts/install/mount_data_disk.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/aztk/node_scripts/install/mount_data_disk.sh b/aztk/node_scripts/install/mount_data_disk.sh index f55ed7ec..cd09cbc0 100644 --- a/aztk/node_scripts/install/mount_data_disk.sh +++ b/aztk/node_scripts/install/mount_data_disk.sh @@ -1,8 +1,8 @@ #!/bin/bash devicename=$1 -parted --script --align optimal /dev/sdd mklabel gpt -parted --script --align optimal /dev/sdd mkpart primary ext4 0% 100% -mkfs.ext4 /dev/sdd1 +parted --script --align optimal ${devicename} mklabel gpt +parted --script --align optimal ${devicename} mkpart primary ext4 0% 100% +mkfs.ext4 ${devicename}1 mkdir -p /mnt/data-disk1 -echo "/dev/sdd1 /mnt/data-disk1 auto defaults,nofail 0 0" >> /etc/fstab -mount /dev/sdd1 /mnt/data-disk1 +echo "${devicename}1 /mnt/data-disk1 auto defaults,nofail 0 0" >> /etc/fstab +mount ${devicename}1 /mnt/data-disk1 From 94f299d9edfb80daccb277043ef98e6f1e96ffe4 Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 1 Jun 2018 18:07:35 -0700 Subject: [PATCH 04/11] debug --- aztk/node_scripts/install/install.py | 10 +++++----- aztk/node_scripts/install/mount_data_disk.sh | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/aztk/node_scripts/install/install.py b/aztk/node_scripts/install/install.py index 9321a087..efbe3cc4 100644 --- a/aztk/node_scripts/install/install.py +++ b/aztk/node_scripts/install/install.py @@ -17,10 +17,10 @@ def read_cluster_config(): return cluster_config -def mount_data_disk(device_name): - p = subprocess.Popen([os.environ["AZTK_WORKING_DIR"] + "/mount_data_disk.sh", device_name], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, error = p.communicate() - print(output, error) +def mount_data_disk(device_name, number): + p = subprocess.Popen([os.environ["AZTK_WORKING_DIR"] + "/mount_data_disk.sh", device_name, str(number)], shell=True) + # output, error = p.communicate() + # print(output, error) return p.returncode @@ -29,7 +29,7 @@ def setup_data_disk(number): import string chars = string.ascii_lowercase[2:] for i in range(number): - mount_data_disk("/dev/sd" + chars[i]) + mount_data_disk("/dev/sd" + chars[i], i) def setup_host(docker_repo: str): diff --git a/aztk/node_scripts/install/mount_data_disk.sh b/aztk/node_scripts/install/mount_data_disk.sh index cd09cbc0..bbf4565b 100644 --- a/aztk/node_scripts/install/mount_data_disk.sh +++ b/aztk/node_scripts/install/mount_data_disk.sh @@ -1,5 +1,6 @@ #!/bin/bash devicename=$1 +datadisknumber=$2 parted --script --align optimal ${devicename} mklabel gpt parted --script --align optimal ${devicename} mkpart primary ext4 0% 100% mkfs.ext4 ${devicename}1 From 65d6d4b679bb37d978d6d319e27be0863b7e84a8 Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 1 Jun 2018 18:15:11 -0700 Subject: [PATCH 05/11] fix path --- aztk/node_scripts/install/install.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aztk/node_scripts/install/install.py b/aztk/node_scripts/install/install.py index efbe3cc4..d946643f 100644 --- a/aztk/node_scripts/install/install.py +++ b/aztk/node_scripts/install/install.py @@ -18,7 +18,7 @@ def read_cluster_config(): def mount_data_disk(device_name, number): - p = subprocess.Popen([os.environ["AZTK_WORKING_DIR"] + "/mount_data_disk.sh", device_name, str(number)], shell=True) + p = subprocess.Popen([os.environ["AZTK_WORKING_DIR"] + "aztk/node_scripts/install/mount_data_disk.sh", device_name, str(number)], shell=True) # output, error = p.communicate() # print(output, error) return p.returncode From 77ef1bc72db63fbec439c4919617db560082b894 Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Mon, 4 Jun 2018 13:47:17 -0700 Subject: [PATCH 06/11] successfully mount data disk --- aztk/node_scripts/install/install.py | 4 +++- aztk/node_scripts/install/mount_data_disk.sh | 21 ++++++++++++++++---- aztk/node_scripts/setup_host.sh | 1 + 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/aztk/node_scripts/install/install.py b/aztk/node_scripts/install/install.py index d946643f..fe4e1450 100644 --- a/aztk/node_scripts/install/install.py +++ b/aztk/node_scripts/install/install.py @@ -18,7 +18,9 @@ def read_cluster_config(): def mount_data_disk(device_name, number): - p = subprocess.Popen([os.environ["AZTK_WORKING_DIR"] + "aztk/node_scripts/install/mount_data_disk.sh", device_name, str(number)], shell=True) + cmd = os.environ["AZTK_WORKING_DIR"] + "/aztk/node_scripts/install/mount_data_disk.sh " + device_name + " " + str(number) + print("mount disk cmd:", cmd) + p = subprocess.Popen([cmd], shell=True) # output, error = p.communicate() # print(output, error) return p.returncode diff --git a/aztk/node_scripts/install/mount_data_disk.sh b/aztk/node_scripts/install/mount_data_disk.sh index bbf4565b..d295e619 100644 --- a/aztk/node_scripts/install/mount_data_disk.sh +++ b/aztk/node_scripts/install/mount_data_disk.sh @@ -1,9 +1,22 @@ #!/bin/bash devicename=$1 datadisknumber=$2 + +device_partition_name="${devicename}1" +data_disk_mount_point="/data-disk${datadisknumber}" + +# make parition parted --script --align optimal ${devicename} mklabel gpt parted --script --align optimal ${devicename} mkpart primary ext4 0% 100% -mkfs.ext4 ${devicename}1 -mkdir -p /mnt/data-disk1 -echo "${devicename}1 /mnt/data-disk1 auto defaults,nofail 0 0" >> /etc/fstab -mount ${devicename}1 /mnt/data-disk1 + +# format partition +mkfs.ext4 ${device_partition_name} + +# make partition directory +mkdir -p ${data_disk_mount_point} + +# auto mount parition on reboot +echo "${device_partition_name} ${data_disk_mount_point} auto defaults,nofail 0 0" >> /etc/fstab + +# mount partition +mount ${device_partition_name} ${data_disk_mount_point} diff --git a/aztk/node_scripts/setup_host.sh b/aztk/node_scripts/setup_host.sh index 4cfbe282..2cba20ee 100644 --- a/aztk/node_scripts/setup_host.sh +++ b/aztk/node_scripts/setup_host.sh @@ -125,6 +125,7 @@ main () { # Unzip resource files and set permissions chmod 777 $AZTK_WORKING_DIR/aztk/node_scripts/docker_main.sh + chmod +x $AZTK_WORKING_DIR/aztk/node_scripts/install/mount_data_disk.sh # Check docker is running docker info > /dev/null 2>&1 From 975eba0d8669210c6056411f22fb35c8d4a8d60d Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Mon, 4 Jun 2018 14:01:50 -0700 Subject: [PATCH 07/11] fix job --- aztk/spark/client.py | 3 ++- aztk/spark/models/models.py | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/aztk/spark/client.py b/aztk/spark/client.py index b5d4fb62..78d51cd1 100644 --- a/aztk/spark/client.py +++ b/aztk/spark/client.py @@ -230,7 +230,8 @@ def submit_job(self, job_configuration: models.JobConfiguration): vm_image = models.VmImage( publisher='Canonical', offer='UbuntuServer', - sku='16.04') + sku='16.04', + data_disks=job_configuration.data_disks) autoscale_formula = "$TargetDedicatedNodes = {0}; " \ "$TargetLowPriorityNodes = {1}".format( diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index 0536c8d7..584928f4 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -196,7 +196,8 @@ def __init__( max_low_pri_nodes=0, subnet_id=None, scheduling_target: SchedulingTarget = None, - worker_on_master=None): + worker_on_master: bool = None, + data_disks: List[DataDisk] = None): self.id = id self.applications = applications @@ -212,6 +213,7 @@ def __init__( self.subnet_id = subnet_id self.worker_on_master = worker_on_master self.scheduling_target = scheduling_target + self.data_disks = data_disks def to_cluster_config(self): return ClusterConfiguration( @@ -225,6 +227,7 @@ def to_cluster_config(self): worker_on_master=self.worker_on_master, spark_configuration=self.spark_configuration, scheduling_target=self.scheduling_target, + data_disks=self.data_disks, ) def mixed_mode(self) -> bool: From c533779b94aab2201bceb0bdcfd6db76855d711c Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 8 Jun 2018 17:17:34 -0700 Subject: [PATCH 08/11] add mount_path, format_type, job support --- aztk/internal/docker_cmd.py | 2 +- aztk/models/cluster_configuration.py | 1 + aztk/models/data_disk.py | 5 ++ aztk/models/data_disk_format_type.py | 10 ++++ aztk/node_scripts/install/install.py | 25 ++-------- aztk/node_scripts/install/mount_data_disk.sh | 22 --------- aztk/node_scripts/install/setup_data_disk.sh | 25 ++++++++++ aztk/node_scripts/install/setup_data_disks.py | 49 +++++++++++++++++++ aztk/node_scripts/install/spark_container.py | 6 ++- aztk/node_scripts/setup_host.sh | 2 +- aztk_cli/config.py | 15 +++++- aztk_cli/spark/endpoints/job/submit.py | 1 + 12 files changed, 115 insertions(+), 48 deletions(-) create mode 100644 aztk/models/data_disk_format_type.py delete mode 100644 aztk/node_scripts/install/mount_data_disk.sh create mode 100644 aztk/node_scripts/install/setup_data_disk.sh create mode 100644 aztk/node_scripts/install/setup_data_disks.py diff --git a/aztk/internal/docker_cmd.py b/aztk/internal/docker_cmd.py index 7dc75e1e..73a6e517 100644 --- a/aztk/internal/docker_cmd.py +++ b/aztk/internal/docker_cmd.py @@ -28,7 +28,7 @@ def pass_env(self, env: str): self.cmd.add_option('-e', '{0}'.format(env)) def share_folder(self, folder: str): - self.cmd.add_option('-v', '{0}:{0}'.format(folder)) + self.cmd.add_option('--mount', 'type=bind,src={0},dst={0}'.format(folder)) def open_port(self, port: int): self.cmd.add_option('-p', '{0}:{0}'.format(port)) # Spark Master UI diff --git a/aztk/models/cluster_configuration.py b/aztk/models/cluster_configuration.py index d69ec377..6e001101 100644 --- a/aztk/models/cluster_configuration.py +++ b/aztk/models/cluster_configuration.py @@ -10,6 +10,7 @@ from .user_configuration import UserConfiguration from .scheduling_target import SchedulingTarget + class ClusterConfiguration(Model): """ Cluster configuration model diff --git a/aztk/models/data_disk.py b/aztk/models/data_disk.py index 8b6e4887..89e2780c 100644 --- a/aztk/models/data_disk.py +++ b/aztk/models/data_disk.py @@ -1,4 +1,9 @@ from aztk.core.models import Model, fields +from .data_disk_format_type import DataDiskFormatType + + class DataDisk(Model): disk_size_gb = fields.Integer() + mount_path = fields.String() + format_type = fields.String(default=DataDiskFormatType.ext4) diff --git a/aztk/models/data_disk_format_type.py b/aztk/models/data_disk_format_type.py new file mode 100644 index 00000000..d65c7faa --- /dev/null +++ b/aztk/models/data_disk_format_type.py @@ -0,0 +1,10 @@ +class DataDiskFormatType: + bfs = "bfs" + btrfs = "btrfs" + cramfs = "cramfs" + ext2 = "ext2" + ext3 = "ext3" + ext4 = "ext4" + fat = "fat" + minix = "minix" + xfs = "xfs" diff --git a/aztk/node_scripts/install/install.py b/aztk/node_scripts/install/install.py index fe4e1450..8a55a4a0 100644 --- a/aztk/node_scripts/install/install.py +++ b/aztk/node_scripts/install/install.py @@ -1,5 +1,4 @@ import os -import subprocess import wait_until_master_selected from aztk.internal import cluster_data @@ -8,6 +7,7 @@ from install import (create_user, pick_master, plugins, scripts, spark, spark_container) from .node_scheduling import setup_node_scheduling +from .setup_data_disks import setup_data_disks def read_cluster_config(): @@ -17,32 +17,12 @@ def read_cluster_config(): return cluster_config -def mount_data_disk(device_name, number): - cmd = os.environ["AZTK_WORKING_DIR"] + "/aztk/node_scripts/install/mount_data_disk.sh " + device_name + " " + str(number) - print("mount disk cmd:", cmd) - p = subprocess.Popen([cmd], shell=True) - # output, error = p.communicate() - # print(output, error) - return p.returncode - - -def setup_data_disk(number): - # setup datadisks, starting with sdc - import string - chars = string.ascii_lowercase[2:] - for i in range(number): - mount_data_disk("/dev/sd" + chars[i], i) def setup_host(docker_repo: str): """ Code to be run on the node(NOT in a container) """ - p = subprocess.Popen(["lsblk -lnS --sort name | wc -l"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, _ = p.communicate() - output = int(output) - 3 # by default, there are 3 devices on each host: sda, sdb, sr0 - if output > 2: - setup_data_disk(output) client = config.batch_client @@ -70,6 +50,8 @@ def setup_host(docker_repo: str): cluster_conf = read_cluster_config() + setup_data_disks(cluster_conf) + setup_node_scheduling(client, cluster_conf, is_master) #TODO pass azure file shares @@ -77,6 +59,7 @@ def setup_host(docker_repo: str): docker_repo=docker_repo, gpu_enabled=os.environ.get("AZTK_GPU_ENABLED") == "true", plugins=cluster_conf.plugins, + data_disks=cluster_conf.data_disks, ) plugins.setup_plugins(target=PluginTarget.Host, is_master=is_master, is_worker=is_worker) diff --git a/aztk/node_scripts/install/mount_data_disk.sh b/aztk/node_scripts/install/mount_data_disk.sh deleted file mode 100644 index d295e619..00000000 --- a/aztk/node_scripts/install/mount_data_disk.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -devicename=$1 -datadisknumber=$2 - -device_partition_name="${devicename}1" -data_disk_mount_point="/data-disk${datadisknumber}" - -# make parition -parted --script --align optimal ${devicename} mklabel gpt -parted --script --align optimal ${devicename} mkpart primary ext4 0% 100% - -# format partition -mkfs.ext4 ${device_partition_name} - -# make partition directory -mkdir -p ${data_disk_mount_point} - -# auto mount parition on reboot -echo "${device_partition_name} ${data_disk_mount_point} auto defaults,nofail 0 0" >> /etc/fstab - -# mount partition -mount ${device_partition_name} ${data_disk_mount_point} diff --git a/aztk/node_scripts/install/setup_data_disk.sh b/aztk/node_scripts/install/setup_data_disk.sh new file mode 100644 index 00000000..2c18ffcb --- /dev/null +++ b/aztk/node_scripts/install/setup_data_disk.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -e + +devicename="/dev/"$1 +format_type=$2 +mount_path=$3 +device_partition_name="${devicename}1" + +# make parition +parted --script --align optimal ${devicename} mklabel gpt +parted --script --align optimal ${devicename} mkpart primary ext4 0% 100% + +# format partition +sleep 1 +mkfs.${format_type} ${device_partition_name} + +# make partition directory +mkdir -p ${mount_path} + +# auto mount parition on reboot +echo "${device_partition_name} ${mount_path} auto defaults,nofail 0 0" >> /etc/fstab + +# mount partition +mount ${device_partition_name} ${mount_path} diff --git a/aztk/node_scripts/install/setup_data_disks.py b/aztk/node_scripts/install/setup_data_disks.py new file mode 100644 index 00000000..2b33b902 --- /dev/null +++ b/aztk/node_scripts/install/setup_data_disks.py @@ -0,0 +1,49 @@ +import subprocess +import sys +import os + + +def mount_data_disk(data_disk, device_name, number): + cmd = os.environ["AZTK_WORKING_DIR"] + "/aztk/node_scripts/install/setup_data_disk.sh " + data_disk.mount_path = "/data-disk" + str(number) if not data_disk.mount_path else data_disk.mount_path + args = device_name + " " + data_disk.format_type + " " + data_disk.mount_path + cmd = cmd + args + print("mount disk cmd:", cmd) + p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + if p.returncode != 0: + print("ERROR: failed to mount data_disk device {}", device_name) + sys.exit(p.returncode) + + return data_disk + + +def setup_data_disks(cluster_configuration): + cmd = 'lsblk -lnS --sort name | wc -l' + p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, _ = p.communicate() + if int(output) <= 3: + return + + # by default, there are 3 devices on each host: sda, sdb, sr0 + cmd = 'lsblk -lnbS --sort=name --output NAME,SIZE | grep -v "sr0\|sd[ab]"' + p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + disks = stdout.decode('UTF-8').split('\n')[:-1] + + disk_size_mapping = {} + for disk in disks: + assert len(disk.split()) == 2 + name, size = disk.split() + # convert size from bytes to gb + size = int(size) / 1024 / 1024 / 1024 + if not disk_size_mapping.get(size): + disk_size_mapping[size] = [name] + else: + disk_size_mapping[size].append(name) + + for i, defined_data_disk in enumerate(cluster_configuration.data_disks): + device_name = disk_size_mapping[defined_data_disk.disk_size_gb].pop() + mounted_data_disk = mount_data_disk(data_disk=defined_data_disk, device_name=device_name, number=i) + # update cluster_configuration in case mount_path changed + cluster_configuration[i] = mounted_data_disk diff --git a/aztk/node_scripts/install/spark_container.py b/aztk/node_scripts/install/spark_container.py index 405498ee..b29a3f21 100644 --- a/aztk/node_scripts/install/spark_container.py +++ b/aztk/node_scripts/install/spark_container.py @@ -7,7 +7,8 @@ def start_spark_container( docker_repo: str=None, gpu_enabled: bool=False, file_mounts=None, - plugins=None): + plugins=None, + data_disks=None): cmd = DockerCmd( name=constants.DOCKER_SPARK_CONTAINER_NAME, @@ -18,7 +19,8 @@ def start_spark_container( if file_mounts: for mount in file_mounts: cmd.share_folder(mount.mount_path) - cmd.share_folder('/mnt') + cmd.share_folder('/mnt/batch') + [cmd.share_folder(data_disk.mount_path) for data_disk in data_disks] cmd.pass_env('AZTK_WORKING_DIR') cmd.pass_env('AZ_BATCH_ACCOUNT_NAME') diff --git a/aztk/node_scripts/setup_host.sh b/aztk/node_scripts/setup_host.sh index 2cba20ee..ac9d901c 100644 --- a/aztk/node_scripts/setup_host.sh +++ b/aztk/node_scripts/setup_host.sh @@ -125,7 +125,7 @@ main () { # Unzip resource files and set permissions chmod 777 $AZTK_WORKING_DIR/aztk/node_scripts/docker_main.sh - chmod +x $AZTK_WORKING_DIR/aztk/node_scripts/install/mount_data_disk.sh + chmod +x $AZTK_WORKING_DIR/aztk/node_scripts/install/setup_data_disk.sh # Check docker is running docker info > /dev/null 2>&1 diff --git a/aztk_cli/config.py b/aztk_cli/config.py index 5206b55c..f7b55e51 100644 --- a/aztk_cli/config.py +++ b/aztk_cli/config.py @@ -5,6 +5,7 @@ SecretsConfiguration, ClusterConfiguration, SchedulingTarget, + DataDisk, ) from aztk.utils import deprecate from aztk.models import Toolkit @@ -184,6 +185,7 @@ def __init__(self): self.subnet_id = None self.worker_on_master = None self.scheduling_target = None + self.data_disks = [] def _merge_dict(self, config): config = config.get('job') @@ -205,7 +207,18 @@ def _merge_dict(self, config): scheduling_target = cluster_configuration.get("scheduling_target") if scheduling_target: self.scheduling_target = SchedulingTarget(scheduling_target) - + if cluster_configuration.get("data_disks"): + for item in cluster_configuration.get("data_disks"): + data_disk = DataDisk() + print("data_disk", data_disk.to_dict()) + sys.exit() + if item.get("disk_size_gb"): + data_disk.disk_size_gb = item.get("disk_size_gb") + if item.get("mount_path"): + data_disk.mount_path = item.get("mount_path") + if item.get("format_type"): + data_disk.format_type = item.get("format_type") + self.data_disks.append(data_disk) applications = config.get('applications') if applications: diff --git a/aztk_cli/spark/endpoints/job/submit.py b/aztk_cli/spark/endpoints/job/submit.py index 91c5b768..a3347773 100644 --- a/aztk_cli/spark/endpoints/job/submit.py +++ b/aztk_cli/spark/endpoints/job/submit.py @@ -45,6 +45,7 @@ def execute(args: typing.NamedTuple): subnet_id=job_conf.subnet_id, worker_on_master=job_conf.worker_on_master, scheduling_target=job_conf.scheduling_target, + data_disks=job_conf.data_disks, ) #TODO: utils.print_job_conf(job_configuration) From 53ce0cd1ef0f8f9d981faaf411ff8e6a64a3ec25 Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Thu, 14 Jun 2018 13:21:19 -0700 Subject: [PATCH 09/11] add tests --- aztk/models/__init__.py | 1 + tests/models/test_data_disk.py | 40 ++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 tests/models/test_data_disk.py diff --git a/aztk/models/__init__.py b/aztk/models/__init__.py index ebc23c99..66b3cabc 100644 --- a/aztk/models/__init__.py +++ b/aztk/models/__init__.py @@ -14,6 +14,7 @@ from .ssh_log import SSHLog from .vm_image import VmImage from .data_disk import DataDisk +from .data_disk import DataDiskFormatType from .software import Software from .cluster import Cluster from .scheduling_target import SchedulingTarget diff --git a/tests/models/test_data_disk.py b/tests/models/test_data_disk.py new file mode 100644 index 00000000..e68a66e2 --- /dev/null +++ b/tests/models/test_data_disk.py @@ -0,0 +1,40 @@ +import pytest + +from aztk.models import DataDisk, DataDiskFormatType +from aztk.error import InvalidModelError + + +def test_valid_data_disk(): + data_disk = DataDisk(disk_size_gb=10, mount_path='/test/path', format_type=DataDiskFormatType.ext2) + data_disk.validate() + + assert data_disk.disk_size_gb == 10 + assert data_disk.mount_path == '/test/path' + assert data_disk.format_type == DataDiskFormatType.ext2 + + +def test_uninitialized_data_disk(): + data_disk = DataDisk() + with pytest.raises(InvalidModelError): + data_disk.validate() + + assert data_disk.disk_size_gb is None + assert data_disk.mount_path is None + assert data_disk.format_type == DataDiskFormatType.ext4 + + +def test_data_disk_minimum_required_fields(): + data_disk = DataDisk(disk_size_gb=1) + assert data_disk.disk_size_gb == 1 + assert data_disk.mount_path == None + assert data_disk.format_type == DataDiskFormatType.ext4 + + +def test_data_disk_format_type(): + data_disk = DataDisk(disk_size_gb=1, format_type=DataDiskFormatType.ext2) + assert data_disk.format_type == "ext2" + assert data_disk.format_type == DataDiskFormatType.ext2 + + data_disk = DataDisk(disk_size_gb=1, format_type="ext2") + assert data_disk.format_type == "ext2" + assert data_disk.format_type == DataDiskFormatType.ext2 From 9db709332e00f19cd331c2fdf396e19c6dc6f25b Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 15 Jun 2018 10:43:58 -0700 Subject: [PATCH 10/11] fix import, docstrings, typos --- aztk/models/__init__.py | 2 +- aztk/models/data_disk.py | 10 +++++++++- aztk/models/data_disk_format_type.py | 14 ++++++++++++++ aztk/models/file_share.py | 9 +++++++++ docs/dev/writing-models.md | 2 +- 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/aztk/models/__init__.py b/aztk/models/__init__.py index 66b3cabc..7e30de6a 100644 --- a/aztk/models/__init__.py +++ b/aztk/models/__init__.py @@ -14,7 +14,7 @@ from .ssh_log import SSHLog from .vm_image import VmImage from .data_disk import DataDisk -from .data_disk import DataDiskFormatType +from .data_disk_format_type import DataDiskFormatType from .software import Software from .cluster import Cluster from .scheduling_target import SchedulingTarget diff --git a/aztk/models/data_disk.py b/aztk/models/data_disk.py index 89e2780c..0cabff0b 100644 --- a/aztk/models/data_disk.py +++ b/aztk/models/data_disk.py @@ -2,8 +2,16 @@ from .data_disk_format_type import DataDiskFormatType - class DataDisk(Model): + """ + Configuration for an additional local storage disk that is attached to the virtual machine, + formatted and mounted into the Spark Docker container + + Args: + disk_size_gb (int): Which docker endpoint to use. Default to docker hub. + mount_path (:obj:`str`, optional): the path where the disk should be mounted + format_type (:obj:`aztk.models.DataDiskFormatType`, optional): the type of file system format + """ disk_size_gb = fields.Integer() mount_path = fields.String() format_type = fields.String(default=DataDiskFormatType.ext4) diff --git a/aztk/models/data_disk_format_type.py b/aztk/models/data_disk_format_type.py index d65c7faa..e1455a3e 100644 --- a/aztk/models/data_disk_format_type.py +++ b/aztk/models/data_disk_format_type.py @@ -1,4 +1,18 @@ class DataDiskFormatType: + """ + The valid file system formats for a Data Disk + + Attributes: + bfs (:obj:`str`) + btrfs (:obj:`str`) + cramfs (:obj:`str`) + ext2 (:obj:`str`) + ext3 (:obj:`str`) + ext4 (:obj:`str`) + fat (:obj:`str`) + minix (:obj:`str`) + xfs (:obj:`str`) + """ bfs = "bfs" btrfs = "btrfs" cramfs = "cramfs" diff --git a/aztk/models/file_share.py b/aztk/models/file_share.py index c94bdcbb..6c02f93b 100644 --- a/aztk/models/file_share.py +++ b/aztk/models/file_share.py @@ -1,6 +1,15 @@ from aztk.core.models import Model, fields class FileShare(Model): + """ + Azure Files file share to mount to each node in the cluster + + Args: + storage_account_name (int): the name of the Azure Storage Account + storage_account_key (:obj:`str`, optional): the shared key to the Azure Storage Account + file_share_path (:obj:`str`, optional): the path of the file share in Azure Files + mount_path (:obj:`str`, optional): the path on the node to mount the file share + """ storage_account_name = fields.String() storage_account_key = fields.String() file_share_path = fields.String() diff --git a/docs/dev/writing-models.md b/docs/dev/writing-models.md index e4c0cb45..53266635 100644 --- a/docs/dev/writing-models.md +++ b/docs/dev/writing-models.md @@ -6,7 +6,7 @@ In `aztk/models` create a new file with the name of your model `my_model.py` In `aztk/models/__init__.py` add `from .my_model import MyModel` -Create a new class `MyModel` that inherit `Modle` +Create a new class `MyModel` that inherit `Module` ```python from aztk.core.models import Model, fields From eea7b317aa02fa58a8ab2119e4c0e065d0784fb9 Mon Sep 17 00:00:00 2001 From: Jake Freck Date: Fri, 15 Jun 2018 11:17:43 -0700 Subject: [PATCH 11/11] rename test --- tests/models/test_data_disk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/models/test_data_disk.py b/tests/models/test_data_disk.py index e68a66e2..112d0d84 100644 --- a/tests/models/test_data_disk.py +++ b/tests/models/test_data_disk.py @@ -13,7 +13,7 @@ def test_valid_data_disk(): assert data_disk.format_type == DataDiskFormatType.ext2 -def test_uninitialized_data_disk(): +def test_default_data_disk(): data_disk = DataDisk() with pytest.raises(InvalidModelError): data_disk.validate()