diff --git a/README.md b/README.md index 8486393..7e1c157 100644 --- a/README.md +++ b/README.md @@ -36,8 +36,9 @@ Under construction | name (x) is unsupported | type | required? | default | description | |:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------| | mode (replace, append is supported) | string | optional | "append" | See [Mode](#mode) | -| auth_method (service_account is supported) | string | optional | "application\_default" | See [Authentication](#authentication) | +| auth_method (service_account, workload_identity_federation are supported)| string | optional | "application\_default" | See [Authentication](#authentication) | | json_keyfile | string | optional | | keyfile path or `content` | +| workload_identity_federation | hash | optional | | Workload Identity Federation config. See below | | project (x) | string | required unless service\_account's `json_keyfile` is given. | | project\_id | | dataset | string | required | | dataset | | location | string | optional | nil | geographic location of dataset. See [Location](#location) | @@ -112,6 +113,85 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b | schema_update_options (x) | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. | +### Workload Identity Federation + +Workload Identity Federation allows authentication using AWS credentials to access Google Cloud resources. + +| name | type | required? | default | description | +|:-------------------------------------|:------------|:-----------|:------------------|:-----------------------| +| workload_identity_federation.config | string | required | | Path to the Workload Identity Federation JSON config file | +| workload_identity_federation.aws_access_key_id | string | required | | AWS Access Key ID | +| workload_identity_federation.aws_secret_access_key | string | required | | AWS Secret Access Key | +| workload_identity_federation.aws_session_token | string | optional | | AWS Session Token (for temporary credentials) | +| workload_identity_federation.aws_region | string | optional | "ap-northeast-1" | AWS Region | + +Example) + +```yaml +out: + type: bigquery_java + auth_method: workload_identity_federation + workload_identity_federation: + config: /path/to/workload-identity-federation-config.json + aws_access_key_id: AKIAXXXXXXXXXXXXXXXX + aws_secret_access_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + # aws_session_token: xxxxxxxx # optional, for temporary credentials + aws_region: ap-northeast-1 + project: my-project + dataset: my_dataset + table: my_table + source_format: NEWLINE_DELIMITED_JSON +``` + +The `config` should contain the Workload Identity Federation configuration from Google Cloud. + +#### Service Account Impersonation (recommended) + +Use a service account to access BigQuery. The federated identity impersonates a service account that has the necessary permissions. + +```json +{ + "universe_domain": "googleapis.com", + "type": "external_account", + "audience": "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", + "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request", + "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT@PROJECT.iam.gserviceaccount.com:generateAccessToken", + "token_url": "https://sts.googleapis.com/v1/token", + "credential_source": { + "environment_id": "aws1", + "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone", + "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials", + "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15" + } +} +``` + +#### Direct Resource Access + +Access BigQuery directly without service account impersonation. Omit `service_account_impersonation_url` from the config file. You must grant IAM permissions directly to the federated principal. + +```json +{ + "universe_domain": "googleapis.com", + "type": "external_account", + "audience": "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", + "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request", + "token_url": "https://sts.googleapis.com/v1/token", + "credential_source": { + "environment_id": "aws1", + "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone", + "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials", + "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15" + } +} +``` + +To use direct resource access, grant BigQuery permissions to the federated principal: + +``` +principal://iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/subject/SUBJECT +``` + ### Column Options (NOT fully supported) Column options are used to aid guessing BigQuery schema, or to define conversion of values: diff --git a/shadow-google-cloud-bigquery-helper/build.gradle b/shadow-google-cloud-bigquery-helper/build.gradle index 3951630..9b50911 100644 --- a/shadow-google-cloud-bigquery-helper/build.gradle +++ b/shadow-google-cloud-bigquery-helper/build.gradle @@ -19,6 +19,16 @@ description = "A helper library for Bigquery output for embulk" sourceCompatibility = 1.8 targetCompatibility = 1.8 +// https://github.com/google/guava/issues/6612#issuecomment-1614992368 +sourceSets.all { + configurations.getByName(runtimeClasspathConfigurationName) { + attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm") + } + configurations.getByName(compileClasspathConfigurationName) { + attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm") + } +} + configurations { runtimeClasspath { resolutionStrategy.activateDependencyLocking() @@ -38,11 +48,19 @@ dependencies { compile("org.slf4j:jcl-over-slf4j:1.7.12") { exclude group: "org.slf4j", module: "slf4j-api" } + + // Newer google-auth-library for AwsCredentials, ImpersonatedCredentials support + compile "com.google.auth:google-auth-library-oauth2-http:1.41.0" + compile "com.google.http-client:google-http-client-gson:1.45.3" } // Relocate Guava and Jackson packages since they are incompatible from Embulk's. shadowJar { + mergeServiceFiles() relocate "com.google.common", "embulk.bigquery.com.google.common" relocate "com.google.thirdparty.publicsuffix", "embulk.bigquery.com.google.thirdparty.publicsuffix" relocate "com.fasterxml.jackson", "embulk.bigquery.com.fasterxml.jackson" + // OpenCensus and gRPC - internal dependencies + relocate "io.opencensus", "embulk.bigquery.io.opencensus" + relocate "io.grpc", "embulk.bigquery.io.grpc" } diff --git a/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile b/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile index ed0996b..93d94f9 100644 --- a/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile +++ b/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile @@ -16,38 +16,39 @@ com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:0.138.2 com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2:0.138.2 com.google.api.grpc:proto-google-common-protos:2.9.0 com.google.api.grpc:proto-google-iam-v1:1.4.1 -com.google.api:api-common:2.2.1 +com.google.api:api-common:2.53.0 com.google.api:gax-grpc:2.18.2 com.google.api:gax-httpjson:0.103.2 com.google.api:gax:2.18.2 com.google.apis:google-api-services-bigquery:v2-rev20220611-1.32.1 -com.google.auth:google-auth-library-credentials:1.7.0 -com.google.auth:google-auth-library-oauth2-http:1.7.0 +com.google.auth:google-auth-library-credentials:1.41.0 +com.google.auth:google-auth-library-oauth2-http:1.41.0 +com.google.auto.value:auto-value-annotations:1.11.0 com.google.cloud:google-cloud-bigquery:2.14.0 com.google.cloud:google-cloud-bigquerystorage:2.14.2 com.google.cloud:google-cloud-core-http:2.8.0 com.google.cloud:google-cloud-core:2.8.0 com.google.code.findbugs:jsr305:3.0.2 -com.google.code.gson:gson:2.9.0 -com.google.errorprone:error_prone_annotations:2.14.0 +com.google.code.gson:gson:2.12.1 +com.google.errorprone:error_prone_annotations:2.41.0 com.google.flatbuffers:flatbuffers-java:1.12.0 -com.google.guava:failureaccess:1.0.1 -com.google.guava:guava:31.1-jre +com.google.guava:failureaccess:1.0.3 +com.google.guava:guava:33.5.0-android com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava com.google.http-client:google-http-client-apache-v2:1.42.0 com.google.http-client:google-http-client-appengine:1.42.0 -com.google.http-client:google-http-client-gson:1.42.0 +com.google.http-client:google-http-client-gson:2.0.2 com.google.http-client:google-http-client-jackson2:1.42.0 -com.google.http-client:google-http-client:1.42.0 -com.google.j2objc:j2objc-annotations:1.3 +com.google.http-client:google-http-client:2.0.2 +com.google.j2objc:j2objc-annotations:3.1 com.google.oauth-client:google-oauth-client:1.34.1 com.google.protobuf:protobuf-java-util:3.21.1 com.google.protobuf:protobuf-java:3.21.1 commons-codec:commons-codec:1.15 io.grpc:grpc-alts:1.47.0 -io.grpc:grpc-api:1.47.0 +io.grpc:grpc-api:1.70.0 io.grpc:grpc-auth:1.47.0 -io.grpc:grpc-context:1.47.0 +io.grpc:grpc-context:1.70.0 io.grpc:grpc-core:1.47.0 io.grpc:grpc-googleapis:1.47.0 io.grpc:grpc-grpclb:1.47.0 @@ -65,13 +66,14 @@ org.apache.arrow:arrow-format:8.0.0 org.apache.arrow:arrow-memory-core:8.0.0 org.apache.arrow:arrow-memory-netty:8.0.0 org.apache.arrow:arrow-vector:8.0.0 -org.apache.httpcomponents:httpclient:4.5.13 -org.apache.httpcomponents:httpcore:4.4.15 +org.apache.httpcomponents:httpclient:4.5.14 +org.apache.httpcomponents:httpcore:4.4.16 org.checkerframework:checker-compat-qual:2.5.5 org.checkerframework:checker-qual:3.22.2 org.codehaus.mojo:animal-sniffer-annotations:1.21 org.conscrypt:conscrypt-openjdk-uber:2.5.1 org.json:json:20200518 +org.jspecify:jspecify:1.0.0 org.slf4j:jcl-over-slf4j:1.7.12 org.slf4j:slf4j-api:1.7.25 org.threeten:threeten-extra:1.7.0 diff --git a/src/main/java/org/embulk/output/bigquery_java/Auth.java b/src/main/java/org/embulk/output/bigquery_java/Auth.java index ae0495e..f8dc58c 100644 --- a/src/main/java/org/embulk/output/bigquery_java/Auth.java +++ b/src/main/java/org/embulk/output/bigquery_java/Auth.java @@ -8,24 +8,30 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.embulk.config.ConfigException; import org.embulk.output.bigquery_java.config.PluginTask; +import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig; import org.embulk.util.config.units.LocalFile; public class Auth { private final String authMethod; private final LocalFile jsonKeyFile; + private final WorkloadIdentityFederationConfig workloadIdentityFederationConfig; public Auth(PluginTask task) { authMethod = task.getAuthMethod(); - jsonKeyFile = task.getJsonKeyfile(); + jsonKeyFile = task.getJsonKeyfile().orElse(null); + workloadIdentityFederationConfig = task.getWorkloadIdentityFederation().orElse(null); } public Credentials getCredentials(String... scopes) throws IOException { - return getGoogleCredentials().createScoped(scopes); + return getGoogleCredentials(scopes).createScoped(scopes); } - private GoogleCredentials getGoogleCredentials() throws IOException { + private GoogleCredentials getGoogleCredentials(String... scopes) throws IOException { if ("authorized_user".equalsIgnoreCase(authMethod)) { return UserCredentials.fromStream(getCredentialsStream()); } else if ("service_account".equalsIgnoreCase(authMethod)) { @@ -34,12 +40,29 @@ private GoogleCredentials getGoogleCredentials() throws IOException { return ComputeEngineCredentials.create(); } else if ("application_default".equalsIgnoreCase(authMethod)) { return GoogleCredentials.getApplicationDefault(); + } else if ("workload_identity_federation".equalsIgnoreCase(authMethod)) { + return getWorkloadIdentityFederationCredentials(scopes); } else { throw new ConfigException("Unknown auth method: " + authMethod); } } private InputStream getCredentialsStream() { + if (jsonKeyFile == null) { + throw new ConfigException( + "json_keyfile is required when auth_method is '" + authMethod + "'"); + } return new ByteArrayInputStream(jsonKeyFile.getContent()); } + + private WorkloadIdentityFederationCredentials getWorkloadIdentityFederationCredentials( + String... scopes) throws IOException { + if (workloadIdentityFederationConfig == null) { + throw new ConfigException( + "workload_identity_federation config is required when auth_method is 'workload_identity_federation'"); + } + Set scopeSet = new HashSet<>(Arrays.asList(scopes)); + return WorkloadIdentityFederationCredentials.getOrCreateByFetchingToken( + workloadIdentityFederationConfig, scopeSet); + } } diff --git a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java index bfff06b..3276d78 100644 --- a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java +++ b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java @@ -126,9 +126,12 @@ public FieldList storeCachedSrcFieldsIfNeed() { } private String getProjectIdFromJsonKeyfile() { + if (!task.getJsonKeyfile().isPresent()) { + throw new ConfigException("Either project or project_id in json_keyfile is required"); + } String projectId = new JSONObject( - new JSONTokener(new ByteArrayInputStream(task.getJsonKeyfile().getContent()))) + new JSONTokener(new ByteArrayInputStream(task.getJsonKeyfile().get().getContent()))) .getString("project_id"); if (projectId == null) { throw new ConfigException("Either project or project_id in json_keyfile is required"); diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java new file mode 100644 index 0000000..6b73590 --- /dev/null +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java @@ -0,0 +1,137 @@ +package org.embulk.output.bigquery_java; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.AwsCredentials; +import com.google.auth.oauth2.AwsSecurityCredentials; +import com.google.auth.oauth2.AwsSecurityCredentialsSupplier; +import com.google.auth.oauth2.ExternalAccountSupplierContext; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ImpersonatedCredentials; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; +import org.embulk.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WorkloadIdentityFederationAuth { + private static final Logger logger = + LoggerFactory.getLogger(WorkloadIdentityFederationAuth.class); + private static final int TOKEN_LIFETIME_SECONDS = 3600; + + private final String awsAccessKeyId; + private final String awsSecretAccessKey; + private final String awsSessionToken; + private final String awsRegion; + private final String audience; + private final String serviceAccountImpersonationUrl; + private final String tokenUrl; + private final Set scopes; + + public WorkloadIdentityFederationAuth( + String awsAccessKeyId, + String awsSecretAccessKey, + String awsSessionToken, + String awsRegion, + String audience, + String serviceAccountImpersonationUrl, + String tokenUrl, + Set scopes) { + this.awsAccessKeyId = awsAccessKeyId; + this.awsSecretAccessKey = awsSecretAccessKey; + this.awsSessionToken = awsSessionToken; + this.awsRegion = awsRegion; + this.audience = audience; + this.serviceAccountImpersonationUrl = serviceAccountImpersonationUrl; + this.tokenUrl = tokenUrl != null ? tokenUrl : "https://sts.googleapis.com/v1/token"; + this.scopes = scopes; + } + + public AccessToken fetchAccessToken() throws IOException { + AccessToken federatedToken = fetchFederatedToken(); + + // Direct resource access (no service account impersonation) + if (serviceAccountImpersonationUrl == null || serviceAccountImpersonationUrl.isEmpty()) { + return federatedToken; + } + + return impersonateServiceAccount(federatedToken); + } + + private static String formatAccessToken(AccessToken token) { + String expireTime = + token.getExpirationTime() != null + ? token.getExpirationTime().toInstant().toString() + : "null"; + return "expireTime: " + expireTime; + } + + private String getServiceAccountEmail() { + // Extract email from URL like: + // https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/xxx@yyy.iam.gserviceaccount.com:generateAccessToken + String[] parts = serviceAccountImpersonationUrl.split("serviceAccounts/"); + if (parts.length < 2) { + throw new ConfigException( + String.format( + "Invalid service_account_impersonation_url: %s", serviceAccountImpersonationUrl)); + } + return parts[1].replace(":generateAccessToken", ""); + } + + // https://docs.cloud.google.com/iam/docs/reference/sts/rest/v1/TopLevel/token + private AccessToken fetchFederatedToken() throws IOException { + logger.debug("fetching federated token using AwsCredentials"); + + // Create AWS security credentials supplier + AwsSecurityCredentialsSupplier supplier = + new AwsSecurityCredentialsSupplier() { + @Override + public AwsSecurityCredentials getCredentials(ExternalAccountSupplierContext context) + throws IOException { + return new AwsSecurityCredentials(awsAccessKeyId, awsSecretAccessKey, awsSessionToken); + } + + @Override + public String getRegion(ExternalAccountSupplierContext context) throws IOException { + return awsRegion; + } + }; + + // Build AwsCredentials using the supplier + AwsCredentials awsCredentials = + AwsCredentials.newBuilder() + .setAwsSecurityCredentialsSupplier(supplier) + .setAudience(audience) + .setTokenUrl(tokenUrl) + .setSubjectTokenType("urn:ietf:params:aws:token-type:aws4_request") + .build(); + + // Refresh to get the federated token + awsCredentials.refresh(); + AccessToken accessToken = awsCredentials.getAccessToken(); + + logger.debug("federated token obtained, {}", formatAccessToken(accessToken)); + + return accessToken; + } + + // https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/generateAccessToken + private AccessToken impersonateServiceAccount(AccessToken federatedToken) throws IOException { + String serviceAccountEmail = getServiceAccountEmail(); + logger.debug("impersonating service account: {}", serviceAccountEmail); + + GoogleCredentials sourceCredentials = GoogleCredentials.create(federatedToken); + ImpersonatedCredentials impersonatedCredentials = + ImpersonatedCredentials.create( + sourceCredentials, + serviceAccountEmail, + null, + new ArrayList<>(scopes), + TOKEN_LIFETIME_SECONDS); + + impersonatedCredentials.refresh(); + AccessToken accessToken = impersonatedCredentials.getAccessToken(); + logger.debug("service account impersonation succeeded, {}", formatAccessToken(accessToken)); + return accessToken; + } +} diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java new file mode 100644 index 0000000..16a5a64 --- /dev/null +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java @@ -0,0 +1,135 @@ +package org.embulk.output.bigquery_java; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig; +import org.embulk.util.config.units.LocalFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WorkloadIdentityFederationCredentials extends GoogleCredentials { + private static final Logger logger = + LoggerFactory.getLogger(WorkloadIdentityFederationCredentials.class); + private static final Map cache = + new ConcurrentHashMap<>(); + + private final WorkloadIdentityFederationAuth auth; + + private static class CacheKey { + private final String awsAccessKeyId; + private final String awsRegion; + private final String audience; + private final Set scopes; + + CacheKey(String awsAccessKeyId, String awsRegion, String audience, Set scopes) { + this.awsAccessKeyId = awsAccessKeyId; + this.awsRegion = awsRegion; + this.audience = audience; + this.scopes = scopes; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(awsAccessKeyId, cacheKey.awsAccessKeyId) + && Objects.equals(awsRegion, cacheKey.awsRegion) + && Objects.equals(audience, cacheKey.audience) + && Objects.equals(scopes, cacheKey.scopes); + } + + @Override + public int hashCode() { + return Objects.hash(awsAccessKeyId, awsRegion, audience, scopes); + } + + @Override + public String toString() { + return String.format( + "%s:%s:%s:%s", awsAccessKeyId, awsRegion, audience, String.join(",", scopes)); + } + } + + public static WorkloadIdentityFederationCredentials getOrCreateByFetchingToken( + WorkloadIdentityFederationConfig wifConfig, Set scopes) throws IOException { + JsonObject jsonConfig = parseConfig(wifConfig); + CacheKey cacheKey = + new CacheKey( + wifConfig.getAwsAccessKeyId(), + wifConfig.getAwsRegion(), + jsonConfig.get("audience").getAsString(), + scopes); + WorkloadIdentityFederationCredentials cached = cache.get(cacheKey); + if (cached != null) { + logger.debug("cache hit for cacheKey: {}", cacheKey); + return cached; + } + logger.debug("cache miss for cacheKey: {}", cacheKey); + WorkloadIdentityFederationCredentials credentials = + createByFetchingToken( + wifConfig.getAwsAccessKeyId(), + wifConfig.getAwsSecretAccessKey(), + wifConfig.getAwsSessionToken().orElse(null), + wifConfig.getAwsRegion(), + jsonConfig, + scopes); + cache.put(cacheKey, credentials); + return credentials; + } + + private static JsonObject parseConfig(WorkloadIdentityFederationConfig wifConfig) { + LocalFile configFile = wifConfig.getConfig(); + String json = new String(configFile.getContent(), StandardCharsets.UTF_8); + return JsonParser.parseString(json).getAsJsonObject(); + } + + private static WorkloadIdentityFederationCredentials createByFetchingToken( + String awsAccessKeyId, + String awsSecretAccessKey, + String awsSessionToken, + String awsRegion, + JsonObject jsonConfig, + Set scopes) + throws IOException { + logger.info("creating credentials by fetching token"); + String tokenUrl = + jsonConfig.has("token_url") ? jsonConfig.get("token_url").getAsString() : null; + String serviceAccountImpersonationUrl = + jsonConfig.has("service_account_impersonation_url") + ? jsonConfig.get("service_account_impersonation_url").getAsString() + : null; + WorkloadIdentityFederationAuth auth = + new WorkloadIdentityFederationAuth( + awsAccessKeyId, + awsSecretAccessKey, + awsSessionToken, + awsRegion, + jsonConfig.get("audience").getAsString(), + serviceAccountImpersonationUrl, + tokenUrl, + scopes); + AccessToken token = auth.fetchAccessToken(); + return new WorkloadIdentityFederationCredentials(auth, token); + } + + private WorkloadIdentityFederationCredentials( + WorkloadIdentityFederationAuth auth, AccessToken accessToken) { + super(GoogleCredentials.newBuilder().setAccessToken(accessToken)); + this.auth = auth; + } + + @Override + public AccessToken refreshAccessToken() throws IOException { + logger.info("refreshing access token"); + return auth.fetchAccessToken(); + } +} diff --git a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java index a7e3a95..38e3ba1 100644 --- a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java +++ b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java @@ -21,7 +21,12 @@ public interface PluginTask extends Task { String getAuthMethod(); @Config("json_keyfile") - LocalFile getJsonKeyfile(); + @ConfigDefault("null") + Optional getJsonKeyfile(); + + @Config("workload_identity_federation") + @ConfigDefault("null") + Optional getWorkloadIdentityFederation(); @Config("project") @ConfigDefault("null") diff --git a/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java new file mode 100644 index 0000000..2c68521 --- /dev/null +++ b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java @@ -0,0 +1,26 @@ +package org.embulk.output.bigquery_java.config; + +import java.util.Optional; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.config.Task; +import org.embulk.util.config.units.LocalFile; + +public interface WorkloadIdentityFederationConfig extends Task { + @Config("config") + LocalFile getConfig(); + + @Config("aws_access_key_id") + String getAwsAccessKeyId(); + + @Config("aws_secret_access_key") + String getAwsSecretAccessKey(); + + @Config("aws_session_token") + @ConfigDefault("null") + Optional getAwsSessionToken(); + + @Config("aws_region") + @ConfigDefault("\"ap-northeast-1\"") + String getAwsRegion(); +}