Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private void onProviderEvent(
case PROVIDER_ERROR:
if (providerEventDetails != null
&& providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) {
onFatal();
onFatal(providerEventDetails);
break;
}

Expand Down Expand Up @@ -283,15 +283,12 @@ private void onError() {
}
}

private void onFatal() {
private void onFatal(ProviderEventDetails providerEventDetails) {
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
}
this.syncResources.setFatal(true);

this.emitProviderError(ProviderEventDetails.builder()
.errorCode(ErrorCode.PROVIDER_FATAL)
.build());
this.syncResources.fatalError(providerEventDetails);
this.emitProviderError(providerEventDetails);

shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.exceptions.FatalError;
import dev.openfeature.sdk.exceptions.GeneralError;
import lombok.Getter;
Expand All @@ -15,10 +16,10 @@
@Getter
class FlagdProviderSyncResources {
@Setter
private volatile ProviderEvent previousEvent = null;
private volatile ProviderEvent previousEvent;

@Setter
private volatile boolean isFatal;
private volatile ProviderEventDetails fatalProviderEventDetails;

private volatile EvaluationContext enrichedContext = new ImmutableContext();
private volatile boolean isInitialized;
Expand All @@ -39,7 +40,6 @@ public synchronized boolean initialize() {
return false;
}
this.isInitialized = true;
this.isFatal = false;
this.notifyAll();
return true;
}
Expand Down Expand Up @@ -68,7 +68,7 @@ public synchronized boolean initialize() {
public void waitForInitialization(long deadline) {
long start = System.currentTimeMillis();
long end = start + deadline;
while (!isInitialized && !isShutDown) {
while (!isInitialized && !isShutDown && !isFatal) {
long now = System.currentTimeMillis();
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
Expand All @@ -77,7 +77,7 @@ public void waitForInitialization(long deadline) {
}
long remaining = end - now;
synchronized (this) {
if (isShutDown) {
if (isShutDown || isFatal) {
break;
}
if (isInitialized) { // might have changed in the meantime
Expand All @@ -91,12 +91,16 @@ public void waitForInitialization(long deadline) {
}
}
}
if (isShutDown) {
String msg = "Already shut down due to previous error.";
if (isFatal) {
throw new FatalError(msg);
if (isFatal) {
var fatalEvent = fatalProviderEventDetails;
if (fatalEvent != null) {
throw new FatalError("Initialization failed due to a fatal error: " + fatalEvent.getMessage());
} else {
throw new FatalError("Initialization failed due to a fatal error.");
}
throw new GeneralError(msg);
}
if (isShutDown) {
throw new GeneralError("Already shut down due to previous error.");
}
}

Expand All @@ -107,4 +111,10 @@ public synchronized void shutdown() {
isShutDown = true;
this.notifyAll();
}

public synchronized void fatalError(ProviderEventDetails providerEventDetails) {
isFatal = true;
fatalProviderEventDetails = providerEventDetails;
this.notifyAll();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.exceptions.FatalError;
import dev.openfeature.sdk.exceptions.GeneralError;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -115,7 +118,6 @@ void callingInitialize_wakesUpWaitingThread() throws InterruptedException {
void callingShutdownWithPreviousNonFatal_wakesUpWaitingThread_WithGeneralException() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final AtomicBoolean successfulTest = new AtomicBoolean();
flagdProviderSyncResources.setFatal(false);

Thread waitingThread = new Thread(() -> {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -147,7 +149,7 @@ void callingShutdownWithPreviousNonFatal_wakesUpWaitingThread_WithGeneralExcepti
void callingShutdownWithPreviousFatal_wakesUpWaitingThread_WithFatalException() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final AtomicBoolean successfulTest = new AtomicBoolean();
flagdProviderSyncResources.setFatal(true);
flagdProviderSyncResources.fatalError(null);

Thread waitingThread = new Thread(() -> {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -184,4 +186,63 @@ void waitForInitializationAfterCallingInitialize_returnsInstantly() {
// do not use MAX_TIME_TOLERANCE here, this should happen faster than that
Assertions.assertTrue(start + 1 >= end);
}

@Timeout(2)
@Test
void fatalHasPrecedenceOverInitAndShutdown() {
flagdProviderSyncResources.fatalError(null);
flagdProviderSyncResources.initialize();
flagdProviderSyncResources.shutdown();

Assertions.assertThrows(FatalError.class, () -> flagdProviderSyncResources.waitForInitialization(10000));
}

@Timeout(2)
@Test
void fatalAbortsInit() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final AtomicLong waitTime = new AtomicLong(Long.MAX_VALUE);
final AtomicReference<Exception> fatalException = new AtomicReference<>();

Thread waitingThread = new Thread(() -> {
long start = System.currentTimeMillis();
isWaiting.set(true);
try {
flagdProviderSyncResources.waitForInitialization(10000);
} catch (Exception e) {
fatalException.set(e);
}
long end = System.currentTimeMillis();
long duration = end - start;
waitTime.set(duration);
});
waitingThread.start();

while (!isWaiting.get()) {
Thread.yield();
}

Thread.sleep(MAX_TIME_TOLERANCE); // waitingThread should have started waiting in the meantime

var fatalEvent = ProviderEventDetails.builder()
.errorCode(ErrorCode.PROVIDER_FATAL)
.message("Some message")
.build();
flagdProviderSyncResources.fatalError(fatalEvent);

waitingThread.join();

var wait = MAX_TIME_TOLERANCE * 3;

Assertions.assertTrue(
waitTime.get() < wait,
() -> "Wakeup should be almost instant, but took " + waitTime.get()
+ " ms, which is more than the max of"
+ wait + " ms");
Assertions.assertNotNull(fatalException.get());
Assertions.assertInstanceOf(FatalError.class, fatalException.get());
Assertions.assertEquals(
"Initialization failed due to a fatal error: " + fatalEvent.getMessage(),
fatalException.get().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveStringResponse;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceBlockingStub;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceStub;
import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.FlagEvaluationDetails;
import dev.openfeature.sdk.FlagValueType;
Expand All @@ -48,6 +49,7 @@
import dev.openfeature.sdk.Reason;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import dev.openfeature.sdk.exceptions.FatalError;
import dev.openfeature.sdk.internal.TriConsumer;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand All @@ -60,8 +62,10 @@
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
Expand Down Expand Up @@ -676,6 +680,38 @@ void updatesSyncMetadataWithCallback() throws Exception {
}
}

@Test
void initAfterFatalPropagatesErrorEvent() {
// given
final var ctx = new ImmutableContext();
final var metadata = new MutableStructure();

// mock a resolver
final var onEvent = new AtomicReference<TriConsumer<ProviderEvent, ProviderEventDetails, Structure>>();
try (var mockResolver = mockConstruction(
InProcessResolver.class,
(mock, context) -> onEvent.set((TriConsumer<ProviderEvent, ProviderEventDetails, Structure>)
context.arguments().get(1)))) {

FlagdProvider provider = new FlagdProvider(FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.build());

onEvent.get()
.accept(
ProviderEvent.PROVIDER_ERROR,
ProviderEventDetails.builder()
.message("msg")
.errorCode(ErrorCode.PROVIDER_FATAL)
.build(),
metadata);

var error = Assertions.assertThrows(FatalError.class, () -> provider.initialize(ctx));
Assertions.assertEquals("Initialization failed due to a fatal error: msg", error.getMessage());
Assertions.assertEquals(ErrorCode.PROVIDER_FATAL, error.getErrorCode());
}
}

// test helper
// create provider with given grpc provider and state supplier
private FlagdProvider createProvider(ChannelConnector connector, ServiceBlockingStub mockBlockingStub) {
Expand Down
Loading