diff --git a/conf/ali_sync.yaml b/conf/ali_sync.yaml index 9cd4fda..bf333a5 100644 --- a/conf/ali_sync.yaml +++ b/conf/ali_sync.yaml @@ -28,6 +28,12 @@ CHECK_EXIST: true # 设置当文件已存在时,是否强制覆盖文件,如果否,则当文件大小或文件类型不一致时才会覆盖 FORCE_OVERRIDE: false +# 设置同步的并发线程数 +THREADS_NUM_FOR_SYNC: 3 + +# 限制每个线程同步数据的速率为1Mb/s=1048576B/s,单位:B/s +SYNC_SPEED: 1048576 + # 设置多少个线程同时添加离线任务 THREADS_NUM_FOR_ADD_OFFLINE_TASK: 20 diff --git a/src/ali_sync.py b/src/ali_sync.py index 3c0733a..f4ca4cd 100644 --- a/src/ali_sync.py +++ b/src/ali_sync.py @@ -1,6 +1,7 @@ #!/usr/bin/env python2 # coding:utf-8 +import ssl import base64 import copy import errno @@ -19,6 +20,8 @@ from botocore.client import Config from pykit import jobq +from pykit import http +from pykit import awssign report_state_lock = threading.RLock() @@ -341,6 +344,64 @@ def compare_file(result, th_status): return True +def upload_file(resp_object, result, ali_file_info): + uri = '/%s/%s' % (cnf['BAISHAN_BUCKET_NAME'], result['s3_key']) + verb = 'PUT' + + endpoint = cnf['BAISHAN_ENDPOINT'] + https_ctx = None + port = 80 + if 'http://' in endpoint: + Host = endpoint[len('http://'):] + elif 'https://' in endpoint: + Host = endpoint[len('https://'):] + https_ctx = ssl._create_unverified_context() + port = 443 + else: + Host = endpoint + + headers = { + 'Content-Length': resp_object.content_length, + 'Host': Host, + 'x-amz-acl': cnf['FILE_ACL'], + 'Content-Type': ali_file_info['content_type'], + } + + for k, v in ali_file_info['meta'].items(): + headers['x-amz-meta-' + k] = v + + request = { + 'verb': verb, + 'uri': uri, + 'headers': headers, + } + + sign = awssign.Signer(cnf['BAISHAN_ACCESS_KEY'], cnf['BAISHAN_SECRET_KEY']) + sign.add_auth(request, query_auth=False, expires=120) + + cli = http.Client(Host, port=port, https_context=https_ctx) + cli.send_request(request['uri'], verb, request['headers']) + + send_size = 0 + start_time = time.time() + while True: + buf = resp_object.read(1024 * 1024) + if buf == '': + break + + end_time = time.time() + if cnf['SYNC_SPEED'] is not None: + expect_time = send_size / cnf['SYNC_SPEED'] + act_time = end_time - start_time + time_diff = expect_time - act_time + + if time_diff > 0: + time.sleep(time_diff) + + cli.send_body(buf) + send_size += 1024 * 1024 + + def pipe_file(result, th_status): result['piped'] = True th_status['piped_n'] = th_status.get('piped_n', 0) + 1 @@ -355,19 +416,13 @@ def update_pipe_progress(done_bytes, total_bytes): file_object.key, progress_callback=update_pipe_progress) ali_file_info = validate_and_extract_ali_file_info(resp_object, result) - if ali_file_info == None: + if ali_file_info is None: result['pipe_failed'] = True th_status['pipe_failed_n'] = th_status.get('pipe_failed_n', 0) + 1 return False - extra_args = { - 'ACL': cnf['FILE_ACL'], - 'ContentType': ali_file_info['content_type'], - 'Metadata': ali_file_info['meta'], - } - s3_client.upload_fileobj(resp_object, cnf['BAISHAN_BUCKET_NAME'], - result['s3_key'], ExtraArgs=extra_args) + upload_file(resp_object, result, ali_file_info) result['pipe_succeed'] = True th_status['pipe_succeed_n'] = th_status.get('pipe_succeed_n', 0) + 1 @@ -498,7 +553,7 @@ def update_sync_stat(result): elif 'default_not_override' in result: ali_sync_state['default_not_override'] += 1 - if not 'piped' in result: + if 'piped' not in result: return ali_sync_state['piped'] += 1 @@ -528,7 +583,7 @@ def update_sync_stat(result): ali_sync_state['pipe_succeed'] += 1 ali_sync_state['pipe_succeed_bytes'] += file_object.size - if not 'compared' in result: + if 'compared' not in result: return if 'compare_failed' in result: @@ -697,7 +752,7 @@ def sync(): try: report_sess = {'stop': False} report_th = _thread(report, (report_sess,)) - jobq.run(iter_files(), [(sync_one_file, 3), + jobq.run(iter_files(), [(sync_one_file, cnf['THREADS_NUM_FOR_SYNC']), (update_sync_stat, 1), ])