diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..27bde98 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -52,27 +52,28 @@ def clear_dict_hook(items): return {k: v if v is not None else '' for k, v in items} def define_schema(field, name): + schema_name = name schema_type = "STRING" schema_mode = "NULLABLE" schema_description = None schema_fields = () - if 'type' not in field and 'anyOf' in field: + if field.get("type") is None and 'anyOf' in field: for types in field['anyOf']: - if types['type'] == 'null': + if types.get('type') == 'null': schema_mode = 'NULLABLE' else: field = types - - if isinstance(field['type'], list): - if field['type'][0] == "null": + + if isinstance(field.get('type'), list): + if field.get('type')[0] == "null": schema_mode = 'NULLABLE' else: schema_mode = 'required' - schema_type = field['type'][-1] + schema_type = field.get('type')[1] else: - schema_type = field['type'] + schema_type = field.get('type') if schema_type == "object": schema_type = "RECORD" schema_fields = tuple(build_schema(field)) @@ -86,22 +87,22 @@ def define_schema(field, name): if schema_type == "string": if "format" in field: - if field['format'] == "date-time": + if field.get('format') == "date-time": schema_type = "timestamp" if schema_type == 'number': schema_type = 'FLOAT' + if schema_type is None: + schema_type = "string" + + print(schema_name, schema_type) + return (schema_name, schema_type, schema_mode, schema_description, schema_fields) def build_schema(schema): SCHEMA = [] for key in schema['properties'].keys(): - - if not (bool(schema['properties'][key])): - # if we endup with an empty record. - continue - schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key) SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields)) @@ -151,7 +152,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) @@ -238,7 +239,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) @@ -258,10 +259,9 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr for table in errors.keys(): if not errors[table]: - logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path)) - emit_state(state) + print('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table), tables[table].path) else: - logging.error('Errors:', errors[table], sep=" ") + print('Errors:', errors[table], sep=" ") return state