Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 78 additions & 28 deletions src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.validation.constraints.NotNull;
import org.embulk.config.ConfigException;
import org.embulk.output.bigquery_java.config.BigqueryColumnOption;
import org.embulk.output.bigquery_java.config.BigqueryTimePartitioning;
Expand Down Expand Up @@ -105,18 +104,24 @@ public BigqueryClient(PluginTask task, Schema schema) {
}
}

@NotNull
private FieldList getSrcFields() {
if (cachedSrcFields == null) {
cachedSrcFields = FieldList.of();
Table srcTable = getTable(task.getTable());
if (srcTable != null) {
com.google.cloud.bigquery.Schema srcSchema = srcTable.getDefinition().getSchema();
if (srcSchema != null) {
cachedSrcFields = srcSchema.getFields();
}
}
public static boolean isNeedUpdateTable(PluginTask task) {
return task.getMode().equals("replace")
&& (task.getRetainColumnDescriptions() || task.getRetainColumnPolicyTags());
}

public FieldList storeCachedSrcFieldsIfNeed() {
if (!isNeedUpdateTable(task)) {
return null;
}
Table srcTable = getTable(task.getTable());
if (srcTable == null) {
return null;
}
com.google.cloud.bigquery.Schema srcSchema = srcTable.getDefinition().getSchema();
if (srcSchema == null) {
return null;
}
cachedSrcFields = srcSchema.getFields();
return cachedSrcFields;
}

Expand Down Expand Up @@ -219,6 +224,65 @@ private void createTableIfNotExist(String table, String dataset, String project)
}
}

public void updateTableIfNeed() {
Table table = this.getTable(task.getTable());
com.google.cloud.bigquery.Schema schema = table.getDefinition().getSchema();
if (schema == null) {
return;
}
com.google.cloud.bigquery.Schema patchSchema =
buildPatchSchema(task, schema.getFields(), cachedSrcFields);
if (patchSchema == null) {
return;
}
try {
bigquery.update(
TableInfo.newBuilder(
table.getTableId(),
StandardTableDefinition.newBuilder().setSchema(patchSchema).build())
.build());
} catch (BigQueryException e) {
logger.error(
String.format(
"embulk-output-bigquery: update_table(%s:%s.%s)",
destinationProject, destinationDataset, task.getTable()));
throw new BigqueryException(
String.format(
"failed to update table %s:%s.%s, response: %s",
destinationProject, destinationDataset, task.getTable(), e));
}
}

public static com.google.cloud.bigquery.Schema buildPatchSchema(
PluginTask task, FieldList currentFields, FieldList dstFields) {
if (!isNeedUpdateTable(task) || dstFields == null) {
return null;
}

List<Field> updatedFields = new ArrayList<>();
for (Field field : currentFields) {
Field.Builder fieldBuilder = field.toBuilder();
dstFields.stream()
.filter(x -> x.getName().equals(field.getName()))
.findFirst()
.ifPresent(
srcField -> {
if (task.getRetainColumnDescriptions()) {
fieldBuilder.setDescription(srcField.getDescription());
}
if (task.getRetainColumnPolicyTags()) {
fieldBuilder.setPolicyTags(srcField.getPolicyTags());
}
});
task.getColumnOptions()
.flatMap(columnOptions -> BigqueryUtil.findColumnOption(field.getName(), columnOptions))
.flatMap(BigqueryColumnOption::getDescription)
.ifPresent(fieldBuilder::setDescription);
updatedFields.add(fieldBuilder.build());
}
return com.google.cloud.bigquery.Schema.of(updatedFields);
}

public TimePartitioning buildTimePartitioning(BigqueryTimePartitioning bigqueryTimePartitioning) {
TimePartitioning.Builder timePartitioningBuilder;

Expand Down Expand Up @@ -753,29 +817,15 @@ private com.google.cloud.bigquery.Schema buildSchema(
BigqueryUtil.findColumnOption(col.getName(), columnOptions);
Field.Builder fieldBuilder = createFieldBuilder(task, col, columnOption);

if ((task.getMode().equals("replace")
&& (task.getRetainColumnDescriptions() || task.getRetainColumnPolicyTags()))) {
getSrcFields().stream()
.filter(x -> x.getName().equals(col.getName()))
.findFirst()
.ifPresent(
field -> {
if (task.getRetainColumnDescriptions()) {
fieldBuilder.setDescription(field.getDescription());
}
if (task.getRetainColumnPolicyTags()) {
fieldBuilder.setPolicyTags(field.getPolicyTags());
}
});
}

if (columnOption.isPresent()) {
BigqueryColumnOption colOpt = columnOption.get();
if (!colOpt.getMode().isEmpty()) {
fieldMode = Field.Mode.valueOf(colOpt.getMode());
}
fieldBuilder.setMode(fieldMode);

// CAUTION: If isNeedUpdateTable() is true, description may be overwritten by
// updateTableIfNeed().
if (colOpt.getDescription().isPresent()) {
fieldBuilder.setDescription(colOpt.getDescription().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public ConfigDiff transaction(
BigqueryTaskBuilder.build(task);
BigqueryClient client = new BigqueryClient(task, schema);
autoCreate(task, client);
client.storeCachedSrcFieldsIfNeed();

control.run(task.dump());
this.writers.values().forEach(BigqueryFileWriter::close);
Expand Down Expand Up @@ -145,6 +146,8 @@ public ConfigDiff transaction(
client.deleteTable(task.getTempTable().get());
}

client.updateTableIfNeed();

if (task.getDeleteFromLocalWhenJobEnd()) {
paths.forEach(p -> p.toFile().delete());
} else {
Expand Down
Loading