Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import static org.mockito.Mockito.doAnswer;

Expand Down Expand Up @@ -64,7 +66,7 @@ public RxDocumentClientUnderTest(URI serviceEndpoint,
null,
false
);
init(null, null);
init(null, null, null, null);
}

RxGatewayStoreModel createRxGatewayProxy(
Expand Down Expand Up @@ -93,6 +95,7 @@ RxGatewayStoreModel createRxGatewayProxy(
userAgentContainer,
globalEndpointManager,
spyHttpClient,
apiType);
apiType,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void readTimeout() throws Exception {
userAgentContainer,
globalEndpointManager,
httpClient,
null);
null,
null);
storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader);

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(clientContext,
Expand Down Expand Up @@ -143,6 +144,7 @@ public void serviceUnavailable() throws Exception {
userAgentContainer,
globalEndpointManager,
httpClient,
null,
null);
storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader);

Expand Down Expand Up @@ -202,7 +204,8 @@ public void applySessionToken(
new UserAgentContainer(),
globalEndpointManager,
httpClient,
apiType);
apiType,
null);
storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader);

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
Expand Down Expand Up @@ -274,7 +277,8 @@ public void validateApiType() throws Exception {
new UserAgentContainer(),
globalEndpointManager,
httpClient,
apiType);
apiType,
null);

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
clientContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -110,7 +112,7 @@ public static class ClientWithGatewaySpy extends SpyBaseClass<RxDocumentServiceR
credential,
contentResponseOnWriteEnabled,
clientTelemetryConfig);
init(null, null);
init(null, null, null, null);
updateOrigRxGatewayStoreModel();
}

Expand All @@ -126,15 +128,17 @@ RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer,
UserAgentContainer userAgentContainer,
GlobalEndpointManager globalEndpointManager,
HttpClient rxClient,
ApiType apiType) {
ApiType apiType,
Function<RxDocumentServiceRequest, RxDocumentServiceResponse> httpRequestInterceptor) {
this.origRxGatewayStoreModel = super.createRxGatewayProxy(
sessionContainer,
consistencyLevel,
queryCompatibilityMode,
userAgentContainer,
globalEndpointManager,
rxClient,
apiType);
apiType,
null);
this.requests = Collections.synchronizedList(new ArrayList<>());
this.spyRxGatewayStoreModel = Mockito.spy(this.origRxGatewayStoreModel);
this.initRequestCapture();
Expand Down Expand Up @@ -201,7 +205,7 @@ public static class ClientUnderTest extends SpyBaseClass<HttpRequest> {
credential,
contentResponseOnWriteEnabled,
clientTelemetryConfig);
init(null, this::initHttpRequestCapture);
init(null, this::initHttpRequestCapture, null, null);
}

private Mono<HttpResponse> captureHttpRequest(InvocationOnMock invocationOnMock) {
Expand Down Expand Up @@ -287,7 +291,7 @@ public static class DirectHttpsClientUnderTest extends SpyBaseClass<HttpRequest>
contentResponseOnWriteEnabled,
clientTelemetryConfig);
assert connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT;
init(null, null);
init(null, null, null, null);

