Skip to content

Commit

Permalink
Merge pull request #70 from nyaruka/better_stats
Browse files Browse the repository at this point in the history
Better stats
  • Loading branch information
rowanseymour authored Jun 28, 2022
2 parents 6e1a376 + 8425a18 commit 8e5263d
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 121 deletions.
80 changes: 39 additions & 41 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,44 +688,37 @@ func DeleteArchiveFile(archive *Archive) error {
}

// CreateOrgArchives builds all the missing archives for the passed in org
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting current archive count")
return nil, nil, nil, nil, errors.Wrapf(err, "error getting current archive count")
}

archives := make([]*Archive, 0)
var dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed []*Archive

// no existing archives means this might be a backfill, figure out if there are full months we can build first
if archiveCount == 0 {
archives, err = GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting missing monthly archives")
return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing monthly archives")
}

// we first create monthly archives
err = createArchives(ctx, db, config, s3Client, org, archives)
if err != nil {
return nil, errors.Wrapf(err, "error creating new monthly archives")
}
monthliesCreated, monthliesFailed = createArchives(ctx, db, config, s3Client, org, archives)
}

// then add in daily archives taking into account the monthly that have been built
daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting missing daily archives")
return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing daily archives")
}

// we then create missing daily archives
err = createArchives(ctx, db, config, s3Client, org, daily)
if err != nil {
return nil, errors.Wrapf(err, "error creating new daily archives")
}
dailiesCreated, dailiesFailed = createArchives(ctx, db, config, s3Client, org, daily)

// append daily archives to any monthly archives
archives = append(archives, daily...)
defer ctx.Done()

return archives, nil
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil
}

func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, archive *Archive) error {
Expand Down Expand Up @@ -758,23 +751,27 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3
return nil
}

func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) error {
func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) ([]*Archive, []*Archive) {
log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name})

created := make([]*Archive, 0, len(archives))
failed := make([]*Archive, 0, 5)

for _, archive := range archives {
log.WithFields(logrus.Fields{"start_date": archive.StartDate, "end_date": archive.endDate(), "period": archive.Period, "archive_type": archive.ArchiveType}).Info("starting archive")
log.WithFields(logrus.Fields{"start_date": archive.StartDate, "end_date": archive.endDate(), "period": archive.Period, "archive_type": archive.ArchiveType}).Debug("starting archive")
start := time.Now()

err := createArchive(ctx, db, config, s3Client, archive)
if err != nil {
log.WithError(err).Error("error creating archive")
continue
failed = append(failed, archive)
} else {
log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": time.Since(start)}).Debug("archive complete")
created = append(created, archive)
}

log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": time.Since(start)}).Info("archive complete")
}

return nil
return created, failed
}

// RollupOrgArchives rolls up monthly archives from our daily archives
Expand Down Expand Up @@ -895,47 +892,45 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
}

deleted = append(deleted, a)
log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("deleted archive records")
log.WithFields(logrus.Fields{"elapsed": time.Since(start)}).Info("deleted archive records")
}

return deleted, nil
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name})
start := time.Now()

created, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating archives")
return nil, nil, nil, nil, nil, errors.Wrapf(err, "error creating archives")
}

if len(created) > 0 {
if len(dailiesCreated) > 0 {
elapsed := time.Since(start)
rate := float32(countRecords(created)) / (float32(elapsed) / float32(time.Second))
rate := float32(countRecords(dailiesCreated)) / (float32(elapsed) / float32(time.Second))
log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org")
}

monthlies, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
monthliesRolledUp, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, errors.Wrapf(err, "error rolling up archives")
return nil, nil, nil, nil, nil, errors.Wrapf(err, "error rolling up archives")
}

created = append(created, monthlies...)
monthliesCreated = append(monthliesCreated, monthliesRolledUp...)

// finally delete any archives not yet actually archived
deleted := make([]*Archive, 0, 1)
var deleted []*Archive
if cfg.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return created, deleted, errors.Wrapf(err, "error deleting archived records")
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, errors.Wrapf(err, "error deleting archived records")
}
}

return created, deleted, nil
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, nil
}

// ArchiveActiveOrgs fetches active orgs and archives messages and runs
Expand All @@ -952,6 +947,7 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
}

totalRunsArchived, totalMsgsArchived := 0, 0
totalRunsFailedArchives, totalMsgsFailedArchives := 0, 0

// for each org, do our export
for _, org := range orgs {
Expand All @@ -960,18 +956,18 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
log := logrus.WithField("org_id", org.ID).WithField("org_name", org.Name)

if cfg.ArchiveMessages {
created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, MessageType)
dailiesCreated, _, _, _, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages")
}
totalMsgsArchived += countRecords(created)
totalMsgsArchived += countRecords(dailiesCreated)
}
if cfg.ArchiveRuns {
created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, RunType)
dailiesCreated, _, _, _, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, RunType)
if err != nil {
log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs")
}
totalRunsArchived += countRecords(created)
totalRunsArchived += countRecords(dailiesCreated)
}

cancel()
Expand All @@ -983,7 +979,9 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds())
analytics.Gauge("archiver.orgs_archived", float64(len(orgs)))
analytics.Gauge("archiver.msgs_archived", float64(totalMsgsArchived))
analytics.Gauge("archiver.msgs_failed_archives", float64(totalMsgsFailedArchives))
analytics.Gauge("archiver.runs_archived", float64(totalRunsArchived))
analytics.Gauge("archiver.runs_failed_archives", float64(totalRunsFailedArchives))

return nil
}
Expand Down
121 changes: 41 additions & 80 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,53 +322,23 @@ func TestArchiveOrgMessages(t *testing.T) {

assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)

created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType)
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType)
assert.NoError(t, err)

