Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions target_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,28 @@ def clear_dict_hook(items):
return {k: v if v is not None else '' for k, v in items}

def define_schema(field, name):

schema_name = name
schema_type = "STRING"
schema_mode = "NULLABLE"
schema_description = None
schema_fields = ()

if 'type' not in field and 'anyOf' in field:
if field.get("type") is None and 'anyOf' in field:
for types in field['anyOf']:
if types['type'] == 'null':
if types.get('type') == 'null':
schema_mode = 'NULLABLE'
else:
field = types
if isinstance(field['type'], list):
if field['type'][0] == "null":

if isinstance(field.get('type'), list):
if field.get('type')[0] == "null":
schema_mode = 'NULLABLE'
else:
schema_mode = 'required'
schema_type = field['type'][-1]
schema_type = field.get('type')[1]
else:
schema_type = field['type']
schema_type = field.get('type')
if schema_type == "object":
schema_type = "RECORD"
schema_fields = tuple(build_schema(field))
Expand All @@ -86,22 +87,22 @@ def define_schema(field, name):

if schema_type == "string":
if "format" in field:
if field['format'] == "date-time":
if field.get('format') == "date-time":
schema_type = "timestamp"

if schema_type == 'number':
schema_type = 'FLOAT'

if schema_type is None:
schema_type = "string"

print(schema_name, schema_type)

return (schema_name, schema_type, schema_mode, schema_description, schema_fields)

def build_schema(schema):
SCHEMA = []
for key in schema['properties'].keys():

if not (bool(schema['properties'][key])):
# if we endup with an empty record.
continue

schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key)
SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields))

Expand Down Expand Up @@ -151,7 +152,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida
state = msg.value

elif isinstance(msg, singer.SchemaMessage):
table = msg.stream
table = msg.stream
schemas[table] = msg.schema
key_properties[table] = msg.key_properties
#tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table]))
Expand Down Expand Up @@ -238,7 +239,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr
state = msg.value

elif isinstance(msg, singer.SchemaMessage):
table = msg.stream
table = msg.stream
schemas[table] = msg.schema
key_properties[table] = msg.key_properties
tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table]))
Expand All @@ -258,10 +259,9 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr

for table in errors.keys():
if not errors[table]:
logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path))
emit_state(state)
print('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table), tables[table].path)
else:
logging.error('Errors:', errors[table], sep=" ")
print('Errors:', errors[table], sep=" ")

return state

Expand Down