Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,6 @@ ENV/
# mypy
.mypy_cache/

.vscode/
.vscode/

.idea/
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ It should be possible to use the oAuth flow to authenticate to GCP as well:

The data will be written to the table specified in your `config.json`.

### JSON Schema compatibility

Singer taps use JSON Schema to describe/validate the data they produce.
However the [patternProperties](https://json-schema.org/understanding-json-schema/reference/object.html#pattern-properties)
feature of this schema standard is not really applicable to BigQuery JSON schema structure.
As a workaround we simply convert the values of fields using `patternProperties` to string before loading.
These columns can then be worked on with [BigQuery JSON functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions).

---

Copyright © 2018 RealSelf, Inc.
Expand Down
138 changes: 100 additions & 38 deletions target_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
#!/usr/bin/env python3

import argparse
import collections
import decimal
import http.client
import io
import sys
import json
import logging
import collections
import re
import sys
import threading
import http.client
import urllib
import pkg_resources

from jsonschema import validate
import singer

from oauth2client import tools
from tempfile import TemporaryFile

import pkg_resources
import singer
from google.api_core import exceptions
from google.cloud import bigquery
from google.cloud.bigquery.job import SourceFormat
from google.cloud.bigquery import Dataset, WriteDisposition
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery import LoadJobConfig
from google.api_core import exceptions
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery.job import SourceFormat
from jsonschema import validate
from oauth2client import tools

try:
parser = argparse.ArgumentParser(parents=[tools.argparser])
Expand All @@ -35,24 +35,36 @@
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
logger = singer.get_logger()

SCOPES = ['https://www.googleapis.com/auth/bigquery','https://www.googleapis.com/auth/bigquery.insertdata']
SCOPES = ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/bigquery.insertdata']
CLIENT_SECRET_FILE = 'client_secret.json'
APPLICATION_NAME = 'Singer BigQuery Target'

StreamMeta = collections.namedtuple('StreamMeta', ['schema', 'key_properties', 'bookmark_properties'])


def emit_state(state):
if state is not None:
line = json.dumps(state)
logger.debug('Emitting state {}'.format(line))
sys.stdout.write("{}\n".format(line))
sys.stdout.flush()


def clear_dict_hook(items):
return {k: v if v is not None else '' for k, v in items}


def fix_name(name):
name = re.sub('[^a-zA-Z0-9_]', '_', name)
if name[0].isdigit():
name = "_" + name
if len(name) > 128:
name = name[:128]
return re.sub('[^a-zA-Z0-9_]', '_', name)


def define_schema(field, name):
schema_name = name
schema_name = fix_name(name)
schema_type = "STRING"
schema_mode = "NULLABLE"
schema_description = None
Expand All @@ -64,7 +76,7 @@ def define_schema(field, name):
schema_mode = 'NULLABLE'
else:
field = types

if isinstance(field['type'], list):
if field['type'][0] == "null":
schema_mode = 'NULLABLE'
Expand All @@ -74,15 +86,18 @@ def define_schema(field, name):
else:
schema_type = field['type']
if schema_type == "object":
schema_type = "RECORD"
schema_fields = tuple(build_schema(field))
if "patternProperties" in field.keys() or "properties" not in field.keys():
schema_type = "string"
else:
schema_type = "RECORD"
schema_fields = tuple(build_schema(field))
if schema_type == "array":
schema_type = field.get('items').get('type')
items_type = field.get('items').get('type')
schema_type = items_type[-1] if isinstance(items_type, list) else items_type
schema_mode = "REPEATED"
if schema_type == "object":
schema_type = "RECORD"
schema_fields = tuple(build_schema(field.get('items')))

schema_type = "RECORD"
schema_fields = tuple(build_schema(field.get('items')))

if schema_type == "string":
if "format" in field:
Expand All @@ -94,24 +109,62 @@ def define_schema(field, name):

return (schema_name, schema_type, schema_mode, schema_description, schema_fields)


def build_schema(schema):
SCHEMA = []

# if "properties" not in schema:
# print(schema)

for key in schema['properties'].keys():

if not (bool(schema['properties'][key])):
# if we endup with an empty record.
continue

schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key)
schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(
schema['properties'][key], key)
SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields))

return SCHEMA


