Skip to content

Commit

Permalink
fix(GODT-1760): Fix out of order UID panics
Browse files Browse the repository at this point in the history
Extendend snapshot with `appendMessageFromOtherState` to enable out of
order UID insertion. Please check the comments in
`TestAppendCanHandleOutOfOrderUIDUpdates` for why this is necessary.
  • Loading branch information
LBeernaertProton committed Oct 19, 2022
1 parent 5e4c79b commit a7e75bf
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 36 deletions.
82 changes: 46 additions & 36 deletions internal/state/responders.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ func (u *clearRecentFlagRespUpdate) apply(ctx context.Context, tx *ent.Tx) error
// targetedExists needs to be separate so that we update the targetStateID safely when doing concurrent updates
// in different states. This way we also avoid the extra step of copying the `exists` data.
type targetedExists struct {
resp *exists
targetStateID StateID
resp *exists
targetStateID StateID
originStateID StateID
originStateSet bool
}

func (u *targetedExists) handle(_ context.Context, snap *snapshot, stateID StateID) ([]response.Response, responderDBUpdate, error) {
Expand All @@ -103,8 +105,14 @@ func (u *targetedExists) handle(_ context.Context, snap *snapshot, stateID State
flags = u.resp.flags
}

if err := snap.appendMessage(u.resp.messageID, u.resp.messageUID, flags); err != nil {
return nil, nil, err
if u.originStateSet && u.originStateID == stateID {
if err := snap.appendMessage(u.resp.messageID, u.resp.messageUID, flags); err != nil {
return nil, nil, err
}
} else {
if err := snap.appendMessageFromOtherState(u.resp.messageID, u.resp.messageUID, flags); err != nil {
return nil, nil, err
}
}

seq, err := snap.getMessageSeq(u.resp.messageID.InternalID)
Expand Down Expand Up @@ -135,7 +143,14 @@ func (u *targetedExists) getMessageID() imap.InternalMessageID {
}

func (u *targetedExists) String() string {
return fmt.Sprintf("TargetedExists: message = %v remote = %v targetStateID = %v", u.resp.messageID.InternalID.ShortID(), u.resp.messageID.RemoteID, u.targetStateID)
var originState string
if u.originStateSet {
originState = fmt.Sprintf("%v", u.originStateID)
} else {
originState = "None"
}

return fmt.Sprintf("TargetedExists: message = %v remote = %v targetStateID = %v originStateID = %v", u.resp.messageID.InternalID.ShortID(), u.resp.messageID.RemoteID, u.targetStateID, originState)
}

// ExistsStateUpdate needs to be a separate update since it has to deal with a Recent flag propagation. If a session
Expand All @@ -148,47 +163,32 @@ type ExistsStateUpdate struct {
responders []*exists
targetStateID StateID
targetStateSet bool
}

func NewExistsStateUpdate(mailboxID imap.InternalMailboxID, messages []db.CreateAndAddMessagesResult, s *State) Update {
var stateID StateID

var targetStateSet bool

if s != nil {
stateID = s.StateID
targetStateSet = true
}

responders := xslices.Map(messages, func(result db.CreateAndAddMessagesResult) *exists {
exists := newExists(result.MessageID, result.UID, result.Flags)

return exists
})

return &ExistsStateUpdate{
MBoxIDStateFilter: MBoxIDStateFilter{MboxID: mailboxID},
responders: responders,
targetStateID: stateID,
targetStateSet: targetStateSet,
}
originStateID StateID
originStateSet bool
}

func newExistsStateUpdateWithExists(mailboxID imap.InternalMailboxID, responders []*exists, s *State) Update {
var stateID StateID

var targetStateSet bool
var (
stateID StateID
originStateID StateID
targetStateSet bool
originStateSet bool
)

if s != nil {
stateID = s.StateID
targetStateSet = true
originStateID = s.StateID
originStateSet = true
}

return &ExistsStateUpdate{
MBoxIDStateFilter: MBoxIDStateFilter{MboxID: mailboxID},
responders: responders,
targetStateID: stateID,
targetStateSet: targetStateSet,
originStateSet: originStateSet,
originStateID: originStateID,
}
}

Expand All @@ -210,21 +210,31 @@ func (e *ExistsStateUpdate) Apply(ctx context.Context, tx *ent.Tx, s *State) err
return e.targetStateID
}()

return s.PushResponder(ctx, tx, xslices.Map(e.responders, func(e *exists) Responder {
return s.PushResponder(ctx, tx, xslices.Map(e.responders, func(ex *exists) Responder {
return &targetedExists{
resp: e,
targetStateID: targetStateID,
resp: ex,
targetStateID: targetStateID,
originStateID: e.originStateID,
originStateSet: e.originStateSet,
}
})...)
}

func (e *ExistsStateUpdate) String() string {
return fmt.Sprintf("ExistsStateUpdate: %v Responders = %v targetStateId = %v",
var originState string
if e.originStateSet {
originState = fmt.Sprintf("%v", e.originStateID)
} else {
originState = "None"
}

return fmt.Sprintf("ExistsStateUpdate: %v Responders = %v targetStateID = %v originStateID = %v",
e.MBoxIDStateFilter,
xslices.Map(e.responders, func(rsp *exists) string {
return rsp.String()
}),
e.targetStateID,
originState,
)
}

Expand Down
10 changes: 10 additions & 0 deletions internal/state/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,16 @@ func (snap *snapshot) appendMessage(messageID ids.MessageIDPair, uid imap.UID, f
return nil
}

func (snap *snapshot) appendMessageFromOtherState(messageID ids.MessageIDPair, uid imap.UID, flags imap.FlagSet) error {
snap.messages.insertOutOfOrder(
messageID,
uid,
flags,
)

return nil
}

func (snap *snapshot) expungeMessage(messageID imap.InternalMessageID) error {
if ok := snap.messages.remove(messageID); !ok {
return ErrNoSuchMessage
Expand Down
19 changes: 19 additions & 0 deletions internal/state/snapshot_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package state
import (
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/internal/ids"
"github.com/bradenaw/juniper/xslices"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -59,6 +60,24 @@ func (list *snapMsgList) insert(msgID ids.MessageIDPair, msgUID imap.UID, flags
list.idx[msgID.InternalID] = snapMsg
}

func (list *snapMsgList) insertOutOfOrder(msgID ids.MessageIDPair, msgUID imap.UID, flags imap.FlagSet) {
index, ok := list.binarySearchByUID(msgUID)
if ok {
panic("Duplicate UID added")
}

snapMsg := &snapMsg{
ID: msgID,
UID: msgUID,
flags: flags,
toExpunge: flags.ContainsUnchecked(imap.FlagDeletedLowerCase),
}

list.msg = xslices.Insert(list.msg, index, snapMsg)

list.idx[msgID.InternalID] = snapMsg
}

func (list *snapMsgList) remove(msgID imap.InternalMessageID) bool {
snapshotMsg, ok := list.idx[msgID]
if !ok {
Expand Down
51 changes: 51 additions & 0 deletions tests/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tests
import (
"bytes"
"context"
"fmt"
"github.com/ProtonMail/gluon/connector"
"github.com/ProtonMail/gluon/imap"
"os"
Expand Down Expand Up @@ -261,3 +262,53 @@ func TestAppendConnectorReturnsSameRemoteIDDifferentMBox(t *testing.T) {
}
})
}

func TestAppendCanHandleOutOfOrderUIDUpdates(t *testing.T) {
// Make sure we are correctly handling the case where we have to clients doing append at the same time.
// Both clients append a message and get assigned UID according to whichever got there first:
//
// * Client A -> Append -> UID 1
// * Client B -> Append -> UID 2
//
// All Clients apply their changes to their local state immediately and will receive a deferred updates for the
// same mailbox if other clients make updates.
// In the case of client B, it appends UID2 as the first message and then later receives an update from A with
// an UID lower than the last UID which caused unnecessary panics in the past.
//
runManyToOneTestWithAuth(t, defaultServerOptions(t, withDisableParallelism()), []int{1, 2}, func(c map[int]*testConnection, session *testSession) {
const MessageCount = 20

// Select mailbox so that both clients get updates.
c[1].C("A001 SELECT INBOX").OK("A001")
c[2].C("A002 SELECT INBOX").OK("A002")

appendFN := func(clientIndex int) {
for i := 0; i < MessageCount; i++ {
c[clientIndex+1].doAppend("INBOX", "To: [email protected]\r\n", "\\Seen").expect("OK")
}
}

wg := sync.WaitGroup{}
wg.Add(2)

for i := 0; i < 2; i++ {
go func(index int) {
defer wg.Done()
appendFN(index)
}(i)
}

wg.Wait()

validateUIDListFn := func(index int) {
c[index].C("F001 FETCH 1:* (UID)")
for i := 1; i <= MessageCount; i++ {
c[index].S(fmt.Sprintf("* %v FETCH (UID %v)", i, i))
}
c[index].OK("F001")
}

validateUIDListFn(1)
validateUIDListFn(2)
})
}

0 comments on commit a7e75bf

Please sign in to comment.