Skip to content

Commit

Permalink
Better logging within batches during rebuilds
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Apr 11, 2022
1 parent 03000c9 commit af3a0b9
Showing 1 changed file with 57 additions and 38 deletions.
95 changes: 57 additions & 38 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,39 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (

// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
func (i *ContactIndexer) indexModified(db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, error) {
batch := &bytes.Buffer{}
createdCount, deletedCount, processedCount := 0, 0, 0
totalFetched, totalCreated, totalDeleted := 0, 0, 0

var modifiedOn time.Time
var contactJSON string
var id, orgID int64
var isActive bool

subBatch := &bytes.Buffer{}
start := time.Now()

for {
batchStart := time.Now() // start time for this batch
batchFetched := 0 // contacts fetched in this batch
batchCreated := 0 // contacts created in ES
batchDeleted := 0 // contacts deleted in ES
batchESTime := time.Duration(0) // time spent indexing for this batch

indexSubBatch := func(b *bytes.Buffer) error {
t := time.Now()
created, deleted, err := i.indexBatch(index, b.Bytes())
if err != nil {
return err
}

batchESTime += time.Since(t)
batchCreated += created
batchDeleted += deleted
b.Reset()
return nil
}

rows, err := db.Query(sqlSelectModifiedContacts, lastModified)

queryCreated := 0
queryCount := 0
queryModified := lastModified

// no more rows? return
Expand All @@ -186,69 +204,70 @@ func (i *ContactIndexer) indexModified(db *sql.DB, index string, lastModified ti
return 0, 0, err
}

queryCount++
processedCount++
batchFetched++
lastModified = modifiedOn

if isActive {
logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Debug("modified contact")

batch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID))
batch.WriteString("\n")
batch.WriteString(contactJSON)
batch.WriteString("\n")
subBatch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID))
subBatch.WriteString("\n")
subBatch.WriteString(contactJSON)
subBatch.WriteString("\n")
} else {
logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).Debug("deleted contact")

batch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID))
batch.WriteString("\n")
subBatch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID))
subBatch.WriteString("\n")
}

// write to elastic search in batches
if queryCount%i.batchSize == 0 {
created, deleted, err := i.indexBatch(index, batch.Bytes())
if err != nil {
if batchFetched%i.batchSize == 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, err
}
batch.Reset()

queryCreated += created
createdCount += created
deletedCount += deleted
}
}

if batch.Len() > 0 {
created, deleted, err := i.indexBatch(index, batch.Bytes())
if err != nil {
if subBatch.Len() > 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, err
}

queryCreated += created
createdCount += created
deletedCount += deleted
batch.Reset()
}

// last modified stayed the same and we didn't add anything, seen it all, break out
if lastModified.Equal(queryModified) && queryCreated == 0 {
break
}

rows.Close()

elapsed := time.Since(start)
rate := float32(processedCount) / (float32(elapsed) / float32(time.Second))

log := i.log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed})
totalFetched += batchFetched
totalCreated += batchCreated
totalDeleted += batchDeleted

totalTime := time.Since(start)
batchTime := time.Since(batchStart)
batchRate := int(float32(batchFetched) / (float32(batchTime) / float32(time.Second)))

log := i.log().WithField("index", index).WithFields(logrus.Fields{
"rate": batchRate,
"batch_fetched": batchFetched,
"batch_created": batchCreated,
"batch_elapsed": batchTime,
"batch_elapsed_es": batchESTime,
"total_fetched": totalFetched,
"total_created": totalCreated,
"total_elapsed": totalTime,
})

// if we're rebuilding, always log batch progress
if rebuild {
log.Info("indexed contact batch")
} else {
log.Debug("indexed contact batch")
}

// last modified stayed the same and we didn't add anything, seen it all, break out
if lastModified.Equal(queryModified) && batchCreated == 0 {
break
}
}

return createdCount, deletedCount, nil
return totalCreated, totalDeleted, nil
}

0 comments on commit af3a0b9

Please sign in to comment.