Skip to content

Commit

Permalink
Merge pull request #688 from nyaruka/fetch_attachment_with_msg_id
Browse files Browse the repository at this point in the history
Send msg id to courier fetch-attachments endpoint
  • Loading branch information
rowanseymour authored Nov 14, 2022
2 parents fd9b784 + 3a02cfe commit 10aad1d
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 14 deletions.
2 changes: 1 addition & 1 deletion core/handlers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func RunTestCases(t *testing.T, ctx context.Context, rt *runtime.Runtime, tcs []
for _, s := range session {
msg := msgsByContactID[s.ContactID()]
if msg != nil {
s.SetIncomingMsg(msg.ID(), "")
s.SetIncomingMsg(models.MsgID(msg.ID()), "")
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func ResumeIVRFlow(
case InputResume:
resume, svcErr, err = buildMsgResume(ctx, rt, svc, channel, contact, urn, call, oa, r, res)
if resume != nil {
session.SetIncomingMsg(resume.(*resumes.MsgResume).Msg().ID(), null.NullString)
session.SetIncomingMsg(models.MsgID(resume.(*resumes.MsgResume).Msg().ID()), null.NullString)
}

case DialResume:
Expand Down
2 changes: 1 addition & 1 deletion core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ RETURNING
`

// UpdateMessage updates a message after handling
func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID, attachments []utils.Attachment, logUUIDs []ChannelLogUUID) error {
func UpdateMessage(ctx context.Context, tx Queryer, msgID MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID, attachments []utils.Attachment, logUUIDs []ChannelLogUUID) error {
_, err := tx.ExecContext(ctx,
`UPDATE
msgs_msg
Expand Down
4 changes: 2 additions & 2 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestNewOutgoingFlowMsg(t *testing.T) {

session := insertTestSession(t, ctx, rt, testdata.Org1, tc.Contact, testdata.Favorites)
if tc.ResponseTo != models.NilMsgID {
session.SetIncomingMsg(flows.MsgID(tc.ResponseTo), null.NullString)
session.SetIncomingMsg(tc.ResponseTo, null.NullString)
}

flowMsg := flows.NewMsgOut(tc.URN, assets.NewChannelReference(tc.ChannelUUID, "Test Channel"), tc.Text, tc.Attachments, tc.QuickReplies, nil, tc.Topic, tc.Unsendable)
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestMarshalMsg(t *testing.T) {
flows.NilUnsendableReason,
)
in1 := testdata.InsertIncomingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "test", models.MsgStatusHandled)
session.SetIncomingMsg(flows.MsgID(in1.ID()), null.String("EX123"))
session.SetIncomingMsg(models.MsgID(in1.ID()), null.String("EX123"))
msg2, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, session, flow, flowMsg2, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC))
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions core/models/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (s *Session) OutputMD5() string {
}

// SetIncomingMsg set the incoming message that this session should be associated with in this sprint
func (s *Session) SetIncomingMsg(id flows.MsgID, externalID null.String) {
s.incomingMsgID = MsgID(id)
func (s *Session) SetIncomingMsg(id MsgID, externalID null.String) {
s.incomingMsgID = id
s.incomingExternalID = externalID
}

Expand Down
4 changes: 3 additions & 1 deletion core/msgio/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type fetchAttachmentRequest struct {
ChannelType models.ChannelType `json:"channel_type"`
ChannelUUID assets.ChannelUUID `json:"channel_uuid"`
URL string `json:"url"`
MsgID models.MsgID `json:"msg_id"`
}

type fetchAttachmentResponse struct {
Expand All @@ -168,11 +169,12 @@ type fetchAttachmentResponse struct {
}

// FetchAttachment calls courier to fetch the given attachment
func FetchAttachment(ctx context.Context, rt *runtime.Runtime, ch *models.Channel, attURL string) (utils.Attachment, models.ChannelLogUUID, error) {
func FetchAttachment(ctx context.Context, rt *runtime.Runtime, ch *models.Channel, attURL string, msgID models.MsgID) (utils.Attachment, models.ChannelLogUUID, error) {
payload := jsonx.MustMarshal(&fetchAttachmentRequest{
ChannelType: ch.Type(),
ChannelUUID: ch.UUID(),
URL: attURL,
MsgID: msgID,
})
req, _ := http.NewRequest("POST", fmt.Sprintf("https://%s/c/_fetch-attachment", rt.Config.Domain), bytes.NewReader(payload))
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", rt.Config.CourierAuthToken))
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestMsgEvents(t *testing.T) {
ContactID: contact.ID,
OrgID: org.ID,
ChannelID: channel.ID,
MsgID: dbMsg.ID(),
MsgID: models.MsgID(dbMsg.ID()),
MsgUUID: dbMsg.UUID(),
URN: contact.URN,
URNID: contact.URNID,
Expand Down Expand Up @@ -647,7 +647,7 @@ func TestTimedEvents(t *testing.T) {
ContactID: tc.Contact.ID,
OrgID: tc.Org.ID,
ChannelID: tc.Channel.ID,
MsgID: flows.MsgID(1),
MsgID: models.MsgID(1),
MsgUUID: flows.MsgUUID(uuids.New()),
URN: tc.Contact.URN,
URNID: tc.Contact.URNID,
Expand Down
8 changes: 4 additions & 4 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
if utils.Attachment(attURL).ContentType() != "" {
attachments = append(attachments, utils.Attachment(attURL))
} else {
attachment, logUUID, err := msgio.FetchAttachment(ctx, rt, channel, attURL)
attachment, logUUID, err := msgio.FetchAttachment(ctx, rt, channel, attURL, event.MsgID)
if err != nil {
return errors.Wrapf(err, "error fetching attachment '%s'", attURL)
}
Expand Down Expand Up @@ -598,7 +598,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e

msgIn := flows.NewMsgIn(event.MsgUUID, event.URN, channel.ChannelReference(), event.Text, availableAttachments)
msgIn.SetExternalID(string(event.MsgExternalID))
msgIn.SetID(event.MsgID)
msgIn.SetID(flows.MsgID(event.MsgID))

// build our hook to mark a flow message as handled
flowMsgHook := func(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, sessions []*models.Session) error {
Expand Down Expand Up @@ -783,7 +783,7 @@ func markMsgHandled(ctx context.Context, db models.Queryer, contact *flows.Conta
flowID = flow.ID()
}

err := models.UpdateMessage(ctx, db, msg.ID(), models.MsgStatusHandled, models.VisibilityVisible, msgType, flowID, attachments, logUUIDs)
err := models.UpdateMessage(ctx, db, models.MsgID(msg.ID()), models.MsgStatusHandled, models.VisibilityVisible, msgType, flowID, attachments, logUUIDs)
if err != nil {
return errors.Wrapf(err, "error marking message as handled")
}
Expand Down Expand Up @@ -813,7 +813,7 @@ type MsgEvent struct {
ContactID models.ContactID `json:"contact_id"`
OrgID models.OrgID `json:"org_id"`
ChannelID models.ChannelID `json:"channel_id"`
MsgID flows.MsgID `json:"msg_id"`
MsgID models.MsgID `json:"msg_id"`
MsgUUID flows.MsgUUID `json:"msg_uuid"`
MsgExternalID null.String `json:"msg_external_id"`
URN urns.URN `json:"urn"`
Expand Down

0 comments on commit 10aad1d

Please sign in to comment.