def apply_string_conversions(record, schema):
tmp_record = {}
for schema_field in schema:
rec_field = record.get(schema_field.name)
if rec_field:
if schema_field.field_type.upper() == "STRING":
if schema_field.mode == "REPEATED":
tmp_record[schema_field.name] = [str(rec_item) for rec_item in rec_field]
else:
tmp_record[schema_field.name] = str(rec_field)
elif schema_field.field_type.upper() in ["RECORD", "STRUCT"]:
if schema_field.mode == "REPEATED":
tmp_record[schema_field.name] = [apply_string_conversions(rec_item, schema_field.fields) for
rec_item in
rec_field]
else:
tmp_record[schema_field.name] = apply_string_conversions(rec_field, schema_field.fields)
else:
tmp_record[schema_field.name] = rec_field
return tmp_record


def apply_decimal_conversions(record):
new_record = {}
for key, value in record.items():
if isinstance(value, decimal.Decimal):
value = float(value)
new_record[fix_name(key)] = value
return new_record


def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True):
state = None
schemas = {}
key_properties = {}
tables = {}
bq_schemas = {}
rows = {}
errors = {}

Expand All @@ -131,18 +184,21 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida

if isinstance(msg, singer.RecordMessage):
if msg.stream not in schemas:
raise Exception("A record for stream {} was encountered before a corresponding schema".format(msg.stream))
raise Exception(
"A record for stream {} was encountered before a corresponding schema".format(msg.stream))

schema = schemas[msg.stream]

if validate_records:
validate(msg.record, schema)

msg.record = apply_string_conversions(msg.record, bq_schemas[msg.stream])
# NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row.
dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8')
new_record = apply_decimal_conversions(msg.record)
dat = bytes(json.dumps(new_record) + '\n', 'UTF-8')

rows[msg.stream].write(dat)
#rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8'))
# rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8'))

state = None

Expand All @@ -151,10 +207,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida
state = msg.value

elif isinstance(msg, singer.SchemaMessage):
table = msg.stream
table = msg.stream
schemas[table] = msg.schema
key_properties[table] = msg.key_properties
#tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table]))
bq_schemas[table] = build_schema(schemas[table])
rows[table] = TemporaryFile(mode='w+b')
errors[table] = None
# try:
Expand All @@ -170,10 +226,9 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida
raise Exception("Unrecognized message {}".format(msg))

for table in rows.keys():
table_ref = bigquery_client.dataset(dataset_id).table(table)
SCHEMA = build_schema(schemas[table])
table_ref = bigquery_client.dataset(dataset_id).table(fix_name(table))
load_config = LoadJobConfig()
load_config.schema = SCHEMA
load_config.schema = bq_schemas[table]
load_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON

if truncate:
Expand All @@ -186,7 +241,6 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida
logger.info("loading job {}".format(load_job.job_id))
logger.info(load_job.result())


# for table in errors.keys():
# if not errors[table]:
# print('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table), tables[table].path)
Expand All @@ -195,6 +249,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida

return state


def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True):
state = None
schemas = {}
Expand All @@ -221,13 +276,16 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr

if isinstance(msg, singer.RecordMessage):
if msg.stream not in schemas:
raise Exception("A record for stream {} was encountered before a corresponding schema".format(msg.stream))
raise Exception(
"A record for stream {} was encountered before a corresponding schema".format(msg.stream))

schema = schemas[msg.stream]

if validate_records:
validate(msg.record, schema)

msg.record = apply_string_conversions(msg.record, tables[msg.stream].schema)
msg.record = apply_decimal_conversions(msg.record)
errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record])
rows[msg.stream] += 1

Expand All @@ -238,7 +296,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr
state = msg.value

elif isinstance(msg, singer.SchemaMessage):
table = msg.stream
table = msg.stream
schemas[table] = msg.schema
key_properties[table] = msg.key_properties
tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table]))
Expand All @@ -261,10 +319,11 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr
logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path))
emit_state(state)
else:
logging.error('Errors:', errors[table], sep=" ")
logging.error('Errors:', str(errors[table]))

return state


def collect():
try:
version = pkg_resources.get_distribution('target-bigquery').version
Expand All @@ -283,6 +342,7 @@ def collect():
except:
logger.debug('Collection request failed')


def main():
with open(flags.config) as input:
config = json.load(input)
Expand All @@ -303,9 +363,11 @@ def main():
input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')

if config.get('stream_data', True):
state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records)
state = persist_lines_stream(config['project_id'], config['dataset_id'], input,
validate_records=validate_records)
else:
state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records)
state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate,
validate_records=validate_records)

emit_state(state)
logger.debug("Exiting normally")
Expand Down