diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 5d245a764..41c6ecfcc 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -4,6 +4,7 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; +import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -19,23 +20,26 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; /** - * Implements the {@link QueueSource} contract and emit flags obtained from - * flagd sync gRPC contract. + * Implements the {@link QueueSource} contract and emit flags obtained from flagd sync gRPC contract. */ @Slf4j @SuppressFBWarnings( value = {"EI_EXPOSE_REP"}, - justification = "We need to expose the BlockingQueue to allow consumers to read from it") + justification = "Random is used to generate a variation & flag configurations require exposing") public class SyncStreamQueueSource implements QueueSource { private static final int QUEUE_SIZE = 5; + private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); private final int streamDeadline; @@ -44,44 +48,28 @@ public class SyncStreamQueueSource implements QueueSource { private final String selector; private final String providerId; private final boolean syncMetadataDisabled; - private final boolean reinitializeOnError; - private final FlagdOptions options; + private final ChannelConnector channelConnector; private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final List fatalStatusCodes; - private volatile GrpcComponents grpcComponents; - - /** - * Container for gRPC components to ensure atomicity during reinitialization. - * All three components are updated together to prevent consumers from seeing - * an inconsistent state where components are from different channel instances. - */ - private static class GrpcComponents { - final ChannelConnector channelConnector; - final FlagSyncServiceStub flagSyncStub; - final FlagSyncServiceBlockingStub metadataStub; - - GrpcComponents(ChannelConnector connector, FlagSyncServiceStub stub, FlagSyncServiceBlockingStub blockingStub) { - this.channelConnector = connector; - this.flagSyncStub = stub; - this.metadataStub = blockingStub; - } - } + private final FlagSyncServiceStub flagSyncStub; + private final FlagSyncServiceBlockingStub metadataStub; + private final ScheduledExecutorService scheduler; /** - * Creates a new SyncStreamQueueSource responsible for observing the event - * stream. + * Creates a new SyncStreamQueueSource responsible for observing the event stream. */ - public SyncStreamQueueSource(final FlagdOptions options) { + public SyncStreamQueueSource(final FlagdOptions options, Consumer onConnectionEvent) { streamDeadline = options.getStreamDeadlineMs(); deadline = options.getDeadline(); selector = options.getSelector(); providerId = options.getProviderId(); maxBackoffMs = options.getRetryBackoffMaxMs(); syncMetadataDisabled = options.isSyncMetadataDisabled(); - fatalStatusCodes = options.getFatalStatusCodes(); - reinitializeOnError = options.isReinitializeOnError(); - this.options = options; - initializeChannelComponents(); + channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options)); + flagSyncStub = + FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady(); + metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel()) + .withWaitForReady(); + scheduler = createScheduler(); } // internal use only @@ -94,55 +82,25 @@ protected SyncStreamQueueSource( deadline = options.getDeadline(); selector = options.getSelector(); providerId = options.getProviderId(); + channelConnector = connectorMock; maxBackoffMs = options.getRetryBackoffMaxMs(); + flagSyncStub = stubMock; syncMetadataDisabled = options.isSyncMetadataDisabled(); - fatalStatusCodes = options.getFatalStatusCodes(); - reinitializeOnError = options.isReinitializeOnError(); - this.options = options; - this.grpcComponents = new GrpcComponents(connectorMock, stubMock, blockingStubMock); + metadataStub = blockingStubMock; + scheduler = createScheduler(); } - /** Initialize channel connector and stubs. */ - private synchronized void initializeChannelComponents() { - ChannelConnector newConnector = new ChannelConnector(options, ChannelBuilder.nettyChannel(options)); - FlagSyncServiceStub newFlagSyncStub = - FlagSyncServiceGrpc.newStub(newConnector.getChannel()).withWaitForReady(); - FlagSyncServiceBlockingStub newMetadataStub = - FlagSyncServiceGrpc.newBlockingStub(newConnector.getChannel()).withWaitForReady(); - - // atomic assignment of all components as a single unit - grpcComponents = new GrpcComponents(newConnector, newFlagSyncStub, newMetadataStub); - } - - /** Reinitialize channel connector and stubs on error. */ - public synchronized void reinitializeChannelComponents() { - if (!reinitializeOnError || shutdown.get()) { - return; - } - - log.info("Reinitializing channel gRPC components in attempt to restore stream."); - GrpcComponents oldComponents = grpcComponents; - - try { - // create new channel components first - initializeChannelComponents(); - } catch (Exception e) { - log.error("Failed to reinitialize channel components", e); - return; - } - - // shutdown old connector after successful reinitialization - if (oldComponents != null && oldComponents.channelConnector != null) { - try { - oldComponents.channelConnector.shutdown(); - } catch (Exception e) { - log.debug("Error shutting down old channel connector during reinitialization", e); - } - } + private static ScheduledExecutorService createScheduler() { + return Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "flagd-sync-retry-scheduler"); + t.setDaemon(true); + return t; + }); } /** Initialize sync stream connector. */ public void init() throws Exception { + channelConnector.initialize(); Thread listener = new Thread(this::observeSyncStream); listener.setDaemon(true); listener.start(); @@ -159,15 +117,20 @@ public BlockingQueue getStreamQueue() { * @throws InterruptedException if stream can't be closed within deadline. */ public void shutdown() throws InterruptedException { - // Do not enqueue errors from here, as this method can be called externally, causing multiple shutdown signals // Use atomic compareAndSet to ensure shutdown is only executed once // This prevents race conditions when shutdown is called from multiple threads if (!shutdown.compareAndSet(false, true)) { log.debug("Shutdown already in progress or completed"); return; } - - grpcComponents.channelConnector.shutdown(); + this.scheduler.shutdownNow(); + try { + this.scheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.debug("Scheduler termination was interrupted", e); + Thread.currentThread().interrupt(); + } + this.channelConnector.shutdown(); } /** Contains blocking calls, to be used concurrently. */ @@ -178,71 +141,61 @@ private void observeSyncStream() { // "waitForReady" on the channel, plus our retry policy slow this loop down in // error conditions while (!shutdown.get()) { + if (shouldThrottle.getAndSet(false)) { + log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); + scheduleRetry(); + return; + } + + log.debug("Initializing sync stream request"); + SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle); + try { + observer.metadata = getMetadata(); + } catch (Exception metaEx) { + // retry if getMetadata fails + String message = metaEx.getMessage(); + log.debug("Metadata request error: {}, will restart", message, metaEx); + enqueueError(String.format("Error in getMetadata request: %s", message)); + shouldThrottle.set(true); + continue; + } + try { - if (shouldThrottle.getAndSet(false)) { - log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); - Thread.sleep(this.maxBackoffMs); - - // Check shutdown again after sleep to avoid unnecessary work - if (shutdown.get()) { - break; - } - } - - log.debug("Initializing sync stream request"); - SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue); - try { - observer.metadata = getMetadata(); - } catch (StatusRuntimeException metaEx) { - if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) { - log.info( - "Fatal status code for metadata request: {}, not retrying", - metaEx.getStatus().getCode()); - shutdown(); - enqueue(QueuePayload.SHUTDOWN); - } else { - // retry for other status codes - String message = metaEx.getMessage(); - log.debug("Metadata request error: {}, will restart", message, metaEx); - enqueue(QueuePayload.ERROR); - } - shouldThrottle.set(true); - continue; - } - - try { - syncFlags(observer); - handleObserverError(observer); - } catch (StatusRuntimeException ex) { - if (fatalStatusCodes.contains(ex.getStatus().getCode().name())) { - log.info( - "Fatal status code during sync stream: {}, not retrying", - ex.getStatus().getCode()); - shutdown(); - enqueue(QueuePayload.SHUTDOWN); - } else { - // retry for other status codes - log.error("Unexpected sync stream exception, will restart.", ex); - enqueue(QueuePayload.ERROR); - } - shouldThrottle.set(true); - } - } catch (InterruptedException ie) { - log.debug("Stream loop interrupted, most likely shutdown was invoked", ie); + syncFlags(observer); + } catch (Exception ex) { + log.error("Unexpected sync stream exception, will restart.", ex); + enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + shouldThrottle.set(true); } } log.info("Shutdown invoked, exiting event stream listener"); } - // TODO: remove the metadata call entirely after - // https://github.com/open-feature/flagd/issues/1584 + /** + * Schedules a retry of the sync stream after the backoff period. + * Uses a non-blocking approach instead of Thread.sleep(). + */ + private void scheduleRetry() { + if (shutdown.get()) { + log.debug("Shutdown in progress, not scheduling retry."); + return; + } + try { + scheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + // Scheduler was shut down after the shutdown check, which is fine + log.debug("Retry scheduling rejected, scheduler is shut down", e); + } + } + + // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584 private Struct getMetadata() { if (syncMetadataDisabled) { return null; } - FlagSyncServiceBlockingStub localStub = grpcComponents.metadataStub; + FlagSyncServiceBlockingStub localStub = metadataStub; if (deadline > 0) { localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); @@ -257,8 +210,7 @@ private Struct getMetadata() { return null; } catch (StatusRuntimeException e) { - // In newer versions of flagd, metadata is part of the sync stream. If the - // method is unimplemented, we + // In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we // can ignore the error if (e.getStatus() != null && Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) { @@ -270,14 +222,12 @@ private Struct getMetadata() { } private void syncFlags(SyncStreamObserver streamObserver) { - FlagSyncServiceStub localStub = grpcComponents.flagSyncStub; // don't mutate the stub + FlagSyncServiceStub localStub = flagSyncStub; // don't mutate the stub if (streamDeadline > 0) { localStub = localStub.withDeadlineAfter(streamDeadline, TimeUnit.MILLISECONDS); } final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder(); - // Selector is now passed via header using ClientInterceptor (see constructor) - // Keeping this for backward compatibility with older flagd versions if (this.selector != null) { syncRequest.setSelector(this.selector); } @@ -291,40 +241,26 @@ private void syncFlags(SyncStreamObserver streamObserver) { streamObserver.done.await(); } - private void handleObserverError(SyncStreamObserver observer) throws InterruptedException { - if (observer.throwable == null) { - return; - } - - Throwable throwable = observer.throwable; - Status status = Status.fromThrowable(throwable); - String message = throwable.getMessage(); - if (fatalStatusCodes.contains(status.getCode().name())) { - shutdown(); - } else { - log.debug("Stream error: {}, will restart", message, throwable); - enqueue(QueuePayload.ERROR); - } - - // Set throttling flag to ensure backoff before retry - this.shouldThrottle.set(true); + private void enqueueError(String message) { + enqueueError(outgoingQueue, message); } - private void enqueue(QueuePayload queuePayload) { - if (!outgoingQueue.offer(queuePayload)) { - log.error("Failed to convey {} status, queue is full", queuePayload.getType()); + private static void enqueueError(BlockingQueue queue, String message) { + if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) { + log.error("Failed to convey ERROR status, queue is full"); } } private static class SyncStreamObserver implements StreamObserver { private final BlockingQueue outgoingQueue; + private final AtomicBoolean shouldThrottle; private final Awaitable done = new Awaitable(); private Struct metadata; - private Throwable throwable; - public SyncStreamObserver(BlockingQueue outgoingQueue) { + public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle) { this.outgoingQueue = outgoingQueue; + this.shouldThrottle = shouldThrottle; } @Override @@ -341,9 +277,16 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) { @Override public void onError(Throwable throwable) { - log.debug("Sync stream error received", throwable); - this.throwable = throwable; - done.wakeup(); + try { + String message = throwable != null ? throwable.getMessage() : "unknown"; + log.debug("Stream error: {}, will restart", message, throwable); + enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + + // Set throttling flag to ensure backoff before retry + this.shouldThrottle.set(true); + } finally { + done.wakeup(); + } } @Override @@ -352,4 +295,4 @@ public void onCompleted() { done.wakeup(); } } -} +} \ No newline at end of file