diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/BarrierRequestTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/BarrierRequestTests.java new file mode 100644 index 000000000000..23e20b2f24e4 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/BarrierRequestTests.java @@ -0,0 +1,466 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.*; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.WFConstants; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.rx.TestSuiteBase; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ConnectTimeoutException; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.azure.cosmos.implementation.HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * E2E testing to verify the handling of barrier requests. + */ +public class BarrierRequestTests extends TestSuiteBase { + + private String primaryRegion; + private String secondaryRegion; + private String primaryRegionalEndpointAsStr; + private String secondaryRegionalEndpointAsStr; + private AccountLevelLocationContext accountLevelLocationReadableLocationContext; + private static final ObjectMapper mapper = new ObjectMapper(); + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public BarrierRequestTests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"multi-region"}) + public void beforeClass() { + CosmosAsyncClient cosmosAsyncClient = getClientBuilder().buildAsyncClient(); + + try { + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(cosmosAsyncClient); + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccount databaseAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + + this.accountLevelLocationReadableLocationContext + = getAccountLevelLocationContext(databaseAccountSnapshot, false); + + assertThat(this.accountLevelLocationReadableLocationContext).isNotNull(); + assertThat(this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions).isNotNull(); + assertThat(this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions.size()).isEqualTo(2); + + this.primaryRegion = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions.get(0); + this.secondaryRegion = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions.get(1); + this.primaryRegionalEndpointAsStr = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint.get(this.primaryRegion); + this.secondaryRegionalEndpointAsStr = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint.get(this.secondaryRegion); + } finally { + cosmosAsyncClient.close(); + } + } + + @Test + public void assertHandleBarriersForStrongConsistencyNoCrossRegionRetry() { + + AtomicBoolean simulateAddressRefreshFailures = new AtomicBoolean(false); + AtomicBoolean failoverTriggered = new AtomicBoolean(false); + AtomicReference globalEndpointManager = new AtomicReference<>(null); + + CosmosClientBuilder clientBuilder = getClientBuilder() + .consistencyLevel(ConsistencyLevel.STRONG); + + clientBuilder.httpRequestInterceptor((request) -> { + logger.info("inside httpRequestInterceptor, simulateAddressRefreshFailures: {}, operationType: {}, resourceType: {}, uri: {}", + simulateAddressRefreshFailures.get(), request.getOperationType(), request.getResourceType()); + + // After the initial write, simulate a network failure on address resolution. + // This will trigger the SDK's failover logic. +// if (simulateAddressRefreshFailures.get() && +// request.isAddressRefresh() && +// request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.primaryRegion)) { +// logger.info("request operationType: " + request.getOperationType()); +// logger.info("request resourceType: " + request.getResourceType()); +// logger.info("Simulating network failure for address resolution for region " + this.primaryRegion); +// logger.info("failoverTriggered: " + failoverTriggered.get()); +// failoverTriggered.compareAndSet(false, true); +// logger.info("failoverTriggered: " + failoverTriggered.get()); +// Map headers = new HashMap<>(); +// headers.put(HttpConstants.HttpHeaders.SUB_STATUS, Integer.toString(GATEWAY_ENDPOINT_UNAVAILABLE)); +// throw new CosmosException(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, "Simulating network failure for address resolution for region", headers, new ConnectTimeoutException()); +// } + + // Once the failover is triggered, trigger a subsequent metadata refresh call (intercepted in httpRequestInterceptor). + logger.info("Checking failoverTriggered to intercept metadata refresh call: " + failoverTriggered.get()); + //logger.info("isMetadataRequest: " + request.isMetadataRequest()); + if (failoverTriggered.get() && request.getResourceType() == ResourceType.DatabaseAccount && request.getOperationType() == OperationType.Read) + { + // Return the modified account properties, making the SDK believe a failover has occurred. + logger.info("Intercepting metadata call and returning modified account properties to simulate failover. New write region: " + this.secondaryRegion); + + ByteBuf byteBuf = Utils.getUTF8BytesOrNull(getDatabaseAccountJsonAfterFailover()); + StoreResponse storeResponse = new StoreResponse( + TestConfigurations.HOST, + 200, + request.getHeaders(), + new ByteBufInputStream(byteBuf), + byteBuf.readableBytes()); + + return new RxDocumentServiceResponse(null, storeResponse); + } + + return null; // let other requests proceed normally + }); + + clientBuilder.storeResponseInterceptor((request, storeResponse) -> { + logger.info("inside storeResponseInterceptor, operationType: {}, resourceType: {}, region: {}", + request.getOperationType(), request.getResourceType(), request.requestContext.regionalRoutingContextToRoute.getRegion()); + + if (request.getOperationType() == OperationType.Create && + request.getResourceType() == ResourceType.Document && + request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.primaryRegion)) { + + String lsn = storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN); + + // Decrement so that GCLSN < LSN to simulate the replication lag + String manipulatedGclsn = String.valueOf(Long.parseLong(lsn) - 2L); + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, manipulatedGclsn); + + // Enable address refresh failures for subsequent barrier requests in the primary region. + simulateAddressRefreshFailures.compareAndSet(false, true); + logger.info("inside storeResponseInterceptor, set simulateAddressRefreshFailures to {}", simulateAddressRefreshFailures.get()); + } + + if (request.getOperationType() == OperationType.Create && + request.getResourceType() == ResourceType.Document && + request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.secondaryRegion)) { + + String lsn = storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN); + + // Decrement so that GCLSN < LSN to simulate the replication lag + String manipulatedGclsn = String.valueOf(Long.parseLong(lsn) - 2L); + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, manipulatedGclsn); + + // Enable address refresh failures for subsequent barrier requests in the primary region. + simulateAddressRefreshFailures.compareAndSet(false, true); + logger.info("inside storeResponseInterceptor, set simulateAddressRefreshFailures to {}", simulateAddressRefreshFailures.get()); + } + + // Track barrier requests (Head operations on a collection) + if (request.getOperationType() == OperationType.Head && request.getResourceType() == ResourceType.DocumentCollection) { + logger.info("Barrier request intercepted in storeResponseInterceptor for region: {}", request.requestContext.regionalRoutingContextToRoute.getRegion()); + logger.info("Setting failoverTriggered to true"); + failoverTriggered.compareAndSet(false, true); + + if (globalEndpointManager != null) { + logger.info("Trigerring metadata refresh"); + globalEndpointManager.get().refreshLocationAsync(null, true).block(); + } else { + logger.info("globalEndpointManager is null, cannot trigger metadata refresh"); + } + + // If the barrier request is in the secondary region, allow it to succeed. + logger.info("Barrier request detected for region: {}", request.requestContext.regionalRoutingContextToRoute.getRegion()); + if (request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.secondaryRegion)) { + // Satisfy the barrier condition by setting GCLSN >= LSN + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(storeResponse.getLSN())); + } else { + // For any other region (initially the primary), keep the barrier condition unmet. + long lsn = storeResponse.getLSN() - 2; + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(lsn)); + } + } + return storeResponse; + }); + + CosmosAsyncClient client = clientBuilder.buildAsyncClient(); + + if (BridgeInternal + .getContextClient(client) + .getConnectionPolicy() + .getConnectionMode() == ConnectionMode.GATEWAY) { + throw new SkipException("Barrier requests cannot be intercepted in Gateway Mode"); + } + + try { + CosmosAsyncContainer container = getSharedSinglePartitionCosmosContainer(client); + + globalEndpointManager.set(BridgeInternal.getContextClient(client).getGlobalEndpointManager()); + + try { + CosmosItemResponse response = container.createItem(CosmosDiagnosticsTest.TestItem.createNewItem()).block(); + logger.info("Item created"); + validateDiagnosticsIsPresent(response); + + CosmosDiagnosticsContext diagnosticsContext = response.getDiagnostics().getDiagnosticsContext(); + logger.info("Diagnostics on successful Create : {}", diagnosticsContext); + } catch (CosmosException ex) { + CosmosDiagnosticsContext diagnosticsContext = ex.getDiagnostics().getDiagnosticsContext(); + logger.error("Diagnostics on unsuccessful Create : {}", diagnosticsContext.toJson()); + } + + } finally { + client.close(); + } + } + + @Test + public void assertHandleBarriersForStrongConsistencyWithCrossRegionRetry() { + + AtomicBoolean simulateAddressRefreshFailures = new AtomicBoolean(false); + AtomicBoolean failoverTriggered = new AtomicBoolean(false); + AtomicReference globalEndpointManager = new AtomicReference<>(null); + + CosmosClientBuilder clientBuilder = getClientBuilder() + .consistencyLevel(ConsistencyLevel.STRONG); + + clientBuilder.httpRequestInterceptor((request) -> { + logger.info("inside httpRequestInterceptor, simulateAddressRefreshFailures: {}, operationType: {}, resourceType: {}, uri: {}", + simulateAddressRefreshFailures.get(), request.getOperationType(), request.getResourceType()); + + // After the initial write, simulate a network failure on address resolution. + // This will trigger the SDK's failover logic. + if (simulateAddressRefreshFailures.get() && + request.isAddressRefresh() && + request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.primaryRegion)) { + logger.info("request operationType: " + request.getOperationType()); + logger.info("request resourceType: " + request.getResourceType()); + logger.info("Simulating network failure for address resolution for region " + this.primaryRegion); + logger.info("failoverTriggered: " + failoverTriggered.get()); + failoverTriggered.compareAndSet(false, true); + logger.info("failoverTriggered: " + failoverTriggered.get()); + Map headers = new HashMap<>(); + headers.put(HttpConstants.HttpHeaders.SUB_STATUS, Integer.toString(GATEWAY_ENDPOINT_UNAVAILABLE)); + throw new CosmosException(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, "Simulating network failure for address resolution for region", headers, new ConnectTimeoutException()); + } + + // Once the failover is triggered, trigger a subsequent metadata refresh call (intercepted in httpRequestInterceptor). + logger.info("Checking failoverTriggered to intercept metadata refresh call: " + failoverTriggered.get()); + //logger.info("isMetadataRequest: " + request.isMetadataRequest()); + if (failoverTriggered.get() && request.getResourceType() == ResourceType.DatabaseAccount && request.getOperationType() == OperationType.Read) + { + // Return the modified account properties, making the SDK believe a failover has occurred. + logger.info("Intercepting metadata call and returning modified account properties to simulate failover. New write region: " + this.secondaryRegion); + + ByteBuf byteBuf = Utils.getUTF8BytesOrNull(getDatabaseAccountJsonAfterFailover()); + StoreResponse storeResponse = new StoreResponse( + TestConfigurations.HOST, + 200, + request.getHeaders(), + new ByteBufInputStream(byteBuf), + byteBuf.readableBytes()); + + return new RxDocumentServiceResponse(null, storeResponse); + } + + return null; // let other requests proceed normally + }); + + clientBuilder.storeResponseInterceptor((request, storeResponse) -> { + logger.info("inside storeResponseInterceptor, operationType: {}, resourceType: {}, region: {}", + request.getOperationType(), request.getResourceType(), request.requestContext.regionalRoutingContextToRoute.getRegion()); + + if (request.getOperationType() == OperationType.Create && + request.getResourceType() == ResourceType.Document && + request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.primaryRegion)) { + + String lsn = storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN); + + // Decrement so that GCLSN < LSN to simulate the replication lag + String manipulatedGclsn = String.valueOf(Long.parseLong(lsn) - 2L); + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, manipulatedGclsn); + + // Enable address refresh failures for subsequent barrier requests in the primary region. + simulateAddressRefreshFailures.compareAndSet(false, true); + logger.info("inside storeResponseInterceptor, set simulateAddressRefreshFailures to {}", simulateAddressRefreshFailures.get()); + } + +// if (request.getOperationType() == OperationType.Create && +// request.getResourceType() == ResourceType.Document && +// request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.secondaryRegion)) { +// +// String jsonResponse = "{status:success,message:Operation completed}"; +// +// try { +// storeResponse.setResponseBodyAsJson(mapper.readTree(jsonResponse)); +// } catch (JsonProcessingException e) { +// logger.error("Error while setting response body as JSON", e); +// } +// storeResponse.withRemappedStatusCode(HttpConstants.StatusCodes.CREATED, 0d); +// +// String lsn = storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN); +// +// storeResponse.setHeaderValue(HttpConstants.HttpHeaders.SUB_STATUS, String.valueOf(HttpConstants.SubStatusCodes.UNKNOWN)); +// +// // Decrement so that GCLSN < LSN to simulate the replication lag +// String manipulatedGclsn = String.valueOf(Long.parseLong(lsn) - 2L); +// +// storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, manipulatedGclsn); +// +// logger.info("inside storeResponseInterceptor, set simulateAddressRefreshFailures to {}", simulateAddressRefreshFailures.get()); +// } + + // Track barrier requests (Head operations on a collection) + if (request.getOperationType() == OperationType.Head && request.getResourceType() == ResourceType.DocumentCollection) { + logger.info("Barrier request intercepted in storeResponseInterceptor for region: {}", request.requestContext.regionalRoutingContextToRoute.getRegion()); + logger.info("Setting failoverTriggered to true"); + failoverTriggered.compareAndSet(false, true); + + if (globalEndpointManager != null) { + logger.info("Trigerring metadata refresh"); + globalEndpointManager.get().refreshLocationAsync(null, true).block(); + } else { + logger.info("globalEndpointManager is null, cannot trigger metadata refresh"); + } + + // If the barrier request is in the secondary region, allow it to succeed. + logger.info("Barrier request detected for region: {}", request.requestContext.regionalRoutingContextToRoute.getRegion()); + if (request.requestContext.regionalRoutingContextToRoute.getRegion().equalsIgnoreCase(this.secondaryRegion)) { + // Satisfy the barrier condition by setting GCLSN >= LSN + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(storeResponse.getLSN())); + } else { + // For any other region (initially the primary), keep the barrier condition unmet. + long lsn = storeResponse.getLSN() - 2; + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(lsn)); + } + } + return storeResponse; + }); + + CosmosAsyncClient client = clientBuilder.buildAsyncClient(); + + if (BridgeInternal + .getContextClient(client) + .getConnectionPolicy() + .getConnectionMode() == ConnectionMode.GATEWAY) { + throw new SkipException("Barrier requests cannot be intercepted in Gateway Mode"); + } + + try { + CosmosAsyncContainer container = getSharedSinglePartitionCosmosContainer(client); + + globalEndpointManager.set(BridgeInternal.getContextClient(client).getGlobalEndpointManager()); + + try { + CosmosItemResponse response = container.createItem(CosmosDiagnosticsTest.TestItem.createNewItem()).block(); + logger.info("Item created"); + validateDiagnosticsIsPresent(response); + + CosmosDiagnosticsContext diagnosticsContext = response.getDiagnostics().getDiagnosticsContext(); + logger.info("Diagnostics on successful Create : {}", diagnosticsContext); + } catch (CosmosException ex) { + CosmosDiagnosticsContext diagnosticsContext = ex.getDiagnostics().getDiagnosticsContext(); + logger.error("Diagnostics on unsuccessful Create : {}", diagnosticsContext.toJson()); + } + + } finally { + client.close(); + } + } + + private void validateDiagnosticsIsPresent(CosmosItemResponse response) { + assertThat(response).isNotNull(); + assertThat(response.getStatusCode()).isNotNull(); + assertThat(response.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertThat(response.getDiagnostics()).isNotNull(); + } + + private String getDatabaseAccountJsonAfterFailover() { + String globalDatabaseAccountName = null; + String regex = "^https?://([^.]+)\\.documents\\.azure\\.com(?::\\d+)?/?"; + Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(TestConfigurations.HOST); + if (matcher.find() && matcher.toMatchResult().groupCount() == 1) { + globalDatabaseAccountName = matcher.group(1); + } else { + throw new IllegalArgumentException("Invalid host: " + TestConfigurations.HOST); + } + + // To simulate a failover from the backend, we modify the account topology in the following way: + // - writeableLocations = secondary region + // - replace secondary region in readableLocations with primary. + String jsonString = "{\"_self\":\"\",\"id\":\"" + globalDatabaseAccountName + "\",\"_rid\":\"" + globalDatabaseAccountName + ".documents.azure.com\"," + + "\"media\":\"//media/\",\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"" + secondaryRegion.toLowerCase().replaceAll("\\s", "") + "\",\"" + + "databaseAccountEndpoint\":\"https://" + globalDatabaseAccountName + "-" + secondaryRegion.toLowerCase().replaceAll("\\s", "") + + ".documents.azure.com:443/\"}],\"readableLocations\":[{\"name\"" + + ":\"" + this.secondaryRegion + "\",\"databaseAccountEndpoint\":\"" + this.secondaryRegionalEndpointAsStr + "\"},{\"name\"" + + ":\"" + this.primaryRegion + "\",\"databaseAccountEndpoint\":\"" + this.primaryRegionalEndpointAsStr + "\"}]," + + "\"enableMultipleWriteLocations\":false,\"continuousBackupEnabled\":false,\"enableNRegionSynchronousCommit\":false," + + "\"enablePerPartitionFailoverBehavior\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3," + + "\"maxReplicasetSize\":4},\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Strong\"},\"systemReplicationPolicy\":" + + "{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4},\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1}," + + "\"queryEngineConfiguration\":\"{\\\"allowNewKeywords\\\":true,\\\"maxJoinsPerSqlQuery\\\":10,\\\"maxQueryRequestTimeoutFraction\\\"" + + ":0.9,\\\"maxSqlQueryInputLength\\\":524288,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"queryMaxInMemorySortDocumentCount\\\":-1000,\\\"" + + "spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlDisableOptimizationFlags\\\":0,\\\"" + + "sqlQueryILDisableOptimizationFlags\\\":0,\\\"clientDisableOptimisticDirectExecution\\\":false,\\\"queryEnableFullText\\\":true,\\\"" + + "queryEnableFullTextPreviewFeatures\\\":false,\\\"queryMaxFullTextScoreSearchTerms\\\":5,\\\"queryMaxRrfArgumentCount\\\":100,\\\"" + + "enableSpatialIndexing\\\":true,\\\"maxInExpressionItemsCount\\\":2147483647,\\\"maxLogicalAndPerSqlQuery\\\":2147483647,\\\"" + + "maxLogicalOrPerSqlQuery\\\":2147483647,\\\"maxSpatialQueryCells\\\":2147483647,\\\"sqlAllowAggregateFunctions\\\":true,\\\"" + + "sqlAllowGroupByClause\\\":true,\\\"sqlAllowLike\\\":true,\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"" + + "sqlAllowTop\\\":true}\"}"; + + return jsonString; + } + + private AccountLevelLocationContext getAccountLevelLocationContext(DatabaseAccount databaseAccount, boolean writeOnly) { + Iterator locationIterator = + writeOnly ? databaseAccount.getWritableLocations().iterator() : databaseAccount.getReadableLocations().iterator(); + + List serviceOrderedReadableRegions = new ArrayList<>(); + List serviceOrderedWriteableRegions = new ArrayList<>(); + Map regionMap = new ConcurrentHashMap<>(); + + while (locationIterator.hasNext()) { + DatabaseAccountLocation accountLocation = locationIterator.next(); + regionMap.put(accountLocation.getName(), accountLocation.getEndpoint()); + + if (writeOnly) { + serviceOrderedWriteableRegions.add(accountLocation.getName()); + } else { + serviceOrderedReadableRegions.add(accountLocation.getName()); + } + } + + return new AccountLevelLocationContext( + serviceOrderedReadableRegions, + serviceOrderedWriteableRegions, + regionMap); + } + + private static class AccountLevelLocationContext { + private final List serviceOrderedReadableRegions; + @SuppressWarnings("unused") + private final List serviceOrderedWriteableRegions; + private final Map regionNameToEndpoint; + + public AccountLevelLocationContext( + List serviceOrderedReadableRegions, + List serviceOrderedWriteableRegions, + Map regionNameToEndpoint) { + + this.serviceOrderedReadableRegions = serviceOrderedReadableRegions; + this.serviceOrderedWriteableRegions = serviceOrderedWriteableRegions; + this.regionNameToEndpoint = regionNameToEndpoint; + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientUnderTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientUnderTest.java index a9f5cb35549c..18a29267e19b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientUnderTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientUnderTest.java @@ -19,6 +19,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; import static org.mockito.Mockito.doAnswer; @@ -64,7 +66,7 @@ public RxDocumentClientUnderTest(URI serviceEndpoint, null, false ); - init(null, null); + init(null, null, null, null); } RxGatewayStoreModel createRxGatewayProxy( @@ -93,6 +95,7 @@ RxGatewayStoreModel createRxGatewayProxy( userAgentContainer, globalEndpointManager, spyHttpClient, - apiType); + apiType, + null); } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java index 5a54c807b6eb..e018532b3b45 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java @@ -99,7 +99,8 @@ public void readTimeout() throws Exception { userAgentContainer, globalEndpointManager, httpClient, - null); + null, + null); storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader); RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(clientContext, @@ -143,6 +144,7 @@ public void serviceUnavailable() throws Exception { userAgentContainer, globalEndpointManager, httpClient, + null, null); storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader); @@ -202,7 +204,8 @@ public void applySessionToken( new UserAgentContainer(), globalEndpointManager, httpClient, - apiType); + apiType, + null); storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader); RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( @@ -274,7 +277,8 @@ public void validateApiType() throws Exception { new UserAgentContainer(), globalEndpointManager, httpClient, - apiType); + apiType, + null); RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( clientContext, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java index b06d6f89b8e9..52b969c6f066 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SpyClientUnderTestFactory.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import static org.mockito.Mockito.doAnswer; @@ -110,7 +112,7 @@ public static class ClientWithGatewaySpy extends SpyBaseClass httpRequestInterceptor) { this.origRxGatewayStoreModel = super.createRxGatewayProxy( sessionContainer, consistencyLevel, @@ -134,7 +137,8 @@ RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer, userAgentContainer, globalEndpointManager, rxClient, - apiType); + apiType, + null); this.requests = Collections.synchronizedList(new ArrayList<>()); this.spyRxGatewayStoreModel = Mockito.spy(this.origRxGatewayStoreModel); this.initRequestCapture(); @@ -201,7 +205,7 @@ public static class ClientUnderTest extends SpyBaseClass { credential, contentResponseOnWriteEnabled, clientTelemetryConfig); - init(null, this::initHttpRequestCapture); + init(null, this::initHttpRequestCapture, null, null); } private Mono captureHttpRequest(InvocationOnMock invocationOnMock) { @@ -287,7 +291,7 @@ public static class DirectHttpsClientUnderTest extends SpyBaseClass contentResponseOnWriteEnabled, clientTelemetryConfig); assert connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT; - init(null, null); + init(null, null, null, null); this.origHttpClient = ReflectionUtils.getDirectHttpsHttpClient(this); this.spyHttpClient = spy(this.origHttpClient); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java index 331be53cc7af..5879e7d3e61c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java @@ -110,7 +110,7 @@ public void resolveAsync() throws Exception { GlobalAddressResolver globalAddressResolver = new GlobalAddressResolver(mockDiagnosticsClientContext(), httpClient, endpointManager, Protocol.HTTPS, authorizationTokenProvider, collectionCache, routingMapProvider, userAgentContainer, - serviceConfigReader, connectionPolicy, null); + serviceConfigReader, connectionPolicy, null, null); RxDocumentServiceRequest request; request = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(), OperationType.Read, @@ -145,6 +145,7 @@ public void submitOpenConnectionTasksAndInitCaches() { userAgentContainer, serviceConfigReader, connectionPolicy, + null, null); GlobalAddressResolver.EndpointCache endpointCache = new GlobalAddressResolver.EndpointCache(); GatewayAddressCache gatewayAddressCache = Mockito.mock(GatewayAddressCache.class); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java index 680b98a6a915..6453cfab123f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java @@ -19,6 +19,8 @@ import com.azure.cosmos.implementation.QueryFeedOperationState; import com.azure.cosmos.implementation.RequestOptions; import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.WriteRetryPolicy; import com.azure.cosmos.implementation.clienttelemetry.ClientMetricsDiagnosticsHandler; @@ -27,6 +29,7 @@ import com.azure.cosmos.implementation.clienttelemetry.CosmosMeterOptions; import com.azure.cosmos.implementation.clienttelemetry.MetricCategory; import com.azure.cosmos.implementation.clienttelemetry.TagName; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; import com.azure.cosmos.implementation.throughputControl.sdk.config.SDKThroughputControlGroupInternal; @@ -60,6 +63,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -115,6 +119,8 @@ public final class CosmosAsyncClient implements Closeable { private final List requestPolicies; private final CosmosItemSerializer defaultCustomSerializer; private final java.util.function.Function containerFactory; + private final BiFunction storeResponseInterceptor; + private Function httpRequestInterceptor; CosmosAsyncClient(CosmosClientBuilder builder) { // Async Cosmos client wrapper @@ -134,6 +140,9 @@ public final class CosmosAsyncClient implements Closeable { this.nonIdempotentWriteRetryPolicy = builder.getNonIdempotentWriteRetryPolicy(); this.requestPolicies = builder.getOperationPolicies(); this.defaultCustomSerializer = builder.getCustomItemSerializer(); + this.storeResponseInterceptor = builder.getStoreResponseInterceptor(); + this.httpRequestInterceptor = builder.getHttpRequestInterceptor(); + if (builder.containerCreationInterceptor() != null) { this.containerFactory = builder.containerCreationInterceptor(); } else { @@ -185,6 +194,8 @@ public final class CosmosAsyncClient implements Closeable { .withDefaultSerializer(this.defaultCustomSerializer) .withRegionScopedSessionCapturingEnabled(builder.isRegionScopedSessionCapturingEnabled()) .withPerPartitionAutomaticFailoverEnabled(builder.isPerPartitionAutomaticFailoverEnabled()) + .withStoreResponseInterceptor(this.storeResponseInterceptor) + .withHttpRequestInterceptor(this.httpRequestInterceptor) .build(); this.accountConsistencyLevel = this.asyncDocumentClient.getDefaultConsistencyLevelOfAccount(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java index 12d022e69ee7..012f3f6bb6e3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java @@ -13,13 +13,16 @@ import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; import com.azure.cosmos.implementation.DiagnosticsProvider; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.WriteRetryPolicy; import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.apachecommons.lang.time.StopWatch; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.guava25.base.Preconditions; -import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PartitionLevelCircuitBreakerConfig; +import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.routing.LocationHelper; import com.azure.cosmos.models.CosmosAuthorizationTokenResolver; import com.azure.cosmos.models.CosmosClientTelemetryConfig; @@ -39,6 +42,7 @@ import java.util.Locale; import java.util.Objects; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -155,6 +159,8 @@ public class CosmosClientBuilder implements private boolean serverCertValidationDisabled = false; private Function containerFactory = null; + private BiFunction storeResponseInterceptor = null; + private Function httpRequestInterceptor = null; /** * Instantiates a new Cosmos client builder. @@ -170,6 +176,25 @@ public CosmosClientBuilder() { this.requestPolicies = new LinkedList<>(); } + CosmosClientBuilder httpRequestInterceptor(Function httpRequestInterceptor) { + this.httpRequestInterceptor = httpRequestInterceptor; + return this; + } + + Function getHttpRequestInterceptor() { + return this.httpRequestInterceptor; + } + + CosmosClientBuilder storeResponseInterceptor( + BiFunction storeResponseInterceptor) { + this.storeResponseInterceptor = storeResponseInterceptor; + return this; + } + + BiFunction getStoreResponseInterceptor() { + return this.storeResponseInterceptor; + } + CosmosClientBuilder metadataCaches(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot) { this.state = metadataCachesSnapshot; return this; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java index 5d4731cef122..68016c0132af 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java @@ -12,6 +12,7 @@ import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.batch.BatchExecUtils; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelStatistics; @@ -92,6 +93,8 @@ public class CosmosException extends AzureException { */ private RntbdChannelStatistics rntbdChannelStatistics; + private StoreResponse interceptedStoreResponse; + /** * LSN */ @@ -615,6 +618,14 @@ Map> getReplicaStatusList() { return this.replicaStatusList; } + public void setInterceptedStoreResponse(StoreResponse storeResponse) { + this.interceptedStoreResponse = storeResponse; + } + + public StoreResponse getInterceptedStoreResponse() { + return this.interceptedStoreResponse; + } + /////////////////////////////////////////////////////////////////////////////////////////// // the following helper/accessor only helps to access this class outside of this package.// /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java index f19ccb503027..d852e6324bbf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java @@ -15,6 +15,8 @@ import com.azure.cosmos.implementation.batch.ServerBatchRequest; import com.azure.cosmos.implementation.caches.RxClientCollectionCache; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker; import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; @@ -42,6 +44,8 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; /** * Provides a client-side logical representation of the Azure Cosmos DB @@ -115,6 +119,8 @@ class Builder { private boolean isRegionScopedSessionCapturingEnabled; private boolean isPerPartitionAutomaticFailoverEnabled; private List operationPolicies; + private BiFunction storeResponseInterceptor; + private Function httpRequestInterceptor; public Builder withServiceEndpoint(String serviceEndpoint) { try { @@ -287,6 +293,16 @@ public Builder withPerPartitionAutomaticFailoverEnabled(boolean isPerPartitionAu return this; } + public Builder withStoreResponseInterceptor(BiFunction storeResponseInterceptor) { + this.storeResponseInterceptor = storeResponseInterceptor; + return this; + } + + public Builder withHttpRequestInterceptor(Function httpRequestInterceptor) { + this.httpRequestInterceptor = httpRequestInterceptor; + return this; + } + private void ifThrowIllegalArgException(boolean value, String error) { if (value) { throw new IllegalArgumentException(error); @@ -329,7 +345,7 @@ public AsyncDocumentClient build() { operationPolicies, isPerPartitionAutomaticFailoverEnabled); - client.init(state, null); + client.init(state, null, httpRequestInterceptor, storeResponseInterceptor); return client; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java index 4a9c5b9772d1..2a2c845994cc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation; import com.azure.cosmos.implementation.directconnectivity.AddressSelector; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -16,7 +17,6 @@ * This is meant to be internally used only by our sdk. */ public class BackoffRetryUtility { - // transforms a retryFunc to a function which can be used by Observable.retryWhen(.) // also it invokes preRetryCallback prior to doing retry. public static final Quadruple InitialArgumentValuePolicyArg = Quadruple.with(false, false, @@ -30,7 +30,6 @@ public class BackoffRetryUtility { // a helper method for invoking callback method given the retry policy static public Mono executeRetry(Callable> callbackMethod, IRetryPolicy retryPolicy) { - return Mono.defer(() -> { try { return callbackMethod.call(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index ca9731f86e50..94c387d8c037 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -33,7 +33,7 @@ public class ClientRetryPolicy extends DocumentClientRetryPolicy { private final static Logger logger = LoggerFactory.getLogger(ClientRetryPolicy.class); final static int RetryIntervalInMS = 1000; //Once we detect failover wait for 1 second before retrying request. - final static int MaxRetryCount = 120; + final static int MaxRetryCount = 121; // TODO: Remember to set this back after done testing private final static int MaxServiceUnavailableRetryCount = 1; private final DocumentClientRetryPolicy throttlingRetry; @@ -275,8 +275,10 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ } private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest, boolean forceRefresh, boolean usePreferredLocations) { + logger.info("in shouldRetryOnEndpointFailureAsync() Retry count = {}", this.failoverRetryCount); + if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { - logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); + logger.info("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Mono.just(ShouldRetryResult.noRetry()); } @@ -321,7 +323,7 @@ private Mono shouldRetryOnGatewayTimeout(CosmosException clie //if operation is data plane read, metadata read, or query plan it can be retried on a different endpoint. if (canPerformCrossRegionRetryOnGatewayReadTimeout) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { - logger.warn("shouldRetryOnHttpTimeout() Not retrying. Retry count = {}", this.failoverRetryCount); + logger.info("shouldRetryOnHttpTimeout() Not retrying. Retry count = {}", this.failoverRetryCount); return Mono.just(ShouldRetryResult.noRetry()); } @@ -341,7 +343,7 @@ private Mono shouldRetryOnGatewayTimeout(CosmosException clie private Mono shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, boolean usePreferredLocations) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { - logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); + logger.info("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Mono.just(ShouldRetryResult.noRetry()); } Mono refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh, usePreferredLocations); @@ -362,14 +364,15 @@ private Mono refreshLocation(boolean isReadRequest, boolean forceRefresh, URI gatewayRegionalEndpoint = this.regionalRoutingContext.getGatewayRegionalEndpoint(); if (isReadRequest) { - logger.warn("marking the endpoint {} as unavailable for read", gatewayRegionalEndpoint); + logger.info("marking the endpoint {} as unavailable for read", gatewayRegionalEndpoint); this.globalEndpointManager.markEndpointUnavailableForRead(gatewayRegionalEndpoint); } else { - logger.warn("marking the endpoint {} as unavailable for write", gatewayRegionalEndpoint); + logger.info("marking the endpoint {} as unavailable for write", gatewayRegionalEndpoint); this.globalEndpointManager.markEndpointUnavailableForWrite(gatewayRegionalEndpoint); } this.retryContext = new RetryContext(this.failoverRetryCount, usePreferredLocations); + logger.info("entering refreshLocationAsync"); return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); } @@ -414,7 +417,7 @@ private Mono shouldRetryOnBackendServiceUnavailableAsync( nonIdempotentWriteRetriesEnabled, isWebExceptionRetriable, cosmosException)) { - logger.warn( + logger.info( "shouldRetryOnBackendServiceUnavailableAsync() Not retrying" + " on write with non retriable exception and non server returned service unavailable. Retry count = {}", this.serviceUnavailableRetryCount); @@ -422,7 +425,7 @@ private Mono shouldRetryOnBackendServiceUnavailableAsync( } if (this.serviceUnavailableRetryCount++ > MaxServiceUnavailableRetryCount) { - logger.warn("shouldRetryOnBackendServiceUnavailableAsync() Not retrying. Retry count = {}", this.serviceUnavailableRetryCount); + logger.info("shouldRetryOnBackendServiceUnavailableAsync() Not retrying. Retry count = {}", this.serviceUnavailableRetryCount); return Mono.just(ShouldRetryResult.noRetry()); } @@ -436,7 +439,7 @@ private Mono shouldRetryOnBackendServiceUnavailableAsync( int availablePreferredLocations = this.globalEndpointManager.getPreferredLocationCount(); if (availablePreferredLocations <= 1) { - logger.warn("shouldRetryOnServiceUnavailable() Not retrying. No other regions available for the request. AvailablePreferredLocations = {}", availablePreferredLocations); + logger.info("shouldRetryOnServiceUnavailable() Not retrying. No other regions available for the request. AvailablePreferredLocations = {}", availablePreferredLocations); return Mono.just(ShouldRetryResult.noRetry()); } @@ -514,9 +517,11 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) { // Important: this is to make the fault injection context will not be lost between each retries this.request.faultInjectionRequestContext = this.faultInjectionRequestContext; + logger.info("inside onBeforeSendRequest"); // Resolve the endpoint for the request and pin the resolution to the resolved endpoint // This enables marking the endpoint unavailability on endpoint failover/unreachability this.regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request); + logger.info("regional routing context resolved to {} with region {}", this.regionalRoutingContext.getGatewayRegionalEndpoint(), this.regionalRoutingContext.getRegion()); if (request.requestContext != null) { request.requestContext.routeToLocation(this.regionalRoutingContext); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 18da5250c458..f04e00669063 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -38,6 +38,7 @@ public class DocumentServiceRequestContext implements Cloneable { public volatile ISessionToken sessionToken; public volatile long quorumSelectedLSN; public volatile long globalCommittedSelectedLSN; + public volatile String globalStrongWriteRegion; public volatile StoreResponse globalStrongWriteResponse; public volatile ConsistencyLevel originalRequestConsistencyLevel; public volatile ReadConsistencyStrategy readConsistencyStrategy; @@ -148,6 +149,7 @@ public DocumentServiceRequestContext clone() { context.sessionToken = this.sessionToken; context.quorumSelectedLSN = this.quorumSelectedLSN; context.globalCommittedSelectedLSN = this.globalCommittedSelectedLSN; + context.globalStrongWriteRegion = this.globalStrongWriteRegion; context.globalStrongWriteResponse = this.globalStrongWriteResponse; context.originalRequestConsistencyLevel = this.originalRequestConsistencyLevel; context.readConsistencyStrategy = this.readConsistencyStrategy; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index ec0ceb536615..18571f719b59 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -135,6 +135,8 @@ public List getAvailableWriteRoutingContexts() { public static Mono getDatabaseAccountFromAnyLocationsAsync( URI defaultEndpoint, List locations, Function> getDatabaseAccountFn) { + logger.info("entered getDatabaseAccountFromAnyLocationsAsync with defaultEndpoint: {} and locations: {}", + defaultEndpoint, String.join(",", locations)); return getDatabaseAccountFn.apply(defaultEndpoint).onErrorResume( e -> { @@ -172,12 +174,12 @@ public URI getDefaultEndpoint() { } public void markEndpointUnavailableForRead(URI endpoint) { - logger.debug("Marking endpoint {} unavailable for read",endpoint); + logger.info("Marking endpoint {} unavailable for read",endpoint); this.locationCache.markEndpointUnavailableForRead(endpoint);; } public void markEndpointUnavailableForWrite(URI endpoint) { - logger.debug("Marking endpoint {} unavailable for Write",endpoint); + logger.info("Marking endpoint {} unavailable for Write",endpoint); this.locationCache.markEndpointUnavailableForWrite(endpoint); } @@ -193,12 +195,13 @@ public void close() { this.isClosed = true; this.perPartitionAutomaticFailoverConfigModifier = null; this.scheduler.dispose(); - logger.debug("GlobalEndpointManager closed."); + logger.info("GlobalEndpointManager closed."); } public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean forceRefresh) { + logger.info("refreshLocationAsync invoked. forceRefresh: {}", forceRefresh); return Mono.defer(() -> { - logger.debug("refreshLocationAsync() invoked"); + logger.info("refreshLocationAsync() invoked"); if (forceRefresh) { Mono databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync( @@ -222,11 +225,11 @@ public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean } if (!isRefreshing.compareAndSet(false, true)) { - logger.debug("in the middle of another refresh. Not invoking a new refresh."); + logger.info("in the middle of another refresh. Not invoking a new refresh."); return Mono.empty(); } - logger.debug("will refresh"); + logger.info("will refresh"); return this.refreshLocationPrivateAsync(databaseAccount).doOnError(e -> this.isRefreshing.set(false)); }); } @@ -248,8 +251,9 @@ public int getPreferredLocationCount() { } private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) { + logger.info("inside refreshLocationPrivateAsync"); return Mono.defer(() -> { - logger.debug("refreshLocationPrivateAsync() refreshing locations"); + logger.info("refreshLocationPrivateAsync() refreshing locations"); if (databaseAccount != null) { this.databaseAccountWriteLock.lock(); @@ -263,10 +267,10 @@ private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) Utils.ValueHolder canRefreshInBackground = new Utils.ValueHolder<>(); if (this.locationCache.shouldRefreshEndpoints(canRefreshInBackground)) { - logger.debug("shouldRefreshEndpoints: true"); + logger.info("shouldRefreshEndpoints: true"); if (databaseAccount == null && !canRefreshInBackground.v) { - logger.debug("shouldRefreshEndpoints: can't be done in background"); + logger.info("shouldRefreshEndpoints: can't be done in background"); Mono databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync( this.defaultEndpoint, @@ -301,7 +305,7 @@ private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) this.isRefreshing.set(false); return Mono.empty(); } else { - logger.debug("shouldRefreshEndpoints: false, nothing to do."); + logger.info("shouldRefreshEndpoints: false, nothing to do."); this.isRefreshing.set(false); return Mono.empty(); } @@ -315,15 +319,20 @@ private void startRefreshLocationTimerAsync() { private Mono startRefreshLocationTimerAsync(boolean initialization) { if (this.isClosed) { - logger.debug("startRefreshLocationTimerAsync: nothing to do, it is closed"); + logger.info("startRefreshLocationTimerAsync: nothing to do, it is closed"); // if client is already closed, nothing to be done, just return. return Mono.empty(); } - logger.debug("registering a refresh in [{}] ms", this.backgroundRefreshLocationTimeIntervalInMS); + // TODO: revert after testing done + logger.info("registering a refresh in [{}] ms", this.backgroundRefreshLocationTimeIntervalInMS); + //int testRefreshInterval = 3000; + //logger.info("registering a refresh in [{}] ms", testRefreshInterval); LocalDateTime now = LocalDateTime.now(); + // TODO: revert after testing done int delayInMillis = initialization ? 0: this.backgroundRefreshLocationTimeIntervalInMS; + //int delayInMillis = initialization ? 0: testRefreshInterval; this.refreshInBackground.set(true); @@ -336,7 +345,7 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { return Mono.empty(); } - logger.debug("startRefreshLocationTimerAsync() - Invoking refresh, I was registered on [{}]", now); + logger.info("startRefreshLocationTimerAsync() - Invoking refresh, I was registered on [{}]", now); Mono databaseAccountObs = GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList<>(this.getEffectivePreferredRegions()), this::getDatabaseAccountAsync); @@ -359,6 +368,7 @@ public boolean hasThinClientReadLocations() { } private Mono getDatabaseAccountAsync(URI serviceEndpoint) { + logger.info("entered getDatabaseAccountAsync in GlobalEndpointManager"); return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint) .doOnNext(databaseAccount -> { if(databaseAccount != null) { @@ -393,7 +403,7 @@ private Mono getDatabaseAccountAsync(URI serviceEndpoint) { } } - logger.debug("account retrieved: {}", databaseAccount); + logger.info("account retrieved: {}", databaseAccount); }).single(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 63c238f0e6ab..4685b4ac874b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -449,6 +449,7 @@ public static class SubStatusCodes { public static final int CLIENT_OPERATION_TIMEOUT = 20008; // Sub-status code paired with 408 status code public static final int TRANSIT_TIMEOUT = 20911; + public static final int WRITE_REGION_BARRIER_CHANGED_MID_OPERATION = 20912; // IMPORTANT - below sub status codes have no corresponding .Net // version, because they are only applicable in Java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index e909406cd659..9fec8c3b74f2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -42,6 +42,7 @@ import com.azure.cosmos.implementation.directconnectivity.ServerStoreModel; import com.azure.cosmos.implementation.directconnectivity.StoreClient; import com.azure.cosmos.implementation.directconnectivity.StoreClientFactory; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; import com.azure.cosmos.implementation.http.HttpClient; @@ -260,6 +261,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private final RetryPolicy retryPolicy; private HttpClient reactorHttpClient; private Function httpClientInterceptor; + private Function httpRequestInterceptor; private volatile boolean useMultipleWriteLocations; // creator of TransportClient is responsible for disposing it. @@ -772,7 +774,10 @@ private void updateThinProxy() { (this.thinProxy).setSessionContainer(this.sessionContainer); } - public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Function httpClientInterceptor) { + public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, + Function httpClientInterceptor, + Function httpRequestInterceptor, + BiFunction storeResponseInterceptor) { try { this.httpClientInterceptor = httpClientInterceptor; @@ -786,7 +791,8 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func this.userAgentContainer, this.globalEndpointManager, this.reactorHttpClient, - this.apiType); + this.apiType, + httpRequestInterceptor); this.thinProxy = createThinProxy(this.sessionContainer, this.consistencyLevel, @@ -828,6 +834,8 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func this.partitionKeyRangeCache = new RxPartitionKeyRangeCache(RxDocumentClientImpl.this, collectionCache); + this.httpRequestInterceptor = httpRequestInterceptor; + updateGatewayProxy(); updateThinProxy(); clientTelemetry = new ClientTelemetry( @@ -856,7 +864,7 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func if (this.connectionPolicy.getConnectionMode() == ConnectionMode.GATEWAY) { this.storeModel = this.gatewayProxy; } else { - this.initializeDirectConnectivity(); + this.initializeDirectConnectivity(storeResponseInterceptor); } this.retryPolicy.setRxCollectionCache(this.collectionCache); ConsistencyLevel effectiveConsistencyLevel = consistencyLevel != null @@ -879,7 +887,7 @@ public void serialize(CosmosClientMetadataCachesSnapshot state) { RxCollectionCache.serialize(state, this.collectionCache); } - private void initializeDirectConnectivity() { + private void initializeDirectConnectivity(BiFunction rntbdTransportClientStoreResponseInterceptor) { this.addressResolver = new GlobalAddressResolver(this, this.reactorHttpClient, this.globalEndpointManager, @@ -892,7 +900,8 @@ private void initializeDirectConnectivity() { // this.gatewayConfigurationReader, null, this.connectionPolicy, - this.apiType); + this.apiType, + this.httpRequestInterceptor); this.storeClientFactory = new StoreClientFactory( this.addressResolver, @@ -905,6 +914,7 @@ private void initializeDirectConnectivity() { this.clientTelemetry, this.globalEndpointManager); + this.storeClientFactory.setStoreResponseInterceptorIfRntbdTransportClient(rntbdTransportClientStoreResponseInterceptor); this.globalPartitionEndpointManagerForPerPartitionCircuitBreaker.setGlobalAddressResolver(this.addressResolver); this.createStoreModel(true); } @@ -936,7 +946,8 @@ RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer, UserAgentContainer userAgentContainer, GlobalEndpointManager globalEndpointManager, HttpClient httpClient, - ApiType apiType) { + ApiType apiType, + Function httpRequestInterceptor) { return new RxGatewayStoreModel( this, sessionContainer, @@ -945,7 +956,8 @@ RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer, userAgentContainer, globalEndpointManager, httpClient, - apiType); + apiType, + httpRequestInterceptor); } ThinClientStoreModel createThinProxy(ISessionContainer sessionContainer, @@ -6319,6 +6331,7 @@ public AddressSelector getAddressSelector() { } public Flux getDatabaseAccountFromEndpoint(URI endpoint) { + logger.info("entered getDatabaseAccountFromEndpoint line 6334 RxDocumentClientImpl"); return Flux.defer(() -> { RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this, OperationType.Read, ResourceType.DatabaseAccount, "", null, (Object) null); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index ea81761b0411..c0b4858cd621 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -53,6 +53,8 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.Callable; +import java.util.function.BiFunction; +import java.util.function.Function; import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -82,6 +84,7 @@ public class RxGatewayStoreModel implements RxStoreModel, HttpTransportSerialize private GatewayServiceConfigurationReader gatewayServiceConfigurationReader; private RxClientCollectionCache collectionCache; private GatewayServerErrorInjector gatewayServerErrorInjector; + private Function httpRequestInterceptor; public RxGatewayStoreModel( DiagnosticsClientContext clientContext, @@ -91,7 +94,8 @@ public RxGatewayStoreModel( UserAgentContainer userAgentContainer, GlobalEndpointManager globalEndpointManager, HttpClient httpClient, - ApiType apiType) { + ApiType apiType, + Function httpRequestInterceptor) { this.clientContext = clientContext; @@ -107,6 +111,8 @@ public RxGatewayStoreModel( this.httpClient = httpClient; this.sessionContainer = sessionContainer; + + this.httpRequestInterceptor = httpRequestInterceptor; } public RxGatewayStoreModel(RxGatewayStoreModel inner) { @@ -303,6 +309,13 @@ public Mono performRequestInternal(RxDocumentServiceR private Mono performRequestInternalCore(RxDocumentServiceRequest request, URI requestUri) { try { + if (this.httpRequestInterceptor != null) { + RxDocumentServiceResponse result = this.httpRequestInterceptor.apply(request); + if (result != null) { + return Mono.just(result); + } + } + HttpRequest httpRequest = request .getEffectiveHttpTransportSerializer(this) .wrapInHttpRequest(request, requestUri); @@ -686,6 +699,8 @@ private Mono invokeAsync(RxDocumentServiceRequest req @Override public Mono processMessage(RxDocumentServiceRequest request) { + if (this.httpRequestInterceptor != null) {} + Mono responseObs = this.addIntendedCollectionRidAndSessionToken(request).then(invokeAsync(request)); return responseObs.onErrorResume( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ServiceUnavailableException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ServiceUnavailableException.java index a321d59dc238..3c5fa878f176 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ServiceUnavailableException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ServiceUnavailableException.java @@ -38,7 +38,7 @@ public ServiceUnavailableException(CosmosError cosmosError, setSubStatus(subStatusCode); } - ServiceUnavailableException(String message, int subStatusCode) { + public ServiceUnavailableException(String message, int subStatusCode) { this(message, null, (String) null, subStatusCode); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 3922ca187db7..9ba1d801b328 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -25,6 +25,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; /** * While this class is public, but it is not part of our published public APIs. @@ -54,7 +56,8 @@ public ThinClientStoreModel( userAgentContainer, globalEndpointManager, httpClient, - ApiType.SQL); + ApiType.SQL, + null); String userAgent = userAgentContainer != null ? userAgentContainer.getUserAgent() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 92a781cf071c..87584b7524ee 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -27,7 +27,6 @@ import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.collections.ComparatorUtils; -import com.azure.cosmos.implementation.directconnectivity.rntbd.ClosedClientTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; @@ -44,6 +43,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -113,6 +113,10 @@ public Mono writeAsync( TimeoutHelper timeout, boolean forceRefresh) { + logger.info("entered writeAsync with region {} and endpoint {}", + entity.requestContext.regionalRoutingContextToRoute.getRegion(), + entity.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint()); + if (timeout.isElapsed() && // skip throwing RequestTimeout on first retry because the first retry with // force address refresh header can be critical to recover for example from @@ -149,6 +153,10 @@ Mono writePrivateAsync( TimeoutHelper timeout, boolean forceRefresh) { + logger.info("entered writePrivate with region {} and endpoint {}", + request.requestContext.regionalRoutingContextToRoute.getRegion(), + request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint()); + if (timeout.isElapsed() && // skip throwing RequestTimeout on first retry because the first retry with // force address refresh header can be critical to recover for example from @@ -325,6 +333,10 @@ boolean isGlobalStrongRequest(RxDocumentServiceRequest request, StoreResponse re } Mono barrierForGlobalStrong(RxDocumentServiceRequest request, StoreResponse response) { + logger.info("inside barrierForGlobalStrong with region {} and endpoint {}", + request.requestContext.regionalRoutingContextToRoute.getRegion(), + request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint()); + try { if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) { Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-1L); @@ -338,13 +350,14 @@ Mono barrierForGlobalStrong(RxDocumentServiceRequest request, Sto throw new GoneException(RMResources.Gone, HttpConstants.SubStatusCodes.SERVER_GENERATED_410); } + request.requestContext.globalStrongWriteRegion = request.requestContext.regionalRoutingContextToRoute.getRegion(); request.requestContext.globalStrongWriteResponse = response; request.requestContext.globalCommittedSelectedLSN = lsn.v; //if necessary we would have already refreshed cache by now. request.requestContext.forceRefreshAddressCache = false; - logger.debug("ConsistencyWriter: globalCommittedLsn {}, lsn {}", globalCommittedLsn, lsn); + logger.info("ConsistencyWriter: globalCommittedLsn {}, lsn {}", globalCommittedLsn, lsn); //barrier only if necessary, i.e. when write region completes write, but read regions have not. if (globalCommittedLsn.v < lsn.v) { @@ -384,10 +397,34 @@ Mono barrierForGlobalStrong(RxDocumentServiceRequest request, Sto } } + private void validateGlobalStrongWriteRegion(RxDocumentServiceRequest barrierRequest) + { + // validate that a regional failover has not occurred since the initial write. + String currentRegion = barrierRequest.requestContext.regionalRoutingContextToRoute.getRegion(); + logger.info("Entered validateGlobalStrongWriteRegion. CurrentRegion: {}, OriginalWriteRegion: {}", + currentRegion, barrierRequest.requestContext.globalStrongWriteRegion); + if (barrierRequest.requestContext.globalStrongWriteRegion != null && + !Objects.equals(barrierRequest.requestContext.globalStrongWriteRegion, currentRegion)) + { + logger.info( + "ConsistencyWriter: Failover detected during strong consistency write. Original write was to region " + + barrierRequest.requestContext.globalStrongWriteRegion + " but retry is targeting currentRegion " + + currentRegion + ". Failing request."); + + throw new RequestTimeoutException( + "The write operation was initiated in region " + barrierRequest.requestContext.globalStrongWriteRegion + + " but a regional failover occurred. The current attempt is to endpoint " + currentRegion + + ". The state of the write is ambiguous.", + null, + HttpConstants.SubStatusCodes.WRITE_REGION_BARRIER_CHANGED_MID_OPERATION); + } + } + private Mono waitForWriteBarrierAsync(RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) { AtomicInteger writeBarrierRetryCount = new AtomicInteger(ConsistencyWriter.MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES); AtomicLong maxGlobalCommittedLsnReceived = new AtomicLong(0); return Flux.defer(() -> { + this.validateGlobalStrongWriteRegion(barrierRequest); if (barrierRequest.requestContext.timeoutHelper.isElapsed()) { return Flux.error(new RequestTimeoutException()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index e62d7b8c6ca4..04d1236346b1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -71,6 +71,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; @@ -110,6 +112,7 @@ public class GatewayAddressCache implements IAddressCache { private final boolean replicaAddressValidationEnabled; private final Set replicaValidationScopes; private GatewayServerErrorInjector gatewayServerErrorInjector; + public Function httpRequestInterceptor; public GatewayAddressCache( DiagnosticsClientContext clientContext, @@ -357,6 +360,17 @@ private Mono> getServerAddressesViaGatewayInternalAsync(RxDocument JavaStreamUtils.toString(partitionKeyRangeIds, ",")); } + logger.info("inside getServerAddressesViaGatewayInternalAsync"); + logger.info("httpRequestInterceptor is " + (this.httpRequestInterceptor != null ? "not null" : "null")); + if (this.httpRequestInterceptor != null) { + logger.info("getServerAddressesViaGatewayInternalAsync intercepted"); + logger.info("request operationType: " + request.getOperationType() + ", resourceType: " + request.getResourceType()); + RxDocumentServiceResponse result = this.httpRequestInterceptor.apply(request); + if (result != null) { + return Mono.just(result.getQueryResponse(null, Address.class)); + } + } + // track address refresh has happened, this is only meant to be used for fault injection validation request.faultInjectionRequestContext.recordAddressForceRefreshed(forceRefresh); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java index 00905682b4d1..a8b80258bd7b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java @@ -4,18 +4,7 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.CosmosContainerProactiveInitConfig; -import com.azure.cosmos.implementation.ApiType; -import com.azure.cosmos.implementation.Configs; -import com.azure.cosmos.implementation.ConnectionPolicy; -import com.azure.cosmos.implementation.CosmosSchedulers; -import com.azure.cosmos.implementation.DiagnosticsClientContext; -import com.azure.cosmos.implementation.DocumentCollection; -import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.IAuthorizationTokenProvider; -import com.azure.cosmos.implementation.ImplementationBridgeHelpers; -import com.azure.cosmos.implementation.OpenConnectionResponse; -import com.azure.cosmos.implementation.RxDocumentServiceRequest; -import com.azure.cosmos.implementation.UserAgentContainer; +import com.azure.cosmos.implementation.*; import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; @@ -40,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; public class GlobalAddressResolver implements IAddressResolver { @@ -62,6 +53,7 @@ public class GlobalAddressResolver implements IAddressResolver { private ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor; private ConnectionPolicy connectionPolicy; private GatewayServerErrorInjector gatewayServerErrorInjector; + private Function httpRequestInterceptor; public GlobalAddressResolver( DiagnosticsClientContext diagnosticsClientContext, @@ -74,7 +66,8 @@ public GlobalAddressResolver( UserAgentContainer userAgentContainer, GatewayServiceConfigurationReader serviceConfigReader, ConnectionPolicy connectionPolicy, - ApiType apiType) { + ApiType apiType, + Function httpRequestInterceptor) { this.diagnosticsClientContext = diagnosticsClientContext; this.httpClient = httpClient; this.endpointManager = endpointManager; @@ -86,6 +79,7 @@ public GlobalAddressResolver( this.serviceConfigReader = serviceConfigReader; this.tcpConnectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled(); this.connectionPolicy = connectionPolicy; + this.httpRequestInterceptor = httpRequestInterceptor; int maxBackupReadEndpoints = (connectionPolicy.isReadRequestsFallbackEnabled()) ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write getEndpoint (during failover) @@ -291,6 +285,7 @@ private EndpointCache getOrAddEndpoint(URI endpoint) { this.connectionPolicy, this.proactiveOpenConnectionsProcessor, this.gatewayServerErrorInjector); + gatewayAddressCache.httpRequestInterceptor = this.httpRequestInterceptor; AddressResolver addressResolver = new AddressResolver(); addressResolver.initializeCaches(this.collectionCache, this.routingMapProvider, gatewayAddressCache); EndpointCache cache = new EndpointCache(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java index bbf642ba667a..cb07cf04fb41 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java @@ -24,7 +24,7 @@ public class JsonNodeStorePayload implements StorePayload { private static final Logger logger = LoggerFactory.getLogger(JsonNodeStorePayload.class); private static final CharsetDecoder fallbackCharsetDecoder = getFallbackCharsetDecoder(); private final int responsePayloadSize; - private final JsonNode jsonValue; + private JsonNode jsonValue; public JsonNodeStorePayload(ByteBufInputStream bufferStream, int readableBytes, Map responseHeaders) { if (readableBytes > 0) { @@ -107,6 +107,10 @@ public JsonNode getPayload() { return jsonValue; } + public void setPayload(JsonNode payload) { + jsonValue = payload; + } + private static CharsetDecoder getFallbackCharsetDecoder() { if (StringUtil.isNullOrEmpty(Configs.getCharsetDecoderErrorActionOnMalformedInput()) && StringUtil.isNullOrEmpty(Configs.getCharsetDecoderErrorActionOnUnmappedCharacter())) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index 22c5ed8624b6..bf693a1bd16c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -63,6 +63,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.Consumer; import static com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.reportIssue; @@ -114,6 +115,7 @@ public class RntbdTransportClient extends TransportClient { private final RntbdServerErrorInjector serverErrorInjector; private final ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor; private final AddressSelector addressSelector; + private BiFunction storeResponseInterceptor; // endregion @@ -319,6 +321,10 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume storeResponse.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline()); } + if (this.storeResponseInterceptor != null) { + return this.storeResponseInterceptor.apply(request, storeResponse); + } + return storeResponse; }).onErrorMap(throwable -> { @@ -492,6 +498,10 @@ private boolean shouldRecordChannelAcquisitionTimeline(RequestTimeline requestTi channelAcquisitionEvent.get().getDuration().toMillis() > this.channelAcquisitionContextLatencyThresholdInMillis; } + public void setStoreResponseInterceptor(BiFunction storeResponseInterceptor) { + this.storeResponseInterceptor = storeResponseInterceptor; + } + // endregion // region Types diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java index 16f5124a1895..7e8643065f29 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java @@ -10,9 +10,12 @@ import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.IAuthorizationTokenProvider; import com.azure.cosmos.implementation.ISessionContainer; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.UserAgentContainer; import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; +import java.util.function.BiFunction; + // We suppress the "try" warning here because the close() method's signature // allows it to throw InterruptedException which is strongly advised against // by AutoCloseable (see: http://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html#close()). @@ -98,6 +101,17 @@ public StoreClient createStoreClient( sessionRetryOptions); } + public void setStoreResponseInterceptorIfRntbdTransportClient( + BiFunction storeResponseInterceptor) { + this.throwIfClosed(); + + if (this.transportClient instanceof RntbdTransportClient) { + ((RntbdTransportClient) this.transportClient).setStoreResponseInterceptor(storeResponseInterceptor); + } else { + throw new IllegalStateException("StoreResponseInterceptor can only be set for RntbdTransportClient"); + } + } + private void throwIfClosed() { if (isClosed) { throw new IllegalStateException("storeClient already closed!"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 5169dfb121a7..2147ae74e93c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -98,6 +98,8 @@ public Mono> readMultipleReplicaAsync( boolean checkMinLSN, boolean forceReadAll) { + logger.info("inside readMultipleReplicaAsync with region {}", entity.requestContext.regionalRoutingContextToRoute.getRegion()); + if (entity.requestContext.timeoutHelper.isElapsed()) { return Mono.error(new GoneException()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java index c8539f09cba5..2156504db3d1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java @@ -153,6 +153,10 @@ public JsonNode getResponseBodyAsJson() { return this.responsePayload.getPayload(); } + public void setResponseBodyAsJson(JsonNode body) { + this.responsePayload.setPayload(body); + } + public int getResponseBodyLength() { if (this.responsePayload == null) { return 0; @@ -196,6 +200,20 @@ public String getHeaderValue(String attribute) { return null; } + // meant for fault injection only + public void setHeaderValue(String attribute, String value) { + if (this.responseHeaderValues == null || this.responseHeaderNames.length != this.responseHeaderValues.length) { + return; + } + + for (int i = 0; i < responseHeaderNames.length; i++) { + if (responseHeaderNames[i].equalsIgnoreCase(attribute)) { + responseHeaderValues[i] = value; + return; + } + } + } + public double getRequestCharge() { String value = this.getHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE); if (StringUtils.isEmpty(value)) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index d7e8ccb56421..ae42b4dd0ba4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -614,6 +614,8 @@ public void flush(final ChannelHandlerContext context) { */ @Override public void read(final ChannelHandlerContext context) { +// logger.info("inside RntbdRequestManager.read"); + this.traceOperation(context, "read"); context.read(); } @@ -630,6 +632,7 @@ public void read(final ChannelHandlerContext context) { */ @Override public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) { +// logger.info("inside RntbdRequestManager.write"); this.traceOperation(context, "write", message); @@ -984,6 +987,7 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes } final RxDocumentServiceRequest serviceRequest = requestRecord.args().serviceRequest(); + logger.info("inside RntbdRequestManager.messageReceived - serviceRequest region: {}", serviceRequest.requestContext.regionalRoutingContextToRoute.getRegion()); requestRecord.stage(RntbdRequestRecord.Stage.DECODE_STARTED, response.getDecodeStartTime()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java index 6d61cca536d1..f60fcb413ecb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java @@ -145,6 +145,7 @@ public void markEndpointUnavailableForWrite(URI endpoint) { * @param databaseAccount READ DatabaseAccount */ public void onDatabaseAccountRead(DatabaseAccount databaseAccount) { + logger.info("inside onDatabaseAccountRead"); this.updateLocationCache( databaseAccount.getWritableLocations(), databaseAccount.getReadableLocations(), @@ -686,7 +687,7 @@ private void clearStaleEndpointUnavailabilityInfo() { this.unavailableLocationsExpirationTime) && Utils.tryRemove(this.locationUnavailabilityInfoByEndpoint, unavailableEndpoint, removedHolder)) { - logger.debug( + logger.info( "Removed endpoint [{}] unavailable for operations [{}] from unavailableEndpoints", unavailableEndpoint, unavailabilityInfoHolder.v.unavailableOperations); @@ -750,7 +751,7 @@ public LocationUnavailabilityInfo apply(RegionalRoutingContext url, LocationUnav this.updateLocationCache(); - logger.debug( + logger.info( "Endpoint [{}] unavailable for [{}] added/updated to unavailableEndpoints with timestamp [{}]", unavailableEndpoint, unavailableOperationType, @@ -770,7 +771,7 @@ private void updateLocationCache( Boolean enableMultipleWriteLocations) { synchronized (this.lockObject) { DatabaseAccountLocationsInfo nextLocationInfo = new DatabaseAccountLocationsInfo(this.locationInfo); - logger.debug("updating location cache ..., current readLocations [{}], current writeLocations [{}]", + logger.info("updating location cache ..., current readLocations [{}], current writeLocations [{}]", nextLocationInfo.readRegionalRoutingContexts, nextLocationInfo.writeRegionalRoutingContexts); if (preferenceList != null) { @@ -822,7 +823,7 @@ private void updateLocationCache( this.lastCacheUpdateTimestamp = Instant.now(); - logger.debug("updating location cache finished, new readLocations [{}], new writeLocations [{}]", + logger.info("updating location cache finished, new readLocations [{}], new writeLocations [{}]", nextLocationInfo.readRegionalRoutingContexts, nextLocationInfo.writeRegionalRoutingContexts); this.locationInfo = nextLocationInfo; } @@ -914,7 +915,7 @@ private void addRoutingContexts( String location = gatewayDbAccountLocation.getName().toLowerCase(Locale.ROOT); URI endpoint = new URI(gatewayDbAccountLocation.getEndpoint().toLowerCase(Locale.ROOT)); - RegionalRoutingContext regionalRoutingContext = new RegionalRoutingContext(endpoint); + RegionalRoutingContext regionalRoutingContext = new RegionalRoutingContext(endpoint, location); if (!endpointsByLocation.containsKey(location)) { endpointsByLocation.put(location, regionalRoutingContext); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java index ec185ed87be7..4a473544b6de 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java @@ -11,18 +11,28 @@ public class RegionalRoutingContext { // IMPORTANT: // Please reevaluate overridden equals() implementation // when adding additional properties to this class + private final String region; private final URI gatewayRegionalEndpoint; private final String gatewayRegionalEndpointAsString; private URI thinclientRegionalEndpoint; private String thinclientRegionalEndpointAsString; - public RegionalRoutingContext(URI gatewayRegionalEndpoint) { + public RegionalRoutingContext(URI gatewayRegionalEndpoint, String region) { this.gatewayRegionalEndpoint = gatewayRegionalEndpoint; this.gatewayRegionalEndpointAsString = gatewayRegionalEndpoint.toString(); + this.region = region; this.thinclientRegionalEndpoint = null; this.thinclientRegionalEndpointAsString = null; } + public RegionalRoutingContext(URI gatewayRegionalEndpoint) { + this(gatewayRegionalEndpoint, null); + } + + public String getRegion() { + return this.region; + } + public URI getGatewayRegionalEndpoint() { return this.gatewayRegionalEndpoint; }