Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill gaps command #24

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions commands/es-stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"log"
"os"
"strconv"

Expand All @@ -17,10 +18,15 @@ type EsStatsCommand struct {

// Execute collects ledger staticstics for the current ES cluster
func (cmd *EsStatsCommand) Execute() {
min, max, empty := cmd.ES.MinMaxSeq()

if empty {
log.Println("ES is empty")
return
}

table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"From", "To", "Doc_count"})

min, max := cmd.ES.MinMaxSeq()
buckets := cmd.esRanges(min, max)

for i := 0; i < len(buckets); i++ {
Expand Down
9 changes: 7 additions & 2 deletions commands/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"bytes"
"io/ioutil"
"log"
"math/rand"
"time"
Expand Down Expand Up @@ -79,14 +80,18 @@ func (cmd *ExportCommand) exportBlock(i int) {
}

if !cmd.Config.DryRun {
ioutil.WriteFile("./bulk.json", b.Bytes(), 0644)
cmd.ES.IndexWithRetries(&b, cmd.Config.RetryCount)
}
}

func (cmd *ExportCommand) index(b *bytes.Buffer, retry int) {
indexed := cmd.ES.BulkInsert(b)
err := cmd.ES.BulkInsert(b)

if err != nil {
log.Println(err)
log.Println("Failed, retrying")

if !indexed {
if retry > cmd.Config.RetryCount {
log.Fatal("Retries for bulk failed, aborting")
}
Expand Down
186 changes: 186 additions & 0 deletions commands/fill_gaps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package commands

import (
"bufio"
"bytes"
"github.com/astroband/astrologer/db"
"github.com/astroband/astrologer/es"
"github.com/astroband/astrologer/support"
"log"
"strings"
)

const batchSize = 1000
const INDEX_RETRIES_COUNT = 25

type FillGapsCommandConfig struct {
DryRun bool
Start *int
Count *int
BatchSize *int
}

type FillGapsCommand struct {
ES es.Adapter
DB db.Adapter
Config *FillGapsCommandConfig

minSeq int
maxSeq int
count int
}

func (cmd *FillGapsCommand) Execute() {
if cmd.Config.Start != nil && cmd.Config.Count != nil {
cmd.minSeq = *cmd.Config.Start
cmd.count = *cmd.Config.Count
cmd.maxSeq = cmd.minSeq + cmd.count
} else {
var empty bool
cmd.minSeq, cmd.maxSeq, empty = cmd.ES.MinMaxSeq()

if empty {
log.Println("ES is empty")
return
}

if cmd.Config.Start != nil {
cmd.minSeq = *cmd.Config.Start
cmd.count = cmd.maxSeq - cmd.minSeq + 1
}
}

log.Printf("Min seq is %d, max seq is %d\n", cmd.minSeq, cmd.maxSeq)

var missing []int

for i := cmd.minSeq; i < cmd.maxSeq; i += batchSize {
var to int

if i+batchSize > cmd.maxSeq {
to = cmd.maxSeq
} else {
to = i + batchSize - 1
}

seqs := cmd.ES.GetLedgerSeqsInRange(i, to)

if len(seqs) > 0 {
missing = append(missing, cmd.findMissing(seqs)...)

if seqs[len(seqs)-1] != to {
missing = append(missing, support.MakeRangeGtLte(seqs[len(seqs)-1], to)...)
}
} else {
missing = append(missing, support.MakeRangeGteLte(i, to)...)
}
}

cmd.exportSeqs(missing)
}

func (cmd *FillGapsCommand) findMissing(sortedArr []int) (missing []int) {
for i := 1; i < len(sortedArr); i += 1 {
diff := sortedArr[i] - sortedArr[i-1]
if diff > 1 {
missing = append(missing, support.MakeRangeGtLt(sortedArr[i-1], sortedArr[i])...)
}
}

// log.Println("Missing:", missing)
return
}

func (cmd *FillGapsCommand) exportSeqs(seqs []int) {
log.Printf("Exporting %d ledgers\n", len(seqs))

var dbSeqs []int
batchSize := *cmd.Config.BatchSize
batchesCounter := 0

totalTxs := 0
totalOps := 0

for i := 0; i < len(seqs); i += batchSize {

to := i + batchSize
if to > len(seqs) {
to = len(seqs) - 1
}

var seqsBlock []int

if len(seqs) == 1 {
seqsBlock = seqs
} else {
seqsBlock = seqs[i:to]
}

pool.Submit(func() {
batchesCounter += 1
var b bytes.Buffer
rows := cmd.DB.LedgerHeaderRowFetchBySeqs(seqsBlock)

for n := 0; n < len(rows); n++ {
// log.Printf("Ingesting %d ledger\n", rows[n].LedgerSeq)
dbSeqs = append(dbSeqs, rows[n].LedgerSeq)

txs := cmd.DB.TxHistoryRowForSeq(rows[n].LedgerSeq)
totalTxs += len(txs)

for _, tx := range txs {
totalOps += len(tx.Envelope.Tx.Operations)
}

fees := cmd.DB.TxFeeHistoryRowsForRows(txs)

es.SerializeLedger(rows[n], txs, fees, &b)
}

log.Printf(
"Batch %d: Bulk inserting %d docs, total size is %s\n",
batchesCounter,
countLines(b)/2,
support.ByteCountBinary(b.Len()),
)

if !cmd.Config.DryRun {
err := cmd.ES.BulkInsert(&b)

if err != nil {
log.Printf("Batch %d failed with error: %s\n", err)
} else {
log.Printf("Batch %d is inserted\n", batchesCounter)
}
}
})
}

pool.StopWait()
diff := support.Difference(seqs, dbSeqs)

if len(diff) > 0 {
log.Printf("DB misses next ledgers: %v", diff)
}

log.Printf("Total txs count: %d, total ops count: %d", totalTxs, totalOps)
}

func countLines(buf bytes.Buffer) int {
scanner := bufio.NewScanner(strings.NewReader(buf.String()))

// Set the split function for the scanning operation.
scanner.Split(bufio.ScanLines)

// Count the lines.
count := 0
for scanner.Scan() {
count++
}

if err := scanner.Err(); err != nil {
log.Fatal("reading input:", err)
}

return count
}
18 changes: 18 additions & 0 deletions commands/fill_gaps_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package commands

import (
"github.com/astroband/astrologer/es/mocks"
"github.com/stretchr/testify/assert"
"testing"
)

func TestFillGaps(t *testing.T) {
t.Error("test error")

esClient := new(mocks.EsAdapter)
esClient.On("MinMaxSeq").Return(386, 411)

missingSeqs := FillGaps(esClient)

assert.NotEmpty(t, missingSeqs)
}
13 changes: 11 additions & 2 deletions config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
ingestCommand = kingpin.Command("ingest", "Start real time ingestion")
_ = kingpin.Command("stats", "Print database ledger statistics")
_ = kingpin.Command("es-stats", "Print ES ranges stats")
fillGapsCommand = kingpin.Command("fill-gaps", "Fill gaps")

// DatabaseURL Stellar Core database URL
DatabaseURL = kingpin.
Expand Down Expand Up @@ -72,11 +73,19 @@ var (
// Verbose print data
Verbose = exportCommand.Flag("verbose", "Print indexed data").Bool()

// ExportDryRun do not index data
ExportDryRun = exportCommand.Flag("dry-run", "Do not send actual data to Elastic").Bool()
// DryRun do not index data
DryRun = kingpin.Flag("dry-run", "Do not send actual data to Elastic").Bool()

// ForceRecreateIndexes Allows indexes to be deleted before creation
ForceRecreateIndexes = createIndexCommand.Flag("force", "Delete indexes before creation").Bool()

FillGapsFrom = fillGapsCommand.Arg("start", "Ledger to start from").Int()
FillGapsCount = fillGapsCommand.Arg("count", "How many ledgers to check").Int()
FillGapsBatchSize = fillGapsCommand.
Flag("batch", "Ledger batch size").
Short('b').
Default("50").
Int()
)

func parseNumberWithSign(value string) (r NumberWithSign, err error) {
Expand Down
23 changes: 21 additions & 2 deletions db/ledger_header_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package db

import (
"database/sql"
"log"

"github.com/jmoiron/sqlx"
"github.com/stellar/go/xdr"
"log"
)

// LedgerHeaderRow is struct representing ledger in database
Expand Down Expand Up @@ -107,6 +107,25 @@ func (db *Client) LedgerHeaderNext(seq int) *LedgerHeaderRow {
return &h
}

func (db *Client) LedgerHeaderRowFetchBySeqs(seqs []int) []LedgerHeaderRow {
ledgers := []LedgerHeaderRow{}

query, args, err := sqlx.In("SELECT * FROM ledgerheaders WHERE ledgerseq IN (?) ORDER BY ledgerseq ASC;", seqs)

if err != nil {
log.Fatal(err)
}

query = db.rawClient.Rebind(query)
err = db.rawClient.Select(&ledgers, query, args...)

if err != nil {
log.Fatal(err)
}

return ledgers
}

// LedgerHeaderGaps returns gap positions in ledgerheaders
func (db *Client) LedgerHeaderGaps() (r []Gap) {
err := db.rawClient.Select(&r, `
Expand Down
1 change: 1 addition & 0 deletions db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func utf8Scrub(in string) string {
type Adapter interface {
LedgerHeaderRowCount(first int, last int) int
LedgerHeaderRowFetchBatch(n int, start int, batchSize int) []LedgerHeaderRow
LedgerHeaderRowFetchBySeqs(seqs []int) []LedgerHeaderRow
LedgerHeaderLastRow() *LedgerHeaderRow
LedgerHeaderFirstRow() *LedgerHeaderRow
LedgerHeaderNext(seq int) *LedgerHeaderRow
Expand Down
Loading