assert.Equal(t, 63, len(created))
assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), created[0].StartDate)
assert.Equal(t, DayPeriod, created[0].Period)
assert.Equal(t, 0, created[0].RecordCount)
assert.Equal(t, int64(23), created[0].Size)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[0].Hash)

assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), created[1].StartDate)
assert.Equal(t, DayPeriod, created[1].Period)
assert.Equal(t, 0, created[1].RecordCount)
assert.Equal(t, int64(23), created[1].Size)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[1].Hash)

assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), created[2].StartDate)
assert.Equal(t, DayPeriod, created[2].Period)
assert.Equal(t, 3, created[2].RecordCount)
assert.Equal(t, int64(528), created[2].Size)
assert.Equal(t, "b3bf00bf1234ea47f14ffd0171a8ead0", created[2].Hash)

assert.Equal(t, time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), created[3].StartDate)
assert.Equal(t, DayPeriod, created[3].Period)
assert.Equal(t, 1, created[3].RecordCount)
assert.Equal(t, int64(312), created[3].Size)
assert.Equal(t, "32e61b1431217b59fca0170f637d78a3", created[3].Hash)

assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[60].StartDate)
assert.Equal(t, DayPeriod, created[60].Period)
assert.Equal(t, 0, created[60].RecordCount)
assert.Equal(t, int64(23), created[60].Size)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[60].Hash)

assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[61].StartDate)
assert.Equal(t, MonthPeriod, created[61].Period)
assert.Equal(t, 4, created[61].RecordCount)
assert.Equal(t, int64(553), created[61].Size)
assert.Equal(t, "156e45e29b6587cb85ccf75e03800b00", created[61].Hash)

assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[62].StartDate)
assert.Equal(t, MonthPeriod, created[62].Period)
assert.Equal(t, 0, created[62].RecordCount)
assert.Equal(t, int64(23), created[62].Size)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[62].Hash)

// no rollup for october since that had one invalid daily archive
assert.Equal(t, 61, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[1], time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[2], time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), DayPeriod, 3, 528, "b3bf00bf1234ea47f14ffd0171a8ead0")
assertArchive(t, dailiesCreated[3], time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), DayPeriod, 1, 312, "32e61b1431217b59fca0170f637d78a3")
assertArchive(t, dailiesCreated[4], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 0, len(dailiesFailed))

assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 4, 553, "156e45e29b6587cb85ccf75e03800b00")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 0, len(monthliesFailed))

assert.Equal(t, 63, len(deleted))
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), deleted[0].StartDate)
Expand Down Expand Up @@ -440,6 +410,14 @@ func assertCount(t *testing.T, db *sqlx.DB, expected int, query string, args ...
assert.Equal(t, expected, count, "counts mismatch for query %s", query)
}

func assertArchive(t *testing.T, a *Archive, startDate time.Time, period ArchivePeriod, recordCount int, size int64, hash string) {
assert.Equal(t, startDate, a.StartDate)
assert.Equal(t, period, a.Period)
assert.Equal(t, recordCount, a.RecordCount)
assert.Equal(t, size, a.Size)
assert.Equal(t, hash, a.Hash)
}

func TestArchiveOrgRuns(t *testing.T) {
db := setup(t)
ctx := context.Background()
Expand All @@ -461,34 +439,16 @@ func TestArchiveOrgRuns(t *testing.T) {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType)
dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType)
assert.NoError(t, err)

assert.Equal(t, 12, len(created))
assert.Equal(t, 10, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1984, "869cc00ad4cca0371d07c88d8cf2bf26")

assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[0].StartDate)
assert.Equal(t, MonthPeriod, created[0].Period)
assert.Equal(t, 1, created[0].RecordCount)
assert.Equal(t, int64(490), created[0].Size)
assert.Equal(t, "c2138e3c3009a9c09fc55482903d93e4", created[0].Hash)

assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[1].StartDate)
assert.Equal(t, MonthPeriod, created[1].Period)
assert.Equal(t, 0, created[1].RecordCount)
assert.Equal(t, int64(23), created[1].Size)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[1].Hash)

assert.Equal(t, time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), created[2].StartDate)
assert.Equal(t, DayPeriod, created[2].Period)
assert.Equal(t, 0, created[2].RecordCount)
assert.Equal(t, int64(23), created[2].Size)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[2].Hash)

assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[11].StartDate)
assert.Equal(t, DayPeriod, created[11].Period)
assert.Equal(t, 2, created[11].RecordCount)
assert.Equal(t, int64(1984), created[11].Size)
assert.Equal(t, "869cc00ad4cca0371d07c88d8cf2bf26", created[11].Hash)
assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 490, "c2138e3c3009a9c09fc55482903d93e4")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 12, len(deleted))

Expand Down Expand Up @@ -532,18 +492,19 @@ func TestArchiveOrgRuns(t *testing.T) {

// org 2 has a run that can't be archived because it's still active - as it has no existing archives
// this will manifest itself as a monthly which fails to save
created, deleted, err = ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType)
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType)
assert.NoError(t, err)

assert.Equal(t, 34, len(created))
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[0].StartDate)
assert.Equal(t, MonthPeriod, created[0].Period)
assert.Equal(t, "", created[0].ArchiveFile) // failed to create
assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[1].StartDate)
assert.Equal(t, MonthPeriod, created[1].Period)
assert.NotEqual(t, "", created[1].ArchiveFile)
assert.Equal(t, 31, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 1, len(dailiesFailed))
assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "")

assert.Equal(t, 1, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, DayPeriod, created[2].Period)
assert.Equal(t, DayPeriod, created[33].Period)
assert.Equal(t, 1, len(monthliesFailed))
assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "")
}
}

0 comments on commit 8e5263d

Please sign in to comment.