From ee8744ff37b6b6b0f97f30b4b32eb4b3615a8b64 Mon Sep 17 00:00:00 2001 From: "kaili.xu" Date: Thu, 3 Sep 2020 11:21:00 +0800 Subject: [PATCH 1/5] add functions to ali_sync.py: configurable number of synchronization threads and limit the rate of synchronization data, and modify the corresponding configuration file --- conf/ali_sync.yaml | 6 ++++++ src/ali_sync.py | 48 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/conf/ali_sync.yaml b/conf/ali_sync.yaml index 9cd4fda..bf333a5 100644 --- a/conf/ali_sync.yaml +++ b/conf/ali_sync.yaml @@ -28,6 +28,12 @@ CHECK_EXIST: true # 设置当文件已存在时,是否强制覆盖文件,如果否,则当文件大小或文件类型不一致时才会覆盖 FORCE_OVERRIDE: false +# 设置同步的并发线程数 +THREADS_NUM_FOR_SYNC: 3 + +# 限制每个线程同步数据的速率为1Mb/s=1048576B/s,单位:B/s +SYNC_SPEED: 1048576 + # 设置多少个线程同时添加离线任务 THREADS_NUM_FOR_ADD_OFFLINE_TASK: 20 diff --git a/src/ali_sync.py b/src/ali_sync.py index 3c0733a..c7d1c50 100644 --- a/src/ali_sync.py +++ b/src/ali_sync.py @@ -19,6 +19,8 @@ from botocore.client import Config from pykit import jobq +from pykit import http +from pykit import awssign report_state_lock = threading.RLock() @@ -341,6 +343,46 @@ def compare_file(result, th_status): return True +def upload_file(resp_object, result): + uri = '/%s/%s' % (cnf['BAISHAN_BUCKET_NAME'], result['s3_key']) + verb = 'PUT' + headers = { + 'Content-Length': resp_object.content_length, + 'Host': cnf['BAISHAN_ENDPOINT'][7:] + } + + request = { + 'verb': verb, + 'uri': uri, + 'headers': headers, + } + sign = awssign.Signer(cnf['BAISHAN_ACCESS_KEEY'], cnf['BAISHAN_SECRET_KEY']) + sign.add_auth(request, query_auth=False, expires=120) + + cli = http.Client(cnf['BAISHAN_ENDPOINT'][7:], port=80) + cli.send_request(request['uri'], verb, request['headers']) + + send_size = 0 + start_time = time.time() + while True: + buf = resp_object.read(1024 * 1024) + if buf == '': + break + + cli.send_body(buf) + send_size += 1024 * 1024 + end_time = time.time() + + expect_time = send_size / cnf['SYNC_SPEED'] + act_time = end_time - start_time + time_diff = expect_time - act_time + if time_diff > 0: + time.sleep(time_diff) + + cli.read_response() + cli.status == 200 + + def pipe_file(result, th_status): result['piped'] = True th_status['piped_n'] = th_status.get('piped_n', 0) + 1 @@ -355,7 +397,7 @@ def update_pipe_progress(done_bytes, total_bytes): file_object.key, progress_callback=update_pipe_progress) ali_file_info = validate_and_extract_ali_file_info(resp_object, result) - if ali_file_info == None: + if ali_file_info is None: result['pipe_failed'] = True th_status['pipe_failed_n'] = th_status.get('pipe_failed_n', 0) + 1 @@ -498,7 +540,7 @@ def update_sync_stat(result): elif 'default_not_override' in result: ali_sync_state['default_not_override'] += 1 - if not 'piped' in result: + if 'piped' not in result: return ali_sync_state['piped'] += 1 @@ -528,7 +570,7 @@ def update_sync_stat(result): ali_sync_state['pipe_succeed'] += 1 ali_sync_state['pipe_succeed_bytes'] += file_object.size - if not 'compared' in result: + if 'compared' not in result: return if 'compare_failed' in result: From 9d032c179604c00bc6f73d82e5df20d2b76cbb1d Mon Sep 17 00:00:00 2001 From: "kaili.xu" Date: Fri, 4 Sep 2020 12:55:43 +0800 Subject: [PATCH 2/5] modification of the function of limiting synchronization rate --- src/ali_sync.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/ali_sync.py b/src/ali_sync.py index c7d1c50..95eead4 100644 --- a/src/ali_sync.py +++ b/src/ali_sync.py @@ -346,9 +346,16 @@ def compare_file(result, th_status): def upload_file(resp_object, result): uri = '/%s/%s' % (cnf['BAISHAN_BUCKET_NAME'], result['s3_key']) verb = 'PUT' + + endpoint = cnf['BAISHAN_ENDPOINT'] + if 'http://' in endpoint: + Host = endpoint[len('http://'):] + else: + Host = endpoint[len('https://'):] + headers = { 'Content-Length': resp_object.content_length, - 'Host': cnf['BAISHAN_ENDPOINT'][7:] + 'Host': Host, } request = { @@ -356,31 +363,30 @@ def upload_file(resp_object, result): 'uri': uri, 'headers': headers, } + sign = awssign.Signer(cnf['BAISHAN_ACCESS_KEEY'], cnf['BAISHAN_SECRET_KEY']) sign.add_auth(request, query_auth=False, expires=120) - cli = http.Client(cnf['BAISHAN_ENDPOINT'][7:], port=80) + cli = http.Client(Host, port=80) cli.send_request(request['uri'], verb, request['headers']) send_size = 0 - start_time = time.time() while True: + start_time = time.time() buf = resp_object.read(1024 * 1024) - if buf == '': - break + end_time = time.time() + expect_time = send_size / cnf['SYNC_SPEED'] + act_time = end_time - start_time + time_diff = expect_time - act_time + + if time_diff > 0: + time.sleep(time_diff) cli.send_body(buf) send_size += 1024 * 1024 - end_time = time.time() - expect_time = send_size / cnf['SYNC_SPEED'] - act_time = end_time - start_time - time_diff = expect_time - act_time - if time_diff > 0: - time.sleep(time_diff) - - cli.read_response() - cli.status == 200 + if buf == '': + break def pipe_file(result, th_status): @@ -391,6 +397,7 @@ def update_pipe_progress(done_bytes, total_bytes): th_status['pipe_progress'] = (done_bytes, total_bytes) file_object = result['file_object'] + upload_file(file_object, result) try: resp_object = oss2_bucket.get_object( @@ -739,7 +746,7 @@ def sync(): try: report_sess = {'stop': False} report_th = _thread(report, (report_sess,)) - jobq.run(iter_files(), [(sync_one_file, 3), + jobq.run(iter_files(), [(sync_one_file, cnf['THREADS_NUM_FOR_SYNC']), (update_sync_stat, 1), ]) From 1a84c1bac81e148be91a3c754f9cd7a1e34fe35b Mon Sep 17 00:00:00 2001 From: "kaili.xu" Date: Tue, 8 Sep 2020 13:36:12 +0800 Subject: [PATCH 3/5] modify upload function --- src/ali_sync.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/src/ali_sync.py b/src/ali_sync.py index 95eead4..be78745 100644 --- a/src/ali_sync.py +++ b/src/ali_sync.py @@ -371,23 +371,24 @@ def upload_file(resp_object, result): cli.send_request(request['uri'], verb, request['headers']) send_size = 0 + start_time = time.time() while True: - start_time = time.time() buf = resp_object.read(1024 * 1024) + if buf == '': + break + end_time = time.time() - expect_time = send_size / cnf['SYNC_SPEED'] - act_time = end_time - start_time - time_diff = expect_time - act_time + if cnf['SYNC_SPEED'] is not None: + expect_time = send_size / cnf['SYNC_SPEED'] + act_time = end_time - start_time + time_diff = expect_time - act_time - if time_diff > 0: - time.sleep(time_diff) + if time_diff > 0: + time.sleep(time_diff) cli.send_body(buf) send_size += 1024 * 1024 - if buf == '': - break - def pipe_file(result, th_status): result['piped'] = True @@ -397,7 +398,6 @@ def update_pipe_progress(done_bytes, total_bytes): th_status['pipe_progress'] = (done_bytes, total_bytes) file_object = result['file_object'] - upload_file(file_object, result) try: resp_object = oss2_bucket.get_object( @@ -410,14 +410,7 @@ def update_pipe_progress(done_bytes, total_bytes): th_status['pipe_failed_n'] = th_status.get('pipe_failed_n', 0) + 1 return False - extra_args = { - 'ACL': cnf['FILE_ACL'], - 'ContentType': ali_file_info['content_type'], - 'Metadata': ali_file_info['meta'], - } - s3_client.upload_fileobj(resp_object, cnf['BAISHAN_BUCKET_NAME'], - result['s3_key'], ExtraArgs=extra_args) - + upload_file(resp_object, result) result['pipe_succeed'] = True th_status['pipe_succeed_n'] = th_status.get('pipe_succeed_n', 0) + 1 From ba88f5bb728f0f2f3b2fc5ec7d6dde4455816f27 Mon Sep 17 00:00:00 2001 From: "kaili.xu" Date: Wed, 9 Sep 2020 11:21:25 +0800 Subject: [PATCH 4/5] add extra args and modify --- src/ali_sync.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/ali_sync.py b/src/ali_sync.py index be78745..0b51f8d 100644 --- a/src/ali_sync.py +++ b/src/ali_sync.py @@ -1,6 +1,7 @@ #!/usr/bin/env python2 # coding:utf-8 +import ssl import base64 import copy import errno @@ -343,19 +344,37 @@ def compare_file(result, th_status): return True -def upload_file(resp_object, result): +def upload_file(resp_object, result, ali_file_info): uri = '/%s/%s' % (cnf['BAISHAN_BUCKET_NAME'], result['s3_key']) verb = 'PUT' endpoint = cnf['BAISHAN_ENDPOINT'] + https_ctx = None + port = 80 if 'http://' in endpoint: Host = endpoint[len('http://'):] - else: + elif 'https://' in endpoint: Host = endpoint[len('https://'):] + https_ctx = ssl._create_unverified_context() + port = 443 + else: + Host = endpoint + + extra_args = { + 'ACL': cnf['FILE_ACL'], + 'ContentType': ali_file_info['content_type'], + 'Metadata': ali_file_info['meta'], + } + + for k, v in extra_args['Metadata'].items(): + extra_args['Metadata'] = v headers = { 'Content-Length': resp_object.content_length, 'Host': Host, + 'x-amz-acl': extra_args['ACL'], + 'Content-Type': extra_args['ContentType'], + 'x-amz-meta-k': extra_args['Metadata'], } request = { @@ -364,10 +383,10 @@ def upload_file(resp_object, result): 'headers': headers, } - sign = awssign.Signer(cnf['BAISHAN_ACCESS_KEEY'], cnf['BAISHAN_SECRET_KEY']) + sign = awssign.Signer(cnf['BAISHAN_ACCESS_KEY'], cnf['BAISHAN_SECRET_KEY']) sign.add_auth(request, query_auth=False, expires=120) - cli = http.Client(Host, port=80) + cli = http.Client(Host, port=port, https_context=https_ctx) cli.send_request(request['uri'], verb, request['headers']) send_size = 0 @@ -410,7 +429,8 @@ def update_pipe_progress(done_bytes, total_bytes): th_status['pipe_failed_n'] = th_status.get('pipe_failed_n', 0) + 1 return False - upload_file(resp_object, result) + upload_file(resp_object, result, ali_file_info) + result['pipe_succeed'] = True th_status['pipe_succeed_n'] = th_status.get('pipe_succeed_n', 0) + 1 From 1fcd1ce41356406372ab3cf66797e1db1c3879e2 Mon Sep 17 00:00:00 2001 From: "kaili.xu" Date: Wed, 9 Sep 2020 14:05:42 +0800 Subject: [PATCH 5/5] modify headers information --- src/ali_sync.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/ali_sync.py b/src/ali_sync.py index 0b51f8d..f4ca4cd 100644 --- a/src/ali_sync.py +++ b/src/ali_sync.py @@ -360,23 +360,16 @@ def upload_file(resp_object, result, ali_file_info): else: Host = endpoint - extra_args = { - 'ACL': cnf['FILE_ACL'], - 'ContentType': ali_file_info['content_type'], - 'Metadata': ali_file_info['meta'], - } - - for k, v in extra_args['Metadata'].items(): - extra_args['Metadata'] = v - headers = { 'Content-Length': resp_object.content_length, 'Host': Host, - 'x-amz-acl': extra_args['ACL'], - 'Content-Type': extra_args['ContentType'], - 'x-amz-meta-k': extra_args['Metadata'], + 'x-amz-acl': cnf['FILE_ACL'], + 'Content-Type': ali_file_info['content_type'], } + for k, v in ali_file_info['meta'].items(): + headers['x-amz-meta-' + k] = v + request = { 'verb': verb, 'uri': uri,