Skip to content

Commit

Permalink
Merge pull request #366 from nyaruka/simpler_runs
Browse files Browse the repository at this point in the history
Simplify `FlowRun` struct
  • Loading branch information
rowanseymour authored Nov 20, 2024
2 parents 55f2de9 + 6808270 commit b7246c8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 65 deletions.
83 changes: 32 additions & 51 deletions core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package models
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -40,44 +39,26 @@ var runStatusMap = map[flows.RunStatus]RunStatus{

// FlowRun is the mailroom type for a FlowRun
type FlowRun struct {
r struct {
ID FlowRunID `db:"id"`
UUID flows.RunUUID `db:"uuid"`
Status RunStatus `db:"status"`
CreatedOn time.Time `db:"created_on"`
ModifiedOn time.Time `db:"modified_on"`
ExitedOn *time.Time `db:"exited_on"`
Responded bool `db:"responded"`
Results string `db:"results"`
Path string `db:"path"`
CurrentNodeUUID null.String `db:"current_node_uuid"`
ContactID flows.ContactID `db:"contact_id"`
FlowID FlowID `db:"flow_id"`
OrgID OrgID `db:"org_id"`
SessionID SessionID `db:"session_id"`
StartID StartID `db:"start_id"`
}
ID FlowRunID `db:"id"`
UUID flows.RunUUID `db:"uuid"`
Status RunStatus `db:"status"`
CreatedOn time.Time `db:"created_on"`
ModifiedOn time.Time `db:"modified_on"`
ExitedOn *time.Time `db:"exited_on"`
Responded bool `db:"responded"`
Results string `db:"results"`
Path string `db:"path"`
CurrentNodeUUID null.String `db:"current_node_uuid"`
ContactID flows.ContactID `db:"contact_id"`
FlowID FlowID `db:"flow_id"`
OrgID OrgID `db:"org_id"`
SessionID SessionID `db:"session_id"`
StartID StartID `db:"start_id"`

// we keep a reference to the engine's run
run flows.Run
}

func (r *FlowRun) SetSessionID(sessionID SessionID) { r.r.SessionID = sessionID }
func (r *FlowRun) SetStartID(startID StartID) { r.r.StartID = startID }
func (r *FlowRun) StartID() StartID { return r.r.StartID }
func (r *FlowRun) UUID() flows.RunUUID { return r.r.UUID }
func (r *FlowRun) ModifiedOn() time.Time { return r.r.ModifiedOn }

// MarshalJSON is our custom marshaller so that our inner struct get output
func (r *FlowRun) MarshalJSON() ([]byte, error) {
return json.Marshal(r.r)
}

// UnmarshalJSON is our custom marshaller so that our inner struct get output
func (r *FlowRun) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, &r.r)
}

// Step represents a single step in a run, this struct is used for serialization to the steps
type Step struct {
UUID flows.StepUUID `json:"uuid"`
Expand Down Expand Up @@ -112,26 +93,26 @@ func newRun(ctx context.Context, tx *sqlx.Tx, oa *OrgAssets, session *Session, f
return nil, fmt.Errorf("unable to load flow with uuid: %s: %w", fr.FlowReference().UUID, err)
}

// create our run
run := &FlowRun{}
r := &run.r
r.UUID = fr.UUID()
r.Status = runStatusMap[fr.Status()]
r.CreatedOn = fr.CreatedOn()
r.ExitedOn = fr.ExitedOn()
r.ModifiedOn = fr.ModifiedOn()
r.ContactID = fr.Contact().ID()
r.FlowID = flowID
r.SessionID = session.ID()
r.StartID = NilStartID
r.OrgID = oa.OrgID()
r.Path = string(jsonx.MustMarshal(path))
r.Results = string(jsonx.MustMarshal(fr.Results()))
r := &FlowRun{
UUID: fr.UUID(),
Status: runStatusMap[fr.Status()],
CreatedOn: fr.CreatedOn(),
ExitedOn: fr.ExitedOn(),
ModifiedOn: fr.ModifiedOn(),
ContactID: fr.Contact().ID(),
FlowID: flowID,
OrgID: oa.OrgID(),
SessionID: session.ID(),
StartID: NilStartID,
Path: string(jsonx.MustMarshal(path)),
Results: string(jsonx.MustMarshal(fr.Results())),

run: fr,
}

if len(path) > 0 {
r.CurrentNodeUUID = null.String(path[len(path)-1].NodeUUID)
}
run.run = fr

// mark ourselves as responded if we received a message
for _, e := range fr.Events() {
Expand All @@ -141,7 +122,7 @@ func newRun(ctx context.Context, tx *sqlx.Tx, oa *OrgAssets, session *Session, f
}
}

return run, nil
return r, nil
}

// GetContactIDsAtNode returns the ids of contacts currently waiting or active at the given flow node
Expand Down
24 changes: 12 additions & 12 deletions core/models/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,17 +399,18 @@ func (s *Session) Update(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx,
}

// figure out which runs are new and which are updated
updatedRuns := make([]any, 0, 1)
newRuns := make([]any, 0)
updatedRuns := make([]*FlowRun, 0, 1)
newRuns := make([]*FlowRun, 0)

for _, r := range s.Runs() {
modified, found := s.seenRuns[r.UUID()]
modified, found := s.seenRuns[r.UUID]
if !found {
newRuns = append(newRuns, &r.r)
newRuns = append(newRuns, r)
continue
}

if r.ModifiedOn().After(modified) {
updatedRuns = append(updatedRuns, &r.r)
if r.ModifiedOn.After(modified) {
updatedRuns = append(updatedRuns, r)
continue
}
}
Expand Down Expand Up @@ -550,7 +551,7 @@ func NewSession(ctx context.Context, tx *sqlx.Tx, oa *OrgAssets, fs flows.Sessio

// set start id if first run of session
if i == 0 && startID != NilStartID {
run.SetStartID(startID)
run.StartID = startID
}

// save the run to our session
Expand Down Expand Up @@ -677,14 +678,13 @@ func InsertSessions(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *O
return nil, fmt.Errorf("error inserting waiting sessions: %w", err)
}

// for each session associate our run with each
runs := make([]any, 0, len(sessions))
// gather all runs across all sessions
runs := make([]*FlowRun, 0, len(sessions))
for _, s := range sessions {
for _, r := range s.runs {
runs = append(runs, &r.r)
r.SessionID = s.ID() // set our session id now that it is written

// set our session id now that it is written
r.SetSessionID(s.ID())
runs = append(runs, r)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/models/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func TestSessionWithSubflows(t *testing.T) {
assert.Nil(t, session.Timeout())

require.Len(t, session.Runs(), 2)
assert.Equal(t, startID, session.Runs()[0].StartID())
assert.Equal(t, models.NilStartID, session.Runs()[1].StartID())
assert.Equal(t, startID, session.Runs()[0].StartID)
assert.Equal(t, models.NilStartID, session.Runs()[1].StartID)

// check that matches what is in the db
assertdb.Query(t, rt.DB, `SELECT status, session_type, current_flow_id, responded, ended_on, wait_resume_on_expire FROM flows_flowsession`).
Expand Down

0 comments on commit b7246c8

Please sign in to comment.