Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions botoclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/bin/env python2
# coding: utf-8

import boto3
from boto3.s3.transfer import TransferConfig
from botocore.client import Config


class BotoClient(object):

def __init__(self, host, access_key, secret_key):

session = boto3.session.Session()
self.cli = session.client(
's3',
use_ssl=False,
endpoint_url="http://%s:%s" % (host, 80),
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(s3={'addressing_style': 'path'}),
)

def boto_put(self, fpath, bucket_name, key_name, extra_args):

MB = 1024**2
GB = 1024**3

config = TransferConfig(multipart_threshold=1 * GB,
multipart_chunksize=512 * MB, )
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multipart_chunksize 由原来的256MB 改成了 512MB
原因:之前发生过备份备份文件失败的情况,经过排查,发现是由于备份文件大,导致分配数量超过了限制。所以先将分片大小调大一些

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

˙º˙


self.cli.upload_file(
Filename=fpath,
Bucket=bucket_name,
Key=key_name,
Config=config,
ExtraArgs=extra_args,
)

def boto_head(self, bucket_name, key_name):

resp = self.cli.head_object(
Bucket=bucket_name,
Key=key_name,
)

return resp

def boto_copy(self, bucket_name, source_key, key_name):

resp = self.cli.copy_object(
Bucket=bucket_name,
CopySource={
'Bucket': bucket_name,
'Key': source_key},
Key=key_name,
)

return resp

def boto_get(self, fpath, bucket_name, key_name):

obj = self.cli.get_object(Bucket=bucket_name, Key=key_name)

# obj = {
# 'Body': <botocore.response.StreamingBody object at 0x1bc7d10>,
# 'ContentType': 'application/octet-stream',
# 'ResponseMetadata': {
# 'HTTPStatusCode': 200, 'RetryAttempts': 0, 'HostId': '',
# 'RequestId': '00079534-1610-1919-2642-00163e03bb03',
# 'HTTPHeaders': {
# 'access-control-allow-headers': 'Origin, Content-Type, Accept, Content-Length',
# 'access-control-allow-methods': 'GET, PUT, POST, DELETE, OPTIONS, HEAD',
# 'access-control-allow-origin': '*',
# 'access-control-max-age': '31536000',
# 'cache-control': 'max-age=31536000',
# 'connection': 'keep-alive',
# 'content-length': '3508888',
# 'content-type': 'application/octet-stream',
# 'date': 'Wed, 19 Oct 2016 11:26:42 GMT',
# 'etag': '"12619d55847bb120b903b7b7998be1fb"',
# 'last-modified': 'Wed, 19 Oct 2016 11:14:11 GMT',
# 'server': 'openresty/1.9.7.4',
# 'x-amz-meta-md5': '12619d55847bb120b903b7b7998be1fb',
# 'x-amz-meta-s2-crc32': 'c96376ab',
# 'x-amz-meta-s2-size': '3508888'
# 'x-amz-meta-sha1': 'aba30b9b9da5ea743d52c85db7ff82f7c7dc41eb',
# 'x-amz-request-id': '00079534-1610-1919-2642-00163e03bb03',
# 'x-amz-s2-requester': 'drdrxp',
# }
# },
# 'LastModified': datetime.datetime(2016, 10, 19, 11, 14, 11, tzinfo=tzutc()),
# 'ContentLength': 3508888,
# 'ETag': '"12619d55847bb120b903b7b7998be1fb"',
# 'CacheControl': 'max-age=31536000',
# 'Metadata': {
# 's2-size': '3508888',
# 's2-crc32': 'c96376ab',
# 'sha1': 'aba30b9b9da5ea743d52c85db7ff82f7c7dc41eb',
# 'md5': '12619d55847bb120b903b7b7998be1fb'
# }
# }

with open(fpath, 'wb') as f:
while True:
buf = obj['Body'].read(1024 * 1024 * 8)
if buf == '':
break

f.write(buf)

return obj

def boto_list(self, bucket_name):

resp = self.cli.list_objects(Bucket=bucket_name)

if 'Contents' not in resp:
return []
else:
return resp['Contents']
66 changes: 36 additions & 30 deletions mysqlbackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@
from pykit import mysqlutil
from pykit import timeutil

import boto3
import botoclient
import MySQLdb
from boto3.exceptions import S3UploadFailedError
from boto3.s3.transfer import TransferConfig
from botocore.client import Config
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -69,6 +67,10 @@ def __init__(self, bkp_conf):

self.mysql_conn_pool = mysqlconnpool.make(self.mysql_addr)

