Skip to content

Commit

Permalink
Merge pull request #368 from minekube/fix/chat-out-of-order
Browse files Browse the repository at this point in the history
fix: out-of-order disconnects by using chat queue
  • Loading branch information
robinbraemer authored Aug 6, 2024
2 parents 468513d + 996f54b commit 231cd43
Show file tree
Hide file tree
Showing 13 changed files with 448 additions and 191 deletions.
14 changes: 9 additions & 5 deletions pkg/edition/java/proto/packet/chat/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Builder struct {
Sender uuid.UUID
// Timestamp is the time the message was sent.
Timestamp time.Time
// LastSeenMessages is the last seen messages state of the player.
LastSeenMessages LastSeenMessages
}

// ToClient creates a packet which can be sent to the client;
Expand Down Expand Up @@ -95,14 +97,16 @@ func (b *Builder) ToServer() proto.Packet {
}
}
return &SessionPlayerCommand{
Command: strings.TrimPrefix(b.Message, "/"),
Timestamp: b.Timestamp,
Command: strings.TrimPrefix(b.Message, "/"),
Timestamp: b.Timestamp,
LastSeenMessages: b.LastSeenMessages,
}
}
return &SessionPlayerChat{
Message: b.Message,
Timestamp: b.Timestamp,
Signature: []byte{0},
Message: b.Message,
Timestamp: b.Timestamp,
Signature: []byte{0},
LastSeenMessages: b.LastSeenMessages,
}
} else if b.Protocol.GreaterEqual(version.Minecraft_1_19) { // Keyed chat
if strings.HasPrefix(b.Message, "/") {
Expand Down
5 changes: 1 addition & 4 deletions pkg/edition/java/proto/packet/chat/session_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ var _ proto.Packet = (*ArgumentSignatures)(nil)
var _ proto.Packet = (*ArgumentSignature)(nil)

func (s *SessionPlayerCommand) Signed() bool {
if s.Salt == 0 {
return false
}
return !s.LastSeenMessages.Empty() || len(s.ArgumentSignatures.Entries) != 0
return len(s.ArgumentSignatures.Entries) != 0
}

func (s *SessionPlayerCommand) Encode(c *proto.PacketContext, wr io.Writer) error {
Expand Down
17 changes: 17 additions & 0 deletions pkg/edition/java/proxy/bungeecord/bungee_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
Username() string
RemoteAddr() net.Addr
Disconnect(reason component.Component)
Protocol() proto.Protocol
}
Server interface {
Name() string
Expand Down Expand Up @@ -152,6 +153,8 @@ func (r *bungeeCordMessageResponder) Process(message *plugin.Message) bool {
r.processServerIP(in)
case "KickPlayer":
r.processKick(in)
case "KickPlayerRaw":
r.processKickRaw(in)
default:
// Unknown sub-channel, do nothing
}
Expand Down Expand Up @@ -411,6 +414,20 @@ func (r *bungeeCordMessageResponder) processKick(in io.Reader) {
})
}

func (r *bungeeCordMessageResponder) processKickRaw(in io.Reader) {
r.readPlayer(in, func(player Player) {
msg, err := util.ReadUTF(in)
if err != nil {
return
}
kickReason, err := util.JsonCodec(r.player.Protocol()).Unmarshal([]byte(msg))
if err != nil {
kickReason = &component.Text{} // fallback to blank reason
}
player.Disconnect(kickReason)
})
}

//
//
//
Expand Down
159 changes: 159 additions & 0 deletions pkg/edition/java/proxy/chat_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package proxy

import (
"go.minekube.com/gate/pkg/edition/java/netmc"
"go.minekube.com/gate/pkg/edition/java/proto/packet/chat"
"go.minekube.com/gate/pkg/gate/proto"
"go.minekube.com/gate/pkg/internal/future"
"go.minekube.com/gate/pkg/internal/mathutil"
"sync"
"sync/atomic"
"time"
)

// chatQueue is a precisely ordered queue which allows for outside entries into the ordered queue through piggybacking timestamps.
type chatQueue struct {
internalLock sync.Mutex
player *connectedPlayer
chatState *ChatState
head *future.Future[any]
}

// NewChatQueue instantiates a chatQueue for a specific player.
func newChatQueue(player *connectedPlayer) *chatQueue {
return &chatQueue{
player: player,
chatState: &ChatState{},
head: future.New[any]().Complete(nil),
}
}

func (cq *chatQueue) queueTask(task func(*ChatState, netmc.MinecraftConn) *future.Future[any]) {
cq.internalLock.Lock()
defer cq.internalLock.Unlock()

smc, ok := cq.player.ensureBackendConnection()
if !ok {
return
}
cq.head = future.ThenCompose(cq.head, func(a any) *future.Future[any] {
return task(cq.chatState, smc)
})
}

// QueuePacket queues a packet sent from the player - all packets must wait until this processes to send their packets.
// This maintains order on the server-level for the client insertions of commands and messages. All entries are locked through an internal lock.
//
// - nextPacket: a function mapping LastSeenMessages state to a CompletableFuture that will provide the next-processed packet. This should include the fixed LastSeenMessages.
// - timestamp: the new Instant timestamp of this packet to update the internal chat state.
// - lastSeenMessages: the new LastSeenMessages last seen messages to update the internal chat state.
func (cq *chatQueue) QueuePacket(nextPacket func(*chat.LastSeenMessages) *future.Future[proto.Packet], timestamp time.Time, lastSeenMessages *chat.LastSeenMessages) {
cq.queueTask(func(chatState *ChatState, smc netmc.MinecraftConn) *future.Future[any] {
newLastSeenMessages := chatState.UpdateFromMessage(&timestamp, lastSeenMessages)
return future.ThenCompose(nextPacket(newLastSeenMessages), func(p proto.Packet) *future.Future[any] {
return writePacket(p, smc)
})
})
}

// QueuePacketWithFunction hijacks the latest sent packet's chat state to provide an in-order packet without polling the physical, or prior packets sent through the stream.
func (cq *chatQueue) QueuePacketWithFunction(packetFunction func(*ChatState) proto.Packet) {
cq.queueTask(func(chatState *ChatState, smc netmc.MinecraftConn) *future.Future[any] {
packet := packetFunction(chatState)
return writePacket(packet, smc)
})
}

// HandleAcknowledgement handles the acknowledgement of packets.
func (cq *chatQueue) HandleAcknowledgement(offset int) {
cq.queueTask(func(chatState *ChatState, smc netmc.MinecraftConn) *future.Future[any] {
ackCountToForward := chatState.AccumulateAckCount(offset)
if ackCountToForward > 0 {
return writePacket(&chat.ChatAcknowledgement{Offset: ackCountToForward}, smc)
}
return future.New[any]().Complete(nil)
})
}

func writePacket(packet proto.Packet, smc proto.PacketWriter) *future.Future[any] {
f := future.New[any]()
if packet == nil {
f.Complete(nil)
return f
}
go func() {
_ = smc.WritePacket(packet)
f.Complete(nil)
}()
return f
}

// ChatState tracks the last Secure Chat state that we received from the client. This is important to always have a valid 'last seen' state that is consistent with future and past updates from the client (which may be signed). This state is used to construct 'spoofed' command packets from the proxy to the server.
// - If we last forwarded a chat or command packet from the client, we have a known 'last seen' that we can reuse.
// - If we last forwarded a ChatAcknowledgementPacket, the previous 'last seen' cannot be reused. We cannot predict an up-to-date 'last seen', as we do not know which messages the client actually saw.
// - Therefore, we need to hold back any acknowledgement packets so that we can continue to reuse the last valid 'last seen' state.
// - However, there is a limit to the number of messages that can remain unacknowledged on the server.
// - To address this, we know that if the client has moved its 'last seen' window far enough, we can fill in the gap with dummy 'last seen', and it will never be checked.
//
// Note that this is effectively unused for 1.20.5+ clients, as commands without any signature do not send 'last seen' updates.
type ChatState struct {
lastTimestamp atomic.Pointer[time.Time] // time.Time
lastSeenMessages atomic.Pointer[mathutil.BitSet] // BitSet
delayedAckCount atomic.Int32
}

func (cs *ChatState) LastTimestamp() time.Time {
t := cs.lastTimestamp.Load()
if t == nil {
return time.Time{}
}
return *t
}

const (
lastSeenMessagesWindowSize = 20
minimumDelayedAckCount = lastSeenMessagesWindowSize
)

var (
dummyLastSeenMessages = mathutil.BitSet{}
)

func (cs *ChatState) UpdateFromMessage(timestamp *time.Time, lastSeenMessages *chat.LastSeenMessages) *chat.LastSeenMessages {
if timestamp != nil {
cs.lastTimestamp.Store(timestamp)
}
if lastSeenMessages != nil {
// We held back some acknowledged messages, so flush that out now that we have a known 'last seen' state again
delayedAckCount := cs.delayedAckCount.Swap(0)
cs.lastSeenMessages.Store(&lastSeenMessages.Acknowledged)
return &chat.LastSeenMessages{
Offset: lastSeenMessages.Offset + int(delayedAckCount),
Acknowledged: lastSeenMessages.Acknowledged,
}
}
return nil
}

func (cs *ChatState) AccumulateAckCount(ackCount int) int {
delayedAckCount := cs.delayedAckCount.Add(int32(ackCount))
ackCountToForward := delayedAckCount - minimumDelayedAckCount
if ackCountToForward >= lastSeenMessagesWindowSize {
// Because we only forward acknowledgements above the window size, we don't have to shift the previous 'last seen' state
cs.lastSeenMessages.Store(&dummyLastSeenMessages)
cs.delayedAckCount.Store(minimumDelayedAckCount)
return int(ackCountToForward)
}
return 0
}

func (cs *ChatState) CreateLastSeen() chat.LastSeenMessages {
var lastSeenAck mathutil.BitSet
if ack := cs.lastSeenMessages.Load(); ack != nil {
lastSeenAck = *ack
}
return chat.LastSeenMessages{
Offset: 0,
Acknowledged: lastSeenAck,
}
}
95 changes: 59 additions & 36 deletions pkg/edition/java/proxy/handle_chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.minekube.com/gate/pkg/edition/java/proto/version"
"go.minekube.com/gate/pkg/edition/java/proxy/crypto/keyrevision"
"go.minekube.com/gate/pkg/gate/proto"
"go.minekube.com/gate/pkg/internal/future"
)

