diff --git a/botoclient.py b/botoclient.py new file mode 100644 index 0000000..27e7250 --- /dev/null +++ b/botoclient.py @@ -0,0 +1,120 @@ +#!/bin/env python2 +# coding: utf-8 + +import boto3 +from boto3.s3.transfer import TransferConfig +from botocore.client import Config + + +class BotoClient(object): + + def __init__(self, host, access_key, secret_key): + + session = boto3.session.Session() + self.cli = session.client( + 's3', + use_ssl=False, + endpoint_url="http://%s:%s" % (host, 80), + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + config=Config(s3={'addressing_style': 'path'}), + ) + + def boto_put(self, fpath, bucket_name, key_name, extra_args): + + MB = 1024**2 + GB = 1024**3 + + config = TransferConfig(multipart_threshold=1 * GB, + multipart_chunksize=512 * MB, ) + + self.cli.upload_file( + Filename=fpath, + Bucket=bucket_name, + Key=key_name, + Config=config, + ExtraArgs=extra_args, + ) + + def boto_head(self, bucket_name, key_name): + + resp = self.cli.head_object( + Bucket=bucket_name, + Key=key_name, + ) + + return resp + + def boto_copy(self, bucket_name, source_key, key_name): + + resp = self.cli.copy_object( + Bucket=bucket_name, + CopySource={ + 'Bucket': bucket_name, + 'Key': source_key}, + Key=key_name, + ) + + return resp + + def boto_get(self, fpath, bucket_name, key_name): + + obj = self.cli.get_object(Bucket=bucket_name, Key=key_name) + + # obj = { + # 'Body': , + # 'ContentType': 'application/octet-stream', + # 'ResponseMetadata': { + # 'HTTPStatusCode': 200, 'RetryAttempts': 0, 'HostId': '', + # 'RequestId': '00079534-1610-1919-2642-00163e03bb03', + # 'HTTPHeaders': { + # 'access-control-allow-headers': 'Origin, Content-Type, Accept, Content-Length', + # 'access-control-allow-methods': 'GET, PUT, POST, DELETE, OPTIONS, HEAD', + # 'access-control-allow-origin': '*', + # 'access-control-max-age': '31536000', + # 'cache-control': 'max-age=31536000', + # 'connection': 'keep-alive', + # 'content-length': '3508888', + # 'content-type': 'application/octet-stream', + # 'date': 'Wed, 19 Oct 2016 11:26:42 GMT', + # 'etag': '"12619d55847bb120b903b7b7998be1fb"', + # 'last-modified': 'Wed, 19 Oct 2016 11:14:11 GMT', + # 'server': 'openresty/1.9.7.4', + # 'x-amz-meta-md5': '12619d55847bb120b903b7b7998be1fb', + # 'x-amz-meta-s2-crc32': 'c96376ab', + # 'x-amz-meta-s2-size': '3508888' + # 'x-amz-meta-sha1': 'aba30b9b9da5ea743d52c85db7ff82f7c7dc41eb', + # 'x-amz-request-id': '00079534-1610-1919-2642-00163e03bb03', + # 'x-amz-s2-requester': 'drdrxp', + # } + # }, + # 'LastModified': datetime.datetime(2016, 10, 19, 11, 14, 11, tzinfo=tzutc()), + # 'ContentLength': 3508888, + # 'ETag': '"12619d55847bb120b903b7b7998be1fb"', + # 'CacheControl': 'max-age=31536000', + # 'Metadata': { + # 's2-size': '3508888', + # 's2-crc32': 'c96376ab', + # 'sha1': 'aba30b9b9da5ea743d52c85db7ff82f7c7dc41eb', + # 'md5': '12619d55847bb120b903b7b7998be1fb' + # } + # } + + with open(fpath, 'wb') as f: + while True: + buf = obj['Body'].read(1024 * 1024 * 8) + if buf == '': + break + + f.write(buf) + + return obj + + def boto_list(self, bucket_name): + + resp = self.cli.list_objects(Bucket=bucket_name) + + if 'Contents' not in resp: + return [] + else: + return resp['Contents'] diff --git a/mysqlbackup.py b/mysqlbackup.py index 5aeba79..ae28a91 100755 --- a/mysqlbackup.py +++ b/mysqlbackup.py @@ -27,11 +27,9 @@ from pykit import mysqlutil from pykit import timeutil -import boto3 +import botoclient import MySQLdb from boto3.exceptions import S3UploadFailedError -from boto3.s3.transfer import TransferConfig -from botocore.client import Config from botocore.exceptions import ClientError logger = logging.getLogger(__name__) @@ -69,6 +67,10 @@ def __init__(self, bkp_conf): self.mysql_conn_pool = mysqlconnpool.make(self.mysql_addr) + self.bc = botoclient.BotoClient(self.bkp_conf['s3_host'], + self.bkp_conf['s3_access_key'], + self.bkp_conf['s3_secret_key']) + def create_user(self, username, password, host='%', @@ -463,7 +465,7 @@ def backup(self): self.backup_binlog() self.calc_checksum() self.upload_backup() - self.info_r('backup to s3://{s3_bucket}/{s3_key} OK') + self.info_r('backup to s3://{s3_bucket}/{s3_key_by_date} OK') finally: self.remove_backup( 'remove backup {backup_tgz_des3} {backup_data_dir} {backup_binlog_dir}') @@ -568,32 +570,26 @@ def upload_backup(self): self.info('no s3 bucket specified, ignore backup to s3') return - self.info_r('backup to s3://{s3_bucket}/{s3_key} ...') - - bc = boto_client(self.bkp_conf['s3_host'], - self.bkp_conf['s3_access_key'], - self.bkp_conf['s3_secret_key']) + self.info_r('backup to s3://{s3_bucket}/{s3_key_by_date} ...') # boto adds Content-MD5 automatically extra_args = {'Metadata': self.bkp_conf['backup_tgz_des3_meta']} try: - boto_put(bc, - self.render('{backup_tgz_des3}'), - self.render('{s3_bucket}'), - self.render('{s3_key}'), - extra_args - ) + self.bc.boto_put(self.render('{backup_tgz_des3}'), + self.render('{s3_bucket}'), + self.render('{s3_key_by_date}'), + extra_args + ) except S3UploadFailedError as e: self.info(repr(e) + 'while upload {backup_tgz_des3} to s2 cloud') try: - resp = boto_head(bc, - self.render('{s3_bucket}'), - self.render('{s3_key}') - ) + resp = self.bc.boto_head(self.render('{s3_bucket}'), + self.render('{s3_key_by_date}') + ) except ClientError as ee: self.error( repr(ee) + 'backup file: {backup_tgz_des3} not found in s2 cloud') @@ -607,6 +603,15 @@ def upload_backup(self): raise S3UploadFailedError( repr(e) + 'while upload backup file failed') + try: + self.bc.boto_copy(self.render('{s3_bucket}'), + self.render('{s3_key_by_date}'), + self.render('{se_key_by_port}') + ) + except ClientError as e: + self.info( + repr(e) + 'while copy {s3_key_by_date} to {se_key_by_port} failed') + def remove_backup(self, cmd_info): self.shell_run(cmd_info, @@ -729,7 +734,8 @@ def restore_data(self): def download_backup(self): - self.info_r('download backup from s3://{s3_bucket}/{s3_key} ...') + self.info_r( + 'download backup from s3://{s3_bucket}/{s3_key_by_date} ...') try: os.makedirs(self.render('{backup_base}'), mode=0755) @@ -739,14 +745,11 @@ def download_backup(self): else: raise - bc = boto_client(self.bkp_conf['s3_host'], - self.bkp_conf['s3_access_key'], - self.bkp_conf['s3_secret_key']) - resp = boto_get(bc, - self.render('{backup_tgz_des3}'), - self.render('{s3_bucket}'), - self.render('{s3_key}') - ) + resp = self.bc.boto_get( + self.render('{backup_tgz_des3}'), + self.render('{s3_bucket}'), + self.render('{s3_key_by_date}') + ) self.info_r('downloaded backup to {backup_tgz_des3}') @@ -755,7 +758,8 @@ def download_backup(self): self.bkp_conf['backup_tgz_des3_meta'] = meta - self.info_r('download backup from s3://{s3_bucket}/{s3_key} OK') + self.info_r( + 'download backup from s3://{s3_bucket}/{s3_key_by_date} OK') def apply_local_binlog(self): @@ -1035,8 +1039,10 @@ def extend_backup_conf(self, base_backup_conf): ('backup_binlog_tail', "mysql-{port}-binlog"), ('backup_binlog_dir', "{backup_base}/mysql-{port}-binlog"), - ('s3_key', + ('s3_key_by_date', "{date_str}/{port}/{backup_tgz_des3_tail}"), + ('s3_key_by_port', + "{port}/{date_str}/{backup_tgz_des3_tail}"), ('mes', "{host}:{port} [{instance_id}] {mysql_data_dir}"), ] diff --git a/mysqlops.py b/mysqlops.py index 06c4535..f9a61bb 100755 --- a/mysqlops.py +++ b/mysqlops.py @@ -10,8 +10,10 @@ from pykit import humannum from pykit import jobq from pykit import logutil +from pykit import timeutil from pykit import utfjson +import botoclient import mysqlbackup logger = logging.getLogger(__name__) @@ -47,6 +49,7 @@ 'setup_replication', 'table_size', 'user', + 'check_backup_file', ], help='command to run') parser.add_argument('--ports', type=int, required=False, nargs='*', help='ports to run "cmd" on') @@ -109,6 +112,76 @@ def setdef(dic, key, v): if v is not None: dic[key] = v + def load_port_conf(port): + + conf_path = '{conf_base}/{port}/{conf_fn}.yaml'.format( + conf_base=args.conf_base, + conf_fn=args.conf_fn, + port=port) + + conf = mysqlbackup.load_conf_from_file(conf_path) + + setdef(conf, 'clean_after_restore', args.clean_after_restore) + + conf.setdefault('date_str', date_str) + setdef(conf, 'date_str', args.date_str) + + return conf + + def get_latest_backup_date(port): + + conf = load_port_conf(port) + + bc = botoclient.BotoClient( + conf['s3_host'], conf['s3_access_key'], conf['s3_secret_key']) + + backup_files = bc.boto_list(conf['s3_bucket']) + + date_ts = [] + + for bf in backup_files: + # the backup file has two formats key in s2, + # {date_str}/{port}/{backup_tgz_des3_tail} and {port}/{date_str}/{backup_tgz_des3_tail} + # We use the first format to get date_str + _date_str, _port, _backup_tgz = bf['Key'].split('/') + + if _port == str(port): + + date_ts.append(timeutil.parse_to_ts(_date_str, "%Y_%m_%d")) + + if len(date_ts) != 0: + latest_date_str = timeutil.format_ts(max(date_ts), "%Y_%m_%d") + else: + latest_date_str = None + + return latest_date_str + + def check_backup_file(ports): + + for port in ports: + + conf = load_port_conf(port) + + latest_date_str = get_latest_backup_date(port) + + if latest_date_str is not None: + + specified_date_ts = timeutil.parse_to_ts( + conf['date_str'], "%Y_%m_%d") + latest_date_ts = timeutil.parse_to_ts( + latest_date_str, "%Y_%m_%d") + + if specified_date_ts > latest_date_ts: + raise ValueError('port: {p} backup file:{d} is not found in s2'.format( + p=repr(port), d=repr(conf['date_str']))) + else: + setdef(conf, 'date_str', latest_date_str) + logger.info('port: {p} backup file:{d} is ok'.format( + p=repr(port), d=repr(conf['date_str']))) + else: + raise ValueError( + 'port: {p} is found any backup file is s2'.format(p=repr(port))) + def worker(port): try: @@ -121,17 +194,7 @@ def worker(port): def _worker(port): - conf_path = '{conf_base}/{port}/{conf_fn}.yaml'.format( - conf_base=args.conf_base, - conf_fn=args.conf_fn, - port=port) - - conf = mysqlbackup.load_conf_from_file(conf_path) - - setdef(conf, 'date_str', args.date_str) - setdef(conf, 'clean_after_restore', args.clean_after_restore) - - conf.setdefault('date_str', date_str) + conf = load_port_conf(port) mb = mysqlbackup.MysqlBackup(conf) @@ -234,7 +297,6 @@ def _out(): host=args.host, privileges=[args.privilege.split(':', 1)], binlog=(args.binlog == 1)) - else: raise ValueError('unsupported command: ' + repr(cmd)) @@ -246,15 +308,21 @@ def output(rst): else: print json.dumps(rst, indent=2) - jm = jobq.JobManager([(worker, args.jobs), - (output, 1)]) + need_check_backup_cmds = ['check_backup_file', + 'restore_from_backup', 'restore'] + + if cmd in need_check_backup_cmds: + check_backup_file(ports) + else: + jm = jobq.JobManager([(worker, args.jobs), + (output, 1)]) - for port in ports: - jm.put(port) + for port in ports: + jm.put(port) - jm.join() + jm.join() - if len(rsts) == len(ports): - sys.exit(0) - else: - sys.exit(1) + if len(rsts) == len(ports): + sys.exit(0) + else: + sys.exit(1)