diff --git a/.gitignore b/.gitignore index 71fce44..2457af6 100644 --- a/.gitignore +++ b/.gitignore @@ -100,4 +100,6 @@ ENV/ # mypy .mypy_cache/ -.vscode/ \ No newline at end of file +.vscode/ + +.idea/ \ No newline at end of file diff --git a/README.md b/README.md index a98cb0a..bd07928 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,14 @@ It should be possible to use the oAuth flow to authenticate to GCP as well: The data will be written to the table specified in your `config.json`. +### JSON Schema compatibility + +Singer taps use JSON Schema to describe/validate the data they produce. +However the [patternProperties](https://json-schema.org/understanding-json-schema/reference/object.html#pattern-properties) +feature of this schema standard is not really applicable to BigQuery JSON schema structure. +As a workaround we simply convert the values of fields using `patternProperties` to string before loading. +These columns can then be worked on with [BigQuery JSON functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions). + --- Copyright © 2018 RealSelf, Inc. diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..abc716f 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -1,28 +1,28 @@ #!/usr/bin/env python3 import argparse +import collections +import decimal +import http.client import io -import sys import json import logging -import collections +import re +import sys import threading -import http.client import urllib -import pkg_resources - -from jsonschema import validate -import singer - -from oauth2client import tools from tempfile import TemporaryFile +import pkg_resources +import singer +from google.api_core import exceptions from google.cloud import bigquery -from google.cloud.bigquery.job import SourceFormat from google.cloud.bigquery import Dataset, WriteDisposition -from google.cloud.bigquery import SchemaField from google.cloud.bigquery import LoadJobConfig -from google.api_core import exceptions +from google.cloud.bigquery import SchemaField +from google.cloud.bigquery.job import SourceFormat +from jsonschema import validate +from oauth2client import tools try: parser = argparse.ArgumentParser(parents=[tools.argparser]) @@ -35,12 +35,13 @@ logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) logger = singer.get_logger() -SCOPES = ['https://www.googleapis.com/auth/bigquery','https://www.googleapis.com/auth/bigquery.insertdata'] +SCOPES = ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/bigquery.insertdata'] CLIENT_SECRET_FILE = 'client_secret.json' APPLICATION_NAME = 'Singer BigQuery Target' StreamMeta = collections.namedtuple('StreamMeta', ['schema', 'key_properties', 'bookmark_properties']) + def emit_state(state): if state is not None: line = json.dumps(state) @@ -48,11 +49,22 @@ def emit_state(state): sys.stdout.write("{}\n".format(line)) sys.stdout.flush() + def clear_dict_hook(items): return {k: v if v is not None else '' for k, v in items} + +def fix_name(name): + name = re.sub('[^a-zA-Z0-9_]', '_', name) + if name[0].isdigit(): + name = "_" + name + if len(name) > 128: + name = name[:128] + return re.sub('[^a-zA-Z0-9_]', '_', name) + + def define_schema(field, name): - schema_name = name + schema_name = fix_name(name) schema_type = "STRING" schema_mode = "NULLABLE" schema_description = None @@ -64,7 +76,7 @@ def define_schema(field, name): schema_mode = 'NULLABLE' else: field = types - + if isinstance(field['type'], list): if field['type'][0] == "null": schema_mode = 'NULLABLE' @@ -74,15 +86,18 @@ def define_schema(field, name): else: schema_type = field['type'] if schema_type == "object": - schema_type = "RECORD" - schema_fields = tuple(build_schema(field)) + if "patternProperties" in field.keys() or "properties" not in field.keys(): + schema_type = "string" + else: + schema_type = "RECORD" + schema_fields = tuple(build_schema(field)) if schema_type == "array": - schema_type = field.get('items').get('type') + items_type = field.get('items').get('type') + schema_type = items_type[-1] if isinstance(items_type, list) else items_type schema_mode = "REPEATED" if schema_type == "object": - schema_type = "RECORD" - schema_fields = tuple(build_schema(field.get('items'))) - + schema_type = "RECORD" + schema_fields = tuple(build_schema(field.get('items'))) if schema_type == "string": if "format" in field: @@ -94,24 +109,62 @@ def define_schema(field, name): return (schema_name, schema_type, schema_mode, schema_description, schema_fields) + def build_schema(schema): SCHEMA = [] + + # if "properties" not in schema: + # print(schema) + for key in schema['properties'].keys(): - + if not (bool(schema['properties'][key])): # if we endup with an empty record. continue - schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key) + schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema( + schema['properties'][key], key) SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields)) return SCHEMA + +def apply_string_conversions(record, schema): + tmp_record = {} + for schema_field in schema: + rec_field = record.get(schema_field.name) + if rec_field: + if schema_field.field_type.upper() == "STRING": + if schema_field.mode == "REPEATED": + tmp_record[schema_field.name] = [str(rec_item) for rec_item in rec_field] + else: + tmp_record[schema_field.name] = str(rec_field) + elif schema_field.field_type.upper() in ["RECORD", "STRUCT"]: + if schema_field.mode == "REPEATED": + tmp_record[schema_field.name] = [apply_string_conversions(rec_item, schema_field.fields) for + rec_item in + rec_field] + else: + tmp_record[schema_field.name] = apply_string_conversions(rec_field, schema_field.fields) + else: + tmp_record[schema_field.name] = rec_field + return tmp_record + + +def apply_decimal_conversions(record): + new_record = {} + for key, value in record.items(): + if isinstance(value, decimal.Decimal): + value = float(value) + new_record[fix_name(key)] = value + return new_record + + def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True): state = None schemas = {} key_properties = {} - tables = {} + bq_schemas = {} rows = {} errors = {} @@ -131,18 +184,21 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida if isinstance(msg, singer.RecordMessage): if msg.stream not in schemas: - raise Exception("A record for stream {} was encountered before a corresponding schema".format(msg.stream)) + raise Exception( + "A record for stream {} was encountered before a corresponding schema".format(msg.stream)) schema = schemas[msg.stream] if validate_records: validate(msg.record, schema) + msg.record = apply_string_conversions(msg.record, bq_schemas[msg.stream]) # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. - dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8') + new_record = apply_decimal_conversions(msg.record) + dat = bytes(json.dumps(new_record) + '\n', 'UTF-8') rows[msg.stream].write(dat) - #rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8')) + # rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8')) state = None @@ -151,10 +207,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties - #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) + bq_schemas[table] = build_schema(schemas[table]) rows[table] = TemporaryFile(mode='w+b') errors[table] = None # try: @@ -170,10 +226,9 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida raise Exception("Unrecognized message {}".format(msg)) for table in rows.keys(): - table_ref = bigquery_client.dataset(dataset_id).table(table) - SCHEMA = build_schema(schemas[table]) + table_ref = bigquery_client.dataset(dataset_id).table(fix_name(table)) load_config = LoadJobConfig() - load_config.schema = SCHEMA + load_config.schema = bq_schemas[table] load_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON if truncate: @@ -186,7 +241,6 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida logger.info("loading job {}".format(load_job.job_id)) logger.info(load_job.result()) - # for table in errors.keys(): # if not errors[table]: # print('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table), tables[table].path) @@ -195,6 +249,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida return state + def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True): state = None schemas = {} @@ -221,13 +276,16 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr if isinstance(msg, singer.RecordMessage): if msg.stream not in schemas: - raise Exception("A record for stream {} was encountered before a corresponding schema".format(msg.stream)) + raise Exception( + "A record for stream {} was encountered before a corresponding schema".format(msg.stream)) schema = schemas[msg.stream] if validate_records: validate(msg.record, schema) + msg.record = apply_string_conversions(msg.record, tables[msg.stream].schema) + msg.record = apply_decimal_conversions(msg.record) errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record]) rows[msg.stream] += 1 @@ -238,7 +296,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) @@ -261,10 +319,11 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path)) emit_state(state) else: - logging.error('Errors:', errors[table], sep=" ") + logging.error('Errors:', str(errors[table])) return state + def collect(): try: version = pkg_resources.get_distribution('target-bigquery').version @@ -283,6 +342,7 @@ def collect(): except: logger.debug('Collection request failed') + def main(): with open(flags.config) as input: config = json.load(input) @@ -303,9 +363,11 @@ def main(): input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') if config.get('stream_data', True): - state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records) + state = persist_lines_stream(config['project_id'], config['dataset_id'], input, + validate_records=validate_records) else: - state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records) + state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, + validate_records=validate_records) emit_state(state) logger.debug("Exiting normally")