type chatHandler struct {
Expand All @@ -24,7 +25,7 @@ type chatHandler struct {
func (c *chatHandler) handleChat(packet proto.Packet) error {
if c.player.Protocol().GreaterEqual(version.Minecraft_1_19_3) {
if p, ok := packet.(*chat.SessionPlayerChat); ok {
return c.handleSessionChat(p)
return c.handleSessionChat(p, false)
}
} else if c.player.Protocol().GreaterEqual(version.Minecraft_1_19) {
if p, ok := packet.(*chat.KeyedPlayerChat); ok {
Expand Down Expand Up @@ -52,19 +53,19 @@ func (c *chatHandler) handleLegacyChat(packet *chat.LegacyChat) error {
return nil
}
return server.WritePacket((&chat.Builder{
Protocol: server.Protocol(),
Protocol: c.player.Protocol(),
Message: evt.Message(),
Sender: evt.player.ID(),
}).ToServer())
}

//type ChatQueue interface {
//type chatQueue interface {
// // Enqueue enqueues a chat message to be sent to the server.
// // The message is sent to the server when the player is connected to the server.
// Enqueue(message string)
//}

func (c *chatHandler) handleSessionChat(packet *chat.SessionPlayerChat) error {
func (c *chatHandler) handleSessionChat(packet *chat.SessionPlayerChat, unsigned bool) error {
server, ok := c.player.ensureBackendConnection()
if !ok {
return nil
Expand All @@ -74,24 +75,35 @@ func (c *chatHandler) handleSessionChat(packet *chat.SessionPlayerChat) error {
original: packet.Message,
}
c.eventMgr.Fire(evt)
if !evt.Allowed() {
if packet.Signed {
c.invalidCancel(c.log, c.player)
}
return nil

asFuture := func(p proto.Packet) *future.Future[proto.Packet] {
return future.New[proto.Packet]().Complete(p)
}
if evt.Message() != packet.Message {
if packet.Signed && c.invalidChange(c.log, c.player) {
return nil

c.player.chatQueue.QueuePacket(func(newLastSeenMessages *chat.LastSeenMessages) *future.Future[proto.Packet] {
if !evt.Allowed() {
if packet.Signed {
c.invalidCancel(c.log, c.player)
}
return asFuture(nil)
}
return server.WritePacket((&chat.Builder{
Protocol: server.Protocol(),
Message: packet.Message,
Sender: c.player.ID(),
Timestamp: packet.Timestamp,
}).ToServer())
}
return server.WritePacket(packet)
if evt.Message() != packet.Message {
if packet.Signed && c.invalidChange(c.log, c.player) {
return nil
}
return asFuture((&chat.Builder{
Protocol: server.Protocol(),
Message: packet.Message,
Sender: c.player.ID(),
Timestamp: packet.Timestamp,
}).ToServer())
}
if newLastSeenMessages == nil && !unsigned {
packet.LastSeenMessages = *newLastSeenMessages
}
return asFuture(packet)
}, packet.Timestamp, &packet.LastSeenMessages)
return nil
}

func (c *chatHandler) handleKeyedChat(packet *chat.KeyedPlayerChat) error {
Expand All @@ -105,23 +117,34 @@ func (c *chatHandler) handleKeyedChat(packet *chat.KeyedPlayerChat) error {
}
c.eventMgr.Fire(evt)

var msg proto.Packet
if c.player.IdentifiedKey() != nil && !packet.Unsigned {
// 1.19->1.19.2 signed version
return c.handleOldSignedChat(server, packet, evt)
}
// 1.19->1.19.2 unsigned version
if !evt.Allowed() {
return nil
}
return server.WritePacket((&chat.Builder{
Protocol: server.Protocol(),
Message: evt.Message(),
Sender: c.player.ID(),
Timestamp: packet.Expiry,
}).ToServer())
msg = c.handleOldSignedChat(server, packet, evt)
} else {
// 1.19->1.19.2 unsigned version
if !evt.Allowed() {
return nil
}
msg = (&chat.Builder{
Protocol: server.Protocol(),
Message: evt.Message(),
Sender: c.player.ID(),
Timestamp: packet.Expiry,
}).ToServer()
}
c.player.chatQueue.QueuePacket(
func(*chat.LastSeenMessages) *future.Future[proto.Packet] {
return future.New[proto.Packet]().Complete(msg)
},
packet.Expiry,
nil,
)

return nil
}

func (c *chatHandler) handleOldSignedChat(server netmc.MinecraftConn, packet *chat.KeyedPlayerChat, event *PlayerChatEvent) error {
func (c *chatHandler) handleOldSignedChat(server netmc.MinecraftConn, packet *chat.KeyedPlayerChat, event *PlayerChatEvent) proto.Packet {
playerKey := c.player.IdentifiedKey()
denyRevision := keyrevision.RevisionIndex(playerKey.KeyRevision()) >= keyrevision.RevisionIndex(keyrevision.LinkedV2)
if !event.Allowed() {
Expand All @@ -138,14 +161,14 @@ func (c *chatHandler) handleOldSignedChat(server netmc.MinecraftConn, packet *ch
return nil
}
c.log.Info("a plugin changed a signed chat message. The server may not accept it")
return server.WritePacket((&chat.Builder{
return (&chat.Builder{
Protocol: server.Protocol(),
Message: event.Message(),
Sender: c.player.ID(),
Timestamp: packet.Expiry,
}).ToServer())
}).ToServer()
}
return server.WritePacket(packet)
return packet
}

func (c *chatHandler) invalidCancel(log logr.Logger, player *connectedPlayer) {
Expand Down
Loading

0 comments on commit 231cd43

Please sign in to comment.