diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 226fd871e25..f9041e1285b 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -1,12 +1,5 @@ package com.wavesplatform -import java.io._ -import java.net.{MalformedURLException, URL} - -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ -import scala.util.{Failure, Success, Try} - import akka.actor.ActorSystem import com.google.common.io.ByteStreams import com.google.common.primitives.Ints @@ -16,7 +9,7 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonB import com.wavesplatform.block.{Block, BlockHeader} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.consensus.PoSSelector -import com.wavesplatform.database.{openDB, DBExt, KeyTags} +import com.wavesplatform.database.{DBExt, KeyTags, openDB} import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent} import com.wavesplatform.extensions.{Context, Extension} import com.wavesplatform.features.BlockchainFeatures @@ -25,12 +18,12 @@ import com.wavesplatform.lang.ValidationError import com.wavesplatform.mining.Miner import com.wavesplatform.protobuf.block.PBBlocks import com.wavesplatform.settings.WavesSettings -import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height} import com.wavesplatform.state.appender.BlockAppender -import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction} +import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height} import com.wavesplatform.transaction.TxValidationError.GenericError import com.wavesplatform.transaction.smart.script.trace.TracedResult -import com.wavesplatform.utils._ +import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction} +import com.wavesplatform.utils.* import com.wavesplatform.utx.{UtxPool, UtxPoolImpl} import com.wavesplatform.wallet.Wallet import kamon.Kamon @@ -40,6 +33,13 @@ import monix.reactive.{Observable, Observer} import org.iq80.leveldb.DB import scopt.OParser +import java.io.* +import java.net.{MalformedURLException, URL} +import scala.concurrent.duration.* +import scala.concurrent.{Await, Future} +import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} + object Importer extends ScorexLogging { import monix.execution.Scheduler.Implicits.global @@ -59,7 +59,7 @@ object Importer extends ScorexLogging { import scopt.OParser val builder = OParser.builder[ImportOptions] - import builder._ + import builder.* OParser.sequence( programName("waves import"), @@ -166,12 +166,13 @@ object Importer extends ScorexLogging { } } - @volatile private var quit = false - private val lock = new Object + @volatile private var quit = false + @volatile private var inputStream: InputStream = null + private val lock = new Object // noinspection UnstableApiUsage def startImport( - inputStream: BufferedInputStream, + getInputStream: () => InputStream, blockchain: Blockchain, appendBlock: AppendBlock, importOptions: ImportOptions, @@ -188,58 +189,68 @@ object Importer extends ScorexLogging { if (blocksToSkip > 0) log.info(s"Skipping $blocksToSkip block(s)") sys.addShutdownHook { - import scala.concurrent.duration._ + import scala.concurrent.duration.* val millis = (System.nanoTime() - start).nanos.toMillis log.info( s"Imported $counter block(s) from $startHeight to ${startHeight + counter} in ${humanReadableDuration(millis)}" ) } + inputStream = getInputStream() + while (!quit && counter < blocksToApply) lock.synchronized { - val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES) - if (s1 == Ints.BYTES) { - val blockSize = Ints.fromByteArray(lenBytes) - - lazy val blockBytes = new Array[Byte](blockSize) - val factReadSize = - if (blocksToSkip > 0) { - // File IO optimization - ByteStreams.skipFully(inputStream, blockSize) - blockSize - } else { - ByteStreams.read(inputStream, blockBytes, 0, blockSize) - } + try { + val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES) + if (s1 == Ints.BYTES) { + val blockSize = Ints.fromByteArray(lenBytes) + + lazy val blockBytes = new Array[Byte](blockSize) + val factReadSize = + if (blocksToSkip > 0) { + // File IO optimization + ByteStreams.skipFully(inputStream, blockSize) + blockSize + } else { + ByteStreams.read(inputStream, blockBytes, 0, blockSize) + } - if (factReadSize == blockSize) { - if (blocksToSkip > 0) { - blocksToSkip -= 1 - } else { - val blockV5 = blockchain.isFeatureActivated( - BlockchainFeatures.BlockV5, - blockchain.height + 1 - ) - val block = - (if (importOptions.format == Formats.Binary && !blockV5) Block.parseBytes(blockBytes) - else PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)).get - if (blockchain.lastBlockId.contains(block.header.reference)) { - Await.result(appendBlock(block).runAsyncLogErr, Duration.Inf) match { - case Left(ve) => - log.error(s"Error appending block: $ve") - quit = true - case _ => - counter = counter + 1 - } + if (factReadSize == blockSize) { + if (blocksToSkip > 0) { + blocksToSkip -= 1 } else { - log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}") + val blockV5 = blockchain.isFeatureActivated( + BlockchainFeatures.BlockV5, + blockchain.height + 1 + ) + val block = + (if (importOptions.format == Formats.Binary && !blockV5) Block.parseBytes(blockBytes) + else PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)).get + if (blockchain.lastBlockId.contains(block.header.reference)) { + Await.result(appendBlock(block).runAsyncLogErr, Duration.Inf) match { + case Left(ve) => + log.error(s"Error appending block: $ve") + quit = true + case _ => + counter = counter + 1 + } + } else { + log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}") + } } + } else { + log.info(s"$factReadSize != expected $blockSize") + log.info(s"reestablishing input stream") + inputStream.close() + inputStream = getInputStream() } } else { - log.info(s"$factReadSize != expected $blockSize") + if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})") quit = true } - } else { - if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})") - quit = true + } catch { + case NonFatal(e) => + log.error(s"Error reading bytes: $e") + quit = true } } } @@ -285,7 +296,7 @@ object Importer extends ScorexLogging { val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, db, actorSystem) checkGenesis(settings, blockchainUpdater, Miner.Disabled) - val importFileOffset = + def importFileOffset() = if (importOptions.dryRun) 0 else importOptions.format match { @@ -303,7 +314,9 @@ object Importer extends ScorexLogging { case _ => 0L } - val inputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset), 2 * 1024 * 1024) + + def establishInputStream() = + new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset()), 2 * 1024 * 1024) if (importOptions.dryRun) { def readNextBlock(): Future[Option[Block]] = Future.successful(None) @@ -352,10 +365,10 @@ object Importer extends ScorexLogging { levelDb.close() db.close() } - inputStream.close() + if (inputStream != null) inputStream.close() } - startImport(inputStream, blockchainUpdater, extAppender, importOptions, importFileOffset == 0) + startImport(() => establishInputStream(), blockchainUpdater, extAppender, importOptions, importFileOffset() == 0) Await.result(Kamon.stopModules(), 10.seconds) } }