Skip to content
Merged
82 changes: 81 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ Under construction
| name (x) is unsupported | type | required? | default | description |
|:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------|
| mode (replace, append is supported) | string | optional | "append" | See [Mode](#mode) |
| auth_method (service_account is supported) | string | optional | "application\_default" | See [Authentication](#authentication) |
| auth_method (service_account, workload_identity_federation are supported)| string | optional | "application\_default" | See [Authentication](#authentication) |
| json_keyfile | string | optional | | keyfile path or `content` |
| workload_identity_federation | hash | optional | | Workload Identity Federation config. See below |
| project (x) | string | required unless service\_account's `json_keyfile` is given. | | project\_id |
| dataset | string | required | | dataset |
| location | string | optional | nil | geographic location of dataset. See [Location](#location) |
Expand Down Expand Up @@ -112,6 +113,85 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
| schema_update_options (x) | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |


### Workload Identity Federation

Workload Identity Federation allows authentication using AWS credentials to access Google Cloud resources.

| name | type | required? | default | description |
|:-------------------------------------|:------------|:-----------|:------------------|:-----------------------|
| workload_identity_federation.config | string | required | | Path to the Workload Identity Federation JSON config file |
| workload_identity_federation.aws_access_key_id | string | required | | AWS Access Key ID |
| workload_identity_federation.aws_secret_access_key | string | required | | AWS Secret Access Key |
| workload_identity_federation.aws_session_token | string | optional | | AWS Session Token (for temporary credentials) |
| workload_identity_federation.aws_region | string | optional | "ap-northeast-1" | AWS Region |

Example)

```yaml
out:
type: bigquery_java
auth_method: workload_identity_federation
workload_identity_federation:
config: /path/to/workload-identity-federation-config.json
aws_access_key_id: AKIAXXXXXXXXXXXXXXXX
aws_secret_access_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# aws_session_token: xxxxxxxx # optional, for temporary credentials
aws_region: ap-northeast-1
project: my-project
dataset: my_dataset
table: my_table
source_format: NEWLINE_DELIMITED_JSON
```

The `config` should contain the Workload Identity Federation configuration from Google Cloud.

#### Service Account Impersonation (recommended)

Use a service account to access BigQuery. The federated identity impersonates a service account that has the necessary permissions.

```json
{
"universe_domain": "googleapis.com",
"type": "external_account",
"audience": "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID",
"subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT@PROJECT.iam.gserviceaccount.com:generateAccessToken",
"token_url": "https://sts.googleapis.com/v1/token",
"credential_source": {
"environment_id": "aws1",
"region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone",
"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials",
"regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
}
}
```

#### Direct Resource Access

Access BigQuery directly without service account impersonation. Omit `service_account_impersonation_url` from the config file. You must grant IAM permissions directly to the federated principal.

```json
{
"universe_domain": "googleapis.com",
"type": "external_account",
"audience": "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID",
"subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
"token_url": "https://sts.googleapis.com/v1/token",
"credential_source": {
"environment_id": "aws1",
"region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone",
"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials",
"regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
}
}
```

To use direct resource access, grant BigQuery permissions to the federated principal:

```
principal://iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/subject/SUBJECT
```

### Column Options (NOT fully supported)

Column options are used to aid guessing BigQuery schema, or to define conversion of values:
Expand Down
18 changes: 18 additions & 0 deletions shadow-google-cloud-bigquery-helper/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ description = "A helper library for Bigquery output for embulk"
sourceCompatibility = 1.8
targetCompatibility = 1.8

// https://github.com/google/guava/issues/6612#issuecomment-1614992368
sourceSets.all {
configurations.getByName(runtimeClasspathConfigurationName) {
attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm")
}
configurations.getByName(compileClasspathConfigurationName) {
attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm")
}
}

configurations {
runtimeClasspath {
resolutionStrategy.activateDependencyLocking()
Expand All @@ -38,11 +48,19 @@ dependencies {
compile("org.slf4j:jcl-over-slf4j:1.7.12") {
exclude group: "org.slf4j", module: "slf4j-api"
}

// Newer google-auth-library for AwsCredentials, ImpersonatedCredentials support
compile "com.google.auth:google-auth-library-oauth2-http:1.41.0"
compile "com.google.http-client:google-http-client-gson:1.45.3"
}

// Relocate Guava and Jackson packages since they are incompatible from Embulk's.
shadowJar {
mergeServiceFiles()
relocate "com.google.common", "embulk.bigquery.com.google.common"
relocate "com.google.thirdparty.publicsuffix", "embulk.bigquery.com.google.thirdparty.publicsuffix"
relocate "com.fasterxml.jackson", "embulk.bigquery.com.fasterxml.jackson"
// OpenCensus and gRPC - internal dependencies
relocate "io.opencensus", "embulk.bigquery.io.opencensus"
relocate "io.grpc", "embulk.bigquery.io.grpc"
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,39 @@ com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:0.138.2
com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2:0.138.2
com.google.api.grpc:proto-google-common-protos:2.9.0
com.google.api.grpc:proto-google-iam-v1:1.4.1
com.google.api:api-common:2.2.1
com.google.api:api-common:2.53.0
com.google.api:gax-grpc:2.18.2
com.google.api:gax-httpjson:0.103.2
com.google.api:gax:2.18.2
com.google.apis:google-api-services-bigquery:v2-rev20220611-1.32.1
com.google.auth:google-auth-library-credentials:1.7.0
com.google.auth:google-auth-library-oauth2-http:1.7.0
com.google.auth:google-auth-library-credentials:1.41.0
com.google.auth:google-auth-library-oauth2-http:1.41.0
com.google.auto.value:auto-value-annotations:1.11.0
com.google.cloud:google-cloud-bigquery:2.14.0
com.google.cloud:google-cloud-bigquerystorage:2.14.2
com.google.cloud:google-cloud-core-http:2.8.0
com.google.cloud:google-cloud-core:2.8.0
com.google.code.findbugs:jsr305:3.0.2
com.google.code.gson:gson:2.9.0
com.google.errorprone:error_prone_annotations:2.14.0
com.google.code.gson:gson:2.12.1
com.google.errorprone:error_prone_annotations:2.41.0
com.google.flatbuffers:flatbuffers-java:1.12.0
com.google.guava:failureaccess:1.0.1
com.google.guava:guava:31.1-jre
com.google.guava:failureaccess:1.0.3
com.google.guava:guava:33.5.0-android
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
com.google.http-client:google-http-client-apache-v2:1.42.0
com.google.http-client:google-http-client-appengine:1.42.0
com.google.http-client:google-http-client-gson:1.42.0
com.google.http-client:google-http-client-gson:2.0.2
com.google.http-client:google-http-client-jackson2:1.42.0
com.google.http-client:google-http-client:1.42.0
com.google.j2objc:j2objc-annotations:1.3
com.google.http-client:google-http-client:2.0.2
com.google.j2objc:j2objc-annotations:3.1
com.google.oauth-client:google-oauth-client:1.34.1
com.google.protobuf:protobuf-java-util:3.21.1
com.google.protobuf:protobuf-java:3.21.1
commons-codec:commons-codec:1.15
io.grpc:grpc-alts:1.47.0
io.grpc:grpc-api:1.47.0
io.grpc:grpc-api:1.70.0
io.grpc:grpc-auth:1.47.0
io.grpc:grpc-context:1.47.0
io.grpc:grpc-context:1.70.0
io.grpc:grpc-core:1.47.0
io.grpc:grpc-googleapis:1.47.0
io.grpc:grpc-grpclb:1.47.0
Expand All @@ -65,13 +66,14 @@ org.apache.arrow:arrow-format:8.0.0
org.apache.arrow:arrow-memory-core:8.0.0
org.apache.arrow:arrow-memory-netty:8.0.0
org.apache.arrow:arrow-vector:8.0.0
org.apache.httpcomponents:httpclient:4.5.13
org.apache.httpcomponents:httpcore:4.4.15
org.apache.httpcomponents:httpclient:4.5.14
org.apache.httpcomponents:httpcore:4.4.16
org.checkerframework:checker-compat-qual:2.5.5
org.checkerframework:checker-qual:3.22.2
org.codehaus.mojo:animal-sniffer-annotations:1.21
org.conscrypt:conscrypt-openjdk-uber:2.5.1
org.json:json:20200518
org.jspecify:jspecify:1.0.0
org.slf4j:jcl-over-slf4j:1.7.12
org.slf4j:slf4j-api:1.7.25
org.threeten:threeten-extra:1.7.0
Expand Down
29 changes: 26 additions & 3 deletions src/main/java/org/embulk/output/bigquery_java/Auth.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,30 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.embulk.config.ConfigException;
import org.embulk.output.bigquery_java.config.PluginTask;
import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig;
import org.embulk.util.config.units.LocalFile;

public class Auth {
private final String authMethod;
private final LocalFile jsonKeyFile;
private final WorkloadIdentityFederationConfig workloadIdentityFederationConfig;

public Auth(PluginTask task) {
authMethod = task.getAuthMethod();
jsonKeyFile = task.getJsonKeyfile();
jsonKeyFile = task.getJsonKeyfile().orElse(null);
workloadIdentityFederationConfig = task.getWorkloadIdentityFederation().orElse(null);
}

public Credentials getCredentials(String... scopes) throws IOException {
return getGoogleCredentials().createScoped(scopes);
return getGoogleCredentials(scopes).createScoped(scopes);
}

private GoogleCredentials getGoogleCredentials() throws IOException {
private GoogleCredentials getGoogleCredentials(String... scopes) throws IOException {
if ("authorized_user".equalsIgnoreCase(authMethod)) {
return UserCredentials.fromStream(getCredentialsStream());
} else if ("service_account".equalsIgnoreCase(authMethod)) {
Expand All @@ -34,12 +40,29 @@ private GoogleCredentials getGoogleCredentials() throws IOException {
return ComputeEngineCredentials.create();
} else if ("application_default".equalsIgnoreCase(authMethod)) {
return GoogleCredentials.getApplicationDefault();
} else if ("workload_identity_federation".equalsIgnoreCase(authMethod)) {
return getWorkloadIdentityFederationCredentials(scopes);
} else {
throw new ConfigException("Unknown auth method: " + authMethod);
}
}

private InputStream getCredentialsStream() {
if (jsonKeyFile == null) {
throw new ConfigException(
"json_keyfile is required when auth_method is '" + authMethod + "'");
}
return new ByteArrayInputStream(jsonKeyFile.getContent());
}

private WorkloadIdentityFederationCredentials getWorkloadIdentityFederationCredentials(
String... scopes) throws IOException {
if (workloadIdentityFederationConfig == null) {
throw new ConfigException(
"workload_identity_federation config is required when auth_method is 'workload_identity_federation'");
}
Set<String> scopeSet = new HashSet<>(Arrays.asList(scopes));
return WorkloadIdentityFederationCredentials.getOrCreateByFetchingToken(
workloadIdentityFederationConfig, scopeSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,12 @@ public FieldList storeCachedSrcFieldsIfNeed() {
}

private String getProjectIdFromJsonKeyfile() {
if (!task.getJsonKeyfile().isPresent()) {
throw new ConfigException("Either project or project_id in json_keyfile is required");
}
String projectId =
new JSONObject(
new JSONTokener(new ByteArrayInputStream(task.getJsonKeyfile().getContent())))
new JSONTokener(new ByteArrayInputStream(task.getJsonKeyfile().get().getContent())))
.getString("project_id");
if (projectId == null) {
throw new ConfigException("Either project or project_id in json_keyfile is required");
Expand Down
Loading