From f3950184b8fe398968185fe91780b21fb8706cb5 Mon Sep 17 00:00:00 2001 From: Yusuf Fahry Date: Sun, 27 Sep 2020 23:55:50 +0700 Subject: [PATCH 1/2] Fix schema generation process --- target_bigquery.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..5c776b6 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -52,27 +52,28 @@ def clear_dict_hook(items): return {k: v if v is not None else '' for k, v in items} def define_schema(field, name): + schema_name = name schema_type = "STRING" schema_mode = "NULLABLE" schema_description = None schema_fields = () - if 'type' not in field and 'anyOf' in field: + if field.get("type") is None and 'anyOf' in field: for types in field['anyOf']: - if types['type'] == 'null': + if types.get('type') == 'null': schema_mode = 'NULLABLE' else: field = types - - if isinstance(field['type'], list): - if field['type'][0] == "null": + + if isinstance(field.get('type'), list): + if field.get('type')[0] == "null": schema_mode = 'NULLABLE' else: schema_mode = 'required' - schema_type = field['type'][-1] + schema_type = field.get('type')[1] else: - schema_type = field['type'] + schema_type = field.get('type') if schema_type == "object": schema_type = "RECORD" schema_fields = tuple(build_schema(field)) @@ -83,25 +84,25 @@ def define_schema(field, name): schema_type = "RECORD" schema_fields = tuple(build_schema(field.get('items'))) + if schema_type == None: + schema_type = "string" + if schema_type == "string": if "format" in field: - if field['format'] == "date-time": + if field.get('format') == "date-time": schema_type = "timestamp" if schema_type == 'number': schema_type = 'FLOAT' + print(schema_name, schema_type) + return (schema_name, schema_type, schema_mode, schema_description, schema_fields) def build_schema(schema): SCHEMA = [] for key in schema['properties'].keys(): - - if not (bool(schema['properties'][key])): - # if we endup with an empty record. - continue - schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key) SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields)) @@ -151,7 +152,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) @@ -238,7 +239,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) @@ -258,10 +259,9 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr for table in errors.keys(): if not errors[table]: - logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path)) - emit_state(state) + print('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table), tables[table].path) else: - logging.error('Errors:', errors[table], sep=" ") + print('Errors:', errors[table], sep=" ") return state From 938847e79d08347f54337b9b35d6f8edeb4fbc0c Mon Sep 17 00:00:00 2001 From: Yusuf Fahry Date: Sun, 27 Sep 2020 23:57:57 +0700 Subject: [PATCH 2/2] Relocating default --- target_bigquery.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/target_bigquery.py b/target_bigquery.py index 5c776b6..27bde98 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -84,9 +84,6 @@ def define_schema(field, name): schema_type = "RECORD" schema_fields = tuple(build_schema(field.get('items'))) - if schema_type == None: - schema_type = "string" - if schema_type == "string": if "format" in field: @@ -96,6 +93,9 @@ def define_schema(field, name): if schema_type == 'number': schema_type = 'FLOAT' + if schema_type is None: + schema_type = "string" + print(schema_name, schema_type) return (schema_name, schema_type, schema_mode, schema_description, schema_fields)