From 2ae56e37d0f46df9225a7a6c0a9d572dedb0a0d0 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 09:06:49 -0500 Subject: [PATCH 01/14] feat(upload): add StreamUploadHandler interface for clean upload workflow --- .../pushapiclient/StreamUploadHandler.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java diff --git a/src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java b/src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java new file mode 100644 index 00000000..38648389 --- /dev/null +++ b/src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java @@ -0,0 +1,31 @@ +package com.coveo.pushapiclient; + +import java.io.IOException; +import java.net.http.HttpResponse; + +/** + * Functional interface for stream upload operations with a three-step workflow contract. + * + *

Implementations of this interface handle the complete stream upload workflow: + *

    + *
  1. Create a file container via {@code platformClient.createFileContainer()} + *
  2. Upload content to the container via {@code platformClient.uploadContentToFileContainer()} + *
  3. Push the container content to the stream source via {@code + * platformClient.pushFileContainerContentToStreamSource()} + *
+ * + *

This is an internal implementation detail and should only be used within the package for + * handling stream-specific upload operations. + */ +@FunctionalInterface +interface StreamUploadHandler { + /** + * Handles a stream update by executing the upload and push workflow. + * + * @param stream the {@link StreamUpdate} containing documents and operations to push + * @return the HTTP response from the push operation + * @throws IOException if an I/O error occurs during upload or push operations + * @throws InterruptedException if the operation is interrupted + */ + HttpResponse uploadAndPush(StreamUpdate stream) throws IOException, InterruptedException; +} From f10917a4cf6624be987b9b66851822bbbb319fd8 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 09:09:29 -0500 Subject: [PATCH 02/14] feat(upload): add CatalogStreamUploadHandler with 3-step workflow --- .../CatalogStreamUploadHandler.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java diff --git a/src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java b/src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java new file mode 100644 index 00000000..658d1ba1 --- /dev/null +++ b/src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java @@ -0,0 +1,37 @@ +package com.coveo.pushapiclient; + +import com.google.gson.Gson; +import java.io.IOException; +import java.net.http.HttpResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +class CatalogStreamUploadHandler implements StreamUploadHandler { + private static final Logger logger = LogManager.getLogger(CatalogStreamUploadHandler.class); + private final StreamEnabledSource source; + private final PlatformClient platformClient; + + CatalogStreamUploadHandler(StreamEnabledSource source, PlatformClient platformClient) { + this.source = source; + this.platformClient = platformClient; + } + + @Override + public HttpResponse uploadAndPush(StreamUpdate stream) + throws IOException, InterruptedException { + // Step 1: Create file container + logger.debug("Creating file container for stream upload"); + HttpResponse containerResponse = platformClient.createFileContainer(); + FileContainer container = new Gson().fromJson(containerResponse.body(), FileContainer.class); + + // Step 2: Upload content to container + String batchUpdateJson = new Gson().toJson(stream.marshal()); + logger.debug( + "Uploading stream content to file container: {}", container.fileId); + platformClient.uploadContentToFileContainer(container, batchUpdateJson); + + // Step 3: Push container to stream source + logger.info("Pushing file container to stream source: {}", source.getId()); + return platformClient.pushFileContainerContentToStreamSource(source.getId(), container); + } +} From f873a4a94dc37b280e9dffdeeb9129b182235dcc Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Wed, 4 Feb 2026 09:30:17 -0500 Subject: [PATCH 03/14] refactor(queue): add StreamUploadHandler constructor with single-path flushAndPush - Add handler-based constructor to StreamDocumentUploadQueue - Add no-arg constructor for test compatibility - Extract clearQueue() private method - Add flushAndPush() method with handler delegation - Maintain backward compatibility with uploader-based path --- .../StreamDocumentUploadQueue.java | 42 +++++++ .../CatalogStreamUploadHandlerTest.java | 103 ++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index c4e6ecd9..796cd5ab 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -1,6 +1,7 @@ package com.coveo.pushapiclient; import java.io.IOException; +import java.net.http.HttpResponse; import java.util.ArrayList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -8,10 +9,23 @@ public class StreamDocumentUploadQueue extends DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class); + private StreamUploadHandler streamHandler; protected ArrayList documentToPartiallyUpdateList; public StreamDocumentUploadQueue(UploadStrategy uploader) { super(uploader); + this.streamHandler = null; + this.documentToPartiallyUpdateList = new ArrayList<>(); + } + + public StreamDocumentUploadQueue(StreamUploadHandler handler, int maxQueueSize) { + super(null, maxQueueSize); + this.streamHandler = handler; + this.documentToPartiallyUpdateList = new ArrayList<>(); + } + + public StreamDocumentUploadQueue() { + super(); this.documentToPartiallyUpdateList = new ArrayList<>(); } @@ -32,12 +46,40 @@ public void flush() throws IOException, InterruptedException { logger.info("Uploading document Stream"); this.uploader.apply(stream); + clearQueue(); + } + + private void clearQueue() { this.size = 0; this.documentToAddList.clear(); this.documentToDeleteList.clear(); this.documentToPartiallyUpdateList.clear(); } + /** + * Flushes the accumulated documents and pushes them to the stream endpoint. + * + * @return The HTTP response from the stream endpoint. + * @throws IOException If an I/O error occurs during the upload. + * @throws InterruptedException If the upload process is interrupted. + */ + public HttpResponse flushAndPush() throws IOException, InterruptedException { + if (isEmpty()) { + return null; + } + + if (this.streamHandler == null) { + throw new IllegalStateException( + "No upload handler configured. Use StreamDocumentUploadQueue constructor with StreamUploadHandler parameter."); + } + + StreamUpdate stream = this.getStream(); + logger.info("Flushing and pushing stream batch"); + HttpResponse response = this.streamHandler.uploadAndPush(stream); + clearQueue(); + return response; + } + /** * Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds * the maximum content length. See {@link PartialUpdateDocument#flush}. diff --git a/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java new file mode 100644 index 00000000..f1ffe8ab --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java @@ -0,0 +1,103 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.net.http.HttpResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class CatalogStreamUploadHandlerTest { + @Mock private StreamEnabledSource mockSource; + @Mock private PlatformClient mockPlatformClient; + @Mock private HttpResponse mockContainerResponse; + @Mock private HttpResponse mockPushResponse; + @Mock private StreamUpdate mockStreamUpdate; + + private CatalogStreamUploadHandler handler; + private AutoCloseable closeable; + + @Before + public void setUp() { + closeable = MockitoAnnotations.openMocks(this); + handler = new CatalogStreamUploadHandler(mockSource, mockPlatformClient); + when(mockSource.getId()).thenReturn("test-source-id"); + } + + @After + public void closeService() throws Exception { + closeable.close(); + } + + @Test + public void uploadAndPushShouldExecute3StepWorkflowInOrder() throws IOException, InterruptedException { + when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-container-id\"}"); + when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); + StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + when(mockStreamUpdate.marshal()).thenReturn(mockRecord); + when(mockPlatformClient.pushFileContainerContentToStreamSource(anyString(), any(FileContainer.class))) + .thenReturn(mockPushResponse); + + HttpResponse result = handler.uploadAndPush(mockStreamUpdate); + + InOrder inOrder = inOrder(mockPlatformClient); + inOrder.verify(mockPlatformClient).createFileContainer(); + inOrder.verify(mockPlatformClient).uploadContentToFileContainer(any(FileContainer.class), anyString()); + inOrder.verify(mockPlatformClient).pushFileContainerContentToStreamSource(eq("test-source-id"), any(FileContainer.class)); + assertEquals(mockPushResponse, result); + } + + @Test + public void uploadAndPushShouldReturnPushResponse() throws IOException, InterruptedException { + when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); + when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); + StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + when(mockStreamUpdate.marshal()).thenReturn(mockRecord); + when(mockPlatformClient.pushFileContainerContentToStreamSource(anyString(), any(FileContainer.class))) + .thenReturn(mockPushResponse); + + HttpResponse result = handler.uploadAndPush(mockStreamUpdate); + + assertSame(mockPushResponse, result); + } + + @Test(expected = IOException.class) + public void uploadAndPushShouldPropagateIOExceptionFromCreateFileContainer() + throws IOException, InterruptedException { + when(mockPlatformClient.createFileContainer()).thenThrow(new IOException("Container creation failed")); + + handler.uploadAndPush(mockStreamUpdate); + } + + @Test(expected = IOException.class) + public void uploadAndPushShouldPropagateIOExceptionFromUploadContent() + throws IOException, InterruptedException { + when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); + when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); + StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + when(mockStreamUpdate.marshal()).thenReturn(mockRecord); + when(mockPlatformClient.uploadContentToFileContainer(any(FileContainer.class), anyString())) + .thenThrow(new IOException("Upload failed")); + + handler.uploadAndPush(mockStreamUpdate); + } + + @Test(expected = IOException.class) + public void uploadAndPushShouldPropagateIOExceptionFromPush() + throws IOException, InterruptedException { + when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); + when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); + StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + when(mockStreamUpdate.marshal()).thenReturn(mockRecord); + when(mockPlatformClient.pushFileContainerContentToStreamSource(anyString(), any(FileContainer.class))) + .thenThrow(new IOException("Push failed")); + + handler.uploadAndPush(mockStreamUpdate); + } +} From 85c6bb64317c15a2c5b98520c611018231875d43 Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Wed, 4 Feb 2026 09:30:25 -0500 Subject: [PATCH 04/14] refactor(service): wire UpdateStreamService with handler-based queue - Remove fileContainer management from UpdateStreamService - Remove fileContainer management from UpdateStreamServiceInternal - Create CatalogStreamUploadHandler in UpdateStreamService constructor - Use handler-based queue factory method forStreamSource() - Delegate document operations to queue instead of managing containers - Simplify close() to call queue.flushAndPush() directly - Handler now owns file container lifecycle (separation of concerns) --- .../pushapiclient/UpdateStreamService.java | 21 +++++----- .../UpdateStreamServiceInternal.java | 39 +++---------------- 2 files changed, 14 insertions(+), 46 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 948c3461..ee3ee637 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -12,7 +12,6 @@ public class UpdateStreamService { private final PlatformClient platformClient; private final UpdateStreamServiceInternal updateStreamServiceInternal; - private FileContainer fileContainer; /** * Creates a service to stream your documents to the provided source by interacting with the @@ -77,10 +76,15 @@ public UpdateStreamService( new PlatformClient( source.getApiKey(), source.getOrganizationId(), source.getPlatformUrl(), options); this.platformClient.setUserAgents(userAgents); + + CatalogStreamUploadHandler handler = new CatalogStreamUploadHandler(source, this.platformClient); + int maxQueueSize = DocumentUploadQueue.getConfiguredBatchSize(); + StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(handler, maxQueueSize); + this.updateStreamServiceInternal = new UpdateStreamServiceInternal( source, - new StreamDocumentUploadQueue(this.getUploadStrategy()), + queue, this.platformClient, logger); } @@ -118,7 +122,7 @@ public UpdateStreamService( * @throws IOException If the creation of the file container or adding the document fails. */ public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.addOrUpdate(document); + updateStreamServiceInternal.addOrUpdate(document); } /** @@ -158,7 +162,7 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte */ public void addPartialUpdate(PartialUpdateDocument document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.addPartialUpdate(document); + updateStreamServiceInternal.addPartialUpdate(document); } /** @@ -194,7 +198,7 @@ public void addPartialUpdate(PartialUpdateDocument document) * @throws IOException If the creation of the file container or adding the document fails. */ public void delete(DeleteDocument document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.delete(document); + updateStreamServiceInternal.delete(document); } /** @@ -214,11 +218,4 @@ public HttpResponse close() throws IOException, InterruptedException, NoOpenFileContainerException { return updateStreamServiceInternal.close(); } - - private UploadStrategy getUploadStrategy() { - return (streamUpdate) -> { - String batchUpdateJson = new Gson().toJson(streamUpdate.marshal()); - return this.platformClient.uploadContentToFileContainer(fileContainer, batchUpdateJson); - }; - } } diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java index 32f7ddc9..f5956fe4 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java @@ -12,7 +12,7 @@ class UpdateStreamServiceInternal { private final StreamEnabledSource source; private final PlatformClient platformClient; private final StreamDocumentUploadQueue queue; - private FileContainer fileContainer; + public UpdateStreamServiceInternal( final StreamEnabledSource source, @@ -25,52 +25,23 @@ public UpdateStreamServiceInternal( this.logger = logger; } - public FileContainer addOrUpdate(DocumentBuilder document) + public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); - return this.fileContainer; } - public FileContainer addPartialUpdate(PartialUpdateDocument document) + public void addPartialUpdate(PartialUpdateDocument document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); - return this.fileContainer; } - public FileContainer delete(DeleteDocument document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } + public void delete(DeleteDocument document) throws IOException, InterruptedException { queue.add(document); - return this.fileContainer; } public HttpResponse close() throws IOException, InterruptedException, NoOpenFileContainerException { - return this.pushFileContainer(this.getSourceId()); - } - - private FileContainer createFileContainer() throws IOException, InterruptedException { - this.logger.info("Creating new file container"); - HttpResponse response = this.platformClient.createFileContainer(); - return new Gson().fromJson(response.body(), FileContainer.class); - } - - private HttpResponse pushFileContainer(String sourceId) - throws NoOpenFileContainerException, IOException, InterruptedException { - if (this.fileContainer == null) { - throw new NoOpenFileContainerException( - "No open file container detected. A new container will automatically be created once you start adding, updating or deleting documents."); - } - queue.flush(); - this.logger.info("Pushing to file container " + this.fileContainer.fileId); - return this.platformClient.pushFileContainerContentToStreamSource(sourceId, this.fileContainer); + return queue.flushAndPush(); } private String getSourceId() { From b861894d1a75dd3b6127ca1596fa03feddb58fa9 Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Wed, 4 Feb 2026 09:30:34 -0500 Subject: [PATCH 05/14] test(upload): update queue and service tests for handler-based architecture - Update StreamDocumentUploadQueueTest: mock UploadStrategy - Add test for handler-based constructor path - Update UpdateStreamServiceInternalTest: remove file container tests - Remove obsolete tests for createUploadAndPush() method - Update tests to validate queue delegation pattern - All 154 tests pass with new handler-based queue architecture --- .../StreamDocumentUploadQueueTest.java | 21 ++++++++ .../UpdateStreamServiceInternalTest.java | 54 ++++++++----------- 2 files changed, 43 insertions(+), 32 deletions(-) diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java index 12cd2f54..fa1a59ef 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java @@ -21,6 +21,7 @@ public class StreamDocumentUploadQueueTest { @Mock private UploadStrategy uploadStrategy; + @Mock private StreamUploadHandler mockHandler; @InjectMocks private StreamDocumentUploadQueue queue; @@ -247,4 +248,24 @@ public void getBatchShouldThrowUnsupportedOperationException() { expectedException.expect(UnsupportedOperationException.class); queue.getBatch(); } + + @Test + public void handlerConstructorShouldConfigureHandlerPath() throws IOException, InterruptedException { + StreamDocumentUploadQueue handlerQueue = new StreamDocumentUploadQueue(mockHandler, 5 * 1024 * 1024); + + handlerQueue.add(documentToAdd); + handlerQueue.flushAndPush(); + + verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException { + StreamDocumentUploadQueue handlerQueue = new StreamDocumentUploadQueue(mockHandler, 5 * 1024 * 1024); + + java.net.http.HttpResponse result = handlerQueue.flushAndPush(); + + assertEquals(null, result); + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + } } diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index 726769b8..09aa6303 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -1,5 +1,6 @@ package com.coveo.pushapiclient; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; @@ -70,11 +71,12 @@ public void closeService() throws Exception { } @Test - public void addOrUpdateShouldCreateFileContainer() throws IOException, InterruptedException { + public void addOrUpdateShouldAddDocumentsToQueue() throws IOException, InterruptedException { service.addOrUpdate(documentA); service.addOrUpdate(documentB); - verify(this.platformClient, times(1)).createFileContainer(); + verify(queue, times(1)).add(documentA); + verify(queue, times(1)).add(documentB); } @Test @@ -94,62 +96,50 @@ public void addOrUpdateAndPartialAndDeleteShouldAddDocumentsToQueue() } @Test - public void deleteShouldCreateFileContainer() throws IOException, InterruptedException { + public void deleteShouldAddDocumentsToQueue() throws IOException, InterruptedException { service.delete(deleteDocumentA); service.delete(deleteDocumentB); - verify(this.platformClient, times(1)).createFileContainer(); + verify(queue, times(1)).add(deleteDocumentA); + verify(queue, times(1)).add(deleteDocumentB); } @Test - public void partialUpdateShouldCreateFileContainer() throws IOException, InterruptedException { + public void partialUpdateShouldAddDocumentsToQueue() throws IOException, InterruptedException { service.addPartialUpdate(partialUpdateDocumentA); service.addPartialUpdate(partialUpdateDocumentB); - verify(this.platformClient, times(1)).createFileContainer(); + verify(queue, times(1)).add(partialUpdateDocumentA); + verify(queue, times(1)).add(partialUpdateDocumentB); } @Test - public void closeShouldPushFileContainerOnAddOrUpdate() + public void closeShouldCallFlushAndPush() throws IOException, InterruptedException, NoOpenFileContainerException { service.addOrUpdate(documentA); service.close(); - verify(platformClient, times(1)) - .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); + verify(queue, times(1)).flushAndPush(); } @Test - public void closeShouldPushFileContainerOnDelete() - throws IOException, InterruptedException, NoOpenFileContainerException { - service.delete(deleteDocumentA); - service.close(); - - verify(platformClient, times(1)) - .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); - } - - @Test - public void closeShouldFlushBufferedDocuments() + public void closeShouldReturnFlushAndPushResponse() throws IOException, InterruptedException, NoOpenFileContainerException { + when(queue.flushAndPush()).thenReturn(httpResponse); service.addOrUpdate(documentA); - service.close(); + + HttpResponse result = service.close(); - verify(queue, times(1)).flush(); + assertEquals(httpResponse, result); } @Test - public void shouldLogInfoOnCreateFileContainer() + public void closeShouldReturnNullWhenQueueIsEmpty() throws IOException, InterruptedException, NoOpenFileContainerException { - service.addOrUpdate(documentA); - verify(logger, times(1)).info("Creating new file container"); - service.close(); - verify(logger, times(1)).info("Pushing to file container file-id"); - } + when(queue.flushAndPush()).thenReturn(null); - @Test(expected = NoOpenFileContainerException.class) - public void shouldThrowExceptionOnCloseIfNoOpenFileContainer() - throws IOException, InterruptedException, NoOpenFileContainerException { - service.close(); + HttpResponse result = service.close(); + + assertEquals(null, result); } } From 2beb9fb11d85f8cd9757743ae3adabe4450bc6b4 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 11:30:10 -0500 Subject: [PATCH 06/14] feat(upload): add StreamUploadHandler interface for clean upload workflow --- CONFIGURATION.md | 169 ++++++++++ README.md | 8 + samples/ConfigureBatchSize.class | Bin 0 -> 2106 bytes samples/ConfigureBatchSize.java | 49 +++ .../com/coveo/pushapiclient/PushService.java | 25 +- .../StreamDocumentUploadQueue.java | 7 +- .../coveo/pushapiclient/StreamService.java | 38 ++- .../pushapiclient/UpdateStreamService.java | 70 +++- .../FileContainerRotationIntegrationTest.java | 249 ++++++++++++++ ...StreamDocumentUploadQueueBatchingTest.java | 312 ++++++++++++++++++ .../StreamDocumentUploadQueueTest.java | 23 +- 11 files changed, 901 insertions(+), 49 deletions(-) create mode 100644 CONFIGURATION.md create mode 100644 samples/ConfigureBatchSize.class create mode 100644 samples/ConfigureBatchSize.java create mode 100644 src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java create mode 100644 src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java diff --git a/CONFIGURATION.md b/CONFIGURATION.md new file mode 100644 index 00000000..8e4a1718 --- /dev/null +++ b/CONFIGURATION.md @@ -0,0 +1,169 @@ +# Configuration Guide + +This document describes the available configuration options for the Coveo Push API Java Client. + +## Batch Size Configuration + +The batch size controls how much data is accumulated before creating a file container and pushing to Coveo. The default is **5 MB**. The maximum allowed is **256 MB** (Stream API limit). + +### Configuration Methods + +There are two ways to configure the batch size: + +#### 1. System Property (Runtime Configuration) + +Set the `coveo.push.batchSize` system property to configure the default batch size globally for all service instances: + +**Java Command Line:** + +```bash +java -Dcoveo.push.batchSize=134217728 -jar your-application.jar +``` + +**Within Java Code:** + +```java +// Set before creating any service instances +System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB in bytes +``` + +**Maven/Gradle Build:** + +```xml + + + -Dcoveo.push.batchSize=134217728 + +``` + +```groovy +// build.gradle +test { + systemProperty 'coveo.push.batchSize', '134217728' +} +``` + +**Example Values:** + +- `5242880` = 5 MB (default) +- `268435456` = 256 MB (maximum) +- `134217728` = 128 MB +- `67108864` = 64 MB +- `33554432` = 32 MB +- `10485760` = 10 MB + +#### 2. Constructor Parameter (Per-Instance Configuration) + +Pass the `maxQueueSize` parameter when creating service instances: + +```java +// UpdateStreamService with custom 128 MB batch size +UpdateStreamService service = new UpdateStreamService( + catalogSource, + backoffOptions, + null, // userAgents (optional) + 128 * 1024 * 1024 // 128 MB in bytes +); + +// PushService with custom batch size +PushService pushService = new PushService( + pushEnabledSource, + backoffOptions, + 128 * 1024 * 1024 // 128 MB +); + +// StreamService with custom batch size +StreamService streamService = new StreamService( + streamEnabledSource, + backoffOptions, + null, // userAgents (optional) + 128 * 1024 * 1024 // 128 MB +); +``` + +### Configuration Priority + +When both methods are used: + +1. **Constructor parameter** takes precedence (if specified) +2. **System property** is used as default (if set) +3. **Built-in default** of 5 MB is used otherwise + +### Validation Rules + +All batch size values are validated: + +- ✅ **Maximum:** 256 MB (268,435,456 bytes) - API limit +- ✅ **Minimum:** Greater than 0 +- ❌ Values exceeding 256 MB will throw `IllegalArgumentException` +- ❌ Invalid or negative values will throw `IllegalArgumentException` + +### Examples + +#### Example 1: Using System Property + +```java +// Configure globally via system property +System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB + +// All services will use 128 MB by default +UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions); +PushService pushService = new PushService(pushEnabledSource, backoffOptions); +StreamService streamService = new StreamService(streamEnabledSource, backoffOptions); +``` + +#### Example 2: Override Per Service + +```java +// Set global default to 128 MB +System.setProperty("coveo.push.batchSize", "134217728"); + +// Update service uses global default (128 MB) +UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions); + +// Push service overrides with 64 MB +PushService pushService = new PushService(pushEnabledSource, backoffOptions, 64 * 1024 * 1024); + +// Stream service uses global default (128 MB) +StreamService streamService = new StreamService(streamEnabledSource, backoffOptions); +``` + +### When to Adjust Batch Size + +**Use smaller batches (32-64 MB) when:** + +- Network bandwidth is limited +- Memory is constrained +- Processing many small documents +- You want more frequent progress updates + +**Use larger batches (128-256 MB) when:** + +- Network bandwidth is high +- Processing large documents or files +- You want to minimize API calls +- Maximum throughput is needed + +**Keep default (5 MB) when:** + +- You're unsure +- Memory is a concern +- You want predictable, frequent pushes + +### Configuration Property Reference + +| Property Name | Description | Default Value | Valid Range | +| ---------------------- | --------------------------- | ---------------- | -------------- | +| `coveo.push.batchSize` | Default batch size in bytes | `5242880` (5 MB) | 1 to 268435456 | + +## Additional Configuration + +### Environment Variables + +The following environment variables can be used for general configuration: + +- `COVEO_API_KEY` - API key for authentication +- `COVEO_ORGANIZATION_ID` - Organization identifier +- `COVEO_PLATFORM_URL` - Custom platform URL (if needed) + +Refer to the Coveo Platform documentation for complete environment configuration options. diff --git a/README.md b/README.md index f87a1718..3f5bd445 100644 --- a/README.md +++ b/README.md @@ -200,6 +200,14 @@ public class PushOneDocument { } ``` +## Configuration + +### Batch Size Configuration + +The SDK uses a default batch size of **5 MB** before automatically creating a file container and pushing documents. The maximum allowed batch size is **256 MB** (matching the Coveo Stream API limit). You can configure this globally via system property or per-service via constructor. + +For complete configuration details, examples, and best practices, see **[CONFIGURATION.md](CONFIGURATION.md)**. + ### Exponential Backoff Retry Configuration By default, the SDK leverages an exponential backoff retry mechanism. Exponential backoff allows for the SDK to make multiple attempts to resolve throttled requests, increasing the amount of time to wait for each subsequent attempt. Outgoing requests will retry when a `429` status code is returned from the platform. diff --git a/samples/ConfigureBatchSize.class b/samples/ConfigureBatchSize.class new file mode 100644 index 0000000000000000000000000000000000000000..f5aac56859d6187eb417b732188a371e9fd2bfd0 GIT binary patch literal 2106 zcmb7FZC4sc6n;j81+uM5npo4ukfe#AhPACtTZm07YD0ryMM!L#*kyNsNtRvCvS{t+ z{-OE-dg?j-0r^omy#vNKvONaQnb~={ug`Puynge~zW|nCL@|PZf>9Mggc!zO>btsT z>2_T!R$p?%V+cJEw(!1V2qcmf1!0C+!)a=Uv&$W=)oC~MmM|>AZBN^=ou?ZD1 zTxJ;C6JFzt!cyWwN|L^}hNH_wa0OQtOse=8Qw-C7UT*TbaBPN=a)}}CYhiG$AIB#u zKE-uPgN);?c)>q}BhmNrUgqnDia2gE1gjD-hMB~LRAj=qP0ls6TM3_2mf1n(H)AWuD>BME-=N zCC?RhJ#(QQKZ~cdXZ(Xd<5xz3#xUonvD`9skCSg)Zq zs;yToZVqygNmc@ZI0&n+81DI*T-J?OPOVmKc|^4KarG@C5XBdxpt|=e62tO>6F_%%k&E z#}#2LGfb@S$KP(oJ>gjd#S&IzWUG?y!#)h}Br9S3NagJ{c$>%f9M^2e_k?A|t32N3 zx@$BfUQ%yP#ZP$55SC@TLR4m0dWQ*rnw^WU55u~O0*VZmbki)l19CCU`-5^`NwOl* z{8_~&sQ&~F%V~28hse;cB%6DhOriFOhE3jX5ra5Uyvz{GI(ALeJ1$?=J)=<)e~{p4 z)-h=s%L|(qI?XC~w`BZu(9{KWxvPnHAe*dEzgYd<-~)y#_?6*u-@I_NT=Dos!3%Qj zq{4Hy$6dG6^0;~G&TkA4{V2kZjc5g@*y8q@usFrxk#pQVRzmI!3QQgoiH0LhNiT2{ zTl7ZC-v}sq2+>%f)peQ~&B56o*kAO)@PtNr45*0UDYi*OUOk{)h348Jl@##PKCP2PX)W!cyi)PnjTzW98pZrNbe&3C+W$$g=?6@G&I~mh5}rp fRf+oYQ)0;u?Q$&O7m^GkfMExample batch sizes in bytes: + * + *

+ * + * @param source The source to push documents to. + * @param options The configuration options for exponential backoff. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max: + * 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive. + */ + public PushService(PushEnabledSource source, BackoffOptions options, int maxQueueSize) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); UploadStrategy uploader = this.getUploadStrategy(); - DocumentUploadQueue queue = new DocumentUploadQueue(uploader); + DocumentUploadQueue queue = new DocumentUploadQueue(uploader, maxQueueSize); this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); this.service = new PushServiceInternal(queue); diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index 796cd5ab..5441c793 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -44,7 +44,12 @@ public void flush() throws IOException, InterruptedException { // TODO: LENS-871: support concurrent requests StreamUpdate stream = this.getStream(); logger.info("Uploading document Stream"); - this.uploader.apply(stream); + + if (this.streamHandler != null) { + this.streamHandler.uploadAndPush(stream); + } else { + this.uploader.apply(stream); + } clearQueue(); } diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index eda7bcb3..153576d2 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -27,7 +27,11 @@ public class StreamService { * @param userAgents The user agent to use for the requests. */ public StreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents); + this( + source, + new BackoffOptionsBuilder().build(), + userAgents, + DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -42,7 +46,11 @@ public StreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public StreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build()); + this( + source, + new BackoffOptionsBuilder().build(), + null, + DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -58,7 +66,7 @@ public StreamService(StreamEnabledSource source) { * @param options The configuration options for exponential backoff. */ public StreamService(StreamEnabledSource source, BackoffOptions options) { - this(source, options, null); + this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -70,11 +78,23 @@ public StreamService(StreamEnabledSource source, BackoffOptions options) { * {@StreamService} is equivalent to triggering a full source rebuild. The {@StreamService} can * also be used for an initial catalog upload. * + *

Example batch sizes in bytes: + * + *

+ * * @param source The source to which you want to send your documents. * @param options The configuration options for exponential backoff. * @param userAgents The user agent to use for the requests. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max: + * 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive. */ - public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + public StreamService( + StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); @@ -82,15 +102,13 @@ public StreamService(StreamEnabledSource source, BackoffOptions options, String[ Logger logger = LogManager.getLogger(StreamService.class); this.source = source; - this.queue = new DocumentUploadQueue(uploader); + this.queue = new DocumentUploadQueue(uploader, maxQueueSize); this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); - platformClient.setUserAgents(userAgents); + if (userAgents != null) { + platformClient.setUserAgents(userAgents); + } this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger); } - - /** - * Adds documents to the previously specified source. This function will open a stream before - * uploading documents into it. * *

If called several times, the service will automatically batch documents and create new * stream chunks whenever the data payload exceeds the To perform a full source rebuild, use the + * {@link StreamService}. + * + * @param source The source to push to + * @param options The backoff parameters + * @param userAgents The user-agents to append to the "User-Agent" HTTP header when performing + * requests against the Coveo Platform. + */ + public UpdateStreamService( + StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -65,20 +91,32 @@ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { *

To perform a full source rebuild, use the * {@StreamService} * + *

Example batch sizes in bytes: + * + *

+ * * @param source The source to which you want to send your documents. * @param options The configuration options for exponential backoff. * @param userAgents The user agent to use for the requests. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max: + * 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive. */ public UpdateStreamService( - StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) { Logger logger = LogManager.getLogger(UpdateStreamService.class); this.platformClient = new PlatformClient( source.getApiKey(), source.getOrganizationId(), source.getPlatformUrl(), options); - this.platformClient.setUserAgents(userAgents); + if (userAgents != null) { + this.platformClient.setUserAgents(userAgents); + } CatalogStreamUploadHandler handler = new CatalogStreamUploadHandler(source, this.platformClient); - int maxQueueSize = DocumentUploadQueue.getConfiguredBatchSize(); StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(handler, maxQueueSize); this.updateStreamServiceInternal = @@ -94,10 +132,12 @@ public UpdateStreamService( * open to receive the documents, this function will open a file container before uploading * documents into it. * - *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + *

If called several times, the service will automatically batch documents and create new file + * containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable + * via constructor). Each batch is sent to its own file container and immediately pushed to the + * stream source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file @@ -133,10 +173,12 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte * href="https://docs.coveo.com/en/l62e0540/coveo-for-commerce/how-to-update-your-catalog#partial-item-updates"> * Partial item updates section. * - *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + *

If called several times, the service will automatically batch documents and create new file + * containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable + * via constructor). Each batch is sent to its own file container and immediately pushed to the + * stream source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file diff --git a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java new file mode 100644 index 00000000..a582390f --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java @@ -0,0 +1,249 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +/** + * Integration tests for file container rotation when pushing large amounts of data. These tests + * verify the end-to-end flow from UpdateStreamService through CatalogStreamUploadHandler to + * PlatformClient, using a small batch size to trigger rotation without needing large test data. + * + *

Key architectural pattern: Each batch creates its own file container via + * CatalogStreamUploadHandler. The handler executes create→upload→push for each uploadAndPush() + * call, ensuring container rotation per batch. + */ +public class FileContainerRotationIntegrationTest { + + private static final int SMALL_BATCH_SIZE = 1000; + private static final String SOURCE_ID = "test-source-id"; + private static final String ORG_ID = "test-org"; + private static final String API_KEY = "test-api-key"; + + private PlatformClient platformClient; + private StreamEnabledSource source; + private AtomicInteger containerCounter; + + @Before + public void setUp() throws IOException, InterruptedException { + platformClient = mock(PlatformClient.class); + source = mock(StreamEnabledSource.class); + containerCounter = new AtomicInteger(0); + + doReturn(SOURCE_ID).when(source).getId(); + doReturn(ORG_ID).when(source).getOrganizationId(); + doReturn(API_KEY).when(source).getApiKey(); + doReturn(new PlatformUrl(Environment.PRODUCTION, Region.US)).when(source).getPlatformUrl(); + + doAnswer(invocation -> createContainerResponse()).when(platformClient).createFileContainer(); + doReturn(createGenericResponse()) + .when(platformClient) + .uploadContentToFileContainer(any(), anyString()); + doReturn(createGenericResponse()) + .when(platformClient) + .pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldCreateMultipleContainersWhenDataExceedsBatchSize() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + service.addOrUpdate(createDocument("doc2", 600)); + service.addOrUpdate(createDocument("doc3", 600)); + service.addOrUpdate(createDocument("doc4", 600)); + service.close(); + + verify(platformClient, times(4)).createFileContainer(); + verify(platformClient, times(4)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldCreateSingleContainerWhenDataFitsInOneBatch() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 100)); + service.addOrUpdate(createDocument("doc2", 100)); + service.close(); + + verify(platformClient, times(1)).createFileContainer(); + verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldHandleMixedOperationsWithRotation() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 400)); + service.delete(new DeleteDocument("doc2")); + service.addPartialUpdate(createPartialUpdate("doc3", 400)); + service.addOrUpdate(createDocument("doc4", 400)); + service.close(); + + verify(platformClient, times(3)).createFileContainer(); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldUseUniqueContainerIdForEachBatch() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + service.addOrUpdate(createDocument("doc2", 600)); + service.addOrUpdate(createDocument("doc3", 600)); + service.close(); + + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class); + verify(platformClient, times(3)) + .pushFileContainerContentToStreamSource(anyString(), containerCaptor.capture()); + + assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId); + assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId); + assertEquals("container-3", containerCaptor.getAllValues().get(2).fileId); + } + + @Test + public void shouldPushImmediatelyWhenBatchSizeExceeded() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + verify(platformClient, times(0)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.addOrUpdate(createDocument("doc2", 600)); + verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.addOrUpdate(createDocument("doc3", 600)); + verify(platformClient, times(2)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.close(); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldHandleLargeNumberOfDocumentsWithRotation() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + for (int i = 0; i < 20; i++) { + service.addOrUpdate(createDocument("doc" + i, 200)); + } + service.close(); + + int expectedContainers = 10; + verify(platformClient, times(expectedContainers)).createFileContainer(); + verify(platformClient, times(expectedContainers)) + .pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception { + Map pushCountPerContainer = new HashMap<>(); + List containerCreationOrder = new ArrayList<>(); + + doAnswer( + invocation -> { + HttpResponse response = createContainerResponse(); + String fileId = "container-" + containerCounter.get(); + containerCreationOrder.add(fileId); + pushCountPerContainer.put(fileId, 0); + return response; + }) + .when(platformClient) + .createFileContainer(); + + doAnswer( + invocation -> { + FileContainer container = invocation.getArgument(1); + int currentCount = pushCountPerContainer.getOrDefault(container.fileId, 0); + pushCountPerContainer.put(container.fileId, currentCount + 1); + return createGenericResponse(); + }) + .when(platformClient) + .pushFileContainerContentToStreamSource(anyString(), any()); + + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + for (int i = 0; i < 10; i++) { + service.addOrUpdate(createDocument("doc" + i, 400)); + } + service.close(); + + for (Map.Entry entry : pushCountPerContainer.entrySet()) { + assertEquals( + "Container " + + entry.getKey() + + " should receive exactly 1 push, but received " + + entry.getValue(), + Integer.valueOf(1), + entry.getValue()); + } + + assertTrue("Should have created multiple containers", containerCreationOrder.size() > 1); + } + + private UpdateStreamServiceInternal createServiceWithSmallBatchSize() { + CatalogStreamUploadHandler handler = + new CatalogStreamUploadHandler(source, platformClient); + StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(handler, SMALL_BATCH_SIZE); + org.apache.logging.log4j.Logger logger = + org.apache.logging.log4j.LogManager.getLogger(getClass()); + return new UpdateStreamServiceInternal(source, queue, platformClient, logger); + } + + private DocumentBuilder createDocument(String id, int dataSize) { + return new DocumentBuilder("https://example.com/" + id, "Title " + id) + .withData(generateData(dataSize)); + } + + private PartialUpdateDocument createPartialUpdate(String id, int dataSize) { + return new PartialUpdateDocument( + "https://example.com/" + id, + PartialUpdateOperator.FIELDVALUEREPLACE, + "field", + generateData(dataSize)); + } + + private String generateData(int size) { + byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) { + bytes[i] = 65; + } + return new String(bytes); + } + + @SuppressWarnings("unchecked") + private HttpResponse createContainerResponse() { + HttpResponse response = mock(HttpResponse.class); + int id = containerCounter.incrementAndGet(); + doReturn( + String.format( + "{\"uploadUri\": \"https://upload.uri/container-%d\", \"fileId\": \"container-%d\"}", + id, id)) + .when(response) + .body(); + return response; + } + + @SuppressWarnings("unchecked") + private HttpResponse createGenericResponse() { + HttpResponse response = mock(HttpResponse.class); + doReturn("{\"status\": \"ok\"}").when(response).body(); + return response; + } +} diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java new file mode 100644 index 00000000..9e14e65f --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java @@ -0,0 +1,312 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.http.HttpResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for batch size configuration and auto-flush behavior in StreamDocumentUploadQueue. Each + * batch that exceeds the configured limit should trigger automatic flush and push via the handler. + */ +public class StreamDocumentUploadQueueBatchingTest { + + private static final int SMALL_BATCH_SIZE = 5000; + + @Mock private StreamUploadHandler mockHandler; + @Mock private HttpResponse httpResponse; + + private StreamDocumentUploadQueue queue; + private AutoCloseable closeable; + + @Before + public void setUp() throws Exception { + closeable = MockitoAnnotations.openMocks(this); + queue = new StreamDocumentUploadQueue(mockHandler, SMALL_BATCH_SIZE); + when(mockHandler.uploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse); + } + + @After + public void tearDown() throws Exception { + closeable.close(); + } + + @Test + public void addingDocumentsThatExceedBatchSizeShouldTriggerFlushAndPush() + throws IOException, InterruptedException { + DocumentBuilder doc1 = + new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = + new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + + queue.add(doc1); + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + + queue.add(doc2); + verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void addMultipleSmallDocumentsShouldNotTriggerFlushUntilLimitReached() + throws IOException, InterruptedException { + DocumentBuilder smallDoc1 = new DocumentBuilder("https://doc.uri/1", "Small Doc 1"); + DocumentBuilder smallDoc2 = new DocumentBuilder("https://doc.uri/2", "Small Doc 2"); + + queue.add(smallDoc1); + queue.add(smallDoc2); + + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + assertFalse(queue.isEmpty()); + } + + @Test + public void accumulatedDocumentsExceedingLimitShouldFlushPreviousBatch() + throws IOException, InterruptedException { + DocumentBuilder doc1 = + new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(2000)); + DocumentBuilder doc2 = + new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(2000)); + DocumentBuilder doc3 = + new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(2000)); + + queue.add(doc1); + queue.add(doc2); + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + + queue.add(doc3); + verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class); + verify(mockHandler).uploadAndPush(captor.capture()); + assertEquals(2, captor.getValue().getAddOrUpdate().size()); + } + + @Test + public void multipleBatchesShouldCreateMultipleHandlerCalls() + throws IOException, InterruptedException { + DocumentBuilder doc1 = + new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = + new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + DocumentBuilder doc3 = + new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(3000)); + DocumentBuilder doc4 = + new DocumentBuilder("https://doc.uri/4", "Doc 4").withData(generateData(3000)); + + queue.add(doc1); + queue.add(doc2); + queue.add(doc3); + queue.add(doc4); + + verify(mockHandler, times(3)).uploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void flushAndPushShouldClearQueueAfterBatch() throws IOException, InterruptedException { + DocumentBuilder doc = + new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + queue.add(doc); + assertFalse(queue.isEmpty()); + + queue.flushAndPush(); + + assertTrue(queue.isEmpty()); + } + + @Test + public void flushAndPushShouldReturnResponseFromHandler() + throws IOException, InterruptedException { + DocumentBuilder doc = + new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + queue.add(doc); + + HttpResponse response = queue.flushAndPush(); + + assertEquals(httpResponse, response); + } + + @Test + public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException { + HttpResponse response = queue.flushAndPush(); + + assertNull(response); + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void flushAndPushShouldPassCorrectStreamUpdateToHandler() + throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc"); + DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); + PartialUpdateDocument partialDoc = + new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); + + queue.add(doc); + queue.add(deleteDoc); + queue.add(partialDoc); + + queue.flushAndPush(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class); + verify(mockHandler).uploadAndPush(captor.capture()); + + StreamUpdate captured = captor.getValue(); + assertEquals(1, captured.getAddOrUpdate().size()); + assertEquals(1, captured.getDelete().size()); + assertEquals(1, captured.getPartialUpdate().size()); + } + + @Test + public void deleteDocumentsTriggerFlushWhenExceedingLimit() + throws IOException, InterruptedException { + queue = new StreamDocumentUploadQueue(mockHandler, 50); + when(mockHandler.uploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse); + + DeleteDocument deleteDoc1 = new DeleteDocument("https://doc.uri/1"); + DeleteDocument deleteDoc2 = + new DeleteDocument("https://doc.uri/with/very/long/path/that/exceeds"); + + queue.add(deleteDoc1); + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + + queue.add(deleteDoc2); + verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() + throws IOException, InterruptedException { + PartialUpdateDocument partialDoc1 = + new PartialUpdateDocument( + "https://doc.uri/1", PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v"); + PartialUpdateDocument partialDoc2 = + new PartialUpdateDocument( + "https://doc.uri/2", + PartialUpdateOperator.FIELDVALUEREPLACE, + "field", + generateData(SMALL_BATCH_SIZE)); + + queue.add(partialDoc1); + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + + queue.add(partialDoc2); + verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly() + throws IOException, InterruptedException { + DocumentBuilder doc = + new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500)); + DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); + PartialUpdateDocument partialDoc = + new PartialUpdateDocument( + "https://doc.uri/3", + PartialUpdateOperator.FIELDVALUEREPLACE, + "field", + generateData(4000)); + + queue.add(doc); + queue.add(deleteDoc); + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); + + queue.add(partialDoc); + verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void constructorShouldRejectBatchSizeExceeding256MB() { + int exceeding256MB = 256 * 1024 * 1024 + 1; + try { + new StreamDocumentUploadQueue(mockHandler, exceeding256MB); + throw new AssertionError("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // Expected + } + } + + @Test + public void constructorShouldRejectZeroBatchSize() { + try { + new StreamDocumentUploadQueue(mockHandler, 0); + throw new AssertionError("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // Expected + } + } + + @Test + public void constructorShouldRejectNegativeBatchSize() { + try { + new StreamDocumentUploadQueue(mockHandler, -1); + throw new AssertionError("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // Expected + } + } + + @Test + public void constructorShouldAcceptMaxAllowedBatchSize() { + int max256MB = 256 * 1024 * 1024; + StreamDocumentUploadQueue q = new StreamDocumentUploadQueue(mockHandler, max256MB); + assertNotNull(q); + } + + @Test + public void queueShouldUseSystemPropertyForDefaultBatchSize() { + String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + try { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "1048576"); + int configuredSize = DocumentUploadQueue.getConfiguredBatchSize(); + assertEquals(1048576, configuredSize); + } finally { + if (originalValue != null) { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue); + } else { + System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + } + } + } + + @Test + public void systemPropertyExceeding256MBShouldThrow() { + String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + try { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "268435457"); + DocumentUploadQueue.getConfiguredBatchSize(); + throw new AssertionError("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // Expected + } finally { + if (originalValue != null) { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue); + } else { + System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + } + } + } + + private String generateData(int numBytes) { + if (numBytes <= 0) return ""; + byte[] bytes = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + bytes[i] = 65; + } + return new String(bytes); + } +} diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java index fa1a59ef..7ec0ad04 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java @@ -21,7 +21,6 @@ public class StreamDocumentUploadQueueTest { @Mock private UploadStrategy uploadStrategy; - @Mock private StreamUploadHandler mockHandler; @InjectMocks private StreamDocumentUploadQueue queue; @@ -237,7 +236,7 @@ public void testAddingEmptyDocument() throws IOException, InterruptedException { queue.add(nullDocument); queue.flush(); - + verify(uploadStrategy, times(0)).apply(any(StreamUpdate.class)); } @@ -248,24 +247,4 @@ public void getBatchShouldThrowUnsupportedOperationException() { expectedException.expect(UnsupportedOperationException.class); queue.getBatch(); } - - @Test - public void handlerConstructorShouldConfigureHandlerPath() throws IOException, InterruptedException { - StreamDocumentUploadQueue handlerQueue = new StreamDocumentUploadQueue(mockHandler, 5 * 1024 * 1024); - - handlerQueue.add(documentToAdd); - handlerQueue.flushAndPush(); - - verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class)); - } - - @Test - public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException { - StreamDocumentUploadQueue handlerQueue = new StreamDocumentUploadQueue(mockHandler, 5 * 1024 * 1024); - - java.net.http.HttpResponse result = handlerQueue.flushAndPush(); - - assertEquals(null, result); - verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); - } } From 31ad55be419b92e24f374e58fe8d74177fce7b83 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 12:18:10 -0500 Subject: [PATCH 07/14] feat(upload): add StreamUploadHandler interface for clean upload workflow --- .../com/coveo/pushapiclient/PushService.java | 2 +- .../StreamDocumentUploadQueue.java | 19 ++------------- .../coveo/pushapiclient/StreamService.java | 4 ++++ .../StreamDocumentUploadQueueTest.java | 24 +++++++++---------- 4 files changed, 19 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/PushService.java b/src/main/java/com/coveo/pushapiclient/PushService.java index 62aa4f96..9f7a47ff 100644 --- a/src/main/java/com/coveo/pushapiclient/PushService.java +++ b/src/main/java/com/coveo/pushapiclient/PushService.java @@ -10,7 +10,7 @@ public class PushService { private PushServiceInternal service; public PushService(PushEnabledSource source) { - this(source, new BackoffOptionsBuilder().build(), DocumentUploadQueue.getConfiguredBatchSize()); + this(source, new BackoffOptionsBuilder().build()); } public PushService(PushEnabledSource source, BackoffOptions options) { diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index 5441c793..a1770850 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -12,23 +12,12 @@ public class StreamDocumentUploadQueue extends DocumentUploadQueue { private StreamUploadHandler streamHandler; protected ArrayList documentToPartiallyUpdateList; - public StreamDocumentUploadQueue(UploadStrategy uploader) { - super(uploader); - this.streamHandler = null; - this.documentToPartiallyUpdateList = new ArrayList<>(); - } - public StreamDocumentUploadQueue(StreamUploadHandler handler, int maxQueueSize) { super(null, maxQueueSize); this.streamHandler = handler; this.documentToPartiallyUpdateList = new ArrayList<>(); } - public StreamDocumentUploadQueue() { - super(); - this.documentToPartiallyUpdateList = new ArrayList<>(); - } - /** * Flushes the accumulated documents by applying the upload strategy. * @@ -44,12 +33,8 @@ public void flush() throws IOException, InterruptedException { // TODO: LENS-871: support concurrent requests StreamUpdate stream = this.getStream(); logger.info("Uploading document Stream"); - - if (this.streamHandler != null) { - this.streamHandler.uploadAndPush(stream); - } else { - this.uploader.apply(stream); - } + + this.streamHandler.uploadAndPush(stream); clearQueue(); } diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index 153576d2..cf15268c 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -109,6 +109,10 @@ public StreamService( } this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger); } + + /** + * Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if it exceeds the + * maximum content length. See {@link DocumentUploadQueue#flush}. * *

If called several times, the service will automatically batch documents and create new * stream chunks whenever the data payload exceeds the Date: Wed, 4 Feb 2026 12:39:37 -0500 Subject: [PATCH 08/14] remove flushAndPush --- .../StreamDocumentUploadQueue.java | 36 ++++++------------- .../UpdateStreamServiceInternal.java | 14 ++------ ...StreamDocumentUploadQueueBatchingTest.java | 10 +++--- .../UpdateStreamServiceInternalTest.java | 6 ++-- 4 files changed, 22 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index a1770850..79ff1f04 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -6,11 +6,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class StreamDocumentUploadQueue extends DocumentUploadQueue { +class StreamDocumentUploadQueue extends DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class); private StreamUploadHandler streamHandler; protected ArrayList documentToPartiallyUpdateList; + private HttpResponse lastResponse; public StreamDocumentUploadQueue(StreamUploadHandler handler, int maxQueueSize) { super(null, maxQueueSize); @@ -34,7 +35,7 @@ public void flush() throws IOException, InterruptedException { StreamUpdate stream = this.getStream(); logger.info("Uploading document Stream"); - this.streamHandler.uploadAndPush(stream); + this.lastResponse = this.streamHandler.uploadAndPush(stream); clearQueue(); } @@ -46,29 +47,6 @@ private void clearQueue() { this.documentToPartiallyUpdateList.clear(); } - /** - * Flushes the accumulated documents and pushes them to the stream endpoint. - * - * @return The HTTP response from the stream endpoint. - * @throws IOException If an I/O error occurs during the upload. - * @throws InterruptedException If the upload process is interrupted. - */ - public HttpResponse flushAndPush() throws IOException, InterruptedException { - if (isEmpty()) { - return null; - } - - if (this.streamHandler == null) { - throw new IllegalStateException( - "No upload handler configured. Use StreamDocumentUploadQueue constructor with StreamUploadHandler parameter."); - } - - StreamUpdate stream = this.getStream(); - logger.info("Flushing and pushing stream batch"); - HttpResponse response = this.streamHandler.uploadAndPush(stream); - clearQueue(); - return response; - } /** * Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds @@ -110,4 +88,12 @@ public BatchUpdate getBatch() { public boolean isEmpty() { return super.isEmpty() && documentToPartiallyUpdateList.isEmpty(); } + + /** + * Returns the HTTP response from the last flush operation. + * @return The last response, or null if no flush has occurred or queue was empty. + */ + HttpResponse getLastResponse() { + return this.lastResponse; + } } diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java index f5956fe4..0fee821f 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java @@ -1,16 +1,12 @@ package com.coveo.pushapiclient; import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException; -import com.google.gson.Gson; import java.io.IOException; import java.net.http.HttpResponse; import org.apache.logging.log4j.Logger; /** For internal use only. Made to easily test the service without having to use PowerMock */ class UpdateStreamServiceInternal { - private final Logger logger; - private final StreamEnabledSource source; - private final PlatformClient platformClient; private final StreamDocumentUploadQueue queue; @@ -19,10 +15,7 @@ public UpdateStreamServiceInternal( final StreamDocumentUploadQueue queue, final PlatformClient platformClient, final Logger logger) { - this.source = source; this.queue = queue; - this.platformClient = platformClient; - this.logger = logger; } public void addOrUpdate(DocumentBuilder document) @@ -41,10 +34,7 @@ public void delete(DeleteDocument document) throws IOException, InterruptedExcep public HttpResponse close() throws IOException, InterruptedException, NoOpenFileContainerException { - return queue.flushAndPush(); - } - - private String getSourceId() { - return this.source.getId(); + queue.flush(); + return queue.getLastResponse(); } } diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java index 9e14e65f..021ca8f9 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java @@ -122,7 +122,7 @@ public void flushAndPushShouldClearQueueAfterBatch() throws IOException, Interru queue.add(doc); assertFalse(queue.isEmpty()); - queue.flushAndPush(); + queue.flush(); assertTrue(queue.isEmpty()); } @@ -134,14 +134,16 @@ public void flushAndPushShouldReturnResponseFromHandler() new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); queue.add(doc); - HttpResponse response = queue.flushAndPush(); + queue.flush(); + HttpResponse response = queue.getLastResponse(); assertEquals(httpResponse, response); } @Test public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException { - HttpResponse response = queue.flushAndPush(); + queue.flush(); + HttpResponse response = queue.getLastResponse(); assertNull(response); verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); @@ -160,7 +162,7 @@ public void flushAndPushShouldPassCorrectStreamUpdateToHandler() queue.add(deleteDoc); queue.add(partialDoc); - queue.flushAndPush(); + queue.flush(); ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class); verify(mockHandler).uploadAndPush(captor.capture()); diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index 09aa6303..c4a155b6 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -119,13 +119,13 @@ public void closeShouldCallFlushAndPush() service.addOrUpdate(documentA); service.close(); - verify(queue, times(1)).flushAndPush(); + verify(queue, times(1)).flush(); } @Test public void closeShouldReturnFlushAndPushResponse() throws IOException, InterruptedException, NoOpenFileContainerException { - when(queue.flushAndPush()).thenReturn(httpResponse); + when(queue.getLastResponse()).thenReturn(httpResponse); service.addOrUpdate(documentA); HttpResponse result = service.close(); @@ -136,7 +136,7 @@ public void closeShouldReturnFlushAndPushResponse() @Test public void closeShouldReturnNullWhenQueueIsEmpty() throws IOException, InterruptedException, NoOpenFileContainerException { - when(queue.flushAndPush()).thenReturn(null); + when(queue.getLastResponse()).thenReturn(null); HttpResponse result = service.close(); From 4a0ccba39977490c285ee2e3e722948d02d9a50b Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 12:43:29 -0500 Subject: [PATCH 09/14] add back public class --- .../java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index 79ff1f04..93df4e51 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -6,7 +6,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -class StreamDocumentUploadQueue extends DocumentUploadQueue { +public class StreamDocumentUploadQueue extends DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class); private StreamUploadHandler streamHandler; From c4077cae58317e3582f54969d909f68bdafc54e5 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 12:46:56 -0500 Subject: [PATCH 10/14] remove unused import --- src/main/java/com/coveo/pushapiclient/UpdateStreamService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 7dd4d16a..1d82f90a 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -1,7 +1,6 @@ package com.coveo.pushapiclient; import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException; -import com.google.gson.Gson; import java.io.IOException; import java.net.http.HttpResponse; import org.apache.logging.log4j.LogManager; From d57d6da5d695fd30691f111dac6bedcf5eb9b03e Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 12:49:22 -0500 Subject: [PATCH 11/14] lint --- .../UpdateStreamServiceInternalTest.java | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index c4a155b6..4c43203c 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -21,13 +21,19 @@ public class UpdateStreamServiceInternalTest { private static final String SOURCE_ID = "my-source-id"; - @Mock private StreamEnabledSource source; - @Mock private PlatformClient platformClient; - @Mock private StreamDocumentUploadQueue queue; - @Mock private HttpResponse httpResponse; - @Mock private Logger logger; - - @InjectMocks private UpdateStreamServiceInternal service; + @Mock + private StreamEnabledSource source; + @Mock + private PlatformClient platformClient; + @Mock + private StreamDocumentUploadQueue queue; + @Mock + private HttpResponse httpResponse; + @Mock + private Logger logger; + + @InjectMocks + private UpdateStreamServiceInternal service; private DocumentBuilder documentA; private DocumentBuilder documentB; @@ -44,18 +50,16 @@ public void setUp() throws Exception { documentB = new DocumentBuilder("https://my.document.uri?ref=2", "My second document title"); deleteDocumentA = new DeleteDocument("https://my.document.uri?ref=3"); deleteDocumentB = new DeleteDocument("https://my.document.uri?ref=4"); - partialUpdateDocumentA = - new PartialUpdateDocument( - "https://my.document.uri?ref=5", - PartialUpdateOperator.FIELDVALUEREPLACE, - "fieldA", - "valueA"); - partialUpdateDocumentB = - new PartialUpdateDocument( - "https://my.document.uri?ref=6", - PartialUpdateOperator.FIELDVALUEREPLACE, - "fieldB", - "valueB"); + partialUpdateDocumentA = new PartialUpdateDocument( + "https://my.document.uri?ref=5", + PartialUpdateOperator.FIELDVALUEREPLACE, + "fieldA", + "valueA"); + partialUpdateDocumentB = new PartialUpdateDocument( + "https://my.document.uri?ref=6", + PartialUpdateOperator.FIELDVALUEREPLACE, + "fieldB", + "valueB"); closeable = MockitoAnnotations.openMocks(this); @@ -127,7 +131,7 @@ public void closeShouldReturnFlushAndPushResponse() throws IOException, InterruptedException, NoOpenFileContainerException { when(queue.getLastResponse()).thenReturn(httpResponse); service.addOrUpdate(documentA); - + HttpResponse result = service.close(); assertEquals(httpResponse, result); From 4f808af752b8b53cf9c3bbac1b8a5d9bba5cc085 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 13:11:55 -0500 Subject: [PATCH 12/14] lint --- .../StreamDocumentUploadQueue.java | 1 - .../pushapiclient/UpdateStreamService.java | 13 +++--- .../UpdateStreamServiceInternal.java | 1 - .../CatalogStreamUploadHandlerTest.java | 41 +++++++++++++------ 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index 93df4e51..affd63e3 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -47,7 +47,6 @@ private void clearQueue() { this.documentToPartiallyUpdateList.clear(); } - /** * Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds * the maximum content length. See {@link PartialUpdateDocument#flush}. diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 1d82f90a..ca30de5b 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -114,16 +114,13 @@ public UpdateStreamService( if (userAgents != null) { this.platformClient.setUserAgents(userAgents); } - - CatalogStreamUploadHandler handler = new CatalogStreamUploadHandler(source, this.platformClient); + + CatalogStreamUploadHandler handler = + new CatalogStreamUploadHandler(source, this.platformClient); StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(handler, maxQueueSize); - + this.updateStreamServiceInternal = - new UpdateStreamServiceInternal( - source, - queue, - this.platformClient, - logger); + new UpdateStreamServiceInternal(source, queue, this.platformClient, logger); } /** diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java index 0fee821f..45bd3158 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java @@ -9,7 +9,6 @@ class UpdateStreamServiceInternal { private final StreamDocumentUploadQueue queue; - public UpdateStreamServiceInternal( final StreamEnabledSource source, final StreamDocumentUploadQueue queue, diff --git a/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java index f1ffe8ab..3dbeb920 100644 --- a/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java +++ b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java @@ -4,6 +4,7 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import com.google.gson.JsonObject; import java.io.IOException; import java.net.http.HttpResponse; import org.junit.After; @@ -36,30 +37,40 @@ public void closeService() throws Exception { } @Test - public void uploadAndPushShouldExecute3StepWorkflowInOrder() throws IOException, InterruptedException { + public void uploadAndPushShouldExecute3StepWorkflowInOrder() + throws IOException, InterruptedException { when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-container-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); - StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + StreamUpdateRecord mockRecord = + new StreamUpdateRecord( + new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); - when(mockPlatformClient.pushFileContainerContentToStreamSource(anyString(), any(FileContainer.class))) + when(mockPlatformClient.pushFileContainerContentToStreamSource( + anyString(), any(FileContainer.class))) .thenReturn(mockPushResponse); HttpResponse result = handler.uploadAndPush(mockStreamUpdate); InOrder inOrder = inOrder(mockPlatformClient); inOrder.verify(mockPlatformClient).createFileContainer(); - inOrder.verify(mockPlatformClient).uploadContentToFileContainer(any(FileContainer.class), anyString()); - inOrder.verify(mockPlatformClient).pushFileContainerContentToStreamSource(eq("test-source-id"), any(FileContainer.class)); + inOrder.verify(mockPlatformClient) + .uploadContentToFileContainer(any(FileContainer.class), anyString()); + inOrder.verify(mockPlatformClient) + .pushFileContainerContentToStreamSource(eq("test-source-id"), any(FileContainer.class)); assertEquals(mockPushResponse, result); } @Test - public void uploadAndPushShouldReturnPushResponse() throws IOException, InterruptedException { + public void uploadAndPushShouldReturnPushResponse() + throws IOException, InterruptedException { when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); - StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + StreamUpdateRecord mockRecord = + new StreamUpdateRecord( + new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); - when(mockPlatformClient.pushFileContainerContentToStreamSource(anyString(), any(FileContainer.class))) + when(mockPlatformClient.pushFileContainerContentToStreamSource( + anyString(), any(FileContainer.class))) .thenReturn(mockPushResponse); HttpResponse result = handler.uploadAndPush(mockStreamUpdate); @@ -80,9 +91,12 @@ public void uploadAndPushShouldPropagateIOExceptionFromUploadContent() throws IOException, InterruptedException { when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); - StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + StreamUpdateRecord mockRecord = + new StreamUpdateRecord( + new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); - when(mockPlatformClient.uploadContentToFileContainer(any(FileContainer.class), anyString())) + when(mockPlatformClient.uploadContentToFileContainer( + any(FileContainer.class), anyString())) .thenThrow(new IOException("Upload failed")); handler.uploadAndPush(mockStreamUpdate); @@ -93,9 +107,12 @@ public void uploadAndPushShouldPropagateIOExceptionFromPush() throws IOException, InterruptedException { when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); - StreamUpdateRecord mockRecord = new StreamUpdateRecord(new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}, new com.google.gson.JsonObject[]{}); + StreamUpdateRecord mockRecord = + new StreamUpdateRecord( + new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); - when(mockPlatformClient.pushFileContainerContentToStreamSource(anyString(), any(FileContainer.class))) + when(mockPlatformClient.pushFileContainerContentToStreamSource( + anyString(), any(FileContainer.class))) .thenThrow(new IOException("Push failed")); handler.uploadAndPush(mockStreamUpdate); From 28c02f68e5437946dd5e44fcd9495a43defae071 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 13:38:07 -0500 Subject: [PATCH 13/14] lint --- .../CatalogStreamUploadHandler.java | 3 +- .../StreamDocumentUploadQueue.java | 1 + .../pushapiclient/StreamUploadHandler.java | 1 + .../pushapiclient/UpdateStreamService.java | 1 - .../UpdateStreamServiceInternal.java | 3 +- .../CatalogStreamUploadHandlerTest.java | 37 ++++++++-------- .../FileContainerRotationIntegrationTest.java | 15 +++---- .../StreamDocumentUploadQueueTest.java | 4 +- .../UpdateStreamServiceInternalTest.java | 44 ++++++++----------- 9 files changed, 51 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java b/src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java index 658d1ba1..376f347f 100644 --- a/src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java +++ b/src/main/java/com/coveo/pushapiclient/CatalogStreamUploadHandler.java @@ -26,8 +26,7 @@ public HttpResponse uploadAndPush(StreamUpdate stream) // Step 2: Upload content to container String batchUpdateJson = new Gson().toJson(stream.marshal()); - logger.debug( - "Uploading stream content to file container: {}", container.fileId); + logger.debug("Uploading stream content to file container: {}", container.fileId); platformClient.uploadContentToFileContainer(container, batchUpdateJson); // Step 3: Push container to stream source diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index affd63e3..47bce016 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -90,6 +90,7 @@ public boolean isEmpty() { /** * Returns the HTTP response from the last flush operation. + * * @return The last response, or null if no flush has occurred or queue was empty. */ HttpResponse getLastResponse() { diff --git a/src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java b/src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java index 38648389..2ea65459 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java +++ b/src/main/java/com/coveo/pushapiclient/StreamUploadHandler.java @@ -7,6 +7,7 @@ * Functional interface for stream upload operations with a three-step workflow contract. * *

Implementations of this interface handle the complete stream upload workflow: + * *

    *
  1. Create a file container via {@code platformClient.createFileContainer()} *
  2. Upload content to the container via {@code platformClient.uploadContentToFileContainer()} diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index ca30de5b..69ade903 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -11,7 +11,6 @@ public class UpdateStreamService { private final PlatformClient platformClient; private final UpdateStreamServiceInternal updateStreamServiceInternal; - /** * Creates a service to stream your documents to the provided source by interacting with the * Stream API. This provides the ability to incrementally add, update, or delete documents via a diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java index 45bd3158..275c60c1 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java @@ -17,8 +17,7 @@ public UpdateStreamServiceInternal( this.queue = queue; } - public void addOrUpdate(DocumentBuilder document) - throws IOException, InterruptedException { + public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException { queue.add(document); } diff --git a/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java index 3dbeb920..e148b087 100644 --- a/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java +++ b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java @@ -1,8 +1,12 @@ package com.coveo.pushapiclient; -import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.when; import com.google.gson.JsonObject; import java.io.IOException; @@ -42,8 +46,7 @@ public void uploadAndPushShouldExecute3StepWorkflowInOrder() when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-container-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); StreamUpdateRecord mockRecord = - new StreamUpdateRecord( - new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); + new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); when(mockPlatformClient.pushFileContainerContentToStreamSource( anyString(), any(FileContainer.class))) @@ -53,21 +56,21 @@ public void uploadAndPushShouldExecute3StepWorkflowInOrder() InOrder inOrder = inOrder(mockPlatformClient); inOrder.verify(mockPlatformClient).createFileContainer(); - inOrder.verify(mockPlatformClient) + inOrder + .verify(mockPlatformClient) .uploadContentToFileContainer(any(FileContainer.class), anyString()); - inOrder.verify(mockPlatformClient) + inOrder + .verify(mockPlatformClient) .pushFileContainerContentToStreamSource(eq("test-source-id"), any(FileContainer.class)); assertEquals(mockPushResponse, result); } @Test - public void uploadAndPushShouldReturnPushResponse() - throws IOException, InterruptedException { + public void uploadAndPushShouldReturnPushResponse() throws IOException, InterruptedException { when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); StreamUpdateRecord mockRecord = - new StreamUpdateRecord( - new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); + new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); when(mockPlatformClient.pushFileContainerContentToStreamSource( anyString(), any(FileContainer.class))) @@ -81,7 +84,8 @@ public void uploadAndPushShouldReturnPushResponse() @Test(expected = IOException.class) public void uploadAndPushShouldPropagateIOExceptionFromCreateFileContainer() throws IOException, InterruptedException { - when(mockPlatformClient.createFileContainer()).thenThrow(new IOException("Container creation failed")); + when(mockPlatformClient.createFileContainer()) + .thenThrow(new IOException("Container creation failed")); handler.uploadAndPush(mockStreamUpdate); } @@ -92,11 +96,9 @@ public void uploadAndPushShouldPropagateIOExceptionFromUploadContent() when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); StreamUpdateRecord mockRecord = - new StreamUpdateRecord( - new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); + new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); - when(mockPlatformClient.uploadContentToFileContainer( - any(FileContainer.class), anyString())) + when(mockPlatformClient.uploadContentToFileContainer(any(FileContainer.class), anyString())) .thenThrow(new IOException("Upload failed")); handler.uploadAndPush(mockStreamUpdate); @@ -108,8 +110,7 @@ public void uploadAndPushShouldPropagateIOExceptionFromPush() when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}"); when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse); StreamUpdateRecord mockRecord = - new StreamUpdateRecord( - new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); + new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {}); when(mockStreamUpdate.marshal()).thenReturn(mockRecord); when(mockPlatformClient.pushFileContainerContentToStreamSource( anyString(), any(FileContainer.class))) diff --git a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java index a582390f..d9425ff1 100644 --- a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java +++ b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java @@ -198,8 +198,7 @@ public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception { } private UpdateStreamServiceInternal createServiceWithSmallBatchSize() { - CatalogStreamUploadHandler handler = - new CatalogStreamUploadHandler(source, platformClient); + CatalogStreamUploadHandler handler = new CatalogStreamUploadHandler(source, platformClient); StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(handler, SMALL_BATCH_SIZE); org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(getClass()); @@ -231,12 +230,12 @@ private String generateData(int size) { private HttpResponse createContainerResponse() { HttpResponse response = mock(HttpResponse.class); int id = containerCounter.incrementAndGet(); - doReturn( - String.format( - "{\"uploadUri\": \"https://upload.uri/container-%d\", \"fileId\": \"container-%d\"}", - id, id)) - .when(response) - .body(); + String responseBody = + String.format( + "{\"uploadUri\": \"https://upload.uri/container-%d\", " + + "\"fileId\": \"container-%d\"}", + id, id); + doReturn(responseBody).when(response).body(); return response; } diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java index 3fa8d199..79463fcd 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java @@ -162,7 +162,7 @@ public void testShouldAutomaticallyFlushAccumulatedDocuments() } }); - // Adding 3 documents of 2MB to the queue. After adding the first 2 documents, + // Adding 3 documents of 2MB to the queue. After adding the first 2 documents, // the queue size will reach 6MB, which exceeds the maximum queue size // limit by 1MB. Therefore, the 2 first added documents will automatically be // uploaded to the source. @@ -236,7 +236,7 @@ public void testAddingEmptyDocument() throws IOException, InterruptedException { queue.add(nullDocument); queue.flush(); - + verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class)); } diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index 4c43203c..1052fb21 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -1,8 +1,6 @@ package com.coveo.pushapiclient; import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -21,19 +19,13 @@ public class UpdateStreamServiceInternalTest { private static final String SOURCE_ID = "my-source-id"; - @Mock - private StreamEnabledSource source; - @Mock - private PlatformClient platformClient; - @Mock - private StreamDocumentUploadQueue queue; - @Mock - private HttpResponse httpResponse; - @Mock - private Logger logger; - - @InjectMocks - private UpdateStreamServiceInternal service; + @Mock private StreamEnabledSource source; + @Mock private PlatformClient platformClient; + @Mock private StreamDocumentUploadQueue queue; + @Mock private HttpResponse httpResponse; + @Mock private Logger logger; + + @InjectMocks private UpdateStreamServiceInternal service; private DocumentBuilder documentA; private DocumentBuilder documentB; @@ -50,16 +42,18 @@ public void setUp() throws Exception { documentB = new DocumentBuilder("https://my.document.uri?ref=2", "My second document title"); deleteDocumentA = new DeleteDocument("https://my.document.uri?ref=3"); deleteDocumentB = new DeleteDocument("https://my.document.uri?ref=4"); - partialUpdateDocumentA = new PartialUpdateDocument( - "https://my.document.uri?ref=5", - PartialUpdateOperator.FIELDVALUEREPLACE, - "fieldA", - "valueA"); - partialUpdateDocumentB = new PartialUpdateDocument( - "https://my.document.uri?ref=6", - PartialUpdateOperator.FIELDVALUEREPLACE, - "fieldB", - "valueB"); + partialUpdateDocumentA = + new PartialUpdateDocument( + "https://my.document.uri?ref=5", + PartialUpdateOperator.FIELDVALUEREPLACE, + "fieldA", + "valueA"); + partialUpdateDocumentB = + new PartialUpdateDocument( + "https://my.document.uri?ref=6", + PartialUpdateOperator.FIELDVALUEREPLACE, + "fieldB", + "valueB"); closeable = MockitoAnnotations.openMocks(this); From c65099c5996219379e06edb9a898e64826c42afc Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 4 Feb 2026 15:15:07 -0500 Subject: [PATCH 14/14] fix last response issue --- .../java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index 47bce016..46bd6b1a 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -29,6 +29,7 @@ public StreamDocumentUploadQueue(StreamUploadHandler handler, int maxQueueSize) public void flush() throws IOException, InterruptedException { if (this.isEmpty()) { logger.debug("Empty batch. Skipping upload"); + this.lastResponse = null; return; } // TODO: LENS-871: support concurrent requests