From e3b2ef6fdd9cc7059afc33546a64b6757e0784e2 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Tue, 3 Mar 2026 16:45:52 -0800 Subject: [PATCH] Cap ServiceBusMessageBatch size at 1 MB to match broker enforcement The Service Bus broker enforces a 1 MB batch size limit regardless of the max-message-size advertised on the AMQP link. Premium partitioned namespaces advertise 100 MB on the link, causing tryAddMessage() to accept batches the broker will reject. Cap batch creation in ServiceBusSenderAsyncClient.createMessageBatch() at 1 MB (MAX_BATCH_SIZE_BYTES). This is the single enforcement point: both sendMessages(iterable) and scheduleMessages(iterable) call createMessageBatch internally. Single-message paths (sendMessage, scheduleMessage) are NOT capped since the 1 MB limit is batch-specific and individual messages on Premium can validly exceed 1 MB up to the per-entity limit. When a user requests a batch size exceeding 1 MB via CreateMessageBatchOptions, throw ServiceBusException. Tracking: azure-service-bus#708 ICM: 51000000793879 --- .../ServiceBusSenderAsyncClient.java | 18 +- .../ServiceBusSenderAsyncClientTest.java | 329 ++++++++++++++++++ 2 files changed, 343 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 58db3b129e65..35abdc92caa5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -222,6 +222,16 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { * The default maximum allowable size, in bytes, for a batch to be sent. */ static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; + // Temporary workaround: Service Bus enforces a maximum batch payload size of 1 MB that is not + // communicated via the AMQP link's max-message-size property. The link reports the per-message + // limit (up to 100 MB for Premium partitioned), but the broker rejects batch sends above 1 MB. + // This cap is applied only in createMessageBatch(), which is the single enforcement point for + // batch size limits. The sendMessages(iterable) and scheduleMessages(iterable) paths use + // createMessageBatch() internally and are therefore also capped. Single-message paths + // (sendMessage, scheduleMessage) are not capped since individual messages on Premium can + // validly exceed 1 MB up to the per-entity limit. + // Tracked by: https://github.com/Azure/azure-service-bus/issues/708 + static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024; private static final String TRANSACTION_LINK_NAME = "coordinator"; private static final ServiceBusMessage END = new ServiceBusMessage(new byte[0]); private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions(); @@ -463,15 +473,15 @@ public Mono createMessageBatch(CreateMessageBatchOptions final int maxSize = options.getMaximumSizeInBytes(); return getSendLinkWithRetry("create-batch").flatMap(link -> link.getLinkSize().flatMap(size -> { - final int maximumLinkSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; + final int maximumLinkSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES); if (maxSize > maximumLinkSize) { return monoError(logger, new IllegalArgumentException(String.format(Locale.US, - "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size" - + " (%s bytes).", + "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum" + + " allowed size (%s bytes).", maxSize, maximumLinkSize))); } - final int batchSize = maxSize > 0 ? maxSize : maximumLinkSize; + final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize; return Mono .just(new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer)); })).onErrorMap(this::mapError); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 20ca897316e9..a170d0e3d3dd 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -75,6 +75,7 @@ import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; +import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_BATCH_SIZE_BYTES; import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -302,6 +303,105 @@ void createsMessageBatchWithSize(boolean isV2) { }).expectComplete().verify(DEFAULT_TIMEOUT); } + /** + * Verifies that the batch max size is capped at MAX_BATCH_SIZE_BYTES (1 MB) when the link reports a larger size. + * This simulates a Premium partitioned namespace where the link advertises up to 100 MB per-message. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchCappedAtMaxBatchSizeWhenLinkReportsLargerSize(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int largeLinkSize = 100 * 1024 * 1024; // 100 MB + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + // Act & Assert + StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { + Assertions.assertEquals(MAX_BATCH_SIZE_BYTES, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + + /** + * Verifies that the batch max size uses the link size when it is smaller than MAX_BATCH_SIZE_BYTES (1 MB). + * This simulates a Standard namespace where the link advertises 256 KB. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchUsesLinkSizeWhenSmallerThanMaxBatchSize(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int smallLinkSize = 256 * 1024; // 256 KB + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(smallLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + // Act & Assert + StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { + Assertions.assertEquals(smallLinkSize, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + + /** + * Verifies that user-specified maxSize exceeding the effective 1 MB cap throws an error. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchWithOptionsExceedingMaxBatchSizeCapThrowsError(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int largeLinkSize = 100 * 1024 * 1024; // 100 MB + int requestedBatchSize = 2 * 1024 * 1024; // 2 MB - exceeds 1 MB cap + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + final CreateMessageBatchOptions options + = new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize); + + // Act & Assert + // The IllegalArgumentException from createMessageBatch is wrapped by mapError into ServiceBusException. + StepVerifier.create(sender.createMessageBatch(options)) + .expectError(ServiceBusException.class) + .verify(DEFAULT_TIMEOUT); + } + + /** + * Verifies that user-specified maxSize smaller than the 1 MB cap is respected. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchWithOptionsSmallerThanMaxBatchSizeCapIsRespected(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int largeLinkSize = 100 * 1024 * 1024; // 100 MB + int requestedBatchSize = 500 * 1024; // 500 KB + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + final CreateMessageBatchOptions options + = new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize); + + // Act & Assert + StepVerifier.create(sender.createMessageBatch(options)).assertNext(batch -> { + Assertions.assertEquals(requestedBatchSize, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + @ParameterizedTest @MethodSource("selectStack") void scheduleMessageSizeTooBig(boolean isV2) { @@ -745,6 +845,31 @@ void sendMessagesList(boolean isV2) { messagesSent.forEach(message -> Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType())); } + /** + * Verifies that sendMessages(Iterable) internally uses createMessageBatch() which caps at + * MAX_BATCH_SIZE_BYTES (1 MB) even when the link reports a much larger size (e.g. 100 MB Premium). + * The sendIterable → sendNextIterableBatch → createMessageBatch() path is covered. + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB - simulates Premium partitioned namespace + final int count = 4; + final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + // Act - sendMessages(Iterable) goes through sendIterable → createMessageBatch() which caps at 1 MB. + // Small messages still fit within the 1 MB cap, so operation completes successfully. + StepVerifier.create(sender.sendMessages(messages)).expectComplete().verify(DEFAULT_TIMEOUT); + } + /** * Verifies that sending multiple message which does not fit in single batch will throw exception. */ @@ -851,6 +976,31 @@ void sendSingleMessage(boolean isV2) { Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType()); } + /** + * Verifies that sendMessage(single) does NOT cap at MAX_BATCH_SIZE_BYTES on a Premium-like link (100 MB). + * The single-message path goes through sendFluxInternal → AmqpMessageCollector which bypasses + * createMessageBatch() and therefore is not subject to the 1 MB batch cap. + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendSingleMessageNotCappedWithLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB + final ServiceBusMessage testData = new ServiceBusMessage(TEST_CONTENTS); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.send(any(org.apache.qpid.proton.message.Message.class))).thenReturn(Mono.empty()); + + // Act - sendMessage(single) should succeed; the raw link size (100 MB) is used, not 1 MB cap. + StepVerifier.create(sender.sendMessage(testData)).expectComplete().verify(DEFAULT_TIMEOUT); + + // Assert - message was sent successfully (no size rejection from the 1 MB cap) + verify(sendLink, times(1)).send(any(org.apache.qpid.proton.message.Message.class)); + } + @ParameterizedTest @MethodSource("selectStack") void scheduleMessage(boolean isV2) { @@ -907,6 +1057,185 @@ void scheduleMessageWithTransaction(boolean isV2) { Assertions.assertEquals(message, actualMessages.get(0)); } + /** + * Verifies that scheduleMessages(Iterable) internally uses createMessageBatch() which caps at + * MAX_BATCH_SIZE_BYTES (1 MB) even when the link reports a much larger size. The capped batch max + * size is passed as maxSize to managementNode.schedule(), NOT the raw 100 MB. + */ + @ParameterizedTest + @MethodSource("selectStack") + void scheduleMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB + final long sequenceNumberReturned = 42L; + final OffsetDateTime instant = mock(OffsetDateTime.class); + final int count = 3; + final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(managementNode.schedule(anyList(), eq(instant), eq(MAX_BATCH_SIZE_BYTES), eq(LINK_NAME), isNull())) + .thenReturn(Flux.fromStream(IntStream.range(0, count).mapToObj(i -> sequenceNumberReturned + i))); + + // Act & Assert - scheduleMessages(Iterable) → createMessageBatch() caps at 1 MB, + // then passes MAX_BATCH_SIZE_BYTES to managementNode.schedule(), NOT the raw 100 MB. + StepVerifier.create(sender.scheduleMessages(messages, instant)) + .expectNextCount(count) + .expectComplete() + .verify(DEFAULT_TIMEOUT); + + verify(managementNode).schedule(anyList(), eq(instant), eq(MAX_BATCH_SIZE_BYTES), eq(LINK_NAME), isNull()); + } + + /** + * Verifies that scheduleMessage(single) passes the raw link size (100 MB) to managementNode.schedule(), + * NOT the 1 MB cap. The single-message schedule path (scheduleMessageInternal) does not go through + * createMessageBatch() and therefore is not capped. + */ + @ParameterizedTest + @MethodSource("selectStack") + void scheduleMessageSingleNotCappedWithLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB + final long sequenceNumberReturned = 10; + final OffsetDateTime instant = mock(OffsetDateTime.class); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(managementNode.schedule(anyList(), eq(instant), eq(largeLinkSize), eq(LINK_NAME), isNull())) + .thenReturn(Flux.just(sequenceNumberReturned)); + + // Act & Assert + StepVerifier.create(sender.scheduleMessage(message, instant)) + .expectNext(sequenceNumberReturned) + .expectComplete() + .verify(DEFAULT_TIMEOUT); + + // Verify managementNode.schedule() received the raw 100 MB link size, not the 1 MB cap + verify(managementNode).schedule(sbMessagesCaptor.capture(), eq(instant), eq(largeLinkSize), eq(LINK_NAME), + isNull()); + List actualMessages = sbMessagesCaptor.getValue(); + Assertions.assertNotNull(actualMessages); + Assertions.assertEquals(1, actualMessages.size()); + Assertions.assertEquals(message, actualMessages.get(0)); + } + + /** + * Verifies that sendMessage(single) with a message larger than 1 MB succeeds on a large link. + * This proves the single-message path (sendFluxInternal -> AmqpMessageCollector) is NOT capped + * at MAX_BATCH_SIZE_BYTES (1 MB). On Premium namespaces with large per-entity limits, individual + * messages exceeding 1 MB are valid. + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendSingleMessageLargerThan1MBSucceedsWithLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + // Create a message with payload > 1 MB. Serialized size will be ~2 MB + AMQP overhead. + final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.send(any(org.apache.qpid.proton.message.Message.class))).thenReturn(Mono.empty()); + + // Act & Assert - A 2 MB message on a 5 MB link succeeds because sendMessage(single) uses the raw + // link size (5 MB) via sendFluxInternal, NOT the 1 MB batch cap. + StepVerifier.create(sender.sendMessage(largeMessage)).expectComplete().verify(DEFAULT_TIMEOUT); + + verify(sendLink, times(1)).send(any(org.apache.qpid.proton.message.Message.class)); + } + + /** + * Verifies that sendMessages(Iterable) rejects a single message larger than 1 MB even on a large link. + * This proves the asymmetry: the same message that succeeds via sendMessage(single) FAILS via + * sendMessages(Iterable) because the iterable path goes through createMessageBatch() which caps + * the batch at MAX_BATCH_SIZE_BYTES (1 MB). + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + // Create a single message with payload > 1 MB. + final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); + final List messages = new ArrayList<>(); + messages.add(largeMessage); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + // Act & Assert - The iterable path uses createMessageBatch() which caps at 1 MB. + // The 2 MB message cannot fit in the 1 MB-capped batch, so it fails. + StepVerifier.create(sender.sendMessages(messages)) + .expectError(ServiceBusException.class) + .verify(DEFAULT_TIMEOUT); + + verify(sendLink, never()).send(anyList()); + verify(sendLink, never()).send(any(org.apache.qpid.proton.message.Message.class)); + } + + /** + * Verifies that scheduleMessages(Iterable) rejects a single message larger than 1 MB on a large link. + * The schedule iterable path goes through createMessageBatch() -> tryAddMessage for all messages. + * Unlike sendMessages(Iterable), scheduleMessages(Iterable) does NOT auto-split: all messages + * must fit in one batch. + */ + @ParameterizedTest + @MethodSource("selectStack") + void scheduleMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + final OffsetDateTime instant = mock(OffsetDateTime.class); + // Create a single message with payload > 1 MB. + final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); + final List messages = new ArrayList<>(); + messages.add(largeMessage); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + // Act & Assert - The schedule iterable path uses createMessageBatch() which caps at 1 MB. + // The 2 MB message cannot fit, so it fails with ServiceBusException. + StepVerifier.create(sender.scheduleMessages(messages, instant)) + .expectError(ServiceBusException.class) + .verify(DEFAULT_TIMEOUT); + + verify(managementNode, never()).schedule(anyList(), any(), anyInt(), any(), any()); + } + + /** + * Verifies that createMessageBatch() falls back to MAX_MESSAGE_LENGTH_BYTES when the link reports + * size 0. The fallback (256 KB) is smaller than MAX_BATCH_SIZE_BYTES (1 MB), so the batch max + * size equals the fallback value. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchFallbackWhenLinkReportsZeroSize(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(0)); // Link reports zero + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + // Act & Assert - When link size is 0, fallback is MAX_MESSAGE_LENGTH_BYTES (256 KB), then + // Math.min(256KB, 1MB) = 256 KB. + StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { + Assertions.assertEquals(MAX_MESSAGE_LENGTH_BYTES, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + @ParameterizedTest @MethodSource("selectStack") void cancelScheduleMessage(boolean isV2) {