Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable {
* The default maximum allowable size, in bytes, for a batch to be sent.
*/
static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024;
// Temporary workaround: Service Bus enforces a maximum batch payload size of 1 MB that is not
// communicated via the AMQP link's max-message-size property. The link reports the per-message
// limit (up to 100 MB for Premium partitioned), but the broker rejects batch sends above 1 MB.
// This cap is applied only in createMessageBatch(), which is the single enforcement point for
// batch size limits. The sendMessages(iterable) and scheduleMessages(iterable) paths use
// createMessageBatch() internally and are therefore also capped. Single-message paths
// (sendMessage, scheduleMessage) are not capped since individual messages on Premium can
// validly exceed 1 MB up to the per-entity limit.
// Tracked by: https://github.com/Azure/azure-service-bus/issues/708
static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024;
private static final String TRANSACTION_LINK_NAME = "coordinator";
private static final ServiceBusMessage END = new ServiceBusMessage(new byte[0]);
private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions();
Expand Down Expand Up @@ -463,15 +473,15 @@ public Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions
final int maxSize = options.getMaximumSizeInBytes();

return getSendLinkWithRetry("create-batch").flatMap(link -> link.getLinkSize().flatMap(size -> {
final int maximumLinkSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
final int maximumLinkSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES);
if (maxSize > maximumLinkSize) {
return monoError(logger,
new IllegalArgumentException(String.format(Locale.US,
"CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size"
+ " (%s bytes).",
"CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum"
+ " allowed size (%s bytes).",
maxSize, maximumLinkSize)));
}
final int batchSize = maxSize > 0 ? maxSize : maximumLinkSize;
final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize;
return Mono
.just(new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer));
})).onErrorMap(this::mapError);
Expand Down
Loading
Loading