Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ public class CapturingEventListener extends CountDownEventListener {
@Override
public void onEvent(Event event) {
synchronized (events) {
events.add(event);
if (event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD) {
for (Event uncompressedEvent : ((TransactionPayloadEventData) event.getData()).getUncompressedEvents()) {
events.add(uncompressedEvent);
}
}
else {
events.add(event);
}
super.onEvent(event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import static org.testng.Assert.*;
Expand Down Expand Up @@ -42,10 +43,20 @@ public void testVeryLargeTransactionNear2GB() throws Exception {
throw new SkipException("Transaction compression requires MySQL 8.0.20+");
}

CapturingEventListener capturingEventListener = new CapturingEventListener();
client.registerEventListener(capturingEventListener);
client.unregisterEventListener(eventListener);
client.registerEventListener(eventListener);
// Custom listener that captures TRANSACTION_PAYLOAD events without unpacking them
final List<TransactionPayloadEventData> payloadEvents = new ArrayList<TransactionPayloadEventData>();
CountDownEventListener payloadListener = new CountDownEventListener() {
@Override
public void onEvent(Event event) {
if (event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD) {
synchronized (payloadEvents) {
payloadEvents.add((TransactionPayloadEventData) event.getData());
}
}
super.onEvent(event);
}
};
client.registerEventListener(payloadListener);

try {
// Create table with large BLOB column to generate big transactions
Expand All @@ -60,8 +71,8 @@ public void execute(Statement statement) throws SQLException {
"data3 LONGTEXT)");
}
});
eventListener.waitForAtLeast(EventType.QUERY, 2, BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT);
eventListener.reset();
payloadListener.waitForAtLeast(EventType.QUERY, 2, BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT);
payloadListener.reset();

// Generate large repeating data that compresses well
// We want uncompressed to be ~2-3GB but compressed to stay under 2GB
Expand Down Expand Up @@ -114,12 +125,9 @@ public void execute(Statement statement) throws SQLException {

// Wait for transaction payload event (give it more time for large transactions)
long largeTimeout = BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT * 1000; // 30 seconds
eventListener.waitFor(EventType.TRANSACTION_PAYLOAD, 1, largeTimeout);
payloadListener.waitFor(EventType.TRANSACTION_PAYLOAD, 1, largeTimeout);

// Verify the large payload was handled correctly
List<TransactionPayloadEventData> payloadEvents =
capturingEventListener.getEvents(TransactionPayloadEventData.class);

assertTrue(payloadEvents.size() > 0, "Should have captured TRANSACTION_PAYLOAD event");

TransactionPayloadEventData payloadEventData = payloadEvents.get(0);
Expand Down Expand Up @@ -173,7 +181,7 @@ public void execute(Statement statement) throws SQLException {
System.out.println("===========================================\n");

} finally {
client.unregisterEventListener(capturingEventListener);
client.unregisterEventListener(payloadListener);
}
}
}
Loading