Skip to content

Commit

Permalink
Copying DONT_LOOBACK if set in SERLIALIZE.down() (https://issues.redh…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 5, 2024
1 parent c0be647 commit 3798812
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions src/org/jgroups/protocols/SERIALIZE.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jgroups.protocols;

import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Message;
import org.jgroups.MessageFactory;
Expand All @@ -25,11 +26,8 @@
*/
@MBean(description="Serializes entire message into the payload of another message")
public class SERIALIZE extends Protocol {

protected static final short GMS_ID=ClassConfigurator.getProtocolId(GMS.class);
//@Property(description="If true, messages with no payload will not be serialized")
//protected boolean exclude_empty_msgs=true;
protected MessageFactory mf;
protected MessageFactory mf;

public void init() throws Exception {
super.init();
Expand All @@ -49,6 +47,8 @@ public Object down(Message msg) {
}
// exclude existing headers, they will be seen again when we unmarshal the message at the receiver
Message tmp=new BytesMessage(msg.dest(), serialized_msg).setFlag(msg.getFlags(), false);
if(msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK))
tmp.setFlag(Message.TransientFlag.DONT_LOOPBACK);
GMS.GmsHeader hdr=msg.getHeader(GMS_ID);
if(hdr != null)
tmp.putHeader(GMS_ID, hdr);
Expand All @@ -57,7 +57,7 @@ public Object down(Message msg) {

public Object up(Message msg) {
try {
Message ret=deserialize(msg);
Message ret=deserialize(msg.src(), msg.getArray(), msg.getOffset(), msg.getLength());
return up_prot.up(ret);
}
catch(Exception e) {
Expand All @@ -70,7 +70,7 @@ public void up(MessageBatch batch) {
while(it.hasNext()) {
Message msg=it.next();
try {
Message deserialized_msg=deserialize(msg);
Message deserialized_msg=deserialize(msg.src(), msg.getArray(), msg.getOffset(), msg.getLength());
it.replace(deserialized_msg);
}
catch(Exception e) {
Expand All @@ -82,18 +82,17 @@ public void up(MessageBatch batch) {
up_prot.up(batch);
}


protected Message deserialize(Message msg) throws Exception {
protected Message deserialize(Address sender, byte[] buf, int offset, int length) throws Exception {
try {
Message ret=Util.messageFromBuffer(msg.getArray(), msg.getOffset(), msg.getLength(), mf);
if(ret.getDest() == null)
ret.setDest(msg.getDest());
if(ret.getSrc() == null)
ret.setSrc(msg.getSrc());
return ret;
Message msg=Util.messageFromBuffer(buf, offset, length, mf);
if(msg.getDest() == null)
msg.setDest(msg.getDest());
if(msg.getSrc() == null)
msg.setSrc(msg.getSrc());
return msg;
}
catch(Exception e) {
throw new Exception(String.format("failed deserialize message from %s", msg.getSrc()), e);
throw new Exception(String.format("failed deserialize message from %s", sender), e);
}
}
}

0 comments on commit 3798812

Please sign in to comment.