Skip to content

Commit

Permalink
Merge branch 'main' into real_elastic_tests_3
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 23, 2023
2 parents dc7984d + 062ec7f commit 6e5b2a5
Show file tree
Hide file tree
Showing 18 changed files with 386 additions and 216 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
v8.1.50 (2023-05-23)
-------------------------
* Remove support for passing URNs to flow/preview_start as that's not a thing we do
* Make the name of the ES index for contacts configurable

v8.1.49 (2023-05-18)
-------------------------
* Remove support for ticket assignment with a note
Expand Down
69 changes: 58 additions & 11 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ func LoadContact(ctx context.Context, db Queryer, oa *OrgAssets, id ContactID) (
// LoadContacts loads a set of contacts for the passed in ids. Note that the order of the returned contacts
// won't necessarily match the order of the ids.
func LoadContacts(ctx context.Context, db Queryer, oa *OrgAssets, ids []ContactID) ([]*Contact, error) {
if len(ids) == 0 {
return nil, nil
}

start := time.Now()

rows, err := db.QueryxContext(ctx, sqlSelectContact, pq.Array(ids), oa.OrgID())
Expand Down Expand Up @@ -386,6 +390,10 @@ func GetContactIDsFromReferences(ctx context.Context, db Queryer, orgID OrgID, r

// gets the contact IDs for the passed in org and set of UUIDs
func getContactIDsFromUUIDs(ctx context.Context, db Queryer, orgID OrgID, uuids []flows.ContactUUID) ([]ContactID, error) {
if len(uuids) == 0 {
return nil, nil
}

ids, err := queryContactIDs(ctx, db, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids))
if err != nil {
return nil, errors.Wrapf(err, "error selecting contact ids by UUID")
Expand Down Expand Up @@ -647,34 +655,40 @@ func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, ur
return contact, flowContact, created, nil
}

// GetOrCreateContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as
// the passed in URNs with the ids of the contacts.
func GetOrCreateContactIDsFromURNs(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]ContactID, error) {
// GetOrCreateContactsFromURNs will fetch or create the contacts for the passed in URNs, returning a map of the fetched
// contacts and another map of the created contacts.
func GetOrCreateContactsFromURNs(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]*Contact, map[urns.URN]*Contact, error) {
// ensure all URNs are normalized
for i, urn := range urnz {
urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry()))
}

// find current owners of these URNs
owners, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz)
owners, err := contactsFromURNs(ctx, db, oa, urnz)
if err != nil {
return nil, errors.Wrapf(err, "error looking up contacts for URNs")
return nil, nil, errors.Wrap(err, "error looking up contacts for URNs")
}

fetched := make(map[urns.URN]*Contact, len(urnz))
created := make(map[urns.URN]*Contact, len(urnz))

// create any contacts that are missing
for urn, contactID := range owners {
if contactID == NilContactID {
for urn, contact := range owners {
if contact == nil {
contact, _, _, err := GetOrCreateContact(ctx, db, oa, []urns.URN{urn}, NilChannelID)
if err != nil {
return nil, errors.Wrapf(err, "error creating contact")
return nil, nil, errors.Wrapf(err, "error creating contact")
}
owners[urn] = contact.ID()
created[urn] = contact
} else {
fetched[urn] = contact
}
}
return owners, nil

return fetched, created, nil
}

// looks up the contacts who own the given urns (which should be normalized by the caller) and returns that information as a map
// looks up the contact IDs who own the given urns (which should be normalized by the caller) and returns that information as a map
func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urns.URN) (map[urns.URN]ContactID, error) {
identityToOriginal := make(map[urns.URN]urns.URN, len(urnz))
identities := make([]urns.URN, len(urnz))
Expand Down Expand Up @@ -705,6 +719,39 @@ func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urn
return owners, nil
}

// like contactIDsFromURNs but fetches the contacts
func contactsFromURNs(ctx context.Context, db Queryer, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]*Contact, error) {
ids, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz)
if err != nil {
return nil, err
}

