Skip to content
This repository has been archived by the owner on Aug 23, 2020. It is now read-only.

send heartbeat pulses and request tx using heartbeat #1830

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/main/java/com/iota/iri/Iota.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.iota.iri.conf.IotaConfig;
import com.iota.iri.controllers.TipsViewModel;
import com.iota.iri.controllers.TransactionViewModel;
import com.iota.iri.network.HeartbeatPulse;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.TipsRequester;
import com.iota.iri.network.TransactionRequester;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class Iota {

public LocalSnapshotsPersistenceProvider localSnapshotsDb;
public final CacheManager cacheManager;
public final HeartbeatPulse heartbeatPulse;

/**
* Initializes the latest snapshot and then creates all services needed to run an IOTA node.
Expand All @@ -130,7 +132,7 @@ public Iota(IotaConfig configuration, SpentAddressesProvider spentAddressesProvi
TransactionRequester transactionRequester, NeighborRouter neighborRouter,
TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester,
TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb,
CacheManager cacheManager, TransactionSolidifier transactionSolidifier) {
CacheManager cacheManager, TransactionSolidifier transactionSolidifier, HeartbeatPulse heartbeatPulse) {
this.configuration = configuration;

this.ledgerService = ledgerService;
Expand Down Expand Up @@ -160,6 +162,7 @@ public Iota(IotaConfig configuration, SpentAddressesProvider spentAddressesProvi

this.tipsSelector = tipsSelector;
this.cacheManager = cacheManager;
this.heartbeatPulse = heartbeatPulse;
}

private void initDependencies() throws SnapshotException, SpentAddressesException {
Expand Down Expand Up @@ -223,6 +226,8 @@ public void init() throws Exception {
if (transactionPruner != null) {
transactionPruner.start();
}

heartbeatPulse.start();
}

private void rescanDb() throws Exception {
Expand Down Expand Up @@ -270,6 +275,7 @@ public void shutdown() throws Exception {
localSnapshotManager.shutdown();
}

heartbeatPulse.shutdown();
tipsRequester.shutdown();
txPipeline.shutdown();
neighborRouter.shutdown();
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/com/iota/iri/MainInjectionConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import com.iota.iri.cache.impl.CacheManagerImpl;
import com.iota.iri.conf.IotaConfig;
import com.iota.iri.controllers.TipsViewModel;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.TipsRequester;
import com.iota.iri.network.TransactionRequester;
import com.iota.iri.network.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import com.iota.iri.network.pipeline.TransactionProcessingPipeline;
import com.iota.iri.service.API;
import com.iota.iri.service.ledger.LedgerService;
Expand Down Expand Up @@ -100,6 +98,12 @@ LatestMilestoneTracker provideLatestMilestoneTracker(Tangle tangle, SnapshotProv
return new LatestMilestoneTrackerImpl(tangle, snapshotProvider, milestoneService, milestoneSolidifier, configuration);
}

@Singleton
@Provides
HeartbeatPulse providerHeartbeatPulse(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider, @Nullable LocalSnapshotManager localSnapshotManager){
return new HeartbeatPulseImpl(neighborRouter, snapshotProvider, configuration, localSnapshotManager);
}

@Singleton
@Provides
LatestSolidMilestoneTracker provideLatestSolidMilestoneTracker(Tangle tangle, SnapshotProvider snapshotProvider,
Expand Down Expand Up @@ -171,21 +175,21 @@ TipSelector provideTipSelector(Tangle tangle, SnapshotProvider snapshotProvider,
@Singleton
@Provides
Iota provideIota(SpentAddressesProvider spentAddressesProvider, SpentAddressesService spentAddressesService,
SnapshotProvider snapshotProvider, SnapshotService snapshotService,
@Nullable LocalSnapshotManager localSnapshotManager, MilestoneService milestoneService,
LatestMilestoneTracker latestMilestoneTracker, LatestSolidMilestoneTracker latestSolidMilestoneTracker,
SeenMilestonesRetriever seenMilestonesRetriever, LedgerService ledgerService,
@Nullable TransactionPruner transactionPruner, MilestoneSolidifier milestoneSolidifier,
BundleValidator bundleValidator, Tangle tangle, TransactionValidator transactionValidator,
TransactionRequester transactionRequester, NeighborRouter neighborRouter,
TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester,
TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb,
CacheManager cacheManager, TransactionSolidifier transactionSolidifier) {
SnapshotProvider snapshotProvider, SnapshotService snapshotService,
@Nullable LocalSnapshotManager localSnapshotManager, MilestoneService milestoneService,
LatestMilestoneTracker latestMilestoneTracker, LatestSolidMilestoneTracker latestSolidMilestoneTracker,
SeenMilestonesRetriever seenMilestonesRetriever, LedgerService ledgerService,
@Nullable TransactionPruner transactionPruner, MilestoneSolidifier milestoneSolidifier,
BundleValidator bundleValidator, Tangle tangle, TransactionValidator transactionValidator,
TransactionRequester transactionRequester, NeighborRouter neighborRouter,
TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester,
TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb,
CacheManager cacheManager, TransactionSolidifier transactionSolidifier, HeartbeatPulse heartbeatPulse) {
return new Iota(configuration, spentAddressesProvider, spentAddressesService, snapshotProvider, snapshotService,
localSnapshotManager, milestoneService, latestMilestoneTracker, latestSolidMilestoneTracker,
seenMilestonesRetriever, ledgerService, transactionPruner, milestoneSolidifier, bundleValidator, tangle,
transactionValidator, transactionRequester, neighborRouter, transactionProcessingPipeline,
tipsRequester, tipsViewModel, tipsSelector, localSnapshotsDb, cacheManager, transactionSolidifier);
tipsRequester, tipsViewModel, tipsSelector, localSnapshotsDb, cacheManager, transactionSolidifier, heartbeatPulse);
}

@Singleton
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/iota/iri/network/HeartbeatPulse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.iota.iri.network;

/**
* A background worker that sends {@link com.iota.iri.network.protocol.Heartbeat}s to neighbors.
*/
public interface HeartbeatPulse {
/**
* Starts the background worker that calls {@link #sendHeartbeat()} rhythmically.
*/
void start();

/**
* Stops the background worker that sends out heartbeats.
*/
void shutdown();

/**
* Sends {@link com.iota.iri.network.protocol.Heartbeat} to all neighbors.
*/
void sendHeartbeat();
}
91 changes: 91 additions & 0 deletions src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.iota.iri.network;

import com.iota.iri.conf.SnapshotConfig;
import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.network.protocol.Heartbeat;
import com.iota.iri.service.snapshot.LocalSnapshotManager;
import com.iota.iri.service.snapshot.SnapshotProvider;
import com.iota.iri.service.transactionpruning.TransactionPruningException;
import com.iota.iri.utils.thread.DedicatedScheduledExecutorService;
import com.iota.iri.utils.thread.SilentScheduledExecutorService;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link HeartbeatPulse} interface.
*/
public class HeartbeatPulseImpl implements HeartbeatPulse {

private NeighborRouter neighborRouter;
private SnapshotProvider snapshotProvider;
private SnapshotConfig snapshotConfig;
private LocalSnapshotManager localSnapshotManager;

private static final Logger log = LoggerFactory.getLogger(HeartbeatPulseImpl.class);

/**
* The rate in milliseconds, at which heartbeats are sent out.
*/
private static final int HEARTBEAT_RATE_MILLIS = 60000;

/**
* Holds a reference to the manager of the background worker.
*/
private final SilentScheduledExecutorService executorService = new DedicatedScheduledExecutorService(
"Heartbeat Pulse");

/**
* Constructor for heartbeat pulse
*
* @param neighborRouter Neighbor router
* @param snapshotProvider Snapshot provider
* @param snapshotConfig Snapshot config
* @param localSnapshotManager local snapshot manager
*/
public HeartbeatPulseImpl(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider,
SnapshotConfig snapshotConfig, LocalSnapshotManager localSnapshotManager) {
this.neighborRouter = neighborRouter;
this.snapshotProvider = snapshotProvider;
this.snapshotConfig = snapshotConfig;
this.localSnapshotManager = localSnapshotManager;
}

@Override
public void start() {
executorService.silentScheduleWithFixedDelay(this::sendHeartbeat, 0, HEARTBEAT_RATE_MILLIS,
TimeUnit.MILLISECONDS);
}

@Override
public void shutdown() {
executorService.shutdownNow();
}

@Override
public void sendHeartbeat() {
int lastSolidMilestoneIndex = snapshotProvider.getLatestSnapshot().getIndex();
int firstSolidMilestoneIndex = snapshotProvider.getInitialSnapshot().getIndex();

if (snapshotConfig.getLocalSnapshotsPruningEnabled()) {
try {
firstSolidMilestoneIndex = localSnapshotManager.maxSnapshotPruningMilestone();
} catch (TransactionPruningException e) {
log.info("Failed to get first solid milestone with pruning" + e.getMessage());
firstSolidMilestoneIndex = snapshotProvider.getInitialSnapshot().getIndex();
}
}

Heartbeat heartbeat = new Heartbeat();
heartbeat.setFirstSolidMilestoneIndex(firstSolidMilestoneIndex);
heartbeat.setLastSolidMilestoneIndex(lastSolidMilestoneIndex);

Map<String, Neighbor> currentlyConnectedNeighbors = neighborRouter.getConnectedNeighbors();
for (Neighbor neighbor : currentlyConnectedNeighbors.values()) {
neighborRouter.gossipHeartbeatTo(neighbor, heartbeat);
}
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/iota/iri/network/NeighborRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.network.pipeline.TransactionProcessingPipeline;
import com.iota.iri.network.pipeline.TransactionProcessingPipelineImpl;
import com.iota.iri.network.protocol.Heartbeat;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,6 +97,14 @@ public interface NeighborRouter {
void gossipTransactionTo(Neighbor neighbor, TransactionViewModel tvm, boolean useHashOfTVM)
throws Exception;

/**
* Gossips the given heartbeat to the given neighbor.
*
* @param neighbor The {@link Neighbor} to gossip the heartbeat to
* @param heartbeat The {@link Heartbeat} to gossip
*/
void gossipHeartbeatTo(Neighbor neighbor, Heartbeat heartbeat);

/**
* Shut downs the {@link NeighborRouter} and all currently open connections.
*/
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/iota/iri/network/NeighborRouterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.iota.iri.network.pipeline.TransactionProcessingPipeline;
import com.iota.iri.network.pipeline.TransactionProcessingPipelineImpl;
import com.iota.iri.network.protocol.Handshake;
import com.iota.iri.network.protocol.Heartbeat;
import com.iota.iri.network.protocol.Protocol;
import com.iota.iri.utils.Converter;

Expand Down Expand Up @@ -909,6 +910,13 @@ public void gossipTransactionTo(Neighbor neighbor, TransactionViewModel tvm, boo
neighbor.getMetrics().incrSentTransactionsCount();
}

@Override
public void gossipHeartbeatTo(Neighbor neighbor, Heartbeat heartbeat) {
ByteBuffer packet = Protocol.createHeartbeatPacket(heartbeat);
neighbor.send(packet);
neighbor.getMetrics().incrSentHeartbeatCount();
}

@Override
public void shutdown() {
shutdown.set(true);
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/iota/iri/network/neighbor/NeighborMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,16 @@ public interface NeighborMetrics {
* @return the number of packets dropped from the neighbor's send queue
*/
long incrDroppedSendPacketsCount();

/**
* Increments the sent heartbeat count
* @return The number of heartbeat that have been sent
*/
long incrSentHeartbeatCount();

/**
* Gets the heartbeat count
* @return The heartbeat count
*/
long getHeartbeatCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class NeighborMetricsImpl implements NeighborMetrics {
private AtomicLong sentTxsCount = new AtomicLong();
private AtomicLong newTxsCount = new AtomicLong();
private AtomicLong droppedSendPacketsCount = new AtomicLong();
private AtomicLong heartbeatCount = new AtomicLong();

@Override
public long getAllTransactionsCount() {
Expand Down Expand Up @@ -86,4 +87,14 @@ public long getDroppedSendPacketsCount() {
public long incrDroppedSendPacketsCount() {
return droppedSendPacketsCount.incrementAndGet();
}

@Override
public long incrSentHeartbeatCount() {
return heartbeatCount.incrementAndGet();
}

@Override
public long getHeartbeatCount() {
return heartbeatCount.get();
}
}
11 changes: 10 additions & 1 deletion src/main/java/com/iota/iri/network/pipeline/BroadcastStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.iota.iri.controllers.TransactionViewModel;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.network.protocol.Heartbeat;
import com.iota.iri.service.validation.TransactionSolidifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,7 +53,15 @@ public ProcessingContext process(ProcessingContext ctx) {
continue;
}
try {
neighborRouter.gossipTransactionTo(neighbor, tvm);
boolean shouldGossip = true;
//neighbor supports STING. Else fall backwards
if(neighbor.getProtocolVersion() >= 2){
Heartbeat heartbeat = neighbor.heartbeat();
shouldGossip = tvm.snapshotIndex() >= heartbeat.getFirstSolidMilestoneIndex() && tvm.snapshotIndex() <= heartbeat.getLastSolidMilestoneIndex();
}
if(shouldGossip){
neighborRouter.gossipTransactionTo(neighbor, tvm);
}
} catch (Exception e) {
log.error(e.getMessage());
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/iota/iri/network/protocol/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Protocol {
*/
public final static byte[] SUPPORTED_PROTOCOL_VERSIONS = {
/* supports protocol version(s): 1 */
(byte) 0b00000001,
(byte) 0b00000011,
};
/**
* The amount of bytes dedicated for the message type in the packet header.
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/iota/iri/service/API.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public class API {
private final TransactionValidator transactionValidator;
private final TransactionSolidifier transactionSolidifier;
private final LatestMilestoneTracker latestMilestoneTracker;

private final int maxFindTxs;
private final int maxRequestList;
private final int maxGetTrytes;
Expand Down Expand Up @@ -198,11 +198,11 @@ public class API {
*
*/
public API(IotaConfig configuration, IXI ixi, TransactionRequester transactionRequester,
SpentAddressesService spentAddressesService, Tangle tangle, BundleValidator bundleValidator,
SnapshotProvider snapshotProvider, LedgerService ledgerService, NeighborRouter neighborRouter,
TipSelector tipsSelector, TipsViewModel tipsViewModel, TransactionValidator transactionValidator,
LatestMilestoneTracker latestMilestoneTracker, TransactionProcessingPipeline txPipeline,
TransactionSolidifier transactionSolidifier) {
SpentAddressesService spentAddressesService, Tangle tangle, BundleValidator bundleValidator,
SnapshotProvider snapshotProvider, LedgerService ledgerService, NeighborRouter neighborRouter,
TipSelector tipsSelector, TipsViewModel tipsViewModel, TransactionValidator transactionValidator,
LatestMilestoneTracker latestMilestoneTracker, TransactionProcessingPipeline txPipeline,
TransactionSolidifier transactionSolidifier) {
this.configuration = configuration;
this.ixi = ixi;

Expand All @@ -219,7 +219,7 @@ public API(IotaConfig configuration, IXI ixi, TransactionRequester transactionRe
this.transactionValidator = transactionValidator;
this.transactionSolidifier = transactionSolidifier;
this.latestMilestoneTracker = latestMilestoneTracker;

maxFindTxs = configuration.getMaxFindTransactions();
maxRequestList = configuration.getMaxRequestsList();
maxGetTrytes = configuration.getMaxGetTrytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.iota.iri.service.milestone.LatestMilestoneTracker;
import com.iota.iri.service.transactionpruning.PruningCondition;
import com.iota.iri.service.transactionpruning.TransactionPruningException;

/**
* Represents the manager for local {@link Snapshot}s that takes care of periodically creating a new {@link Snapshot}
Expand Down Expand Up @@ -41,4 +42,10 @@ public interface LocalSnapshotManager {
* @param conditions conditions on which we check to make a snapshot
*/
void addPruningConditions(PruningCondition... conditions);

/**
* Get the max pruning milestone for all conditions.
* @return Max pruning milestone
*/
int maxSnapshotPruningMilestone() throws TransactionPruningException;
}
Loading