Skip to content

Commit

Permalink
refector: optimize block sync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
cyber3george committed Feb 26, 2024
1 parent 6145e62 commit d50da10
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 78 deletions.
69 changes: 58 additions & 11 deletions src/service/block/block.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { PromisePool } from '@supercharge/promise-pool';
import { TxInEntity } from '../../entities/txIn.entity';
import { TxOutEntity } from '../../entities/txOut.entity';
import { verifyMerkle } from '../../lib/merkle';
import { Interval } from '@nestjs/schedule';

@Injectable()
export class BlockService implements OnApplicationBootstrap {
Expand Down Expand Up @@ -72,7 +73,27 @@ export class BlockService implements OnApplicationBootstrap {
this.logger.debug(`newBlock: ${message.toString('hex')}`);
}

@Interval(10 * 60 * 1000)
async progressSyncInfo() {
const totalBlock = await this.blockEntityRepository.count({
where: {
is_reorg: false,
},
});
const doubleCheckBlock = await this.blockEntityRepository.count({
where: {
processStatus: BlockProcessStatus.doubleCheck,
is_reorg: false,
},
});
const percent = ((doubleCheckBlock / totalBlock) * 100).toFixed(2);
const needSyncBlock = totalBlock - doubleCheckBlock;
this.logger.debug(`block sync percent: ${percent}%`);
this.logger.debug(`need sync block number: ${needSyncBlock}`);
}

async lastNostartRowArray(): Promise<BlockEntity[]> {
// download
const lastNostartHeightRowArray = await this.blockEntityRepository.find({
where: {
processStatus: BlockProcessStatus.downloaded,
Expand All @@ -82,6 +103,10 @@ export class BlockService implements OnApplicationBootstrap {
},
take: 100,
});
if (lastNostartHeightRowArray.length > 0) {
return lastNostartHeightRowArray;
}
// before completed
const maxCompletedRow = await this.blockEntityRepository.findOne({
where: {
processStatus: BlockProcessStatus.completed,
Expand All @@ -105,18 +130,19 @@ export class BlockService implements OnApplicationBootstrap {
if (beforeProcess.length > 0) {
return beforeProcess;
}
const errorCompleted = await this.blockEntityRepository
.createQueryBuilder('block')
.where(
' block.processStatus = :status and block.num_tx != block.process_count',
{ status: BlockProcessStatus.completed },
)
.getMany();
if (errorCompleted.length > 0) {
return errorCompleted;
}
}
return lastNostartHeightRowArray;
// error completed
const errorCompleted = await this.blockEntityRepository
.createQueryBuilder('block')
.where(
' block.processStatus = :status and block.num_tx != block.process_count',
{ status: BlockProcessStatus.completed },
)
.getMany();
if (errorCompleted.length > 0) {
return errorCompleted;
}
return [];
}

async downloadFile(downloadBlockRow: BlockEntity) {
Expand Down Expand Up @@ -157,6 +183,21 @@ export class BlockService implements OnApplicationBootstrap {
processStatus: BlockProcessStatus.downloaded,
},
});
const completedCount = await this.blockEntityRepository.count({
where: {
processStatus: BlockProcessStatus.completed,
},
});
if (completedCount > 200) {
await this.blockEntityRepository.update(
{
processStatus: BlockProcessStatus.completed,
},
{
processStatus: BlockProcessStatus.nostart,
},
);
}
const willCacheNumber = this.blockCacheNumber - downloadedCount;
if (willCacheNumber <= 0) {
return;
Expand Down Expand Up @@ -256,6 +297,12 @@ export class BlockService implements OnApplicationBootstrap {
}

const txIdIndexMap = {};
await this.blockEntityRepository.update(
{ hash: nostartRow.hash },
{
processStatus: BlockProcessStatus.processing,
},
);

const txIdList = block.transactions.map(function (value: any) {
return value.hash;
Expand Down
92 changes: 25 additions & 67 deletions src/service/transaction/transaction.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,9 @@ export class TransactionService implements OnApplicationBootstrap {
const { errors } = await PromisePool.withConcurrency(2)
.for(txInEntityListSubSet)
.process(async (txInEntityListSub) => {
await this.txInEntityRepository
.createQueryBuilder()
.insert()
.into(TxInEntity)
.values(txInEntityListSub)
.orIgnore()
.execute();
// await this.txInEntityRepository.upsert(txInEntityListSub, [
// 'outpoint',
// ]);
await this.txInEntityRepository.upsert(txInEntityListSub, [
'outpoint',
]);
});
if (errors.length > 0) {
console.log('mempool txIn save', errors);
Expand All @@ -260,14 +253,7 @@ export class TransactionService implements OnApplicationBootstrap {
await PromisePool.withConcurrency(2)
.for(txOutEntityListSubSet)
.process(async (txOutEntityListSub) => {
await this.txOutEntityRepository
.createQueryBuilder()
.insert()
.into(TxOutEntity)
.values(txOutEntityListSub)
.orIgnore()
.execute();
// await this.txOutEntityRepository.save(txOutEntityListSub);
await this.txOutEntityRepository.save(txOutEntityListSub);
});
})(),
]);
Expand Down Expand Up @@ -610,9 +596,6 @@ export class TransactionService implements OnApplicationBootstrap {
created_at: LessThan(timeout),
},
take: step,
order: {
cursor_id: 'asc',
},
});
if (records.length > 0) {
const timeOutTxidList = [];
Expand Down Expand Up @@ -744,62 +727,34 @@ export class TransactionService implements OnApplicationBootstrap {
const p2 = PromisePool.withConcurrency(concurrency)
.for(arrayToChunks(txInEntityList, bulkNumber))
.process(async (chunk) => {
await this.txInEntityRepository
.createQueryBuilder()
.insert()
.into(TxInEntity)
.values(chunk)
.orIgnore()
.execute();
// await this.txInEntityRepository.upsert(
// sortedObjectArrayByKey(chunk, 'outpoint'),
// ['outpoint'],
// );
await this.txInEntityRepository.upsert(
sortedObjectArrayByKey(chunk, 'outpoint'),
['outpoint'],
);
});
const p3 = PromisePool.withConcurrency(concurrency)
.for(arrayToChunks(txOutEntityList, bulkNumber))
.process(async (chunk) => {
await this.txOutEntityRepository
.createQueryBuilder()
.insert()
.into(TxOutEntity)
.values(chunk)
.orIgnore()
.execute();
// await this.txOutEntityRepository.upsert(
// sortedObjectArrayByKey(chunk, 'outpoint'),
// ['outpoint'],
// );
await this.txOutEntityRepository.upsert(
sortedObjectArrayByKey(chunk, 'outpoint'),
['outpoint'],
);
});
const p4 = PromisePool.withConcurrency(concurrency)
.for(arrayToChunks(txOutNftEntityList, bulkNumber))
.process(async (chunk) => {
await this.txInEntityRepository
.createQueryBuilder()
.insert()
.into(TxInEntity)
.values(chunk)
.orIgnore()
.execute();
// await this.txOutNftEntityRepository.upsert(
// sortedObjectArrayByKey(chunk, 'outpoint'),
// ['outpoint'],
// );
await this.txOutNftEntityRepository.upsert(
sortedObjectArrayByKey(chunk, 'outpoint'),
['outpoint'],
);
});
const p5 = PromisePool.withConcurrency(concurrency)
.for(arrayToChunks(txOutFtEntityList, bulkNumber))
.process(async (chunk) => {
await this.txOutEntityRepository
.createQueryBuilder()
.insert()
.into(TxOutEntity)
.values(chunk)
.orIgnore()
.execute();
// await this.txOutFtEntityRepository.upsert(
// sortedObjectArrayByKey(chunk, 'outpoint'),
// ['outpoint'],
// );
await this.txOutFtEntityRepository.upsert(
sortedObjectArrayByKey(chunk, 'outpoint'),
['outpoint'],
);
});
const pResultList = await Promise.all([p1, p2, p3, p4, p5]);
let errorsLength = 0;
Expand Down Expand Up @@ -1038,13 +993,16 @@ export class TransactionService implements OnApplicationBootstrap {
}, cursorId: ${cursorId}, lastCursorId: ${cursorId}`,
);
}
return cursorId;
return updateResult.changedRows;
}

async useTxoDaemon() {
while (true) {
try {
await this.useTxo();
const changeRow = await this.useTxo();
if (changeRow > 0) {
continue;
}
} catch (e) {
console.log('useTxoDaemon', e);
}
Expand Down

0 comments on commit d50da10

Please sign in to comment.