From dc0376848795c96b1d15f45f11359d0c8dbb3d73 Mon Sep 17 00:00:00 2001 From: nhanover Date: Thu, 27 Aug 2020 16:48:52 -0400 Subject: [PATCH 1/3] fixed nullable array issues --- setup.py | 2 +- target_bigquery.py | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index 2c687b8..ff24d36 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='target-bigquery', - version='1.4.0', + version='1.4.3', description='Singer.io target for writing data to Google BigQuery', author='RealSelf Business Intelligence', url='https://github.com/RealSelf/target-bigquery', diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..91a5278 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -66,11 +66,12 @@ def define_schema(field, name): field = types if isinstance(field['type'], list): - if field['type'][0] == "null": - schema_mode = 'NULLABLE' - else: - schema_mode = 'required' + schema_mode = 'NULLABLE' schema_type = field['type'][-1] + if schema_type == 'null': + schema_type = field['type'][0] + if schema_type == 'null': + schema_type = field['type'][1] else: schema_type = field['type'] if schema_type == "object": @@ -79,7 +80,7 @@ def define_schema(field, name): if schema_type == "array": schema_type = field.get('items').get('type') schema_mode = "REPEATED" - if schema_type == "object": + if schema_type == "object" or "object" in schema_type: schema_type = "RECORD" schema_fields = tuple(build_schema(field.get('items'))) From 5474c80520bc45d243b09a59db529c8ea7635977 Mon Sep 17 00:00:00 2001 From: nhanover Date: Thu, 27 Aug 2020 17:17:57 -0400 Subject: [PATCH 2/3] changed error handling --- setup.py | 2 +- target_bigquery.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index ff24d36..d8b2858 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='target-bigquery', - version='1.4.3', + version='1.4.4', description='Singer.io target for writing data to Google BigQuery', author='RealSelf Business Intelligence', url='https://github.com/RealSelf/target-bigquery', diff --git a/target_bigquery.py b/target_bigquery.py index 91a5278..3db30a8 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -262,7 +262,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path)) emit_state(state) else: - logging.error('Errors:', errors[table], sep=" ") + console.error('Errors:' + errors[table]) return state From 76999f26c8c31ff370e92b46201cca5ff8b60a39 Mon Sep 17 00:00:00 2001 From: nhanover Date: Thu, 27 Aug 2020 17:27:42 -0400 Subject: [PATCH 3/3] finished ignore unknown values --- target_bigquery.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/target_bigquery.py b/target_bigquery.py index 3db30a8..52417ef 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -117,7 +117,6 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida errors = {} bigquery_client = bigquery.Client(project=project_id) - # try: # dataset = bigquery_client.create_dataset(Dataset(dataset_ref)) or Dataset(dataset_ref) # except exceptions.Conflict: @@ -229,7 +228,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr if validate_records: validate(msg.record, schema) - errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record]) + errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record], ignore_unknown_values=True) rows[msg.stream] += 1 state = None @@ -262,7 +261,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path)) emit_state(state) else: - console.error('Errors:' + errors[table]) + print(errors[table]) return state