// get the ids of the contacts that exist
existingIDs := make([]ContactID, 0, len(ids))
for _, id := range ids {
if id != NilContactID {
existingIDs = append(existingIDs, id)
}
}

fetched, err := LoadContacts(ctx, db, oa, existingIDs)
if err != nil {
return nil, errors.Wrap(err, "error loading contacts")
}

// and transform those into a map by URN
fetchedByID := make(map[ContactID]*Contact, len(fetched))
for _, c := range fetched {
fetchedByID[c.ID()] = c
}
byURN := make(map[urns.URN]*Contact, len(ids))
for urn, id := range ids {
byURN[urn] = fetchedByID[id]
}

return byURN, nil
}

func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz []urns.URN, channelID ChannelID) (ContactID, bool, error) {
// find current owners of these URNs
owners, err := contactIDsFromURNs(ctx, db, orgID, urnz)
Expand Down
72 changes: 37 additions & 35 deletions core/models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,59 +358,61 @@ func TestGetOrCreateContactIDsFromURNs(t *testing.T) {

defer testsuite.Reset(testsuite.ResetData)

oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
assert.NoError(t, err)

// add an orphaned URN
testdata.InsertContactURN(rt, testdata.Org1, nil, urns.URN("telegram:200001"), 100)

contactIDSeq := models.ContactID(30000)
newContact := func() models.ContactID { id := contactIDSeq; contactIDSeq++; return id }
prevContact := func() models.ContactID { return contactIDSeq - 1 }

org, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
assert.NoError(t, err)
cathy, _ := testdata.Cathy.Load(rt, oa)

tcs := []struct {
OrgID models.OrgID
URNs []urns.URN
ContactIDs map[urns.URN]models.ContactID
orgID models.OrgID
urns []urns.URN
fetched map[urns.URN]*models.Contact
created []urns.URN
}{
{
testdata.Org1.ID,
[]urns.URN{testdata.Cathy.URN},
map[urns.URN]models.ContactID{testdata.Cathy.URN: testdata.Cathy.ID},
},
{
testdata.Org1.ID,
[]urns.URN{urns.URN(testdata.Cathy.URN.String() + "?foo=bar")},
map[urns.URN]models.ContactID{urns.URN(testdata.Cathy.URN.String() + "?foo=bar"): testdata.Cathy.ID},
orgID: testdata.Org1.ID,
urns: []urns.URN{testdata.Cathy.URN},
fetched: map[urns.URN]*models.Contact{
testdata.Cathy.URN: cathy,
},
created: []urns.URN{},
},
{
testdata.Org1.ID,
[]urns.URN{testdata.Cathy.URN, urns.URN("telegram:100001")},
map[urns.URN]models.ContactID{
testdata.Cathy.URN: testdata.Cathy.ID,
urns.URN("telegram:100001"): newContact(),
orgID: testdata.Org1.ID,
urns: []urns.URN{urns.URN(testdata.Cathy.URN.String() + "?foo=bar")},
fetched: map[urns.URN]*models.Contact{
urns.URN(testdata.Cathy.URN.String() + "?foo=bar"): cathy,
},
created: []urns.URN{},
},
{
testdata.Org1.ID,
[]urns.URN{urns.URN("telegram:100001")},
map[urns.URN]models.ContactID{urns.URN("telegram:100001"): prevContact()},
orgID: testdata.Org1.ID,
urns: []urns.URN{testdata.Cathy.URN, urns.URN("telegram:100001")},
fetched: map[urns.URN]*models.Contact{
testdata.Cathy.URN: cathy,
},
created: []urns.URN{"telegram:100001"},
},
{
testdata.Org1.ID,
[]urns.URN{urns.URN("telegram:200001")},
map[urns.URN]models.ContactID{urns.URN("telegram:200001"): newContact()}, // new contact assigned orphaned URN
orgID: testdata.Org1.ID,
urns: []urns.URN{urns.URN("telegram:200001")},
fetched: map[urns.URN]*models.Contact{},
created: []urns.URN{"telegram:200001"}, // new contact assigned orphaned URN
},
}

for i, tc := range tcs {
ids, err := models.GetOrCreateContactIDsFromURNs(ctx, rt.DB, org, tc.URNs)
fetched, created, err := models.GetOrCreateContactsFromURNs(ctx, rt.DB, oa, tc.urns)
assert.NoError(t, err, "%d: error getting contact ids", i)
assert.Equal(t, tc.ContactIDs, ids, "%d: mismatch in contact ids", i)
assert.Equal(t, tc.fetched, fetched, "%d: fetched contacts mismatch", i)
assert.Equal(t, tc.created, maps.Keys(created), "%d: created contacts mismatch", i)
}
}

func TestGetOrCreateContactIDsFromURNsRace(t *testing.T) {
func TestGetOrCreateContactsFromURNsRace(t *testing.T) {
ctx, rt := testsuite.Runtime()

defer testsuite.Reset(testsuite.ResetData)
Expand All @@ -427,13 +429,13 @@ func TestGetOrCreateContactIDsFromURNsRace(t *testing.T) {
return nil
})

var contacts [2]models.ContactID
var contacts [2]*models.Contact
var errs [2]error

test.RunConcurrently(2, func(i int) {
var cmap map[urns.URN]models.ContactID
cmap, errs[i] = models.GetOrCreateContactIDsFromURNs(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")})
contacts[i] = cmap[urns.URN("telegram:100007")]
var created map[urns.URN]*models.Contact
_, created, errs[i] = models.GetOrCreateContactsFromURNs(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")})
contacts[i] = created[urns.URN("telegram:100007")]
})

require.NoError(t, errs[0])
Expand Down
3 changes: 3 additions & 0 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Exclusions struct {
NotSeenSinceDays int `json:"not_seen_since_days"` // contacts who have not been seen for more than this number of days
}

// NoExclusions is a constant for the empty value
var NoExclusions = Exclusions{}

// Scan supports reading exclusion values from JSON in database
func (e *Exclusions) Scan(value any) error {
if value == nil {
Expand Down
2 changes: 1 addition & 1 deletion core/msgio/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func QueueCourierMessages(rc redis.Conn, oa *models.OrgAssets, contactID models.
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{"msgs": len(batch), "contact_id": contactID, "channel_uuid": channel.UUID(), "elapsed": time.Since(start)}).Info("msgs queued to courier")
logrus.WithFields(logrus.Fields{"msgs": len(batch), "contact_id": contactID, "channel_uuid": channel.UUID(), "elapsed": time.Since(start)}).Debug("msgs queued to courier")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion core/search/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSmartGroups(t *testing.T) {
testdata.Bob.ID,
)

testsuite.ReindexElastic()
testsuite.ReindexElastic(ctx)

oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshCampaigns|models.RefreshGroups)
assert.NoError(t, err)
Expand Down
14 changes: 6 additions & 8 deletions core/search/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/contactql"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/flows"
Expand All @@ -13,7 +12,7 @@ import (
)

// BuildStartQuery builds a start query for the given flow and start options
func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, urnz []urns.URN, userQuery string, excs models.Exclusions) (string, error) {
func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, userQuery string, excs models.Exclusions, excGroups []*models.Group) (string, error) {
var parsedQuery *contactql.ContactQuery
var err error

Expand All @@ -24,10 +23,10 @@ func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, groups []*models.G
}
}

return contactql.Stringify(buildStartQuery(oa.Env(), flow, groups, contactUUIDs, urnz, parsedQuery, excs)), nil
return contactql.Stringify(buildStartQuery(oa.Env(), flow, groups, contactUUIDs, parsedQuery, excs, excGroups)), nil
}

func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, urnz []urns.URN, userQuery *contactql.ContactQuery, excs models.Exclusions) contactql.QueryNode {
func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, userQuery *contactql.ContactQuery, excs models.Exclusions, excGroups []*models.Group) contactql.QueryNode {
inclusions := make([]contactql.QueryNode, 0, 10)

for _, group := range groups {
Expand All @@ -36,10 +35,6 @@ func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.G
for _, contactUUID := range contactUUIDs {
inclusions = append(inclusions, contactql.NewCondition("uuid", contactql.PropertyTypeAttribute, contactql.OpEqual, string(contactUUID)))
}
for _, urn := range urnz {
scheme, path, _, _ := urn.ToParts()
inclusions = append(inclusions, contactql.NewCondition(scheme, contactql.PropertyTypeScheme, contactql.OpEqual, path))
}
if userQuery != nil {
inclusions = append(inclusions, userQuery.Root())
}
Expand All @@ -58,6 +53,9 @@ func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.G
seenSince := dates.Now().Add(-time.Hour * time.Duration(24*excs.NotSeenSinceDays))
exclusions = append(exclusions, contactql.NewCondition("last_seen_on", contactql.PropertyTypeAttribute, contactql.OpGreaterThan, formatQueryDate(env, seenSince)))
}
for _, group := range excGroups {
exclusions = append(exclusions, contactql.NewCondition("group", contactql.PropertyTypeAttribute, contactql.OpNotEqual, group.Name()))
}

return contactql.NewBoolCombination(contactql.BoolOperatorAnd,
contactql.NewBoolCombination(contactql.BoolOperatorOr, inclusions...),
Expand Down
24 changes: 11 additions & 13 deletions core/search/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/search"
Expand All @@ -29,32 +28,31 @@ func TestBuildStartQuery(t *testing.T) {
testers := oa.GroupByID(testdata.TestersGroup.ID)

tcs := []struct {
groups []*models.Group
contactUUIDs []flows.ContactUUID
urns []urns.URN
userQuery string
exclusions models.Exclusions
expected string
err string
groups []*models.Group
contactUUIDs []flows.ContactUUID
userQuery string
exclusions models.Exclusions
excludeGroups []*models.Group
expected string
err string
}{
{
groups: []*models.Group{doctors, testers},
contactUUIDs: []flows.ContactUUID{testdata.Cathy.UUID, testdata.George.UUID},
urns: []urns.URN{"tel:+1234567890", "telegram:9876543210"},
exclusions: models.Exclusions{},
expected: `group = "Doctors" OR group = "Testers" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf" OR uuid = "8d024bcd-f473-4719-a00a-bd0bb1190135" OR tel = "+1234567890" OR telegram = 9876543210`,
expected: `group = "Doctors" OR group = "Testers" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf" OR uuid = "8d024bcd-f473-4719-a00a-bd0bb1190135"`,
},
{
groups: []*models.Group{doctors},
contactUUIDs: []flows.ContactUUID{testdata.Cathy.UUID},
urns: []urns.URN{"tel:+1234567890"},
exclusions: models.Exclusions{
NonActive: true,
InAFlow: true,
StartedPreviously: true,
NotSeenSinceDays: 90,
},
expected: `(group = "Doctors" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf" OR tel = "+1234567890") AND status = "active" AND flow = "" AND history != "Favorites" AND last_seen_on > "20-01-2022"`,
excludeGroups: []*models.Group{testers},
expected: `(group = "Doctors" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf") AND status = "active" AND flow = "" AND history != "Favorites" AND last_seen_on > "20-01-2022" AND group != "Testers"`,
},
{
contactUUIDs: []flows.ContactUUID{testdata.Cathy.UUID},
Expand Down Expand Up @@ -111,7 +109,7 @@ func TestBuildStartQuery(t *testing.T) {
}

for _, tc := range tcs {
actual, err := search.BuildStartQuery(oa, flow, tc.groups, tc.contactUUIDs, tc.urns, tc.userQuery, tc.exclusions)
actual, err := search.BuildStartQuery(oa, flow, tc.groups, tc.contactUUIDs, tc.userQuery, tc.exclusions, tc.excludeGroups)
if tc.err != "" {
assert.Equal(t, "", actual)
assert.EqualError(t, err, tc.err)
Expand Down
Loading

0 comments on commit 6e5b2a5

Please sign in to comment.