From 861be8f18504911aae36699c80c85826e79d0af5 Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Thu, 30 Nov 2023 03:10:55 +0300 Subject: [PATCH] Fixed broadcasting MicroBlockInv --- .../com/wavesplatform/mining/Miner.scala | 6 +-- .../microblocks/MicroBlockMinerImpl.scala | 39 +++++++++++++------ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/mining/Miner.scala b/node/src/main/scala/com/wavesplatform/mining/Miner.scala index 239fdb8d06..6f683deb9f 100644 --- a/node/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -309,12 +309,8 @@ class MinerImpl( }.uncancelable for { - elapsed <- waitBlockAppendedTask.timed.map(_._1) - newOffset = (offset - elapsed).max(Duration.Zero) - - _ <- Task(microBlockAttempt := SerialCancelable()).delayExecution(newOffset) + _ <- waitBlockAppendedTask result <- Task(forgeBlock(account)).executeOn(minerScheduler) - _ <- result match { case Right((block, totalConstraint)) => appendTask(block, totalConstraint) diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala index de83f87765..2933e5d573 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala @@ -113,18 +113,15 @@ class MicroBlockMinerImpl( for { _ <- Task.now(if (delay > Duration.Zero) log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock")) _ <- Task.sleep(delay) - _ = log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint") - blocks <- forgeBlocks(account, accumulatedBlock, unconfirmed, stateHash) - .leftWiden[Throwable] - .liftTo[Task] - (signedBlock, microBlock) = blocks - blockId <- appendMicroBlock(microBlock) - _ = BlockStats.mined(microBlock, blockId) - _ <- broadcastMicroBlock(account, microBlock, blockId) - } yield { - if (updatedTotalConstraint.isFull) Stop - else Success(signedBlock, updatedTotalConstraint) - } + r <- + if (blockchainUpdater.lastBlockId.forall(_ == accumulatedBlock.id())) { + log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint") + appendAndBroadcastMicroBlock(account, accumulatedBlock, unconfirmed, updatedTotalConstraint, stateHash) + } else { + log.trace(s"Stopping generating microBlock for ${account.toAddress}, new key block was appended") + Task(Stop) + } + } yield r case (_, updatedTotalConstraint, _) => if (updatedTotalConstraint.isFull) { @@ -142,6 +139,24 @@ class MicroBlockMinerImpl( } } + private def appendAndBroadcastMicroBlock( + account: KeyPair, + block: Block, + transactions: Seq[Transaction], + updatedTotalConstraint: MiningConstraint, + stateHash: Option[BlockId] + ): Task[MicroBlockMiningResult] = + for { + (signedBlock, microBlock) <- forgeBlocks(account, block, transactions, stateHash) + .leftWiden[Throwable] + .liftTo[Task] + blockId <- appendMicroBlock(microBlock) + _ = BlockStats.mined(microBlock, blockId) + _ <- broadcastMicroBlock(account, microBlock, blockId) + } yield + if (updatedTotalConstraint.isFull) Stop + else Success(signedBlock, updatedTotalConstraint) + private def broadcastMicroBlock(account: KeyPair, microBlock: MicroBlock, blockId: BlockId): Task[Unit] = Task(if (allChannels != null) allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference)))