diff --git a/setup.py b/setup.py index 2c687b8..d8b2858 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='target-bigquery', - version='1.4.0', + version='1.4.4', description='Singer.io target for writing data to Google BigQuery', author='RealSelf Business Intelligence', url='https://github.com/RealSelf/target-bigquery', diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..52417ef 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -66,11 +66,12 @@ def define_schema(field, name): field = types if isinstance(field['type'], list): - if field['type'][0] == "null": - schema_mode = 'NULLABLE' - else: - schema_mode = 'required' + schema_mode = 'NULLABLE' schema_type = field['type'][-1] + if schema_type == 'null': + schema_type = field['type'][0] + if schema_type == 'null': + schema_type = field['type'][1] else: schema_type = field['type'] if schema_type == "object": @@ -79,7 +80,7 @@ def define_schema(field, name): if schema_type == "array": schema_type = field.get('items').get('type') schema_mode = "REPEATED" - if schema_type == "object": + if schema_type == "object" or "object" in schema_type: schema_type = "RECORD" schema_fields = tuple(build_schema(field.get('items'))) @@ -116,7 +117,6 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida errors = {} bigquery_client = bigquery.Client(project=project_id) - # try: # dataset = bigquery_client.create_dataset(Dataset(dataset_ref)) or Dataset(dataset_ref) # except exceptions.Conflict: @@ -228,7 +228,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr if validate_records: validate(msg.record, schema) - errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record]) + errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record], ignore_unknown_values=True) rows[msg.stream] += 1 state = None @@ -261,7 +261,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path)) emit_state(state) else: - logging.error('Errors:', errors[table], sep=" ") + print(errors[table]) return state