diff --git a/.gitignore b/.gitignore index 52e4e611..76dddd2c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ *.pyc *.pyo +/.project +/.pydevproject diff --git a/benchmark/cephtestrados.py b/benchmark/cephtestrados.py index 88c46496..24a29eab 100644 --- a/benchmark/cephtestrados.py +++ b/benchmark/cephtestrados.py @@ -10,7 +10,7 @@ logger = logging.getLogger('cbt') from cluster.ceph import Ceph -from benchmark import Benchmark +from benchmark.benchmark import Benchmark class CephTestRados(Benchmark): @@ -59,7 +59,7 @@ def addweight(self, weight): def exists(self): if os.path.exists(self.out_dir): - print 'Skipping existing test in %s.' % self.out_dir + print ('Skipping existing test in %s.' % self.out_dir) return True return False diff --git a/benchmark/cosbench.py b/benchmark/cosbench.py index 42b94284..7e8734ca 100644 --- a/benchmark/cosbench.py +++ b/benchmark/cosbench.py @@ -11,7 +11,7 @@ import logging from cluster.ceph import Ceph -from benchmark import Benchmark +from benchmark.benchmark import Benchmark logger = logging.getLogger("cbt") diff --git a/benchmark/getput.py b/benchmark/getput.py index e8a51b86..ec7b08d0 100644 --- a/benchmark/getput.py +++ b/benchmark/getput.py @@ -9,7 +9,7 @@ import re from cluster.ceph import Ceph -from benchmark import Benchmark +from benchmark.benchmark import Benchmark logger = logging.getLogger("cbt") diff --git a/benchmark/kvmrbdfio.py b/benchmark/kvmrbdfio.py index 829f9b2d..c7724216 100644 --- a/benchmark/kvmrbdfio.py +++ b/benchmark/kvmrbdfio.py @@ -7,7 +7,7 @@ import string import logging -from benchmark import Benchmark +from benchmark.benchmark import Benchmark logger = logging.getLogger("cbt") diff --git a/benchmark/librbdfio.py b/benchmark/librbdfio.py index 9b4944f0..e6f6f5e8 100644 --- a/benchmark/librbdfio.py +++ b/benchmark/librbdfio.py @@ -7,9 +7,10 @@ import threading import logging import json +import socket from cluster.ceph import Ceph -from benchmark import Benchmark +from benchmark.benchmark import Benchmark logger = logging.getLogger("cbt") @@ -49,14 +50,14 @@ def __init__(self, cluster, config): self.pool_name = config.get("poolname", "cbt-librbdfio") self.rbdname = config.get('rbdname', '') - self.total_procs = self.procs_per_volume * self.volumes_per_client * len(settings.getnodes('clients').split(',')) + self.total_procs = self.procs_per_volume * self.volumes_per_client * len(settings.getnodes('clients').split(',')) self.run_dir = '%s/osd_ra-%08d/op_size-%08d/concurrent_procs-%03d/iodepth-%03d/%s' % (self.run_dir, int(self.osd_ra), int(self.op_size), int(self.total_procs), int(self.iodepth), self.mode) self.out_dir = self.archive_dir self.norandommap = config.get("norandommap", False) # Make the file names string (repeated across volumes) self.names = '' - for proc_num in xrange(self.procs_per_volume): + for proc_num in range(self.procs_per_volume): rbd_name = 'cbt-librbdfio-`%s`-file-%d' % (common.get_fqdn_cmd(), proc_num) self.names += '--name=%s ' % rbd_name @@ -75,9 +76,10 @@ def initialize(self): logger.info('Pausing for 60s for idle monitoring.') monitoring.start("%s/idle_monitoring" % self.run_dir) + monitoring.start_pbench("%s/idle" % self.out_dir) time.sleep(60) monitoring.stop() - + monitoring.stop_pbench("%s/idle" % self.out_dir) common.sync_files('%s/*' % self.run_dir, self.out_dir) self.mkimages() @@ -86,7 +88,7 @@ def initialize(self): ps = [] logger.info('Attempting to populating fio files...') if (self.use_existing_volumes == False): - for volnum in xrange(self.volumes_per_client): + for volnum in range(self.volumes_per_client): rbd_name = 'cbt-librbdfio-`%s`-%d' % (common.get_fqdn_cmd(), volnum) pre_cmd = 'sudo %s --ioengine=rbd --clientname=admin --pool=%s --rbdname=%s --invalidate=0 --rw=write --numjobs=%s --bs=4M --size %dM %s --output-format=%s > /dev/null' % (self.cmd_path, self.pool_name, rbd_name, self.numjobs, self.vol_size, self.names, self.fio_out_format) p = common.pdsh(settings.getnodes('clients'), pre_cmd) @@ -105,6 +107,7 @@ def run(self): self.cluster.dump_config(self.run_dir) monitoring.start(self.run_dir) + monitoring.start_pbench(self.out_dir) time.sleep(5) @@ -115,7 +118,7 @@ def run(self): logger.info('Running rbd fio %s test.', self.mode) ps = [] - for i in xrange(self.volumes_per_client): + for i in range(self.volumes_per_client): fio_cmd = self.mkfiocmd(i) p = common.pdsh(settings.getnodes('clients'), fio_cmd) ps.append(p) @@ -125,6 +128,7 @@ def run(self): if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + monitoring.stop_pbench(self.out_dir) monitoring.stop(self.run_dir) # Finally, get the historic ops @@ -139,7 +143,7 @@ def mkfiocmd(self, volnum): rbdname = 'cbt-librbdfio-`%s`-%d' % (common.get_fqdn_cmd(), volnum) logger.debug('Using rbdname %s', rbdname) - out_file = '%s/output.%d' % (self.run_dir, volnum) + out_file = '%s/output.%d.`%s`' % (self.run_dir, volnum, common.get_fqdn_cmd()) fio_cmd = 'sudo %s --ioengine=rbd --clientname=admin --pool=%s --rbdname=%s --invalidate=0' % (self.cmd_path_full, self.pool_name, rbdname) fio_cmd += ' --rw=%s' % self.mode @@ -191,7 +195,7 @@ def mkimages(self): self.cluster.rmpool(self.data_pool, self.data_pool_profile) self.cluster.mkpool(self.data_pool, self.data_pool_profile, 'rbd') for node in common.get_fqdn_list('clients'): - for volnum in xrange(0, self.volumes_per_client): + for volnum in range(0, self.volumes_per_client): node = node.rpartition("@")[2] self.cluster.mkimage('cbt-librbdfio-%s-%d' % (node,volnum), self.vol_size, self.pool_name, self.data_pool, self.vol_object_size) monitoring.stop() @@ -201,7 +205,13 @@ def recovery_callback(self): def parse(self, out_dir): for client in settings.cluster.get('clients'): - for i in xrange(self.volumes_per_client): + try: + socket.inet_aton(client) + client = socket.gethostbyaddr(client) + except: + pass + + for i in range(self.volumes_per_client): found = 0 out_file = '%s/output.%d.%s' % (out_dir, i, client) json_out_file = '%s/json_output.%d.%s' % (out_dir, i, client) diff --git a/benchmark/nullbench.py b/benchmark/nullbench.py index 083f8ee4..b366bbfd 100644 --- a/benchmark/nullbench.py +++ b/benchmark/nullbench.py @@ -5,7 +5,7 @@ import os from cluster.ceph import Ceph -from benchmark import Benchmark +from benchmark.benchmark import Benchmark class Nullbench(Benchmark): diff --git a/benchmark/radosbench.py b/benchmark/radosbench.py index 15128771..9d071462 100644 --- a/benchmark/radosbench.py +++ b/benchmark/radosbench.py @@ -10,7 +10,7 @@ import json from cluster.ceph import Ceph -from benchmark import Benchmark +from benchmark.benchmark import Benchmark logger = logging.getLogger("cbt") @@ -121,10 +121,11 @@ def _run(self, mode, run_dir, out_dir): # Run rados bench monitoring.start(run_dir) + monitoring.start_pbench("%s/%s" % (self.out_dir, mode)) logger.info('Running radosbench %s test.' % mode) ps = [] for i in xrange(self.concurrent_procs): - out_file = '%s/output.%s' % (run_dir, i) + out_file = '%s/output.%s.`%s`' % (run_dir, i, common.get_fqdn_cmd()) objecter_log = '%s/objecter.%s.log' % (run_dir, i) # default behavior is to use a single storage pool pool_name = self.pool @@ -139,6 +140,7 @@ def _run(self, mode, run_dir, out_dir): ps.append(p) for p in ps: p.wait() + monitoring.stop_pbench("%s/%s" % (self.out_dir, mode)) monitoring.stop(run_dir) # If we were doing recovery, wait until it's done. diff --git a/benchmark/rawfio.py b/benchmark/rawfio.py index c4387a2b..e0286b57 100644 --- a/benchmark/rawfio.py +++ b/benchmark/rawfio.py @@ -7,7 +7,7 @@ import string import logging -from benchmark import Benchmark +from benchmark.benchmark import Benchmark logger = logging.getLogger("cbt") diff --git a/benchmark/rbdfio.py b/benchmark/rbdfio.py index 5140820f..0fdb8167 100644 --- a/benchmark/rbdfio.py +++ b/benchmark/rbdfio.py @@ -6,7 +6,7 @@ import time import logging -from benchmark import Benchmark +from benchmark.benchmark import Benchmark logger = logging.getLogger("cbt") diff --git a/benchmark/smallfile.py b/benchmark/smallfile.py new file mode 100644 index 00000000..a5222d78 --- /dev/null +++ b/benchmark/smallfile.py @@ -0,0 +1,175 @@ +# Benchmark subclass to invoke smallfile +# this benchmark will iterate over smallfile test parameters +# something that smallfile cannot do today +# +# see examples/smallfile.yaml for how to use it +# +# at present, this does not create the filesystem or mount it, +# all clients and head node must have filesystem mounted +# +# it assumes that all hosts are accessed with the same user account +# so that user@hostname pdsh syntax is not needed +# +# it has only been tested with a single Cephfs mountpoint/host + +import copy +import common +import monitoring +import os +import time +import logging +import settings +import yaml +import json +import subprocess + +from benchmark.benchmark import Benchmark + +logger = logging.getLogger("cbt") + +# we do this so source of exception is really obvious +class CbtSmfExc(Exception): + pass + +class Smallfile(Benchmark): + + def __init__(self, cluster, config): + super(Smallfile, self).__init__(cluster, config) + self.out_dir = self.archive_dir + self.config = config + mons = settings.getnodes('mons').split(',') + self.any_mon = mons[0] + self.clients = settings.getnodes('clients').split(',') + self.any_client = self.clients[0] + self.head = settings.getnodes('head') + self.cephfs_data_pool_name = config.get('data_pool_name', 'cephfs_data') + self.cleandir() + + + # this function uses "ceph df" output to monitor + # cephfs_data pool object count, when that stops going down + # then the pool is stable and it's ok to start another test + + def get_cephfs_data_objects(self): + (cephdf_out, cephdf_err) = common.pdsh( + self.any_mon, 'ceph -f json df', continue_if_error=False).communicate() + # pdsh prepends JSON output with IP address of host that did the command, + # we have to strip the IP address off before JSON parser will accept it + start_of_json = cephdf_out.index('{') + json_str = cephdf_out[start_of_json:] + cephdf = json.loads(json_str) + cephfs_data_objs = -1 + for p in cephdf['pools']: + if p['name'] == self.cephfs_data_pool_name: + cephfs_data_objs = int(p['stats']['objects']) + break + if cephfs_data_objs == -1: + raise CbtSmfExc('could not find cephfs_data pool in ceph -f json df output') + logger.info('cephfs_data pool object count = %d' % cephfs_data_objs) + return cephfs_data_objs + + def run(self): + super(Smallfile, self).run() + + # someday we might want to allow the option + # to NOT drop cache + self.dropcaches() + # FIXME: if desired, drop cache on OSDs + # FIXME: if desired, drop cache on MDSs + + # dump the cluster config + self.cluster.dump_config(self.run_dir) + + # input YAML parameters for smallfile are subset + # extract parameters that you need + + smfparams = copy.deepcopy(self.config) + del smfparams['benchmark'] + del smfparams['iteration'] + try: + del smfparams['data_pool_name'] + except KeyError: + pass + operation = smfparams['operation'] + topdir = smfparams['top'].split(',')[0] + yaml_input_pathname = os.path.join(self.out_dir, 'smfparams.yaml') + with open(yaml_input_pathname, 'w') as yamlf: + yamlf.write(yaml.dump(smfparams, default_flow_style=False)) + + # generate client list + + client_list_path = os.path.join(self.out_dir, 'client.list') + with open(client_list_path, 'w') as client_f: + for c in self.clients: + client_f.write(c + '\n') + + # ensure SMF directory exists + # for shared filesystem, we only need 1 client to + # initialize it + + logger.info('using client %s to initialize shared filesystem' % self.any_client) + common.pdsh(self.any_client, 'mkdir -p -v -m 0777 ' + topdir, continue_if_error=False).communicate() + + # Run the backfill testing thread if requested + if 'recovery_test' in self.cluster.config: + recovery_callback = self.recovery_callback + self.cluster.create_recovery_test(self.run_dir, recovery_callback) + + # Run smallfile + monitoring.start(self.run_dir) + monitoring.start_pbench(self.out_dir) + logger.info('Running smallfile test, see %s for parameters' % yaml_input_pathname) + smfcmd = [ 'smallfile_cli.py', + '--host-set', client_list_path, + '--response-times', 'Y', + '--yaml-input-file', yaml_input_pathname, + '--verbose', 'Y', + '--output-json', '%s/smfresult.json' % self.out_dir ] + logger.info('smallfile command: %s' % ' '.join(smfcmd)) + logger.info('YAML inputs: %s' % yaml.dump(smfparams)) + smf_out_path = os.path.join(self.out_dir, 'smf-out.log') + (smf_out_str, smf_err_str) = common.pdsh(self.head, ' '.join(smfcmd), continue_if_error=False).communicate() + with open(smf_out_path, 'w') as smf_outf: + smf_outf.write(smf_out_str + '\n') + logger.info('smallfile result: %s' % smf_out_path) + monitoring.stop_pbench(self.out_dir) + monitoring.stop(self.run_dir) + + + # save response times + rsptimes_target_dir = os.path.join(self.out_dir, 'rsptimes') + common.mkdir_p(rsptimes_target_dir) + common.rpdcp(self.head, '', + os.path.join(os.path.join(topdir, 'network_shared'), 'rsptimes*csv'), + rsptimes_target_dir) + + if operation == 'cleanup': + common.pdsh(self.any_client, 'rm -rf ' + topdir, continue_if_error=False).communicate() + common.pdsh(self.any_client, 'mkdir -v -m 0777 ' + topdir, continue_if_error=False).communicate() + # wait until cephfs_data pool stops decreasing + logger.info('wait for cephfs_data pool to empty') + pool_shrinking = True + old_data_objs = self.get_cephfs_data_objects() + while pool_shrinking: + time.sleep(10) + data_objs = self.get_cephfs_data_objects() + if old_data_objs == data_objs: + logger.info('pool stopped shrinking') + pool_shrinking = False + else: + logger.info('pool shrank by %d objects', old_data_objs - data_objs) + old_data_objs = data_objs + + # If we were doing recovery, wait until it's done. + if 'recovery_test' in self.cluster.config: + self.cluster.wait_recovery_done() + + # Finally, get the historic ops + self.cluster.dump_historic_ops(self.run_dir) + common.sync_files(self.run_dir, self.out_dir) + + def recovery_callback(self): + pass + + def __str__(self): + return "%s\n%s\n%s" % (self.run_dir, self.out_dir, super(Smallfile, self).__str__()) diff --git a/benchmarkfactory.py b/benchmarkfactory.py index f357427b..f7a35655 100644 --- a/benchmarkfactory.py +++ b/benchmarkfactory.py @@ -2,6 +2,7 @@ import itertools import settings +from benchmark.smallfile import Smallfile from benchmark.radosbench import Radosbench from benchmark.rbdfio import RbdFio from benchmark.rawfio import RawFio @@ -13,7 +14,7 @@ from benchmark.getput import Getput def get_all(cluster, iteration): - for benchmark, config in sorted(settings.benchmarks.iteritems()): + for benchmark, config in sorted(settings.benchmarks.items()): default = {"benchmark": benchmark, "iteration": iteration} for current in all_configs(config): @@ -31,7 +32,7 @@ def all_configs(config): cycle_over_names = [] default = {} - for param, value in config.iteritems(): + for param, value in config.items(): if isinstance(value, list): cycle_over_lists.append(value) cycle_over_names.append(param) @@ -63,3 +64,5 @@ def get_object(cluster, benchmark, bconfig): return CephTestRados(cluster, bconfig) if benchmark == 'getput': return Getput(cluster, bconfig) + if benchmark == 'smallfile': + return Smallfile(cluster, bconfig) diff --git a/cluster/ceph.py b/cluster/ceph.py index 01390667..157d5873 100644 --- a/cluster/ceph.py +++ b/cluster/ceph.py @@ -8,7 +8,7 @@ import threading import logging -from cluster import Cluster +from cluster.cluster import Cluster logger = logging.getLogger("cbt") @@ -464,6 +464,7 @@ def check_health(self, check_list=None, logfile=None): check_list = ["degraded", "peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery", "stale"] while True: stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s health %s' % (self.ceph_cmd, self.tmp_conf, logline)).communicate() + stdout = stdout.decode() if check_list and not any(x in stdout for x in check_list): break if "HEALTH_OK" in stdout: @@ -546,8 +547,8 @@ def make_profiles(self): for name,profile in erasure_profiles.items(): k = profile.get('erasure_k', 6) m = profile.get('erasure_m', 2) - common.pdsh(settings.getnodes('head'), '%s -c %s osd erasure-code-profile set %s crush-failure-domain=osd k=%s m=%s' % (self.ceph_cmd, self.tmp_conf, name, k, m)).communicate() - self.set_ruleset(name) + common.pdsh(settings.getnodes('head'), '%s -c %s osd erasure-code-profile set %s crush-failure-domain=osd k=%s m=%s' % (self.ceph_cmd, self.tmp_conf, name, k, m)).communicate() + self.set_ruleset(name) def mkpool(self, name, profile_name, application, base_name=None): pool_profiles = self.config.get('pool_profiles', {'default': {}}) @@ -570,7 +571,7 @@ def mkpool(self, name, profile_name, application, base_name=None): target_max_bytes = profile.get('target_max_bytes', None) min_read_recency_for_promote = profile.get('min_read_recency_for_promote', None) min_write_recency_for_promote = profile.get('min_write_recency_for_promote', None) - # Options for prefilling objects + # Options for prefilling obImportError: cannot import name ''jects prefill_objects = profile.get('prefill_objects', 0) prefill_object_size = profile.get('prefill_object_size', 0) prefill_time = profile.get('prefill_time', 0) diff --git a/common.py b/common.py index 73f3a92d..9bb752a8 100644 --- a/common.py +++ b/common.py @@ -2,6 +2,7 @@ import logging import os import subprocess +import socket import settings @@ -48,6 +49,16 @@ def wait(self): # pdsh() calls that require full parallelism (workload generation) # work correctly. +def ansible_hostfile(hosts): + + inventory_file = '/tmp/cbtinventory' + hosts = hosts.split(",") + with open(inventory_file, 'w') as f: + for item in hosts: + f.write("%s\n" % item) + + return inventory_file + def expanded_node_list(nodes): # nodes is a comma-separated list for pdsh "-w" parameter # nodes may have some entries with '^' prefix, pdsh syntax meaning @@ -64,9 +75,11 @@ def expanded_node_list(nodes): return node_list def pdsh(nodes, command, continue_if_error=True): - args = ['pdsh', '-f', str(len(expanded_node_list(nodes))), '-R', 'ssh', '-w', nodes, command] + #args = ['pdsh', '-f', str(len(expanded_node_list(nodes))), '-R', 'ssh', '-w', nodes, command] + inventory = ansible_hostfile(nodes) + args = ['ansible', '-f', str(len(expanded_node_list(nodes))), '-m', 'shell', '-a', "%s" % command, '-i', inventory, 'all'] # -S means pdsh fails if any host fails - if not continue_if_error: args.insert(1, '-S') + #if not continue_if_error: args.insert(1, '-S') return CheckedPopen(args,continue_if_error=continue_if_error) @@ -79,11 +92,16 @@ def pdcp(nodes, flags, localfile, remotefile): def rpdcp(nodes, flags, remotefile, localfile): - args = ['rpdcp', '-f', '10', '-R', 'ssh', '-w', nodes] - if flags: - args += [flags] - return CheckedPopen(args + [remotefile, localfile], - continue_if_error=False) + #args = ['rpdcp', '-f', '10', '-R', 'ssh', '-w', nodes] + #args = ['ansible', '-f', '10', '-m', 'fetch', '-a', "flat==yes src=%s dest=%s" % (remotefile, localfile), '-i', nodes, 'all'] + lhost = socket.gethostname() + inventory = ansible_hostfile(nodes) + args = ['ansible', '-f', str(len(expanded_node_list(nodes))), '-m', 'shell', '-a', "scp -r %s %s:%s" % (remotefile, lhost, localfile), '-i', inventory, 'all'] +# if flags: +# args += [flags] + #return CheckedPopen(args + [remotefile, localfile], + # continue_if_error=False) + return CheckedPopen(args,continue_if_error=True) def scp(node, localfile, remotefile): @@ -100,13 +118,20 @@ def get_fqdn_cmd(): def get_fqdn_list(nodes): stdout, stderr = pdsh(settings.getnodes(nodes), '%s' % get_fqdn_cmd()).communicate() - print stdout - ret = [i.split(' ', 1)[1] for i in stdout.splitlines()] - print ret + stdout = stdout.decode() + print (stdout) + + ret = [] + for line in stdout.splitlines(): + if "CHANGED" not in line: + ret.append(line) + + #ret = [i.split(' ', 1)[1] for i in stdout.splitlines()] + print (ret) return ret def clean_remote_dir (remote_dir): - print "cleaning remote dir %s" % remote_dir + print ("cleaning remote dir %s" % remote_dir) if remote_dir == "/" or not os.path.isabs(remote_dir): raise SystemExit("Cleaning the remote dir doesn't seem safe, bailing.") diff --git a/example/smallfile-test-set.yaml b/example/smallfile-test-set.yaml new file mode 100644 index 00000000..a4abc902 --- /dev/null +++ b/example/smallfile-test-set.yaml @@ -0,0 +1,48 @@ +# this example runs the smallfile benchmark +# +# we are using a kernel Cephfs shared filesystem +# which must be mounted on head node as well as clients +# +# CBT lets you specify multiple operations and file sizes, +# something that smallfile_cli.py cannot do on its own +# +# the smallfile CBT benchmark not create the filesystem or mount it, +# all clients and head node must have filesystem mounted +# +# it assumes that all hosts are accessed with the same user account +# so that user@hostname pdsh syntax is not needed +# +# ensure that smallfile_cli.py (or softlink) appears in the PATH +# on the 'head' node and that smallfile_remote.py (or softlink) appears +# in PATH on the 'clients' hosts. +# +# you should not specify host-set in your smallfile input parameters, +# the smallfile CBT benchmark module will do that for you +# using 'clients' host group +# +# you should not specify "response-times: Y" in your YAML input, +# the smallfile CBT benchmark defaults to this behavior +# +# it has only been tested with a single Cephfs mountpoint/host + +cluster: + use_existing: True + head: "localhost" + clients: [ "192.168.121.64", "192.168.121.112", "192.168.121.158" ] + osds: [ "192.168.121.64", "192.168.121.112", "192.168.121.158" ] + mons: [ "192.168.121.64" ] + rebuild_every_test: False + tmp_dir: "/tmp/cbt" + osds_per_node: 1 + pool_profiles: + replicated: + replication: 3 + crush_profile: 1 + iterations: 1 +benchmarks: + smallfile: + operation: [ cleanup, create, read, delete ] + file-size: [ 4, 64 ] + top: /mnt/cephfs/smf + files: 2000 + pause: 2000 diff --git a/monitoring.py b/monitoring.py index f3dd99c4..1efd06eb 100644 --- a/monitoring.py +++ b/monitoring.py @@ -1,6 +1,9 @@ import common import settings +import logging +import os +logger = logging.getLogger("cbt") def start(directory): nodes = settings.getnodes('clients', 'osds', 'mons', 'rgws') @@ -24,6 +27,19 @@ def start(directory): # % (blktrace_dir, device, device)) +def start_pbench(directory): + logger.info('Executing Pbench-start-tools') + start_cmd = "/opt/pbench-agent/util-scripts/pbench-start-tools -g default -d %s" % directory + os.system(start_cmd) + +def stop_pbench(directory): + logger.info('Executing Pbench-stop-tools') + stop_cmd = "/opt/pbench-agent/util-scripts/pbench-stop-tools -g default -d %s" % directory + os.system(stop_cmd) + logger.info('Executing Pbench-postprocess-tools') + postprocess_cmd = "/opt/pbench-agent/util-scripts/pbench-postprocess-tools -g default -d %s" % directory + os.system(postprocess_cmd) + def stop(directory=None): nodes = settings.getnodes('clients', 'osds', 'mons', 'rgws') diff --git a/settings.py b/settings.py index 52ee70e3..07be9dd4 100644 --- a/settings.py +++ b/settings.py @@ -3,6 +3,7 @@ import sys import os import logging +import io logger = logging.getLogger("cbt") @@ -16,9 +17,10 @@ def initialize(ctx): config = {} try: - with file(ctx.config_file) as f: - map(config.update, yaml.safe_load_all(f)) - except IOError, e: + file = open(ctx.config_file) + with open(ctx.config_file, 'r') as f: + config.update(yaml.safe_load(f)) + except IOError as e: raise argparse.ArgumentTypeError(str(e)) cluster = config.get('cluster', {})