this.origHttpClient = ReflectionUtils.getDirectHttpsHttpClient(this);
this.spyHttpClient = spy(this.origHttpClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void resolveAsync() throws Exception {

GlobalAddressResolver globalAddressResolver = new GlobalAddressResolver(mockDiagnosticsClientContext(), httpClient, endpointManager, Protocol.HTTPS, authorizationTokenProvider, collectionCache, routingMapProvider,
userAgentContainer,
serviceConfigReader, connectionPolicy, null);
serviceConfigReader, connectionPolicy, null, null);
RxDocumentServiceRequest request;
request = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
OperationType.Read,
Expand Down Expand Up @@ -145,6 +145,7 @@ public void submitOpenConnectionTasksAndInitCaches() {
userAgentContainer,
serviceConfigReader,
connectionPolicy,
null,
null);
GlobalAddressResolver.EndpointCache endpointCache = new GlobalAddressResolver.EndpointCache();
GatewayAddressCache gatewayAddressCache = Mockito.mock(GatewayAddressCache.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.azure.cosmos.implementation.QueryFeedOperationState;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.WriteRetryPolicy;
import com.azure.cosmos.implementation.clienttelemetry.ClientMetricsDiagnosticsHandler;
Expand All @@ -27,6 +29,7 @@
import com.azure.cosmos.implementation.clienttelemetry.CosmosMeterOptions;
import com.azure.cosmos.implementation.clienttelemetry.MetricCategory;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.throughputControl.sdk.config.SDKThroughputControlGroupInternal;
Expand Down Expand Up @@ -60,6 +63,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -115,6 +119,8 @@ public final class CosmosAsyncClient implements Closeable {
private final List<CosmosOperationPolicy> requestPolicies;
private final CosmosItemSerializer defaultCustomSerializer;
private final java.util.function.Function<CosmosAsyncContainer, CosmosAsyncContainer> containerFactory;
private final BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor;
private Function<RxDocumentServiceRequest, RxDocumentServiceResponse> httpRequestInterceptor;

CosmosAsyncClient(CosmosClientBuilder builder) {
// Async Cosmos client wrapper
Expand All @@ -134,6 +140,9 @@ public final class CosmosAsyncClient implements Closeable {
this.nonIdempotentWriteRetryPolicy = builder.getNonIdempotentWriteRetryPolicy();
this.requestPolicies = builder.getOperationPolicies();
this.defaultCustomSerializer = builder.getCustomItemSerializer();
this.storeResponseInterceptor = builder.getStoreResponseInterceptor();
this.httpRequestInterceptor = builder.getHttpRequestInterceptor();

if (builder.containerCreationInterceptor() != null) {
this.containerFactory = builder.containerCreationInterceptor();
} else {
Expand Down Expand Up @@ -185,6 +194,8 @@ public final class CosmosAsyncClient implements Closeable {
.withDefaultSerializer(this.defaultCustomSerializer)
.withRegionScopedSessionCapturingEnabled(builder.isRegionScopedSessionCapturingEnabled())
.withPerPartitionAutomaticFailoverEnabled(builder.isPerPartitionAutomaticFailoverEnabled())
.withStoreResponseInterceptor(this.storeResponseInterceptor)
.withHttpRequestInterceptor(this.httpRequestInterceptor)
.build();

this.accountConsistencyLevel = this.asyncDocumentClient.getDefaultConsistencyLevelOfAccount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot;
import com.azure.cosmos.implementation.DiagnosticsProvider;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.WriteRetryPolicy;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.time.StopWatch;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PartitionLevelCircuitBreakerConfig;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.routing.LocationHelper;
import com.azure.cosmos.models.CosmosAuthorizationTokenResolver;
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
Expand All @@ -39,6 +42,7 @@
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -155,6 +159,8 @@ public class CosmosClientBuilder implements
private boolean serverCertValidationDisabled = false;

private Function<CosmosAsyncContainer, CosmosAsyncContainer> containerFactory = null;
private BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor = null;
private Function<RxDocumentServiceRequest, RxDocumentServiceResponse> httpRequestInterceptor = null;

/**
* Instantiates a new Cosmos client builder.
Expand All @@ -170,6 +176,25 @@ public CosmosClientBuilder() {
this.requestPolicies = new LinkedList<>();
}

CosmosClientBuilder httpRequestInterceptor(Function<RxDocumentServiceRequest, RxDocumentServiceResponse> httpRequestInterceptor) {
this.httpRequestInterceptor = httpRequestInterceptor;
return this;
}

Function<RxDocumentServiceRequest, RxDocumentServiceResponse> getHttpRequestInterceptor() {
return this.httpRequestInterceptor;
}

CosmosClientBuilder storeResponseInterceptor(
BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor) {
this.storeResponseInterceptor = storeResponseInterceptor;
return this;
}

BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> getStoreResponseInterceptor() {
return this.storeResponseInterceptor;
}

CosmosClientBuilder metadataCaches(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot) {
this.state = metadataCachesSnapshot;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.batch.BatchExecUtils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelStatistics;
Expand Down Expand Up @@ -92,6 +93,8 @@ public class CosmosException extends AzureException {
*/
private RntbdChannelStatistics rntbdChannelStatistics;

private StoreResponse interceptedStoreResponse;

/**
* LSN
*/
Expand Down Expand Up @@ -615,6 +618,14 @@ Map<String, Set<String>> getReplicaStatusList() {
return this.replicaStatusList;
}

public void setInterceptedStoreResponse(StoreResponse storeResponse) {
this.interceptedStoreResponse = storeResponse;
}

public StoreResponse getInterceptedStoreResponse() {
return this.interceptedStoreResponse;
}

///////////////////////////////////////////////////////////////////////////////////////////
// the following helper/accessor only helps to access this class outside of this package.//
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
Expand Down Expand Up @@ -42,6 +44,8 @@
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Provides a client-side logical representation of the Azure Cosmos DB
Expand Down Expand Up @@ -115,6 +119,8 @@ class Builder {
private boolean isRegionScopedSessionCapturingEnabled;
private boolean isPerPartitionAutomaticFailoverEnabled;
private List<CosmosOperationPolicy> operationPolicies;
private BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor;
private Function<RxDocumentServiceRequest, RxDocumentServiceResponse> httpRequestInterceptor;

public Builder withServiceEndpoint(String serviceEndpoint) {
try {
Expand Down Expand Up @@ -287,6 +293,16 @@ public Builder withPerPartitionAutomaticFailoverEnabled(boolean isPerPartitionAu
return this;
}

public Builder withStoreResponseInterceptor(BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor) {
this.storeResponseInterceptor = storeResponseInterceptor;
return this;
}

public Builder withHttpRequestInterceptor(Function<RxDocumentServiceRequest, RxDocumentServiceResponse> httpRequestInterceptor) {
this.httpRequestInterceptor = httpRequestInterceptor;
return this;
}

private void ifThrowIllegalArgException(boolean value, String error) {
if (value) {
throw new IllegalArgumentException(error);
Expand Down Expand Up @@ -329,7 +345,7 @@ public AsyncDocumentClient build() {
operationPolicies,
isPerPartitionAutomaticFailoverEnabled);

client.init(state, null);
client.init(state, null, httpRequestInterceptor, storeResponseInterceptor);
return client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
Expand All @@ -16,7 +17,6 @@
* This is meant to be internally used only by our sdk.
*/
public class BackoffRetryUtility {

// transforms a retryFunc to a function which can be used by Observable.retryWhen(.)
// also it invokes preRetryCallback prior to doing retry.
public static final Quadruple<Boolean, Boolean, Duration, Integer> InitialArgumentValuePolicyArg = Quadruple.with(false, false,
Expand All @@ -30,7 +30,6 @@ public class BackoffRetryUtility {
// a helper method for invoking callback method given the retry policy
static public <T> Mono<T> executeRetry(Callable<Mono<T>> callbackMethod,
IRetryPolicy retryPolicy) {

return Mono.defer(() -> {
try {
return callbackMethod.call();
Expand Down
Loading