diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 4758e37c7..e6c44d423 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -226,7 +226,7 @@ private void onProviderEvent( case PROVIDER_ERROR: if (providerEventDetails != null && providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) { - onFatal(); + onFatal(providerEventDetails); break; } @@ -283,15 +283,12 @@ private void onError() { } } - private void onFatal() { + private void onFatal(ProviderEventDetails providerEventDetails) { if (errorTask != null && !errorTask.isCancelled()) { errorTask.cancel(false); } - this.syncResources.setFatal(true); - - this.emitProviderError(ProviderEventDetails.builder() - .errorCode(ErrorCode.PROVIDER_FATAL) - .build()); + this.syncResources.fatalError(providerEventDetails); + this.emitProviderError(providerEventDetails); shutdown(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java index e173cc5df..fec52f8e6 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java @@ -3,6 +3,7 @@ import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.ImmutableContext; import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.ProviderEventDetails; import dev.openfeature.sdk.exceptions.FatalError; import dev.openfeature.sdk.exceptions.GeneralError; import lombok.Getter; @@ -15,10 +16,10 @@ @Getter class FlagdProviderSyncResources { @Setter - private volatile ProviderEvent previousEvent = null; + private volatile ProviderEvent previousEvent; - @Setter private volatile boolean isFatal; + private volatile ProviderEventDetails fatalProviderEventDetails; private volatile EvaluationContext enrichedContext = new ImmutableContext(); private volatile boolean isInitialized; @@ -39,7 +40,6 @@ public synchronized boolean initialize() { return false; } this.isInitialized = true; - this.isFatal = false; this.notifyAll(); return true; } @@ -68,7 +68,7 @@ public synchronized boolean initialize() { public void waitForInitialization(long deadline) { long start = System.currentTimeMillis(); long end = start + deadline; - while (!isInitialized && !isShutDown) { + while (!isInitialized && !isShutDown && !isFatal) { long now = System.currentTimeMillis(); // if wait(0) is called, the thread would wait forever, so we abort when this would happen if (now >= end) { @@ -77,7 +77,7 @@ public void waitForInitialization(long deadline) { } long remaining = end - now; synchronized (this) { - if (isShutDown) { + if (isShutDown || isFatal) { break; } if (isInitialized) { // might have changed in the meantime @@ -91,12 +91,16 @@ public void waitForInitialization(long deadline) { } } } - if (isShutDown) { - String msg = "Already shut down due to previous error."; - if (isFatal) { - throw new FatalError(msg); + if (isFatal) { + var fatalEvent = fatalProviderEventDetails; + if (fatalEvent != null) { + throw new FatalError("Initialization failed due to a fatal error: " + fatalEvent.getMessage()); + } else { + throw new FatalError("Initialization failed due to a fatal error."); } - throw new GeneralError(msg); + } + if (isShutDown) { + throw new GeneralError("Already shut down due to previous error."); } } @@ -107,4 +111,10 @@ public synchronized void shutdown() { isShutDown = true; this.notifyAll(); } + + public synchronized void fatalError(ProviderEventDetails providerEventDetails) { + isFatal = true; + fatalProviderEventDetails = providerEventDetails; + this.notifyAll(); + } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java index 684caf0d9..dd5dbe73a 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java @@ -1,9 +1,12 @@ package dev.openfeature.contrib.providers.flagd; +import dev.openfeature.sdk.ErrorCode; +import dev.openfeature.sdk.ProviderEventDetails; import dev.openfeature.sdk.exceptions.FatalError; import dev.openfeature.sdk.exceptions.GeneralError; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -115,7 +118,6 @@ void callingInitialize_wakesUpWaitingThread() throws InterruptedException { void callingShutdownWithPreviousNonFatal_wakesUpWaitingThread_WithGeneralException() throws InterruptedException { final AtomicBoolean isWaiting = new AtomicBoolean(); final AtomicBoolean successfulTest = new AtomicBoolean(); - flagdProviderSyncResources.setFatal(false); Thread waitingThread = new Thread(() -> { long start = System.currentTimeMillis(); @@ -147,7 +149,7 @@ void callingShutdownWithPreviousNonFatal_wakesUpWaitingThread_WithGeneralExcepti void callingShutdownWithPreviousFatal_wakesUpWaitingThread_WithFatalException() throws InterruptedException { final AtomicBoolean isWaiting = new AtomicBoolean(); final AtomicBoolean successfulTest = new AtomicBoolean(); - flagdProviderSyncResources.setFatal(true); + flagdProviderSyncResources.fatalError(null); Thread waitingThread = new Thread(() -> { long start = System.currentTimeMillis(); @@ -184,4 +186,63 @@ void waitForInitializationAfterCallingInitialize_returnsInstantly() { // do not use MAX_TIME_TOLERANCE here, this should happen faster than that Assertions.assertTrue(start + 1 >= end); } + + @Timeout(2) + @Test + void fatalHasPrecedenceOverInitAndShutdown() { + flagdProviderSyncResources.fatalError(null); + flagdProviderSyncResources.initialize(); + flagdProviderSyncResources.shutdown(); + + Assertions.assertThrows(FatalError.class, () -> flagdProviderSyncResources.waitForInitialization(10000)); + } + + @Timeout(2) + @Test + void fatalAbortsInit() throws InterruptedException { + final AtomicBoolean isWaiting = new AtomicBoolean(); + final AtomicLong waitTime = new AtomicLong(Long.MAX_VALUE); + final AtomicReference fatalException = new AtomicReference<>(); + + Thread waitingThread = new Thread(() -> { + long start = System.currentTimeMillis(); + isWaiting.set(true); + try { + flagdProviderSyncResources.waitForInitialization(10000); + } catch (Exception e) { + fatalException.set(e); + } + long end = System.currentTimeMillis(); + long duration = end - start; + waitTime.set(duration); + }); + waitingThread.start(); + + while (!isWaiting.get()) { + Thread.yield(); + } + + Thread.sleep(MAX_TIME_TOLERANCE); // waitingThread should have started waiting in the meantime + + var fatalEvent = ProviderEventDetails.builder() + .errorCode(ErrorCode.PROVIDER_FATAL) + .message("Some message") + .build(); + flagdProviderSyncResources.fatalError(fatalEvent); + + waitingThread.join(); + + var wait = MAX_TIME_TOLERANCE * 3; + + Assertions.assertTrue( + waitTime.get() < wait, + () -> "Wakeup should be almost instant, but took " + waitTime.get() + + " ms, which is more than the max of" + + wait + " ms"); + Assertions.assertNotNull(fatalException.get()); + Assertions.assertInstanceOf(FatalError.class, fatalException.get()); + Assertions.assertEquals( + "Initialization failed due to a fatal error: " + fatalEvent.getMessage(), + fatalException.get().getMessage()); + } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index 2af572fe5..ad38fe15d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -34,6 +34,7 @@ import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveStringResponse; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceBlockingStub; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceStub; +import dev.openfeature.sdk.ErrorCode; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.FlagEvaluationDetails; import dev.openfeature.sdk.FlagValueType; @@ -48,6 +49,7 @@ import dev.openfeature.sdk.Reason; import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; +import dev.openfeature.sdk.exceptions.FatalError; import dev.openfeature.sdk.internal.TriConsumer; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -60,8 +62,10 @@ import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.MockedConstruction; @@ -676,6 +680,38 @@ void updatesSyncMetadataWithCallback() throws Exception { } } + @Test + void initAfterFatalPropagatesErrorEvent() { + // given + final var ctx = new ImmutableContext(); + final var metadata = new MutableStructure(); + + // mock a resolver + final var onEvent = new AtomicReference>(); + try (var mockResolver = mockConstruction( + InProcessResolver.class, + (mock, context) -> onEvent.set((TriConsumer) + context.arguments().get(1)))) { + + FlagdProvider provider = new FlagdProvider(FlagdOptions.builder() + .resolverType(Config.Resolver.IN_PROCESS) + .build()); + + onEvent.get() + .accept( + ProviderEvent.PROVIDER_ERROR, + ProviderEventDetails.builder() + .message("msg") + .errorCode(ErrorCode.PROVIDER_FATAL) + .build(), + metadata); + + var error = Assertions.assertThrows(FatalError.class, () -> provider.initialize(ctx)); + Assertions.assertEquals("Initialization failed due to a fatal error: msg", error.getMessage()); + Assertions.assertEquals(ErrorCode.PROVIDER_FATAL, error.getErrorCode()); + } + } + // test helper // create provider with given grpc provider and state supplier private FlagdProvider createProvider(ChannelConnector connector, ServiceBlockingStub mockBlockingStub) {