From 32041f91ad9fc673bfa8ac9aef997cf22bace186 Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 13 Dec 2025 09:01:16 +0900 Subject: [PATCH 1/8] Add access_token authentication method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add access_token as a new authentication method for BigQuery. This allows direct authentication using OAuth access tokens without requiring a service account JSON keyfile. - PluginTask: Add access_token option, make json_keyfile Optional - Auth: Add access_token authentication handling - BigqueryClient: Update to handle Optional json_keyfile - README: Add documentation for access_token option 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 3 ++- .../org/embulk/output/bigquery_java/Auth.java | 18 +++++++++++++++++- .../output/bigquery_java/BigqueryClient.java | 6 +++++- .../bigquery_java/config/PluginTask.java | 7 ++++++- 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8486393..0ecd596 100644 --- a/README.md +++ b/README.md @@ -36,8 +36,9 @@ Under construction | name (x) is unsupported | type | required? | default | description | |:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------| | mode (replace, append is supported) | string | optional | "append" | See [Mode](#mode) | -| auth_method (service_account is supported) | string | optional | "application\_default" | See [Authentication](#authentication) | +| auth_method (service_account and access_token are supported)| string | optional | "application\_default" | See [Authentication](#authentication) | | json_keyfile | string | optional | | keyfile path or `content` | +| access_token | string | optional | | access token for authentication | | project (x) | string | required unless service\_account's `json_keyfile` is given. | | project\_id | | dataset | string | required | | dataset | | location | string | optional | nil | geographic location of dataset. See [Location](#location) | diff --git a/src/main/java/org/embulk/output/bigquery_java/Auth.java b/src/main/java/org/embulk/output/bigquery_java/Auth.java index ae0495e..f7b45e2 100644 --- a/src/main/java/org/embulk/output/bigquery_java/Auth.java +++ b/src/main/java/org/embulk/output/bigquery_java/Auth.java @@ -1,6 +1,7 @@ package org.embulk.output.bigquery_java; import com.google.auth.Credentials; +import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.ComputeEngineCredentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; @@ -15,10 +16,12 @@ public class Auth { private final String authMethod; private final LocalFile jsonKeyFile; + private final String accessToken; public Auth(PluginTask task) { authMethod = task.getAuthMethod(); - jsonKeyFile = task.getJsonKeyfile(); + jsonKeyFile = task.getJsonKeyfile().orElse(null); + accessToken = task.getAccessToken().orElse(null); } public Credentials getCredentials(String... scopes) throws IOException { @@ -34,12 +37,25 @@ private GoogleCredentials getGoogleCredentials() throws IOException { return ComputeEngineCredentials.create(); } else if ("application_default".equalsIgnoreCase(authMethod)) { return GoogleCredentials.getApplicationDefault(); + } else if ("access_token".equalsIgnoreCase(authMethod)) { + return GoogleCredentials.create(new AccessToken(getAccessToken(), null)); } else { throw new ConfigException("Unknown auth method: " + authMethod); } } private InputStream getCredentialsStream() { + if (jsonKeyFile == null) { + throw new ConfigException( + "json_keyfile is required when auth_method is '" + authMethod + "'"); + } return new ByteArrayInputStream(jsonKeyFile.getContent()); } + + private String getAccessToken() { + if (accessToken == null) { + throw new ConfigException("access_token is required"); + } + return accessToken; + } } diff --git a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java index bac7e3f..f679af2 100644 --- a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java +++ b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java @@ -121,9 +121,13 @@ private FieldList getSrcFields() { } private String getProjectIdFromJsonKeyfile() { + if (!task.getJsonKeyfile().isPresent()) { + throw new ConfigException("Either project or project_id in json_keyfile is required"); + } String projectId = new JSONObject( - new JSONTokener(new ByteArrayInputStream(task.getJsonKeyfile().getContent()))) + new JSONTokener( + new ByteArrayInputStream(task.getJsonKeyfile().get().getContent()))) .getString("project_id"); if (projectId == null) { throw new ConfigException("Either project or project_id in json_keyfile is required"); diff --git a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java index a7e3a95..df1e052 100644 --- a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java +++ b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java @@ -21,7 +21,12 @@ public interface PluginTask extends Task { String getAuthMethod(); @Config("json_keyfile") - LocalFile getJsonKeyfile(); + @ConfigDefault("null") + Optional getJsonKeyfile(); + + @Config("access_token") + @ConfigDefault("null") + Optional getAccessToken(); @Config("project") @ConfigDefault("null") From a8b1b2e58ff88d9dfa0975036e6231c414a7d4be Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 13 Dec 2025 16:49:49 +0900 Subject: [PATCH 2/8] Add Workload Identity Federation authentication method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for authenticating to BigQuery using AWS credentials via Workload Identity Federation. This allows users to authenticate without storing Google Cloud service account keys by leveraging AWS IAM credentials. - Add WorkloadIdentityFederationConfig for nested configuration - Add WorkloadIdentityFederationAuth for AWS signed request and token exchange - Add WorkloadIdentityFederationCredentials with token refresh support - Update Auth to support workload_identity_federation auth method - Update PluginTask with workload_identity_federation config option - Update README with documentation and examples 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 50 ++- .../org/embulk/output/bigquery_java/Auth.java | 16 + .../WorkloadIdentityFederationAuth.java | 300 ++++++++++++++++++ ...WorkloadIdentityFederationCredentials.java | 20 ++ .../bigquery_java/config/PluginTask.java | 4 + .../WorkloadIdentityFederationConfig.java | 21 ++ 6 files changed, 410 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java create mode 100644 src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java create mode 100644 src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java diff --git a/README.md b/README.md index 0ecd596..e1186a4 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,10 @@ Under construction | name (x) is unsupported | type | required? | default | description | |:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------| | mode (replace, append is supported) | string | optional | "append" | See [Mode](#mode) | -| auth_method (service_account and access_token are supported)| string | optional | "application\_default" | See [Authentication](#authentication) | +| auth_method (service_account, access_token, workload_identity_federation are supported)| string | optional | "application\_default" | See [Authentication](#authentication) | | json_keyfile | string | optional | | keyfile path or `content` | | access_token | string | optional | | access token for authentication | +| workload_identity_federation | hash | optional | | Workload Identity Federation config. See below | | project (x) | string | required unless service\_account's `json_keyfile` is given. | | project\_id | | dataset | string | required | | dataset | | location | string | optional | nil | geographic location of dataset. See [Location](#location) | @@ -113,6 +114,53 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b | schema_update_options (x) | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. | +### Workload Identity Federation + +Workload Identity Federation allows authentication using AWS credentials to access Google Cloud resources. + +| name | type | required? | default | description | +|:-------------------------------------|:------------|:-----------|:------------------|:-----------------------| +| workload_identity_federation.json_keyfile | string | required | | Path to the Workload Identity Federation JSON config file | +| workload_identity_federation.aws_access_key_id | string | required | | AWS Access Key ID | +| workload_identity_federation.aws_secret_access_key | string | required | | AWS Secret Access Key | +| workload_identity_federation.aws_region | string | optional | "ap-northeast-1" | AWS Region | + +Example) + +```yaml +out: + type: bigquery_java + auth_method: workload_identity_federation + workload_identity_federation: + json_keyfile: /path/to/workload-identity-federation-config.json + aws_access_key_id: AKIAXXXXXXXXXXXXXXXX + aws_secret_access_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + aws_region: ap-northeast-1 + project: my-project + dataset: my_dataset + table: my_table + source_format: NEWLINE_DELIMITED_JSON +``` + +The `json_keyfile` should contain the Workload Identity Federation configuration from Google Cloud: + +```json +{ + "universe_domain": "googleapis.com", + "type": "external_account", + "audience": "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", + "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request", + "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT@PROJECT.iam.gserviceaccount.com:generateAccessToken", + "token_url": "https://sts.googleapis.com/v1/token", + "credential_source": { + "environment_id": "aws1", + "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone", + "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials", + "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15" + } +} +``` + ### Column Options (NOT fully supported) Column options are used to aid guessing BigQuery schema, or to define conversion of values: diff --git a/src/main/java/org/embulk/output/bigquery_java/Auth.java b/src/main/java/org/embulk/output/bigquery_java/Auth.java index f7b45e2..dd229cb 100644 --- a/src/main/java/org/embulk/output/bigquery_java/Auth.java +++ b/src/main/java/org/embulk/output/bigquery_java/Auth.java @@ -11,17 +11,20 @@ import java.io.InputStream; import org.embulk.config.ConfigException; import org.embulk.output.bigquery_java.config.PluginTask; +import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig; import org.embulk.util.config.units.LocalFile; public class Auth { private final String authMethod; private final LocalFile jsonKeyFile; private final String accessToken; + private final WorkloadIdentityFederationConfig workloadIdentityFederationConfig; public Auth(PluginTask task) { authMethod = task.getAuthMethod(); jsonKeyFile = task.getJsonKeyfile().orElse(null); accessToken = task.getAccessToken().orElse(null); + workloadIdentityFederationConfig = task.getWorkloadIdentityFederation().orElse(null); } public Credentials getCredentials(String... scopes) throws IOException { @@ -39,6 +42,8 @@ private GoogleCredentials getGoogleCredentials() throws IOException { return GoogleCredentials.getApplicationDefault(); } else if ("access_token".equalsIgnoreCase(authMethod)) { return GoogleCredentials.create(new AccessToken(getAccessToken(), null)); + } else if ("workload_identity_federation".equalsIgnoreCase(authMethod)) { + return getWorkloadIdentityFederationCredentials(); } else { throw new ConfigException("Unknown auth method: " + authMethod); } @@ -58,4 +63,15 @@ private String getAccessToken() { } return accessToken; } + + private WorkloadIdentityFederationCredentials getWorkloadIdentityFederationCredentials() + throws IOException { + if (workloadIdentityFederationConfig == null) { + throw new ConfigException( + "workload_identity_federation config is required when auth_method is 'workload_identity_federation'"); + } + WorkloadIdentityFederationAuth wifAuth = + new WorkloadIdentityFederationAuth(workloadIdentityFederationConfig); + return wifAuth.getCredentials(); + } } diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java new file mode 100644 index 0000000..621ff01 --- /dev/null +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java @@ -0,0 +1,300 @@ +package org.embulk.output.bigquery_java; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.stream.Collectors; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.embulk.config.ConfigException; +import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WorkloadIdentityFederationAuth { + private static final Logger logger = + LoggerFactory.getLogger(WorkloadIdentityFederationAuth.class); + private final String awsAccessKeyId; + private final String awsSecretAccessKey; + private final String awsRegion; + private final String audience; + private final String serviceAccountImpersonationUrl; + private final String tokenUrl; + + public WorkloadIdentityFederationAuth(WorkloadIdentityFederationConfig config) { + this.awsAccessKeyId = config.getAwsAccessKeyId(); + this.awsSecretAccessKey = config.getAwsSecretAccessKey(); + this.awsRegion = config.getAwsRegion(); + + JSONObject jsonConfig = + new JSONObject( + new JSONTokener(new ByteArrayInputStream(config.getJsonKeyfile().getContent()))); + this.audience = jsonConfig.getString("audience"); + this.serviceAccountImpersonationUrl = jsonConfig.getString("service_account_impersonation_url"); + this.tokenUrl = jsonConfig.optString("token_url", "https://sts.googleapis.com/v1/token"); + } + + public WorkloadIdentityFederationCredentials getCredentials() throws IOException { + AccessToken token = getAccessToken(); + return new WorkloadIdentityFederationCredentials(this, token); + } + + public AccessToken getAccessToken() throws IOException { + Map awsRequest = createAwsSignedRequest(); + String federatedToken = exchangeTokenForGoogleAccessToken(awsRequest); + String accessToken = impersonateServiceAccount(federatedToken); + java.util.Date expirationTime = new java.util.Date(System.currentTimeMillis() + 3600 * 1000); + return new AccessToken(accessToken, expirationTime); + } + + private String getServiceAccountEmail() { + // Extract email from URL like: + // https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/xxx@yyy.iam.gserviceaccount.com:generateAccessToken + String[] parts = serviceAccountImpersonationUrl.split("serviceAccounts/"); + if (parts.length < 2) { + throw new ConfigException( + "Invalid service_account_impersonation_url: " + serviceAccountImpersonationUrl); + } + return parts[1].replace(":generateAccessToken", ""); + } + + private Map createAwsSignedRequest() { + String service = "sts"; + String host = "sts." + awsRegion + ".amazonaws.com"; + String method = "POST"; + + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + String amzDate = now.format(DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'")); + String dateStamp = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); + + String queryParams = "Action=GetCallerIdentity&Version=2011-06-15"; + String endpoint = "https://" + host + "/?" + queryParams; + + String payloadHash = sha256Hex(""); + + Map headers = new HashMap<>(); + headers.put("host", host); + headers.put("x-amz-date", amzDate); + headers.put("x-goog-cloud-target-resource", audience); + + List signedHeadersList = new ArrayList<>(headers.keySet()); + Collections.sort(signedHeadersList); + String signedHeaders = String.join(";", signedHeadersList); + + String canonicalHeaders = + signedHeadersList.stream() + .map(k -> k + ":" + headers.get(k) + "\n") + .collect(Collectors.joining()); + + String[] queryParts = queryParams.split("&"); + java.util.Arrays.sort(queryParts); + String canonicalQuerystring = String.join("&", queryParts); + + String canonicalRequest = + String.join( + "\n", method, "/", canonicalQuerystring, canonicalHeaders, signedHeaders, payloadHash); + + String algorithm = "AWS4-HMAC-SHA256"; + String credentialScope = dateStamp + "/" + awsRegion + "/" + service + "/aws4_request"; + + String stringToSign = + String.join("\n", algorithm, amzDate, credentialScope, sha256Hex(canonicalRequest)); + + byte[] signingKey = getSignatureKey(dateStamp, awsRegion, service); + String signature = hmacSha256Hex(signingKey, stringToSign); + + String authorizationHeader = + algorithm + + " Credential=" + + awsAccessKeyId + + "/" + + credentialScope + + ", SignedHeaders=" + + signedHeaders + + ", Signature=" + + signature; + + List> requestHeaders = new ArrayList<>(); + for (String key : signedHeadersList) { + Map header = new HashMap<>(); + header.put("key", key); + header.put("value", headers.get(key)); + requestHeaders.add(header); + } + Map authHeader = new HashMap<>(); + authHeader.put("key", "Authorization"); + authHeader.put("value", authorizationHeader); + requestHeaders.add(authHeader); + + Map result = new HashMap<>(); + result.put("url", endpoint); + result.put("method", method); + result.put("headers", requestHeaders); + return result; + } + + private byte[] getSignatureKey(String dateStamp, String region, String service) { + byte[] kDate = hmacSha256(("AWS4" + awsSecretAccessKey).getBytes(StandardCharsets.UTF_8), dateStamp); + byte[] kRegion = hmacSha256(kDate, region); + byte[] kService = hmacSha256(kRegion, service); + return hmacSha256(kService, "aws4_request"); + } + + private String exchangeTokenForGoogleAccessToken(Map awsRequest) + throws IOException { + JSONObject awsRequestJson = new JSONObject(awsRequest); + String subjectToken = URLEncoder.encode(awsRequestJson.toString(), "UTF-8"); + + String data = + "grant_type=" + + URLEncoder.encode("urn:ietf:params:oauth:grant-type:token-exchange", "UTF-8") + + "&audience=" + + URLEncoder.encode(audience, "UTF-8") + + "&scope=" + + URLEncoder.encode("https://www.googleapis.com/auth/cloud-platform", "UTF-8") + + "&requested_token_type=" + + URLEncoder.encode("urn:ietf:params:oauth:token-type:access_token", "UTF-8") + + "&subject_token_type=" + + URLEncoder.encode("urn:ietf:params:aws:token-type:aws4_request", "UTF-8") + + "&subject_token=" + + subjectToken; + + URL url = new URL(tokenUrl); + logger.debug("Exchanging token at: {}", tokenUrl); + + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + conn.setDoOutput(true); + + try (OutputStream os = conn.getOutputStream()) { + os.write(data.getBytes(StandardCharsets.UTF_8)); + } + + int responseCode = conn.getResponseCode(); + logger.debug("Token exchange response code: {}", responseCode); + + if (responseCode != 200) { + String errorBody; + try (Scanner scanner = new Scanner(conn.getErrorStream(), "UTF-8")) { + errorBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; + } + logger.info("Token exchange failed: {} - {}", responseCode, errorBody); + throw new IOException( + "Google STS token exchange failed: " + responseCode + " - " + errorBody); + } + + String responseBody; + try (Scanner scanner = new Scanner(conn.getInputStream(), "UTF-8")) { + responseBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; + } + logger.info("Token exchange succeeded"); + + JSONObject responseJson = new JSONObject(responseBody); + return responseJson.getString("access_token"); + } + + private String impersonateServiceAccount(String federatedToken) throws IOException { + String serviceAccountEmail = getServiceAccountEmail(); + String impersonationUrl = + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/" + + serviceAccountEmail + + ":generateAccessToken"; + logger.debug("Impersonating service account: {}", serviceAccountEmail); + logger.debug("Impersonation URL: {}", impersonationUrl); + + URL url = new URL(impersonationUrl); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Authorization", "Bearer " + federatedToken); + conn.setRequestProperty("Content-Type", "application/json"); + conn.setDoOutput(true); + + JSONObject requestBody = new JSONObject(); + requestBody.put( + "scope", + new String[] { + "https://www.googleapis.com/auth/bigquery", + "https://www.googleapis.com/auth/devstorage.read_write" + }); + requestBody.put("lifetime", "3600s"); + + try (OutputStream os = conn.getOutputStream()) { + os.write(requestBody.toString().getBytes(StandardCharsets.UTF_8)); + } + + int responseCode = conn.getResponseCode(); + logger.debug("Impersonation response code: {}", responseCode); + + if (responseCode != 200) { + String errorBody; + try (Scanner scanner = new Scanner(conn.getErrorStream(), "UTF-8")) { + errorBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; + } + logger.info("Impersonation failed: {} - {}", responseCode, errorBody); + throw new IOException( + "Service account impersonation failed: " + responseCode + " - " + errorBody); + } + + String responseBody; + try (Scanner scanner = new Scanner(conn.getInputStream(), "UTF-8")) { + responseBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; + } + logger.info("Service account impersonation succeeded"); + + JSONObject responseJson = new JSONObject(responseBody); + return responseJson.getString("accessToken"); + } + + private String sha256Hex(String data) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(data.getBytes(StandardCharsets.UTF_8)); + return bytesToHex(hash); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 algorithm not found", e); + } + } + + private byte[] hmacSha256(byte[] key, String data) { + try { + Mac mac = Mac.getInstance("HmacSHA256"); + mac.init(new SecretKeySpec(key, "HmacSHA256")); + return mac.doFinal(data.getBytes(StandardCharsets.UTF_8)); + } catch (NoSuchAlgorithmException | InvalidKeyException e) { + throw new RuntimeException("HMAC-SHA256 error", e); + } + } + + private String hmacSha256Hex(byte[] key, String data) { + return bytesToHex(hmacSha256(key, data)); + } + + private String bytesToHex(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } +} diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java new file mode 100644 index 0000000..9d295a4 --- /dev/null +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java @@ -0,0 +1,20 @@ +package org.embulk.output.bigquery_java; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.IOException; + +public class WorkloadIdentityFederationCredentials extends GoogleCredentials { + private final WorkloadIdentityFederationAuth auth; + + public WorkloadIdentityFederationCredentials( + WorkloadIdentityFederationAuth auth, AccessToken accessToken) { + super(accessToken); + this.auth = auth; + } + + @Override + public AccessToken refreshAccessToken() throws IOException { + return auth.getAccessToken(); + } +} diff --git a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java index df1e052..7b353f9 100644 --- a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java +++ b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java @@ -28,6 +28,10 @@ public interface PluginTask extends Task { @ConfigDefault("null") Optional getAccessToken(); + @Config("workload_identity_federation") + @ConfigDefault("null") + Optional getWorkloadIdentityFederation(); + @Config("project") @ConfigDefault("null") Optional getProject(); diff --git a/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java new file mode 100644 index 0000000..ed616d8 --- /dev/null +++ b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java @@ -0,0 +1,21 @@ +package org.embulk.output.bigquery_java.config; + +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.config.Task; +import org.embulk.util.config.units.LocalFile; + +public interface WorkloadIdentityFederationConfig extends Task { + @Config("json_keyfile") + LocalFile getJsonKeyfile(); + + @Config("aws_access_key_id") + String getAwsAccessKeyId(); + + @Config("aws_secret_access_key") + String getAwsSecretAccessKey(); + + @Config("aws_region") + @ConfigDefault("\"ap-northeast-1\"") + String getAwsRegion(); +} From 654421ced77af3707616845d2f9bac2bda705576 Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 13 Dec 2025 16:54:48 +0900 Subject: [PATCH 3/8] Remove deprecated access_token authentication method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the access_token auth method in favor of workload_identity_federation which provides better security by not requiring direct token handling. - Remove access_token field and getter from PluginTask - Remove access_token handling from Auth class - Update README to remove access_token documentation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 3 +-- .../java/org/embulk/output/bigquery_java/Auth.java | 12 ------------ .../output/bigquery_java/config/PluginTask.java | 4 ---- 3 files changed, 1 insertion(+), 18 deletions(-) diff --git a/README.md b/README.md index e1186a4..0abe858 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,8 @@ Under construction | name (x) is unsupported | type | required? | default | description | |:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------| | mode (replace, append is supported) | string | optional | "append" | See [Mode](#mode) | -| auth_method (service_account, access_token, workload_identity_federation are supported)| string | optional | "application\_default" | See [Authentication](#authentication) | +| auth_method (service_account, workload_identity_federation are supported)| string | optional | "application\_default" | See [Authentication](#authentication) | | json_keyfile | string | optional | | keyfile path or `content` | -| access_token | string | optional | | access token for authentication | | workload_identity_federation | hash | optional | | Workload Identity Federation config. See below | | project (x) | string | required unless service\_account's `json_keyfile` is given. | | project\_id | | dataset | string | required | | dataset | diff --git a/src/main/java/org/embulk/output/bigquery_java/Auth.java b/src/main/java/org/embulk/output/bigquery_java/Auth.java index dd229cb..2d770f4 100644 --- a/src/main/java/org/embulk/output/bigquery_java/Auth.java +++ b/src/main/java/org/embulk/output/bigquery_java/Auth.java @@ -1,7 +1,6 @@ package org.embulk.output.bigquery_java; import com.google.auth.Credentials; -import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.ComputeEngineCredentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; @@ -17,13 +16,11 @@ public class Auth { private final String authMethod; private final LocalFile jsonKeyFile; - private final String accessToken; private final WorkloadIdentityFederationConfig workloadIdentityFederationConfig; public Auth(PluginTask task) { authMethod = task.getAuthMethod(); jsonKeyFile = task.getJsonKeyfile().orElse(null); - accessToken = task.getAccessToken().orElse(null); workloadIdentityFederationConfig = task.getWorkloadIdentityFederation().orElse(null); } @@ -40,8 +37,6 @@ private GoogleCredentials getGoogleCredentials() throws IOException { return ComputeEngineCredentials.create(); } else if ("application_default".equalsIgnoreCase(authMethod)) { return GoogleCredentials.getApplicationDefault(); - } else if ("access_token".equalsIgnoreCase(authMethod)) { - return GoogleCredentials.create(new AccessToken(getAccessToken(), null)); } else if ("workload_identity_federation".equalsIgnoreCase(authMethod)) { return getWorkloadIdentityFederationCredentials(); } else { @@ -57,13 +52,6 @@ private InputStream getCredentialsStream() { return new ByteArrayInputStream(jsonKeyFile.getContent()); } - private String getAccessToken() { - if (accessToken == null) { - throw new ConfigException("access_token is required"); - } - return accessToken; - } - private WorkloadIdentityFederationCredentials getWorkloadIdentityFederationCredentials() throws IOException { if (workloadIdentityFederationConfig == null) { diff --git a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java index 7b353f9..38e3ba1 100644 --- a/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java +++ b/src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java @@ -24,10 +24,6 @@ public interface PluginTask extends Task { @ConfigDefault("null") Optional getJsonKeyfile(); - @Config("access_token") - @ConfigDefault("null") - Optional getAccessToken(); - @Config("workload_identity_federation") @ConfigDefault("null") Optional getWorkloadIdentityFederation(); From c3dd74855a15019d6b941055d6b7c0bf0f43e1df Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 13 Dec 2025 18:20:13 +0900 Subject: [PATCH 4/8] Improve WorkloadIdentityFederationAuth implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract TOKEN_LIFETIME_SECONDS constant for token expiration time - Add debug logging for non-sensitive response fields (expires_in, token_type, etc.) - Remove unnecessary devstorage.read_write scope, keep only bigquery scope 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../WorkloadIdentityFederationAuth.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java index 621ff01..60d81cb 100644 --- a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java @@ -34,6 +34,8 @@ public class WorkloadIdentityFederationAuth { private static final Logger logger = LoggerFactory.getLogger(WorkloadIdentityFederationAuth.class); + private static final int TOKEN_LIFETIME_SECONDS = 3600; + private final String awsAccessKeyId; private final String awsSecretAccessKey; private final String awsRegion; @@ -63,7 +65,8 @@ public AccessToken getAccessToken() throws IOException { Map awsRequest = createAwsSignedRequest(); String federatedToken = exchangeTokenForGoogleAccessToken(awsRequest); String accessToken = impersonateServiceAccount(federatedToken); - java.util.Date expirationTime = new java.util.Date(System.currentTimeMillis() + 3600 * 1000); + java.util.Date expirationTime = + new java.util.Date(System.currentTimeMillis() + TOKEN_LIFETIME_SECONDS * 1000); return new AccessToken(accessToken, expirationTime); } @@ -211,6 +214,15 @@ private String exchangeTokenForGoogleAccessToken(Map awsRequest) logger.info("Token exchange succeeded"); JSONObject responseJson = new JSONObject(responseBody); + if (responseJson.has("expires_in")) { + logger.debug("Token expires_in: {}", responseJson.get("expires_in")); + } + if (responseJson.has("token_type")) { + logger.debug("Token type: {}", responseJson.getString("token_type")); + } + if (responseJson.has("issued_token_type")) { + logger.debug("Issued token type: {}", responseJson.getString("issued_token_type")); + } return responseJson.getString("access_token"); } @@ -231,13 +243,8 @@ private String impersonateServiceAccount(String federatedToken) throws IOExcepti conn.setDoOutput(true); JSONObject requestBody = new JSONObject(); - requestBody.put( - "scope", - new String[] { - "https://www.googleapis.com/auth/bigquery", - "https://www.googleapis.com/auth/devstorage.read_write" - }); - requestBody.put("lifetime", "3600s"); + requestBody.put("scope", new String[] {"https://www.googleapis.com/auth/bigquery"}); + requestBody.put("lifetime", TOKEN_LIFETIME_SECONDS + "s"); try (OutputStream os = conn.getOutputStream()) { os.write(requestBody.toString().getBytes(StandardCharsets.UTF_8)); @@ -263,6 +270,9 @@ private String impersonateServiceAccount(String federatedToken) throws IOExcepti logger.info("Service account impersonation succeeded"); JSONObject responseJson = new JSONObject(responseBody); + if (responseJson.has("expireTime")) { + logger.debug("Access token expire time: {}", responseJson.getString("expireTime")); + } return responseJson.getString("accessToken"); } From 258f2ce00397b379be86de8fa8095a2e07fd7c87 Mon Sep 17 00:00:00 2001 From: chikamura Date: Sun, 14 Dec 2025 17:47:54 +0900 Subject: [PATCH 5/8] Rename json_keyfile to config in WorkloadIdentityFederationConfig MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplify the configuration parameter name from json_keyfile to config for better clarity and consistency. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 6 +++--- .../bigquery_java/WorkloadIdentityFederationAuth.java | 2 +- .../config/WorkloadIdentityFederationConfig.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 0abe858..4d46cdd 100644 --- a/README.md +++ b/README.md @@ -119,7 +119,7 @@ Workload Identity Federation allows authentication using AWS credentials to acce | name | type | required? | default | description | |:-------------------------------------|:------------|:-----------|:------------------|:-----------------------| -| workload_identity_federation.json_keyfile | string | required | | Path to the Workload Identity Federation JSON config file | +| workload_identity_federation.config | string | required | | Path to the Workload Identity Federation JSON config file | | workload_identity_federation.aws_access_key_id | string | required | | AWS Access Key ID | | workload_identity_federation.aws_secret_access_key | string | required | | AWS Secret Access Key | | workload_identity_federation.aws_region | string | optional | "ap-northeast-1" | AWS Region | @@ -131,7 +131,7 @@ out: type: bigquery_java auth_method: workload_identity_federation workload_identity_federation: - json_keyfile: /path/to/workload-identity-federation-config.json + config: /path/to/workload-identity-federation-config.json aws_access_key_id: AKIAXXXXXXXXXXXXXXXX aws_secret_access_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx aws_region: ap-northeast-1 @@ -141,7 +141,7 @@ out: source_format: NEWLINE_DELIMITED_JSON ``` -The `json_keyfile` should contain the Workload Identity Federation configuration from Google Cloud: +The `config` should contain the Workload Identity Federation configuration from Google Cloud: ```json { diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java index 60d81cb..9ce24e4 100644 --- a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java @@ -50,7 +50,7 @@ public WorkloadIdentityFederationAuth(WorkloadIdentityFederationConfig config) { JSONObject jsonConfig = new JSONObject( - new JSONTokener(new ByteArrayInputStream(config.getJsonKeyfile().getContent()))); + new JSONTokener(new ByteArrayInputStream(config.getConfig().getContent()))); this.audience = jsonConfig.getString("audience"); this.serviceAccountImpersonationUrl = jsonConfig.getString("service_account_impersonation_url"); this.tokenUrl = jsonConfig.optString("token_url", "https://sts.googleapis.com/v1/token"); diff --git a/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java index ed616d8..5d9f934 100644 --- a/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java +++ b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java @@ -6,8 +6,8 @@ import org.embulk.util.config.units.LocalFile; public interface WorkloadIdentityFederationConfig extends Task { - @Config("json_keyfile") - LocalFile getJsonKeyfile(); + @Config("config") + LocalFile getConfig(); @Config("aws_access_key_id") String getAwsAccessKeyId(); From 9541b44b9c6da0e1fd47c6b1a6023b1cdd1d5bb2 Mon Sep 17 00:00:00 2001 From: chikamura Date: Mon, 22 Dec 2025 15:12:40 +0900 Subject: [PATCH 6/8] feat: Update WorkloadIdentityFederation to use Google Auth Library's AwsCredentials MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace manual AWS signature implementation with AwsCredentials and AwsSecurityCredentialsSupplier - Use ImpersonatedCredentials for service account impersonation - Add aws_session_token support for temporary credentials - Add scopes parameter to credential caching - Upgrade google-auth-library-oauth2-http to 1.41.0 - Add google-http-client-gson 1.45.3 for Gson support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 2 + .../build.gradle | 18 + .../runtimeClasspath.lockfile | 30 +- .../org/embulk/output/bigquery_java/Auth.java | 19 +- .../WorkloadIdentityFederationAuth.java | 357 +++++------------- ...WorkloadIdentityFederationCredentials.java | 117 +++++- .../WorkloadIdentityFederationConfig.java | 5 + 7 files changed, 255 insertions(+), 293 deletions(-) diff --git a/README.md b/README.md index 4d46cdd..a141b2e 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,7 @@ Workload Identity Federation allows authentication using AWS credentials to acce | workload_identity_federation.config | string | required | | Path to the Workload Identity Federation JSON config file | | workload_identity_federation.aws_access_key_id | string | required | | AWS Access Key ID | | workload_identity_federation.aws_secret_access_key | string | required | | AWS Secret Access Key | +| workload_identity_federation.aws_session_token | string | optional | | AWS Session Token (for temporary credentials) | | workload_identity_federation.aws_region | string | optional | "ap-northeast-1" | AWS Region | Example) @@ -134,6 +135,7 @@ out: config: /path/to/workload-identity-federation-config.json aws_access_key_id: AKIAXXXXXXXXXXXXXXXX aws_secret_access_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + # aws_session_token: xxxxxxxx # optional, for temporary credentials aws_region: ap-northeast-1 project: my-project dataset: my_dataset diff --git a/shadow-google-cloud-bigquery-helper/build.gradle b/shadow-google-cloud-bigquery-helper/build.gradle index 3951630..9b50911 100644 --- a/shadow-google-cloud-bigquery-helper/build.gradle +++ b/shadow-google-cloud-bigquery-helper/build.gradle @@ -19,6 +19,16 @@ description = "A helper library for Bigquery output for embulk" sourceCompatibility = 1.8 targetCompatibility = 1.8 +// https://github.com/google/guava/issues/6612#issuecomment-1614992368 +sourceSets.all { + configurations.getByName(runtimeClasspathConfigurationName) { + attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm") + } + configurations.getByName(compileClasspathConfigurationName) { + attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm") + } +} + configurations { runtimeClasspath { resolutionStrategy.activateDependencyLocking() @@ -38,11 +48,19 @@ dependencies { compile("org.slf4j:jcl-over-slf4j:1.7.12") { exclude group: "org.slf4j", module: "slf4j-api" } + + // Newer google-auth-library for AwsCredentials, ImpersonatedCredentials support + compile "com.google.auth:google-auth-library-oauth2-http:1.41.0" + compile "com.google.http-client:google-http-client-gson:1.45.3" } // Relocate Guava and Jackson packages since they are incompatible from Embulk's. shadowJar { + mergeServiceFiles() relocate "com.google.common", "embulk.bigquery.com.google.common" relocate "com.google.thirdparty.publicsuffix", "embulk.bigquery.com.google.thirdparty.publicsuffix" relocate "com.fasterxml.jackson", "embulk.bigquery.com.fasterxml.jackson" + // OpenCensus and gRPC - internal dependencies + relocate "io.opencensus", "embulk.bigquery.io.opencensus" + relocate "io.grpc", "embulk.bigquery.io.grpc" } diff --git a/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile b/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile index ed0996b..93d94f9 100644 --- a/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile +++ b/shadow-google-cloud-bigquery-helper/gradle/dependency-locks/runtimeClasspath.lockfile @@ -16,38 +16,39 @@ com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:0.138.2 com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2:0.138.2 com.google.api.grpc:proto-google-common-protos:2.9.0 com.google.api.grpc:proto-google-iam-v1:1.4.1 -com.google.api:api-common:2.2.1 +com.google.api:api-common:2.53.0 com.google.api:gax-grpc:2.18.2 com.google.api:gax-httpjson:0.103.2 com.google.api:gax:2.18.2 com.google.apis:google-api-services-bigquery:v2-rev20220611-1.32.1 -com.google.auth:google-auth-library-credentials:1.7.0 -com.google.auth:google-auth-library-oauth2-http:1.7.0 +com.google.auth:google-auth-library-credentials:1.41.0 +com.google.auth:google-auth-library-oauth2-http:1.41.0 +com.google.auto.value:auto-value-annotations:1.11.0 com.google.cloud:google-cloud-bigquery:2.14.0 com.google.cloud:google-cloud-bigquerystorage:2.14.2 com.google.cloud:google-cloud-core-http:2.8.0 com.google.cloud:google-cloud-core:2.8.0 com.google.code.findbugs:jsr305:3.0.2 -com.google.code.gson:gson:2.9.0 -com.google.errorprone:error_prone_annotations:2.14.0 +com.google.code.gson:gson:2.12.1 +com.google.errorprone:error_prone_annotations:2.41.0 com.google.flatbuffers:flatbuffers-java:1.12.0 -com.google.guava:failureaccess:1.0.1 -com.google.guava:guava:31.1-jre +com.google.guava:failureaccess:1.0.3 +com.google.guava:guava:33.5.0-android com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava com.google.http-client:google-http-client-apache-v2:1.42.0 com.google.http-client:google-http-client-appengine:1.42.0 -com.google.http-client:google-http-client-gson:1.42.0 +com.google.http-client:google-http-client-gson:2.0.2 com.google.http-client:google-http-client-jackson2:1.42.0 -com.google.http-client:google-http-client:1.42.0 -com.google.j2objc:j2objc-annotations:1.3 +com.google.http-client:google-http-client:2.0.2 +com.google.j2objc:j2objc-annotations:3.1 com.google.oauth-client:google-oauth-client:1.34.1 com.google.protobuf:protobuf-java-util:3.21.1 com.google.protobuf:protobuf-java:3.21.1 commons-codec:commons-codec:1.15 io.grpc:grpc-alts:1.47.0 -io.grpc:grpc-api:1.47.0 +io.grpc:grpc-api:1.70.0 io.grpc:grpc-auth:1.47.0 -io.grpc:grpc-context:1.47.0 +io.grpc:grpc-context:1.70.0 io.grpc:grpc-core:1.47.0 io.grpc:grpc-googleapis:1.47.0 io.grpc:grpc-grpclb:1.47.0 @@ -65,13 +66,14 @@ org.apache.arrow:arrow-format:8.0.0 org.apache.arrow:arrow-memory-core:8.0.0 org.apache.arrow:arrow-memory-netty:8.0.0 org.apache.arrow:arrow-vector:8.0.0 -org.apache.httpcomponents:httpclient:4.5.13 -org.apache.httpcomponents:httpcore:4.4.15 +org.apache.httpcomponents:httpclient:4.5.14 +org.apache.httpcomponents:httpcore:4.4.16 org.checkerframework:checker-compat-qual:2.5.5 org.checkerframework:checker-qual:3.22.2 org.codehaus.mojo:animal-sniffer-annotations:1.21 org.conscrypt:conscrypt-openjdk-uber:2.5.1 org.json:json:20200518 +org.jspecify:jspecify:1.0.0 org.slf4j:jcl-over-slf4j:1.7.12 org.slf4j:slf4j-api:1.7.25 org.threeten:threeten-extra:1.7.0 diff --git a/src/main/java/org/embulk/output/bigquery_java/Auth.java b/src/main/java/org/embulk/output/bigquery_java/Auth.java index 2d770f4..f8dc58c 100644 --- a/src/main/java/org/embulk/output/bigquery_java/Auth.java +++ b/src/main/java/org/embulk/output/bigquery_java/Auth.java @@ -8,6 +8,9 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.embulk.config.ConfigException; import org.embulk.output.bigquery_java.config.PluginTask; import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig; @@ -25,10 +28,10 @@ public Auth(PluginTask task) { } public Credentials getCredentials(String... scopes) throws IOException { - return getGoogleCredentials().createScoped(scopes); + return getGoogleCredentials(scopes).createScoped(scopes); } - private GoogleCredentials getGoogleCredentials() throws IOException { + private GoogleCredentials getGoogleCredentials(String... scopes) throws IOException { if ("authorized_user".equalsIgnoreCase(authMethod)) { return UserCredentials.fromStream(getCredentialsStream()); } else if ("service_account".equalsIgnoreCase(authMethod)) { @@ -38,7 +41,7 @@ private GoogleCredentials getGoogleCredentials() throws IOException { } else if ("application_default".equalsIgnoreCase(authMethod)) { return GoogleCredentials.getApplicationDefault(); } else if ("workload_identity_federation".equalsIgnoreCase(authMethod)) { - return getWorkloadIdentityFederationCredentials(); + return getWorkloadIdentityFederationCredentials(scopes); } else { throw new ConfigException("Unknown auth method: " + authMethod); } @@ -52,14 +55,14 @@ private InputStream getCredentialsStream() { return new ByteArrayInputStream(jsonKeyFile.getContent()); } - private WorkloadIdentityFederationCredentials getWorkloadIdentityFederationCredentials() - throws IOException { + private WorkloadIdentityFederationCredentials getWorkloadIdentityFederationCredentials( + String... scopes) throws IOException { if (workloadIdentityFederationConfig == null) { throw new ConfigException( "workload_identity_federation config is required when auth_method is 'workload_identity_federation'"); } - WorkloadIdentityFederationAuth wifAuth = - new WorkloadIdentityFederationAuth(workloadIdentityFederationConfig); - return wifAuth.getCredentials(); + Set scopeSet = new HashSet<>(Arrays.asList(scopes)); + return WorkloadIdentityFederationCredentials.getOrCreateByFetchingToken( + workloadIdentityFederationConfig, scopeSet); } } diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java index 9ce24e4..1f1705c 100644 --- a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java @@ -1,33 +1,16 @@ package org.embulk.output.bigquery_java; import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.AwsCredentials; +import com.google.auth.oauth2.AwsSecurityCredentials; +import com.google.auth.oauth2.AwsSecurityCredentialsSupplier; +import com.google.auth.oauth2.ExternalAccountSupplierContext; import com.google.auth.oauth2.GoogleCredentials; -import java.io.ByteArrayInputStream; +import com.google.auth.oauth2.ImpersonatedCredentials; import java.io.IOException; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; -import java.security.InvalidKeyException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Scanner; -import java.util.stream.Collectors; -import javax.crypto.Mac; -import javax.crypto.spec.SecretKeySpec; +import java.util.Set; import org.embulk.config.ConfigException; -import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig; -import org.json.JSONObject; -import org.json.JSONTokener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,36 +21,43 @@ public class WorkloadIdentityFederationAuth { private final String awsAccessKeyId; private final String awsSecretAccessKey; + private final String awsSessionToken; private final String awsRegion; private final String audience; private final String serviceAccountImpersonationUrl; private final String tokenUrl; - - public WorkloadIdentityFederationAuth(WorkloadIdentityFederationConfig config) { - this.awsAccessKeyId = config.getAwsAccessKeyId(); - this.awsSecretAccessKey = config.getAwsSecretAccessKey(); - this.awsRegion = config.getAwsRegion(); - - JSONObject jsonConfig = - new JSONObject( - new JSONTokener(new ByteArrayInputStream(config.getConfig().getContent()))); - this.audience = jsonConfig.getString("audience"); - this.serviceAccountImpersonationUrl = jsonConfig.getString("service_account_impersonation_url"); - this.tokenUrl = jsonConfig.optString("token_url", "https://sts.googleapis.com/v1/token"); + private final Set scopes; + + public WorkloadIdentityFederationAuth( + String awsAccessKeyId, + String awsSecretAccessKey, + String awsSessionToken, + String awsRegion, + String audience, + String serviceAccountImpersonationUrl, + String tokenUrl, + Set scopes) { + this.awsAccessKeyId = awsAccessKeyId; + this.awsSecretAccessKey = awsSecretAccessKey; + this.awsSessionToken = awsSessionToken; + this.awsRegion = awsRegion; + this.audience = audience; + this.serviceAccountImpersonationUrl = serviceAccountImpersonationUrl; + this.tokenUrl = tokenUrl != null ? tokenUrl : "https://sts.googleapis.com/v1/token"; + this.scopes = scopes; } - public WorkloadIdentityFederationCredentials getCredentials() throws IOException { - AccessToken token = getAccessToken(); - return new WorkloadIdentityFederationCredentials(this, token); + public AccessToken fetchAccessToken() throws IOException { + AccessToken federatedToken = fetchFederatedToken(); + return impersonateServiceAccount(federatedToken); } - public AccessToken getAccessToken() throws IOException { - Map awsRequest = createAwsSignedRequest(); - String federatedToken = exchangeTokenForGoogleAccessToken(awsRequest); - String accessToken = impersonateServiceAccount(federatedToken); - java.util.Date expirationTime = - new java.util.Date(System.currentTimeMillis() + TOKEN_LIFETIME_SECONDS * 1000); - return new AccessToken(accessToken, expirationTime); + private static String formatAccessToken(AccessToken token) { + String expireTime = + token.getExpirationTime() != null + ? token.getExpirationTime().toInstant().toString() + : "null"; + return "expireTime: " + expireTime; } private String getServiceAccountEmail() { @@ -76,235 +66,66 @@ private String getServiceAccountEmail() { String[] parts = serviceAccountImpersonationUrl.split("serviceAccounts/"); if (parts.length < 2) { throw new ConfigException( - "Invalid service_account_impersonation_url: " + serviceAccountImpersonationUrl); + String.format( + "Invalid service_account_impersonation_url: %s", serviceAccountImpersonationUrl)); } return parts[1].replace(":generateAccessToken", ""); } - private Map createAwsSignedRequest() { - String service = "sts"; - String host = "sts." + awsRegion + ".amazonaws.com"; - String method = "POST"; - - ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - String amzDate = now.format(DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'")); - String dateStamp = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); - - String queryParams = "Action=GetCallerIdentity&Version=2011-06-15"; - String endpoint = "https://" + host + "/?" + queryParams; - - String payloadHash = sha256Hex(""); - - Map headers = new HashMap<>(); - headers.put("host", host); - headers.put("x-amz-date", amzDate); - headers.put("x-goog-cloud-target-resource", audience); - - List signedHeadersList = new ArrayList<>(headers.keySet()); - Collections.sort(signedHeadersList); - String signedHeaders = String.join(";", signedHeadersList); - - String canonicalHeaders = - signedHeadersList.stream() - .map(k -> k + ":" + headers.get(k) + "\n") - .collect(Collectors.joining()); - - String[] queryParts = queryParams.split("&"); - java.util.Arrays.sort(queryParts); - String canonicalQuerystring = String.join("&", queryParts); - - String canonicalRequest = - String.join( - "\n", method, "/", canonicalQuerystring, canonicalHeaders, signedHeaders, payloadHash); - - String algorithm = "AWS4-HMAC-SHA256"; - String credentialScope = dateStamp + "/" + awsRegion + "/" + service + "/aws4_request"; - - String stringToSign = - String.join("\n", algorithm, amzDate, credentialScope, sha256Hex(canonicalRequest)); - - byte[] signingKey = getSignatureKey(dateStamp, awsRegion, service); - String signature = hmacSha256Hex(signingKey, stringToSign); - - String authorizationHeader = - algorithm - + " Credential=" - + awsAccessKeyId - + "/" - + credentialScope - + ", SignedHeaders=" - + signedHeaders - + ", Signature=" - + signature; - - List> requestHeaders = new ArrayList<>(); - for (String key : signedHeadersList) { - Map header = new HashMap<>(); - header.put("key", key); - header.put("value", headers.get(key)); - requestHeaders.add(header); - } - Map authHeader = new HashMap<>(); - authHeader.put("key", "Authorization"); - authHeader.put("value", authorizationHeader); - requestHeaders.add(authHeader); - - Map result = new HashMap<>(); - result.put("url", endpoint); - result.put("method", method); - result.put("headers", requestHeaders); - return result; - } - - private byte[] getSignatureKey(String dateStamp, String region, String service) { - byte[] kDate = hmacSha256(("AWS4" + awsSecretAccessKey).getBytes(StandardCharsets.UTF_8), dateStamp); - byte[] kRegion = hmacSha256(kDate, region); - byte[] kService = hmacSha256(kRegion, service); - return hmacSha256(kService, "aws4_request"); + // https://docs.cloud.google.com/iam/docs/reference/sts/rest/v1/TopLevel/token + private AccessToken fetchFederatedToken() throws IOException { + logger.debug("fetching federated token using AwsCredentials"); + + // Create AWS security credentials supplier + AwsSecurityCredentialsSupplier supplier = + new AwsSecurityCredentialsSupplier() { + @Override + public AwsSecurityCredentials getCredentials(ExternalAccountSupplierContext context) + throws IOException { + return new AwsSecurityCredentials(awsAccessKeyId, awsSecretAccessKey, awsSessionToken); + } + + @Override + public String getRegion(ExternalAccountSupplierContext context) throws IOException { + return awsRegion; + } + }; + + // Build AwsCredentials using the supplier + AwsCredentials awsCredentials = + AwsCredentials.newBuilder() + .setAwsSecurityCredentialsSupplier(supplier) + .setAudience(audience) + .setTokenUrl(tokenUrl) + .setSubjectTokenType("urn:ietf:params:aws:token-type:aws4_request") + .build(); + + // Refresh to get the federated token + awsCredentials.refresh(); + AccessToken accessToken = awsCredentials.getAccessToken(); + + logger.debug("federated token obtained, {}", formatAccessToken(accessToken)); + + return accessToken; } - private String exchangeTokenForGoogleAccessToken(Map awsRequest) - throws IOException { - JSONObject awsRequestJson = new JSONObject(awsRequest); - String subjectToken = URLEncoder.encode(awsRequestJson.toString(), "UTF-8"); - - String data = - "grant_type=" - + URLEncoder.encode("urn:ietf:params:oauth:grant-type:token-exchange", "UTF-8") - + "&audience=" - + URLEncoder.encode(audience, "UTF-8") - + "&scope=" - + URLEncoder.encode("https://www.googleapis.com/auth/cloud-platform", "UTF-8") - + "&requested_token_type=" - + URLEncoder.encode("urn:ietf:params:oauth:token-type:access_token", "UTF-8") - + "&subject_token_type=" - + URLEncoder.encode("urn:ietf:params:aws:token-type:aws4_request", "UTF-8") - + "&subject_token=" - + subjectToken; - - URL url = new URL(tokenUrl); - logger.debug("Exchanging token at: {}", tokenUrl); - - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("POST"); - conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - conn.setDoOutput(true); - - try (OutputStream os = conn.getOutputStream()) { - os.write(data.getBytes(StandardCharsets.UTF_8)); - } - - int responseCode = conn.getResponseCode(); - logger.debug("Token exchange response code: {}", responseCode); - - if (responseCode != 200) { - String errorBody; - try (Scanner scanner = new Scanner(conn.getErrorStream(), "UTF-8")) { - errorBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; - } - logger.info("Token exchange failed: {} - {}", responseCode, errorBody); - throw new IOException( - "Google STS token exchange failed: " + responseCode + " - " + errorBody); - } - - String responseBody; - try (Scanner scanner = new Scanner(conn.getInputStream(), "UTF-8")) { - responseBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; - } - logger.info("Token exchange succeeded"); - - JSONObject responseJson = new JSONObject(responseBody); - if (responseJson.has("expires_in")) { - logger.debug("Token expires_in: {}", responseJson.get("expires_in")); - } - if (responseJson.has("token_type")) { - logger.debug("Token type: {}", responseJson.getString("token_type")); - } - if (responseJson.has("issued_token_type")) { - logger.debug("Issued token type: {}", responseJson.getString("issued_token_type")); - } - return responseJson.getString("access_token"); - } - - private String impersonateServiceAccount(String federatedToken) throws IOException { + // https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/generateAccessToken + private AccessToken impersonateServiceAccount(AccessToken federatedToken) throws IOException { String serviceAccountEmail = getServiceAccountEmail(); - String impersonationUrl = - "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/" - + serviceAccountEmail - + ":generateAccessToken"; - logger.debug("Impersonating service account: {}", serviceAccountEmail); - logger.debug("Impersonation URL: {}", impersonationUrl); - - URL url = new URL(impersonationUrl); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("POST"); - conn.setRequestProperty("Authorization", "Bearer " + federatedToken); - conn.setRequestProperty("Content-Type", "application/json"); - conn.setDoOutput(true); - - JSONObject requestBody = new JSONObject(); - requestBody.put("scope", new String[] {"https://www.googleapis.com/auth/bigquery"}); - requestBody.put("lifetime", TOKEN_LIFETIME_SECONDS + "s"); - - try (OutputStream os = conn.getOutputStream()) { - os.write(requestBody.toString().getBytes(StandardCharsets.UTF_8)); - } - - int responseCode = conn.getResponseCode(); - logger.debug("Impersonation response code: {}", responseCode); - - if (responseCode != 200) { - String errorBody; - try (Scanner scanner = new Scanner(conn.getErrorStream(), "UTF-8")) { - errorBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; - } - logger.info("Impersonation failed: {} - {}", responseCode, errorBody); - throw new IOException( - "Service account impersonation failed: " + responseCode + " - " + errorBody); - } - - String responseBody; - try (Scanner scanner = new Scanner(conn.getInputStream(), "UTF-8")) { - responseBody = scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; - } - logger.info("Service account impersonation succeeded"); - - JSONObject responseJson = new JSONObject(responseBody); - if (responseJson.has("expireTime")) { - logger.debug("Access token expire time: {}", responseJson.getString("expireTime")); - } - return responseJson.getString("accessToken"); - } - - private String sha256Hex(String data) { - try { - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - byte[] hash = digest.digest(data.getBytes(StandardCharsets.UTF_8)); - return bytesToHex(hash); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("SHA-256 algorithm not found", e); - } - } - - private byte[] hmacSha256(byte[] key, String data) { - try { - Mac mac = Mac.getInstance("HmacSHA256"); - mac.init(new SecretKeySpec(key, "HmacSHA256")); - return mac.doFinal(data.getBytes(StandardCharsets.UTF_8)); - } catch (NoSuchAlgorithmException | InvalidKeyException e) { - throw new RuntimeException("HMAC-SHA256 error", e); - } - } - - private String hmacSha256Hex(byte[] key, String data) { - return bytesToHex(hmacSha256(key, data)); - } - - private String bytesToHex(byte[] bytes) { - StringBuilder sb = new StringBuilder(); - for (byte b : bytes) { - sb.append(String.format("%02x", b)); - } - return sb.toString(); + logger.debug("impersonating service account: {}", serviceAccountEmail); + + GoogleCredentials sourceCredentials = GoogleCredentials.create(federatedToken); + ImpersonatedCredentials impersonatedCredentials = + ImpersonatedCredentials.create( + sourceCredentials, + serviceAccountEmail, + null, + new ArrayList<>(scopes), + TOKEN_LIFETIME_SECONDS); + + impersonatedCredentials.refresh(); + AccessToken accessToken = impersonatedCredentials.getAccessToken(); + logger.debug("service account impersonation succeeded, {}", formatAccessToken(accessToken)); + return accessToken; } } diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java index 9d295a4..33b08c7 100644 --- a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java @@ -2,19 +2,130 @@ import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.GoogleCredentials; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.embulk.output.bigquery_java.config.WorkloadIdentityFederationConfig; +import org.embulk.util.config.units.LocalFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkloadIdentityFederationCredentials extends GoogleCredentials { + private static final Logger logger = + LoggerFactory.getLogger(WorkloadIdentityFederationCredentials.class); + private static final Map cache = + new ConcurrentHashMap<>(); + private final WorkloadIdentityFederationAuth auth; - public WorkloadIdentityFederationCredentials( + private static class CacheKey { + private final String awsAccessKeyId; + private final String awsRegion; + private final String audience; + private final Set scopes; + + CacheKey(String awsAccessKeyId, String awsRegion, String audience, Set scopes) { + this.awsAccessKeyId = awsAccessKeyId; + this.awsRegion = awsRegion; + this.audience = audience; + this.scopes = scopes; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(awsAccessKeyId, cacheKey.awsAccessKeyId) + && Objects.equals(awsRegion, cacheKey.awsRegion) + && Objects.equals(audience, cacheKey.audience) + && Objects.equals(scopes, cacheKey.scopes); + } + + @Override + public int hashCode() { + return Objects.hash(awsAccessKeyId, awsRegion, audience, scopes); + } + + @Override + public String toString() { + return String.format( + "%s:%s:%s:%s", awsAccessKeyId, awsRegion, audience, String.join(",", scopes)); + } + } + + public static WorkloadIdentityFederationCredentials getOrCreateByFetchingToken( + WorkloadIdentityFederationConfig wifConfig, Set scopes) throws IOException { + JsonObject jsonConfig = parseConfig(wifConfig); + CacheKey cacheKey = + new CacheKey( + wifConfig.getAwsAccessKeyId(), + wifConfig.getAwsRegion(), + jsonConfig.get("audience").getAsString(), + scopes); + WorkloadIdentityFederationCredentials cached = cache.get(cacheKey); + if (cached != null) { + logger.debug("cache hit for cacheKey: {}", cacheKey); + return cached; + } + logger.debug("cache miss for cacheKey: {}", cacheKey); + WorkloadIdentityFederationCredentials credentials = + createByFetchingToken( + wifConfig.getAwsAccessKeyId(), + wifConfig.getAwsSecretAccessKey(), + wifConfig.getAwsSessionToken().orElse(null), + wifConfig.getAwsRegion(), + jsonConfig, + scopes); + cache.put(cacheKey, credentials); + return credentials; + } + + private static JsonObject parseConfig(WorkloadIdentityFederationConfig wifConfig) { + LocalFile configFile = wifConfig.getConfig(); + String json = new String(configFile.getContent(), StandardCharsets.UTF_8); + return JsonParser.parseString(json).getAsJsonObject(); + } + + private static WorkloadIdentityFederationCredentials createByFetchingToken( + String awsAccessKeyId, + String awsSecretAccessKey, + String awsSessionToken, + String awsRegion, + JsonObject jsonConfig, + Set scopes) + throws IOException { + logger.info("creating credentials by fetching token"); + String tokenUrl = + jsonConfig.has("token_url") ? jsonConfig.get("token_url").getAsString() : null; + WorkloadIdentityFederationAuth auth = + new WorkloadIdentityFederationAuth( + awsAccessKeyId, + awsSecretAccessKey, + awsSessionToken, + awsRegion, + jsonConfig.get("audience").getAsString(), + jsonConfig.get("service_account_impersonation_url").getAsString(), + tokenUrl, + scopes); + AccessToken token = auth.fetchAccessToken(); + return new WorkloadIdentityFederationCredentials(auth, token); + } + + private WorkloadIdentityFederationCredentials( WorkloadIdentityFederationAuth auth, AccessToken accessToken) { - super(accessToken); + super(GoogleCredentials.newBuilder().setAccessToken(accessToken)); this.auth = auth; } @Override public AccessToken refreshAccessToken() throws IOException { - return auth.getAccessToken(); + logger.info("refreshing access token"); + return auth.fetchAccessToken(); } } diff --git a/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java index 5d9f934..2c68521 100644 --- a/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java +++ b/src/main/java/org/embulk/output/bigquery_java/config/WorkloadIdentityFederationConfig.java @@ -1,5 +1,6 @@ package org.embulk.output.bigquery_java.config; +import java.util.Optional; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; import org.embulk.util.config.Task; @@ -15,6 +16,10 @@ public interface WorkloadIdentityFederationConfig extends Task { @Config("aws_secret_access_key") String getAwsSecretAccessKey(); + @Config("aws_session_token") + @ConfigDefault("null") + Optional getAwsSessionToken(); + @Config("aws_region") @ConfigDefault("\"ap-northeast-1\"") String getAwsRegion(); From 481716b243d4fbe6f6695e65ea33f4161f674717 Mon Sep 17 00:00:00 2001 From: chikamura Date: Tue, 23 Dec 2025 10:43:58 +0900 Subject: [PATCH 7/8] style: Format code in BigqueryClient.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../java/org/embulk/output/bigquery_java/BigqueryClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java index d91b88d..3276d78 100644 --- a/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java +++ b/src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java @@ -131,8 +131,7 @@ private String getProjectIdFromJsonKeyfile() { } String projectId = new JSONObject( - new JSONTokener( - new ByteArrayInputStream(task.getJsonKeyfile().get().getContent()))) + new JSONTokener(new ByteArrayInputStream(task.getJsonKeyfile().get().getContent()))) .getString("project_id"); if (projectId == null) { throw new ConfigException("Either project or project_id in json_keyfile is required"); From 170b10a20c9ea8403eb2d1ddbdcb2bed11803374 Mon Sep 17 00:00:00 2001 From: "kosuke.katamoto" Date: Wed, 7 Jan 2026 13:10:18 +0900 Subject: [PATCH 8/8] feat: Add direct resource access support for Workload Identity Federation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allow accessing BigQuery directly without service account impersonation by making service_account_impersonation_url optional. When omitted from the WIF config, the federated token is used directly instead of impersonating a service account. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 32 ++++++++++++++++++- .../WorkloadIdentityFederationAuth.java | 6 ++++ ...WorkloadIdentityFederationCredentials.java | 6 +++- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a141b2e..7e1c157 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,11 @@ out: source_format: NEWLINE_DELIMITED_JSON ``` -The `config` should contain the Workload Identity Federation configuration from Google Cloud: +The `config` should contain the Workload Identity Federation configuration from Google Cloud. + +#### Service Account Impersonation (recommended) + +Use a service account to access BigQuery. The federated identity impersonates a service account that has the necessary permissions. ```json { @@ -162,6 +166,32 @@ The `config` should contain the Workload Identity Federation configuration from } ``` +#### Direct Resource Access + +Access BigQuery directly without service account impersonation. Omit `service_account_impersonation_url` from the config file. You must grant IAM permissions directly to the federated principal. + +```json +{ + "universe_domain": "googleapis.com", + "type": "external_account", + "audience": "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", + "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request", + "token_url": "https://sts.googleapis.com/v1/token", + "credential_source": { + "environment_id": "aws1", + "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone", + "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials", + "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15" + } +} +``` + +To use direct resource access, grant BigQuery permissions to the federated principal: + +``` +principal://iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/subject/SUBJECT +``` + ### Column Options (NOT fully supported) Column options are used to aid guessing BigQuery schema, or to define conversion of values: diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java index 1f1705c..6b73590 100644 --- a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationAuth.java @@ -49,6 +49,12 @@ public WorkloadIdentityFederationAuth( public AccessToken fetchAccessToken() throws IOException { AccessToken federatedToken = fetchFederatedToken(); + + // Direct resource access (no service account impersonation) + if (serviceAccountImpersonationUrl == null || serviceAccountImpersonationUrl.isEmpty()) { + return federatedToken; + } + return impersonateServiceAccount(federatedToken); } diff --git a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java index 33b08c7..16a5a64 100644 --- a/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java +++ b/src/main/java/org/embulk/output/bigquery_java/WorkloadIdentityFederationCredentials.java @@ -103,6 +103,10 @@ private static WorkloadIdentityFederationCredentials createByFetchingToken( logger.info("creating credentials by fetching token"); String tokenUrl = jsonConfig.has("token_url") ? jsonConfig.get("token_url").getAsString() : null; + String serviceAccountImpersonationUrl = + jsonConfig.has("service_account_impersonation_url") + ? jsonConfig.get("service_account_impersonation_url").getAsString() + : null; WorkloadIdentityFederationAuth auth = new WorkloadIdentityFederationAuth( awsAccessKeyId, @@ -110,7 +114,7 @@ private static WorkloadIdentityFederationCredentials createByFetchingToken( awsSessionToken, awsRegion, jsonConfig.get("audience").getAsString(), - jsonConfig.get("service_account_impersonation_url").getAsString(), + serviceAccountImpersonationUrl, tokenUrl, scopes); AccessToken token = auth.fetchAccessToken();