self.bc = botoclient.BotoClient(self.bkp_conf['s3_host'],
self.bkp_conf['s3_access_key'],
self.bkp_conf['s3_secret_key'])

def create_user(self,
username, password,
host='%',
Expand Down Expand Up @@ -463,7 +465,7 @@ def backup(self):
self.backup_binlog()
self.calc_checksum()
self.upload_backup()
self.info_r('backup to s3://{s3_bucket}/{s3_key} OK')
self.info_r('backup to s3://{s3_bucket}/{s3_key_by_date} OK')
finally:
self.remove_backup(
'remove backup {backup_tgz_des3} {backup_data_dir} {backup_binlog_dir}')
Expand Down Expand Up @@ -568,32 +570,26 @@ def upload_backup(self):
self.info('no s3 bucket specified, ignore backup to s3')
return

self.info_r('backup to s3://{s3_bucket}/{s3_key} ...')

bc = boto_client(self.bkp_conf['s3_host'],
self.bkp_conf['s3_access_key'],
self.bkp_conf['s3_secret_key'])
self.info_r('backup to s3://{s3_bucket}/{s3_key_by_date} ...')

# boto adds Content-MD5 automatically
extra_args = {'Metadata': self.bkp_conf['backup_tgz_des3_meta']}

try:
boto_put(bc,
self.render('{backup_tgz_des3}'),
self.render('{s3_bucket}'),
self.render('{s3_key}'),
extra_args
)
self.bc.boto_put(self.render('{backup_tgz_des3}'),
self.render('{s3_bucket}'),
self.render('{s3_key_by_date}'),
extra_args
)

except S3UploadFailedError as e:

self.info(repr(e) + 'while upload {backup_tgz_des3} to s2 cloud')

try:
resp = boto_head(bc,
self.render('{s3_bucket}'),
self.render('{s3_key}')
)
resp = self.bc.boto_head(self.render('{s3_bucket}'),
self.render('{s3_key_by_date}')
)
except ClientError as ee:
self.error(
repr(ee) + 'backup file: {backup_tgz_des3} not found in s2 cloud')
Expand All @@ -607,6 +603,15 @@ def upload_backup(self):
raise S3UploadFailedError(
repr(e) + 'while upload backup file failed')

try:
self.bc.boto_copy(self.render('{s3_bucket}'),
self.render('{s3_key_by_date}'),
self.render('{se_key_by_port}')
)
except ClientError as e:
self.info(
repr(e) + 'while copy {s3_key_by_date} to {se_key_by_port} failed')

def remove_backup(self, cmd_info):

self.shell_run(cmd_info,
Expand Down Expand Up @@ -729,7 +734,8 @@ def restore_data(self):

def download_backup(self):

self.info_r('download backup from s3://{s3_bucket}/{s3_key} ...')
self.info_r(
'download backup from s3://{s3_bucket}/{s3_key_by_date} ...')

try:
os.makedirs(self.render('{backup_base}'), mode=0755)
Expand All @@ -739,14 +745,11 @@ def download_backup(self):
else:
raise

bc = boto_client(self.bkp_conf['s3_host'],
self.bkp_conf['s3_access_key'],
self.bkp_conf['s3_secret_key'])
resp = boto_get(bc,
self.render('{backup_tgz_des3}'),
self.render('{s3_bucket}'),
self.render('{s3_key}')
)
resp = self.bc.boto_get(
self.render('{backup_tgz_des3}'),
self.render('{s3_bucket}'),
self.render('{s3_key_by_date}')
)

self.info_r('downloaded backup to {backup_tgz_des3}')

Expand All @@ -755,7 +758,8 @@ def download_backup(self):

self.bkp_conf['backup_tgz_des3_meta'] = meta

self.info_r('download backup from s3://{s3_bucket}/{s3_key} OK')
self.info_r(
'download backup from s3://{s3_bucket}/{s3_key_by_date} OK')

def apply_local_binlog(self):

Expand Down Expand Up @@ -1035,8 +1039,10 @@ def extend_backup_conf(self, base_backup_conf):
('backup_binlog_tail', "mysql-{port}-binlog"),
('backup_binlog_dir', "{backup_base}/mysql-{port}-binlog"),

('s3_key',
('s3_key_by_date',
"{date_str}/{port}/{backup_tgz_des3_tail}"),
('s3_key_by_port',
"{port}/{date_str}/{backup_tgz_des3_tail}"),
('mes',
"{host}:{port} [{instance_id}] {mysql_data_dir}"),
]
Expand Down
Loading