From 52827f86e1dd38368a63dd608eca9c420a41eb4c Mon Sep 17 00:00:00 2001 From: slawekrewaj Date: Tue, 17 Sep 2019 16:12:24 +0200 Subject: [PATCH 1/3] Supporting illegal BigQuery characters in table and field names --- target_bigquery.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..fff724b 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -10,6 +10,7 @@ import http.client import urllib import pkg_resources +import re from jsonschema import validate import singer @@ -51,8 +52,11 @@ def emit_state(state): def clear_dict_hook(items): return {k: v if v is not None else '' for k, v in items} +def fix_name(name): + return re.sub('[^a-zA-Z0-9_]', '_', name) + def define_schema(field, name): - schema_name = name + schema_name = fix_name(name) schema_type = "STRING" schema_mode = "NULLABLE" schema_description = None @@ -139,7 +143,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida validate(msg.record, schema) # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. - dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8') + new_record = {} + for key, value in msg.record.items(): + new_record[fix_name(key)] = value + dat = bytes(json.dumps(new_record) + '\n', 'UTF-8') rows[msg.stream].write(dat) #rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8')) @@ -170,7 +177,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida raise Exception("Unrecognized message {}".format(msg)) for table in rows.keys(): - table_ref = bigquery_client.dataset(dataset_id).table(table) + table_ref = bigquery_client.dataset(dataset_id).table(fix_name(table)) SCHEMA = build_schema(schemas[table]) load_config = LoadJobConfig() load_config.schema = SCHEMA From a041ed6174cbfaa2d04a3ad0af4aa801fdf29c91 Mon Sep 17 00:00:00 2001 From: slawekrewaj Date: Wed, 2 Oct 2019 14:01:51 +0200 Subject: [PATCH 2/3] Fixing a regression caused by singer.parse_message returning Decimal values instead of float --- target_bigquery.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/target_bigquery.py b/target_bigquery.py index fff724b..c263524 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import argparse +import decimal import io import sys import json @@ -145,6 +146,8 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. new_record = {} for key, value in msg.record.items(): + if isinstance(value, decimal.Decimal): + value = float(value) new_record[fix_name(key)] = value dat = bytes(json.dumps(new_record) + '\n', 'UTF-8') From 13f3a41ceda189d256c46ebfe7ce03fa373cce60 Mon Sep 17 00:00:00 2001 From: meverg Date: Mon, 27 Jan 2020 17:27:55 +0300 Subject: [PATCH 3/3] Make compatible with json schema pattern properties --- .gitignore | 4 +- README.md | 8 +++ target_bigquery.py | 134 +++++++++++++++++++++++++++++++-------------- 3 files changed, 104 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index 71fce44..2457af6 100644 --- a/.gitignore +++ b/.gitignore @@ -100,4 +100,6 @@ ENV/ # mypy .mypy_cache/ -.vscode/ \ No newline at end of file +.vscode/ + +.idea/ \ No newline at end of file diff --git a/README.md b/README.md index a98cb0a..bd07928 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,14 @@ It should be possible to use the oAuth flow to authenticate to GCP as well: The data will be written to the table specified in your `config.json`. +### JSON Schema compatibility + +Singer taps use JSON Schema to describe/validate the data they produce. +However the [patternProperties](https://json-schema.org/understanding-json-schema/reference/object.html#pattern-properties) +feature of this schema standard is not really applicable to BigQuery JSON schema structure. +As a workaround we simply convert the values of fields using `patternProperties` to string before loading. +These columns can then be worked on with [BigQuery JSON functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions). + --- Copyright © 2018 RealSelf, Inc. diff --git a/target_bigquery.py b/target_bigquery.py index c263524..abc716f 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -1,30 +1,28 @@ #!/usr/bin/env python3 import argparse +import collections import decimal +import http.client import io -import sys import json import logging -import collections +import re +import sys import threading -import http.client import urllib -import pkg_resources -import re - -from jsonschema import validate -import singer - -from oauth2client import tools from tempfile import TemporaryFile +import pkg_resources +import singer +from google.api_core import exceptions from google.cloud import bigquery -from google.cloud.bigquery.job import SourceFormat from google.cloud.bigquery import Dataset, WriteDisposition -from google.cloud.bigquery import SchemaField from google.cloud.bigquery import LoadJobConfig -from google.api_core import exceptions +from google.cloud.bigquery import SchemaField +from google.cloud.bigquery.job import SourceFormat +from jsonschema import validate +from oauth2client import tools try: parser = argparse.ArgumentParser(parents=[tools.argparser]) @@ -37,12 +35,13 @@ logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) logger = singer.get_logger() -SCOPES = ['https://www.googleapis.com/auth/bigquery','https://www.googleapis.com/auth/bigquery.insertdata'] +SCOPES = ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/bigquery.insertdata'] CLIENT_SECRET_FILE = 'client_secret.json' APPLICATION_NAME = 'Singer BigQuery Target' StreamMeta = collections.namedtuple('StreamMeta', ['schema', 'key_properties', 'bookmark_properties']) + def emit_state(state): if state is not None: line = json.dumps(state) @@ -50,12 +49,20 @@ def emit_state(state): sys.stdout.write("{}\n".format(line)) sys.stdout.flush() + def clear_dict_hook(items): return {k: v if v is not None else '' for k, v in items} + def fix_name(name): + name = re.sub('[^a-zA-Z0-9_]', '_', name) + if name[0].isdigit(): + name = "_" + name + if len(name) > 128: + name = name[:128] return re.sub('[^a-zA-Z0-9_]', '_', name) + def define_schema(field, name): schema_name = fix_name(name) schema_type = "STRING" @@ -69,7 +76,7 @@ def define_schema(field, name): schema_mode = 'NULLABLE' else: field = types - + if isinstance(field['type'], list): if field['type'][0] == "null": schema_mode = 'NULLABLE' @@ -79,15 +86,18 @@ def define_schema(field, name): else: schema_type = field['type'] if schema_type == "object": - schema_type = "RECORD" - schema_fields = tuple(build_schema(field)) + if "patternProperties" in field.keys() or "properties" not in field.keys(): + schema_type = "string" + else: + schema_type = "RECORD" + schema_fields = tuple(build_schema(field)) if schema_type == "array": - schema_type = field.get('items').get('type') + items_type = field.get('items').get('type') + schema_type = items_type[-1] if isinstance(items_type, list) else items_type schema_mode = "REPEATED" if schema_type == "object": - schema_type = "RECORD" - schema_fields = tuple(build_schema(field.get('items'))) - + schema_type = "RECORD" + schema_fields = tuple(build_schema(field.get('items'))) if schema_type == "string": if "format" in field: @@ -99,24 +109,62 @@ def define_schema(field, name): return (schema_name, schema_type, schema_mode, schema_description, schema_fields) + def build_schema(schema): SCHEMA = [] + + # if "properties" not in schema: + # print(schema) + for key in schema['properties'].keys(): - + if not (bool(schema['properties'][key])): # if we endup with an empty record. continue - schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key) + schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema( + schema['properties'][key], key) SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields)) return SCHEMA + +def apply_string_conversions(record, schema): + tmp_record = {} + for schema_field in schema: + rec_field = record.get(schema_field.name) + if rec_field: + if schema_field.field_type.upper() == "STRING": + if schema_field.mode == "REPEATED": + tmp_record[schema_field.name] = [str(rec_item) for rec_item in rec_field] + else: + tmp_record[schema_field.name] = str(rec_field) + elif schema_field.field_type.upper() in ["RECORD", "STRUCT"]: + if schema_field.mode == "REPEATED": + tmp_record[schema_field.name] = [apply_string_conversions(rec_item, schema_field.fields) for + rec_item in + rec_field] + else: + tmp_record[schema_field.name] = apply_string_conversions(rec_field, schema_field.fields) + else: + tmp_record[schema_field.name] = rec_field + return tmp_record + + +def apply_decimal_conversions(record): + new_record = {} + for key, value in record.items(): + if isinstance(value, decimal.Decimal): + value = float(value) + new_record[fix_name(key)] = value + return new_record + + def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True): state = None schemas = {} key_properties = {} - tables = {} + bq_schemas = {} rows = {} errors = {} @@ -136,23 +184,21 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida if isinstance(msg, singer.RecordMessage): if msg.stream not in schemas: - raise Exception("A record for stream {} was encountered before a corresponding schema".format(msg.stream)) + raise Exception( + "A record for stream {} was encountered before a corresponding schema".format(msg.stream)) schema = schemas[msg.stream] if validate_records: validate(msg.record, schema) + msg.record = apply_string_conversions(msg.record, bq_schemas[msg.stream]) # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. - new_record = {} - for key, value in msg.record.items(): - if isinstance(value, decimal.Decimal): - value = float(value) - new_record[fix_name(key)] = value + new_record = apply_decimal_conversions(msg.record) dat = bytes(json.dumps(new_record) + '\n', 'UTF-8') rows[msg.stream].write(dat) - #rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8')) + # rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8')) state = None @@ -161,10 +207,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties - #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) + bq_schemas[table] = build_schema(schemas[table]) rows[table] = TemporaryFile(mode='w+b') errors[table] = None # try: @@ -181,9 +227,8 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida for table in rows.keys(): table_ref = bigquery_client.dataset(dataset_id).table(fix_name(table)) - SCHEMA = build_schema(schemas[table]) load_config = LoadJobConfig() - load_config.schema = SCHEMA + load_config.schema = bq_schemas[table] load_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON if truncate: @@ -196,7 +241,6 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida logger.info("loading job {}".format(load_job.job_id)) logger.info(load_job.result()) - # for table in errors.keys(): # if not errors[table]: # print('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table), tables[table].path) @@ -205,6 +249,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida return state + def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True): state = None schemas = {} @@ -231,13 +276,16 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr if isinstance(msg, singer.RecordMessage): if msg.stream not in schemas: - raise Exception("A record for stream {} was encountered before a corresponding schema".format(msg.stream)) + raise Exception( + "A record for stream {} was encountered before a corresponding schema".format(msg.stream)) schema = schemas[msg.stream] if validate_records: validate(msg.record, schema) + msg.record = apply_string_conversions(msg.record, tables[msg.stream].schema) + msg.record = apply_decimal_conversions(msg.record) errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record]) rows[msg.stream] += 1 @@ -248,7 +296,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr state = msg.value elif isinstance(msg, singer.SchemaMessage): - table = msg.stream + table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) @@ -271,10 +319,11 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr logging.info('Loaded {} row(s) into {}:{}'.format(rows[table], dataset_id, table, tables[table].path)) emit_state(state) else: - logging.error('Errors:', errors[table], sep=" ") + logging.error('Errors:', str(errors[table])) return state + def collect(): try: version = pkg_resources.get_distribution('target-bigquery').version @@ -293,6 +342,7 @@ def collect(): except: logger.debug('Collection request failed') + def main(): with open(flags.config) as input: config = json.load(input) @@ -313,9 +363,11 @@ def main(): input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') if config.get('stream_data', True): - state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records) + state = persist_lines_stream(config['project_id'], config['dataset_id'], input, + validate_records=validate_records) else: - state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records) + state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, + validate_records=validate_records) emit_state(state) logger.debug("Exiting normally")