diff --git a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java index bac7e3f..bfff06b 100644 --- a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java +++ b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java @@ -45,7 +45,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import javax.validation.constraints.NotNull; import org.embulk.config.ConfigException; import org.embulk.output.bigquery_java.config.BigqueryColumnOption; import org.embulk.output.bigquery_java.config.BigqueryTimePartitioning; @@ -105,18 +104,24 @@ public BigqueryClient(PluginTask task, Schema schema) { } } - @NotNull - private FieldList getSrcFields() { - if (cachedSrcFields == null) { - cachedSrcFields = FieldList.of(); - Table srcTable = getTable(task.getTable()); - if (srcTable != null) { - com.google.cloud.bigquery.Schema srcSchema = srcTable.getDefinition().getSchema(); - if (srcSchema != null) { - cachedSrcFields = srcSchema.getFields(); - } - } + public static boolean isNeedUpdateTable(PluginTask task) { + return task.getMode().equals("replace") + && (task.getRetainColumnDescriptions() || task.getRetainColumnPolicyTags()); + } + + public FieldList storeCachedSrcFieldsIfNeed() { + if (!isNeedUpdateTable(task)) { + return null; } + Table srcTable = getTable(task.getTable()); + if (srcTable == null) { + return null; + } + com.google.cloud.bigquery.Schema srcSchema = srcTable.getDefinition().getSchema(); + if (srcSchema == null) { + return null; + } + cachedSrcFields = srcSchema.getFields(); return cachedSrcFields; } @@ -219,6 +224,65 @@ private void createTableIfNotExist(String table, String dataset, String project) } } + public void updateTableIfNeed() { + Table table = this.getTable(task.getTable()); + com.google.cloud.bigquery.Schema schema = table.getDefinition().getSchema(); + if (schema == null) { + return; + } + com.google.cloud.bigquery.Schema patchSchema = + buildPatchSchema(task, schema.getFields(), cachedSrcFields); + if (patchSchema == null) { + return; + } + try { + bigquery.update( + TableInfo.newBuilder( + table.getTableId(), + StandardTableDefinition.newBuilder().setSchema(patchSchema).build()) + .build()); + } catch (BigQueryException e) { + logger.error( + String.format( + "embulk-output-bigquery: update_table(%s:%s.%s)", + destinationProject, destinationDataset, task.getTable())); + throw new BigqueryException( + String.format( + "failed to update table %s:%s.%s, response: %s", + destinationProject, destinationDataset, task.getTable(), e)); + } + } + + public static com.google.cloud.bigquery.Schema buildPatchSchema( + PluginTask task, FieldList currentFields, FieldList dstFields) { + if (!isNeedUpdateTable(task) || dstFields == null) { + return null; + } + + List updatedFields = new ArrayList<>(); + for (Field field : currentFields) { + Field.Builder fieldBuilder = field.toBuilder(); + dstFields.stream() + .filter(x -> x.getName().equals(field.getName())) + .findFirst() + .ifPresent( + srcField -> { + if (task.getRetainColumnDescriptions()) { + fieldBuilder.setDescription(srcField.getDescription()); + } + if (task.getRetainColumnPolicyTags()) { + fieldBuilder.setPolicyTags(srcField.getPolicyTags()); + } + }); + task.getColumnOptions() + .flatMap(columnOptions -> BigqueryUtil.findColumnOption(field.getName(), columnOptions)) + .flatMap(BigqueryColumnOption::getDescription) + .ifPresent(fieldBuilder::setDescription); + updatedFields.add(fieldBuilder.build()); + } + return com.google.cloud.bigquery.Schema.of(updatedFields); + } + public TimePartitioning buildTimePartitioning(BigqueryTimePartitioning bigqueryTimePartitioning) { TimePartitioning.Builder timePartitioningBuilder; @@ -753,22 +817,6 @@ private com.google.cloud.bigquery.Schema buildSchema( BigqueryUtil.findColumnOption(col.getName(), columnOptions); Field.Builder fieldBuilder = createFieldBuilder(task, col, columnOption); - if ((task.getMode().equals("replace") - && (task.getRetainColumnDescriptions() || task.getRetainColumnPolicyTags()))) { - getSrcFields().stream() - .filter(x -> x.getName().equals(col.getName())) - .findFirst() - .ifPresent( - field -> { - if (task.getRetainColumnDescriptions()) { - fieldBuilder.setDescription(field.getDescription()); - } - if (task.getRetainColumnPolicyTags()) { - fieldBuilder.setPolicyTags(field.getPolicyTags()); - } - }); - } - if (columnOption.isPresent()) { BigqueryColumnOption colOpt = columnOption.get(); if (!colOpt.getMode().isEmpty()) { @@ -776,6 +824,8 @@ private com.google.cloud.bigquery.Schema buildSchema( } fieldBuilder.setMode(fieldMode); + // CAUTION: If isNeedUpdateTable() is true, description may be overwritten by + // updateTableIfNeed(). if (colOpt.getDescription().isPresent()) { fieldBuilder.setDescription(colOpt.getDescription().get()); } diff --git a/src/main/java/org/embulk/output/bigquery_java/BigqueryJavaOutputPlugin.java b/src/main/java/org/embulk/output/bigquery_java/BigqueryJavaOutputPlugin.java index 9e17d44..c7c1510 100644 --- a/src/main/java/org/embulk/output/bigquery_java/BigqueryJavaOutputPlugin.java +++ b/src/main/java/org/embulk/output/bigquery_java/BigqueryJavaOutputPlugin.java @@ -50,6 +50,7 @@ public ConfigDiff transaction( BigqueryTaskBuilder.build(task); BigqueryClient client = new BigqueryClient(task, schema); autoCreate(task, client); + client.storeCachedSrcFieldsIfNeed(); control.run(task.dump()); this.writers.values().forEach(BigqueryFileWriter::close); @@ -145,6 +146,8 @@ public ConfigDiff transaction( client.deleteTable(task.getTempTable().get()); } + client.updateTableIfNeed(); + if (task.getDeleteFromLocalWhenJobEnd()) { paths.forEach(p -> p.toFile().delete()); } else { diff --git a/src/test/java/org/embulk/output/bigquery_java/TestBigqueryClient.java b/src/test/java/org/embulk/output/bigquery_java/TestBigqueryClient.java index 091fb7e..bd89478 100644 --- a/src/test/java/org/embulk/output/bigquery_java/TestBigqueryClient.java +++ b/src/test/java/org/embulk/output/bigquery_java/TestBigqueryClient.java @@ -2,29 +2,31 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.PolicyTags; +import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import org.embulk.config.ConfigSource; import org.embulk.input.file.LocalFileInputPlugin; +import org.embulk.output.bigquery_java.config.BigqueryColumnOption; import org.embulk.output.bigquery_java.config.PluginTask; import org.embulk.parser.csv.CsvParserPlugin; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.OutputPlugin; import org.embulk.spi.ParserPlugin; -import org.embulk.spi.Schema; -import org.embulk.spi.type.Types; import org.embulk.test.TestingEmbulk; import org.embulk.util.config.ConfigMapper; import org.embulk.util.config.ConfigMapperFactory; @@ -56,115 +58,153 @@ private static ConfigSource loadYamlResource(TestingEmbulk embulk, String fileNa @Rule public TemporaryFolder testFolder = new TemporaryFolder(); - private com.google.cloud.bigquery.Schema invokeTakeoverBuildSchema( + private static StandardSQLTypeName toBQType(BigqueryColumnOption column) { + Map bqTypeMap = new HashMap<>(); + bqTypeMap.put("INTEGER", StandardSQLTypeName.INT64); + bqTypeMap.put("STRING", StandardSQLTypeName.STRING); + return bqTypeMap.getOrDefault(column.getType().orElse("STRING"), StandardSQLTypeName.STRING); + } + + @Test + public void TestIsNeedUpdateTable() { + ConfigSource baseConfig = loadYamlResource(embulk, "takeover.yml"); + + // Helper function to create config and test isNeedUpdateTable + Function>> testIsNeedUpdate = + mode -> + retainPolicyTags -> + retainDescriptions -> + BigqueryClient.isNeedUpdateTable( + CONFIG_MAPPER.map( + baseConfig + .set("mode", mode) + .set("retain_column_policy_tags", retainPolicyTags) + .set("retain_column_descriptions", retainDescriptions), + PluginTask.class)); + + // Test cases + assertTrue(testIsNeedUpdate.apply("replace").apply(false).apply(true)); + assertTrue(testIsNeedUpdate.apply("replace").apply(true).apply(false)); + assertFalse(testIsNeedUpdate.apply("replace").apply(false).apply(false)); + assertFalse(testIsNeedUpdate.apply("insert").apply(true).apply(true)); + } + + private Schema invokeTakeoverBuildSchema( Function setupConfig, - Function - setupField0, - Function - setupField1) - throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, - InvocationTargetException { + Function setupField0, + Function setupField1) { ConfigSource config = loadYamlResource(embulk, "takeover.yml"); + PluginTask baseTask = CONFIG_MAPPER.map(config, PluginTask.class); PluginTask task = CONFIG_MAPPER.map(setupConfig.apply(config), PluginTask.class); - Schema schema = Schema.builder().add("c0", Types.LONG).add("c1", Types.STRING).build(); - // Create a partial mock that avoids BigQuery service initialization - BigqueryClient bigqueryClient = Mockito.mock(BigqueryClient.class); + List currentFields = + baseTask.getColumnOptions().orElse(java.util.Collections.emptyList()).stream() + .map(column -> Field.newBuilder(column.getName(), toBQType(column)).build()) + .collect(Collectors.toList()); - // Set required fields for buildSchema method to work - Field taskField = BigqueryClient.class.getDeclaredField("task"); - taskField.setAccessible(true); - taskField.set(bigqueryClient, task); - - Field schemaField = BigqueryClient.class.getDeclaredField("schema"); - schemaField.setAccessible(true); - schemaField.set(bigqueryClient, schema); - - Field columnOptionsField = BigqueryClient.class.getDeclaredField("columnOptions"); - columnOptionsField.setAccessible(true); - columnOptionsField.set( - bigqueryClient, task.getColumnOptions().orElse(java.util.Collections.emptyList())); - Field field = BigqueryClient.class.getDeclaredField("cachedSrcFields"); - field.setAccessible(true); - List fieldList = new ArrayList<>(); - fieldList.add( - setupField0.apply( - com.google.cloud.bigquery.Field.newBuilder("c0", StandardSQLTypeName.INT64))); - fieldList.add( - setupField1.apply( - com.google.cloud.bigquery.Field.newBuilder("c1", StandardSQLTypeName.STRING))); - field.set(bigqueryClient, FieldList.of(fieldList)); - - Method buildSchemaMethod = - BigqueryClient.class.getDeclaredMethod("buildSchema", Schema.class, List.class); - buildSchemaMethod.setAccessible(true); - return (com.google.cloud.bigquery.Schema) - buildSchemaMethod.invoke(bigqueryClient, schema, task.getColumnOptions().orElse(null)); - } - - private com.google.cloud.bigquery.Schema invokeRetainDescriptionBuildSchema( - String mode, Boolean retainColumnDescriptions, String d0, String d1) - throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException, - NoSuchMethodException { + HashMap> setups = new HashMap<>(); + setups.put("c0", setupField0); + setups.put("c1", setupField1); + + List fieldList = + baseTask.getColumnOptions().orElse(java.util.Collections.emptyList()).stream() + .map(c -> setups.get(c.getName()).apply(Field.newBuilder(c.getName(), toBQType(c)))) + .collect(Collectors.toList()); + + return BigqueryClient.buildPatchSchema( + task, FieldList.of(currentFields), FieldList.of(fieldList)); + } + + private Schema invokeRetainDescriptionBuildSchema( + String mode, Boolean retainColumnDescriptions, String d0, String d1) { + return invokeRetainDescriptionBuildSchema( + configSource -> configSource, mode, retainColumnDescriptions, d0, d1); + } + + private Schema invokeRetainDescriptionBuildSchema( + Function setupConfig, + String mode, + Boolean retainColumnDescriptions, + String d0, + String d1) { return invokeTakeoverBuildSchema( configSource -> - configSource - .set("mode", mode) - .set("retain_column_descriptions", retainColumnDescriptions), + setupConfig.apply( + configSource + .set("mode", mode) + .set("retain_column_policy_tags", true) + .set("retain_column_descriptions", retainColumnDescriptions)), builder -> builder.setDescription(d0).build(), builder -> builder.setDescription(d1).build()); } @Test - public void testRetainDescriptionTrue() - throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException, - InvocationTargetException, IOException { - com.google.cloud.bigquery.Schema schema = - invokeRetainDescriptionBuildSchema("replace", true, "prev_c0", "prev_c1"); - assertEquals("c0", schema.getFields().get(0).getDescription()); + public void testRetainDescriptionTrue() { + Schema schema = invokeRetainDescriptionBuildSchema("replace", true, "prev_c0", "prev_c1"); + assertEquals("d0", schema.getFields().get(0).getDescription()); assertEquals("prev_c1", schema.getFields().get(1).getDescription()); } @Test - public void testRetainDescriptionFalse() - throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException, - InvocationTargetException, IOException { - com.google.cloud.bigquery.Schema schema = - invokeRetainDescriptionBuildSchema("replace", false, "prev_c0", "prev_c1"); - assertEquals("c0", schema.getFields().get(0).getDescription()); + public void testRetainDescriptionFalse() { + Schema schema = invokeRetainDescriptionBuildSchema("replace", false, "prev_c0", "prev_c1"); + assertEquals("d0", schema.getFields().get(0).getDescription()); assertNull(schema.getFields().get(1).getDescription()); } @Test - public void testRetainDescriptionTrueButNotModeReplace() - throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException, - InvocationTargetException, IOException { - com.google.cloud.bigquery.Schema schema = - invokeRetainDescriptionBuildSchema("insert", true, "prev_c0", "prev_c1"); - assertEquals("c0", schema.getFields().get(0).getDescription()); + public void testRetainDescriptionTrueWithColumnOptionNull() { + Schema schema = + invokeRetainDescriptionBuildSchema( + c -> c.set("column_options", null), "replace", true, "prev_c0", "prev_c1"); + assertEquals("prev_c0", schema.getFields().get(0).getDescription()); + assertEquals("prev_c1", schema.getFields().get(1).getDescription()); + } + + @Test + public void testRetainDescriptionFalseWithColumnOptionNull() { + Schema schema = + invokeRetainDescriptionBuildSchema( + c -> c.set("column_options", null), "replace", false, "prev_c0", "prev_c1"); + assertNull(schema.getFields().get(0).getDescription()); assertNull(schema.getFields().get(1).getDescription()); } - private com.google.cloud.bigquery.Schema invokeRetainPolicyTagsBuildSchema( - String mode, Boolean retainPolicyTags, String[] tags0, String[] tags1) - throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException, - NoSuchMethodException { + @Test + public void testRetainDescriptionTrueButNotModeReplace() { + Schema schema = invokeRetainDescriptionBuildSchema("insert", true, "prev_c0", "prev_c1"); + assertNull(schema); + } + + private Schema invokeRetainPolicyTagsBuildSchema( + Function setupConfig, + String mode, + Boolean retainPolicyTags, + String[] tags0, + String[] tags1) { List n0 = Arrays.stream(tags0).collect(Collectors.toList()); List n1 = Arrays.stream(tags1).collect(Collectors.toList()); PolicyTags p0 = PolicyTags.newBuilder().setNames(n0).build(); PolicyTags p1 = PolicyTags.newBuilder().setNames(n1).build(); return invokeTakeoverBuildSchema( configSource -> - configSource.set("mode", mode).set("retain_column_policy_tags", retainPolicyTags), + setupConfig.apply( + configSource + .set("mode", mode) + .set("retain_column_policy_tags", retainPolicyTags) + .set("retain_column_descriptions", true)), builder -> builder.setPolicyTags(p0).build(), builder -> builder.setPolicyTags(p1).build()); } + private Schema invokeRetainPolicyTagsBuildSchema( + String mode, Boolean retainPolicyTags, String[] tags0, String[] tags1) { + return invokeRetainPolicyTagsBuildSchema(c -> c, mode, retainPolicyTags, tags0, tags1); + } + @Test - public void testRetainColumnPolicyTagsTrue() - throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException, - InvocationTargetException, IOException { - com.google.cloud.bigquery.Schema schema = + public void testRetainColumnPolicyTagsTrue() { + Schema schema = invokeRetainPolicyTagsBuildSchema( "replace", true, new String[] {"c0"}, new String[] {"c10", "c11"}); assertArrayEquals( @@ -176,10 +216,8 @@ public void testRetainColumnPolicyTagsTrue() } @Test - public void testRetainColumnPolicyTagsFalse() - throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException, - InvocationTargetException, IOException { - com.google.cloud.bigquery.Schema schema = + public void testRetainColumnPolicyTagsFalse() { + Schema schema = invokeRetainPolicyTagsBuildSchema( "replace", false, new String[] {"c0"}, new String[] {"c10", "c11"}); assertNull(schema.getFields().get(0).getPolicyTags()); @@ -187,13 +225,74 @@ public void testRetainColumnPolicyTagsFalse() } @Test - public void testRetainColumnPolicyTagsTrueButNotModeReplace() - throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException, - InvocationTargetException, IOException { - com.google.cloud.bigquery.Schema schema = + public void testRetainColumnPolicyTagsTrueButNotModeReplace() { + Schema schema = invokeRetainPolicyTagsBuildSchema( "insert", true, new String[] {"c0"}, new String[] {"c10", "c11"}); - assertNull(schema.getFields().get(0).getPolicyTags()); - assertNull(schema.getFields().get(1).getPolicyTags()); + assertNull(schema); + } + + @Test + public void testStoreCachedSrcFieldsIfNeed() throws NoSuchFieldException, IllegalAccessException { + ConfigSource config = loadYamlResource(embulk, "takeover.yml"); + + // Test case 1: Mode is "replace" with retainColumnDescriptions = true + PluginTask replaceTask = + CONFIG_MAPPER.map( + config + .set("mode", "replace") + .set("retain_column_descriptions", true) + .set("retain_column_policy_tags", false), + PluginTask.class); + + BigqueryClient client = Mockito.mock(BigqueryClient.class); + Table mockTable = Mockito.mock(Table.class); + TableDefinition mockTableDef = Mockito.mock(TableDefinition.class); + + Schema mockSchema = Schema.of(Field.newBuilder("field1", StandardSQLTypeName.STRING).build()); + + Mockito.when(client.getTable(Mockito.anyString())).thenReturn(mockTable); + Mockito.when(mockTable.getDefinition()).thenReturn(mockTableDef); + Mockito.when(mockTableDef.getSchema()).thenReturn(mockSchema); + Mockito.when(client.storeCachedSrcFieldsIfNeed()).thenCallRealMethod(); + + // Set task field + java.lang.reflect.Field taskField = BigqueryClient.class.getDeclaredField("task"); + taskField.setAccessible(true); + taskField.set(client, replaceTask); + + assertEquals(client.storeCachedSrcFieldsIfNeed(), mockSchema.getFields()); + + // Test case 2: Mode is "insert" - should return null + PluginTask insertTask = + CONFIG_MAPPER.map( + config + .set("mode", "insert") + .set("retain_column_descriptions", true) + .set("retain_column_policy_tags", true), + PluginTask.class); + + taskField.set(client, insertTask); + + assertNull(client.storeCachedSrcFieldsIfNeed()); + + // Test case 3: Table doesn't exist - should return null + Mockito.when(client.getTable(Mockito.anyString())).thenReturn(null); + + taskField.set(client, replaceTask); + + assertNull(client.storeCachedSrcFieldsIfNeed()); + + // Test case 4: Schema is null - should return null + Table nullSchemaTable = Mockito.mock(Table.class); + TableDefinition nullSchemaTableDef = Mockito.mock(TableDefinition.class); + + Mockito.when(client.getTable(Mockito.anyString())).thenReturn(nullSchemaTable); + Mockito.when(nullSchemaTable.getDefinition()).thenReturn(nullSchemaTableDef); + Mockito.when(nullSchemaTableDef.getSchema()).thenReturn(null); + + taskField.set(client, replaceTask); + + assertNull(client.storeCachedSrcFieldsIfNeed()); } } diff --git a/src/test/resources/java/org/embulk/output/bigquery_java/bigquery_client/takeover.yml b/src/test/resources/java/org/embulk/output/bigquery_java/bigquery_client/takeover.yml index f3e1546..a73e35e 100644 --- a/src/test/resources/java/org/embulk/output/bigquery_java/bigquery_client/takeover.yml +++ b/src/test/resources/java/org/embulk/output/bigquery_java/bigquery_client/takeover.yml @@ -14,7 +14,7 @@ column_options: - name: c0 type: INTEGER mode: NULLABLE - description: c0 + description: d0 - name: c1 type: STRING mode: NULLABLE