Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to #127 TransactionPayloadEventDataDeserializer use the same CompatibilityMode settings from parent EventDeserializer #138

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,25 @@ private void registerDefaultEventDataDeserializers() {
eventDataDeserializers.put(EventType.MARIADB_GTID_LIST,
new MariadbGtidListEventDataDeserializer());
eventDataDeserializers.put(EventType.TRANSACTION_PAYLOAD,
new TransactionPayloadEventDataDeserializer());
new TransactionPayloadEventDataDeserializer().customizeEventDeserializerSupplier(new TransactionPayloadEventDataDeserializer.Supplier<EventDeserializer>() {
@Override
public EventDeserializer get() {
EventDeserializer eventDeserializer = new EventDeserializer(
eventHeaderDeserializer,
defaultEventDataDeserializer,
eventDataDeserializers,
tableMapEventByTableId
);

if (!compatibilitySet.isEmpty()) {
CompatibilityMode[] compatibilityModeSettings = new CompatibilityMode[compatibilitySet.size()];
compatibilitySet.toArray(compatibilityModeSettings);
eventDeserializer.setCompatibilityMode(compatibilityModeSettings[0], compatibilityModeSettings);
}

return eventDeserializer;
}
}));
}

public void setEventDataDeserializer(EventType eventType, EventDataDeserializer eventDataDeserializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria
public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2;
public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;

private Supplier<EventDeserializer> eventDeserializerSupplier = new Supplier<EventDeserializer>() {
@Override
public EventDeserializer get() {
return new EventDeserializer();
}
};

public TransactionPayloadEventDataDeserializer customizeEventDeserializerSupplier(Supplier<EventDeserializer> supplier) {
this.eventDeserializerSupplier = supplier;
return this;
}

@Override
public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
TransactionPayloadEventData eventData = new TransactionPayloadEventData();
Expand Down Expand Up @@ -86,7 +98,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)

// Read and store events from decompressed byte array into input stream
ArrayList<Event> decompressedEvents = new ArrayList<>();
EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer();
EventDeserializer transactionPayloadEventDeserializer = obtainEventDeserializer();
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst);

Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
Expand All @@ -99,4 +111,15 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)

return eventData;
}

protected EventDeserializer obtainEventDeserializer() {
return eventDeserializerSupplier.get();
}

public interface Supplier<V> {

V get();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
import com.github.shyiko.mysql.binlog.event.XAPrepareEventData;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import org.testng.annotations.Test;

import java.io.IOException;
import java.io.Serializable;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

/**
* @author <a href="mailto:[email protected]">Somesh Malviya</a>
Expand Down Expand Up @@ -82,6 +82,8 @@ public class TransactionPayloadEventDataDeserializerTest {
.append("]}")
.toString();

private static final byte[] UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY = new byte[] {1, 0, 0, 0};

@Test
public void deserialize() throws IOException {
TransactionPayloadEventDataDeserializer deserializer = new TransactionPayloadEventDataDeserializer();
Expand All @@ -97,4 +99,62 @@ public void deserialize() throws IOException {
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
}

@Test
public void deserializeUsingEventDeserializer() throws IOException {

ByteArrayInputStream dataStream = new ByteArrayInputStream(DATA);

// Mock create target TransactionPayloadEventData DATA event header
final EventHeaderV4 eventHeader = new EventHeaderV4();
eventHeader.setEventType(EventType.TRANSACTION_PAYLOAD);
eventHeader.setEventLength(DATA.length + 19L);
eventHeader.setTimestamp(1646406641000L);
eventHeader.setServerId(223344);


EventHeaderDeserializer eventHeaderDeserializer = new EventHeaderDeserializer() {

private long count = 0L;

private EventHeaderDeserializer defaultEventHeaderDeserializer = new EventHeaderV4Deserializer();

@Override
public EventHeader deserialize(ByteArrayInputStream inputStream) throws IOException {
if (count > 0) {
// uncompressed event header deserialize
return defaultEventHeaderDeserializer.deserialize(inputStream);
}
count++;
// we need to return target TransactionPayloadEventData DATA event header we had mocked
return eventHeader;
}
};

EventDeserializer eventDeserializer = new EventDeserializer(eventHeaderDeserializer, new NullEventDataDeserializer());
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.INTEGER_AS_BYTE_ARRAY);

Event event = eventDeserializer.nextEvent(dataStream);

assertTrue(event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD);
assertTrue(event.getData() instanceof TransactionPayloadEventData);

TransactionPayloadEventData transactionPayloadEventData = event.getData();
assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType());
assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize());
assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize());
assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size());
assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType());
assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType());
assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType());
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
// assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
assertTrue(transactionPayloadEventData.getUncompressedEvents().get(2).getData() instanceof UpdateRowsEventData);

UpdateRowsEventData updateRowsEventData = transactionPayloadEventData.getUncompressedEvents().get(2).getData();
assertEquals(1, updateRowsEventData.getRows().size());
Serializable[] updateBefore = updateRowsEventData.getRows().get(0).getKey();
assertEquals(UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY, updateBefore[0]);
}

}