Skip to content
This repository has been archived by the owner on Jan 8, 2021. It is now read-only.

Commit

Permalink
Merge pull request #133 from radixdlt/integration/fix-epoch-change-ed…
Browse files Browse the repository at this point in the history
…ge-cases+RPNV1-458

Integration/fix epoch change edge cases+rpnv1 458
  • Loading branch information
talekhinezh authored Jul 7, 2020
2 parents 91bd0e7 + 60ca09a commit 1641e68
Show file tree
Hide file tree
Showing 71 changed files with 2,009 additions and 450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.radixdlt.EpochChangeSender;
import com.radixdlt.consensus.BFTEventSender;
import com.radixdlt.consensus.CommittedStateSync;
import com.radixdlt.consensus.GetVerticesResponse;
import com.radixdlt.consensus.EpochChange;
import com.radixdlt.consensus.bft.GetVerticesErrorResponse;
import com.radixdlt.consensus.bft.GetVerticesResponse;
import com.radixdlt.consensus.NewView;
import com.radixdlt.consensus.Proposal;
import com.radixdlt.consensus.QuorumCertificate;
import com.radixdlt.consensus.SyncVerticesRPCSender;
import com.radixdlt.consensus.Vertex;
import com.radixdlt.consensus.VertexStore.GetVerticesRequest;
import com.radixdlt.consensus.VertexStore.VertexStoreEventSender;
import com.radixdlt.consensus.bft.VertexStore.GetVerticesRequest;
import com.radixdlt.consensus.bft.VertexStore.VertexStoreEventSender;
import com.radixdlt.consensus.Vote;
import com.radixdlt.crypto.ECPublicKey;
import com.radixdlt.crypto.Hash;
Expand All @@ -43,11 +46,11 @@
*
* This class is not thread safe.
*/
public final class ControlledBFTNetwork {
public final class ControlledNetwork {
private final ImmutableList<ECPublicKey> nodes;
private final ImmutableMap<ChannelId, LinkedList<ControlledMessage>> messageQueue;

ControlledBFTNetwork(ImmutableList<ECPublicKey> nodes) {
ControlledNetwork(ImmutableList<ECPublicKey> nodes) {
this.nodes = nodes;
this.messageQueue = nodes.stream()
.flatMap(n0 -> nodes.stream().map(n1 -> new ChannelId(n0, n1)))
Expand Down Expand Up @@ -158,13 +161,18 @@ public Hash getVertexId() {
public int getCount() {
return count;
}

@Override
public String toString() {
return String.format("%s{count=%s}", this.getClass().getSimpleName(), count);
}
}

public ControlledSender getSender(ECPublicKey sender) {
return new ControlledSender(sender);
}

public final class ControlledSender implements BFTEventSender, VertexStoreEventSender, SyncVerticesRPCSender {
public final class ControlledSender implements BFTEventSender, VertexStoreEventSender, SyncVerticesRPCSender, EpochChangeSender {
private final ECPublicKey sender;

private ControlledSender(ECPublicKey sender) {
Expand All @@ -184,7 +192,15 @@ public void sendGetVerticesResponse(GetVerticesRequest originalRequest, Immutabl
}

@Override
public void syncedVertex(Vertex vertex) {
public void sendGetVerticesErrorResponse(GetVerticesRequest originalRequest, QuorumCertificate highestQC,
QuorumCertificate highestCommittedQC) {
ControlledGetVerticesRequest request = (ControlledGetVerticesRequest) originalRequest;
GetVerticesErrorResponse response = new GetVerticesErrorResponse(request.getVertexId(), highestQC, highestCommittedQC, request.opaque);
putMesssage(new ControlledMessage(sender, request.requestor, response));
}

@Override
public void sendSyncedVertex(Vertex vertex) {
putMesssage(new ControlledMessage(sender, sender, vertex.getId()));
}

Expand All @@ -210,7 +226,12 @@ public void committedStateSync(CommittedStateSync committedStateSync) {
}

@Override
public void committedVertex(Vertex vertex) {
public void epochChange(EpochChange epochChange) {
putMesssage(new ControlledMessage(sender, sender, epochChange));
}

@Override
public void sendCommittedVertex(Vertex vertex) {
// Ignore committed vertex signal
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,35 @@

import static org.mockito.Mockito.mock;

import com.radixdlt.consensus.BFTEventPreprocessor;
import com.radixdlt.consensus.BFTEventProcessor;
import com.radixdlt.consensus.BFTEventReducer;
import com.radixdlt.consensus.BFTFactory;
import com.radixdlt.consensus.CommittedStateSync;
import com.radixdlt.consensus.ConsensusEvent;
import com.radixdlt.consensus.DefaultHasher;
import com.radixdlt.consensus.EmptySyncEpochsRPCSender;
import com.radixdlt.consensus.EmptySyncVerticesRPCSender;
import com.radixdlt.consensus.GetVerticesResponse;
import com.radixdlt.consensus.EpochChange;
import com.radixdlt.consensus.EpochManager;
import com.radixdlt.consensus.PendingVotes;
import com.radixdlt.consensus.bft.GetVerticesErrorResponse;
import com.radixdlt.consensus.bft.GetVerticesResponse;
import com.radixdlt.consensus.LocalTimeout;
import com.radixdlt.consensus.ProposerElectionFactory;
import com.radixdlt.consensus.VertexMetadata;
import com.radixdlt.consensus.VertexStore.GetVerticesRequest;
import com.radixdlt.consensus.bft.VertexStore.GetVerticesRequest;
import com.radixdlt.consensus.Hasher;
import com.radixdlt.consensus.NewView;
import com.radixdlt.consensus.PendingVotes;
import com.radixdlt.consensus.Proposal;
import com.radixdlt.consensus.QuorumCertificate;
import com.radixdlt.consensus.BFTEventReducer;
import com.radixdlt.consensus.SyncQueues;
import com.radixdlt.consensus.SyncedStateComputer;
import com.radixdlt.consensus.Vertex;
import com.radixdlt.consensus.VertexStore;
import com.radixdlt.consensus.bft.VertexStore;
import com.radixdlt.consensus.SyncVerticesRPCSender;
import com.radixdlt.consensus.View;
import com.radixdlt.consensus.Vote;
import com.radixdlt.consensus.deterministic.ControlledBFTNetwork.ControlledSender;
import com.radixdlt.consensus.VertexStoreFactory;
import com.radixdlt.consensus.deterministic.ControlledNetwork.ControlledSender;
import com.radixdlt.consensus.liveness.FixedTimeoutPacemaker;
import com.radixdlt.consensus.liveness.FixedTimeoutPacemaker.TimeoutSender;
import com.radixdlt.consensus.liveness.MempoolProposalGenerator;
import com.radixdlt.consensus.liveness.Pacemaker;
import com.radixdlt.consensus.liveness.ProposalGenerator;
import com.radixdlt.consensus.liveness.ProposerElection;
import com.radixdlt.consensus.liveness.ScheduledTimeoutSender;
import com.radixdlt.consensus.safety.SafetyRules;
import com.radixdlt.consensus.safety.SafetyState;
import com.radixdlt.consensus.validators.Validator;
import com.radixdlt.consensus.validators.ValidatorSet;
import com.radixdlt.counters.SystemCounters;
import com.radixdlt.crypto.ECKeyPair;
Expand All @@ -59,29 +57,30 @@
import com.radixdlt.mempool.Mempool;
import com.radixdlt.middleware2.CommittedAtom;
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

/**
* Controlled BFT Node where its state machine is managed by a synchronous
* Controlled Node where its state machine is managed by a synchronous
* processNext() call.
*/
class ControlledBFTNode {
private final BFTEventProcessor ec;
class ControlledNode {
private final EpochManager epochManager;
private final SystemCounters systemCounters;
private final VertexStore vertexStore;
private final ValidatorSet initialValidatorSet;
private final ControlledSender controlledSender;

ControlledBFTNode(
ControlledNode(
ECKeyPair key,
ControlledSender sender,
ProposerElection proposerElection,
ValidatorSet validatorSet,
ProposerElectionFactory proposerElectionFactory,
ValidatorSet initialValidatorSet,
boolean enableGetVerticesRPC,
BooleanSupplier syncedSupplier
) {
this.systemCounters = SystemCounters.getInstance();
Vertex genesisVertex = Vertex.createGenesis();
QuorumCertificate genesisQC = QuorumCertificate.ofGenesis(genesisVertex);
this.controlledSender = Objects.requireNonNull(sender);
this.initialValidatorSet = Objects.requireNonNull(initialValidatorSet);

SyncedStateComputer<CommittedAtom> stateComputer = new SyncedStateComputer<CommittedAtom>() {
@Override
Expand All @@ -105,40 +104,41 @@ public void execute(CommittedAtom instruction) {
};

SyncVerticesRPCSender syncVerticesRPCSender = enableGetVerticesRPC ? sender : EmptySyncVerticesRPCSender.INSTANCE;
this.vertexStore = new VertexStore(genesisVertex, genesisQC, stateComputer, syncVerticesRPCSender, sender, systemCounters);
Mempool mempool = new EmptyMempool();
ProposalGenerator proposalGenerator = new MempoolProposalGenerator(vertexStore, mempool);
TimeoutSender timeoutSender = mock(TimeoutSender.class);
// Timeout doesn't matter here
Pacemaker pacemaker = new FixedTimeoutPacemaker(1, timeoutSender);
Hasher hasher = new DefaultHasher();
SafetyRules safetyRules = new SafetyRules(key, SafetyState.initialState(), hasher);
PendingVotes pendingVotes = new PendingVotes(hasher);
BFTEventReducer reducer = new BFTEventReducer(
proposalGenerator,
mempool,
sender,
safetyRules,
pacemaker,
vertexStore,
pendingVotes,
proposerElection,
key,
validatorSet,
systemCounters
);
SyncQueues syncQueues = new SyncQueues(
validatorSet.getValidators().stream().map(Validator::nodeKey).collect(Collectors.toSet()),
systemCounters
);
VertexStoreFactory vertexStoreFactory = (vertex, qc, syncedStateComputer) ->
new VertexStore(vertex, qc, syncedStateComputer, syncVerticesRPCSender, sender, systemCounters);
BFTFactory bftFactory =
(pacemaker, vertexStore, proposerElection, validatorSet) -> {
final ProposalGenerator proposalGenerator = new MempoolProposalGenerator(vertexStore, mempool);
final SafetyRules safetyRules = new SafetyRules(key, SafetyState.initialState(), hasher);
final PendingVotes pendingVotes = new PendingVotes(hasher);

this.ec = new BFTEventPreprocessor(
return new BFTEventReducer(
proposalGenerator,
mempool,
controlledSender,
safetyRules,
pacemaker,
vertexStore,
pendingVotes,
proposerElection,
key,
validatorSet,
systemCounters
);
};

this.epochManager = new EpochManager(
stateComputer,
EmptySyncEpochsRPCSender.INSTANCE,
mock(ScheduledTimeoutSender.class),
timeoutSender -> new FixedTimeoutPacemaker(1, timeoutSender),
vertexStoreFactory,
proposerElectionFactory,
bftFactory,
key.getPublicKey(),
reducer,
pacemaker,
vertexStore,
proposerElection,
syncQueues
systemCounters
);
}

Expand All @@ -147,26 +147,27 @@ SystemCounters getSystemCounters() {
}

void start() {
ec.start();
EpochChange epochChange = new EpochChange(VertexMetadata.ofGenesisAncestor(), this.initialValidatorSet);
controlledSender.epochChange(epochChange);
}

void processNext(Object msg) {
if (msg instanceof GetVerticesRequest) {
vertexStore.processGetVerticesRequest((GetVerticesRequest) msg);
if (msg instanceof EpochChange) {
this.epochManager.processEpochChange((EpochChange) msg);
} else if (msg instanceof GetVerticesRequest) {
this.epochManager.processGetVerticesRequest((GetVerticesRequest) msg);
} else if (msg instanceof GetVerticesResponse) {
vertexStore.processGetVerticesResponse((GetVerticesResponse) msg);
this.epochManager.processGetVerticesResponse((GetVerticesResponse) msg);
} else if (msg instanceof GetVerticesErrorResponse) {
this.epochManager.processGetVerticesErrorResponse((GetVerticesErrorResponse) msg);
} else if (msg instanceof CommittedStateSync) {
vertexStore.processCommittedStateSync((CommittedStateSync) msg);
} else if (msg instanceof View) {
ec.processLocalTimeout((View) msg);
} else if (msg instanceof NewView) {
ec.processNewView((NewView) msg);
} else if (msg instanceof Proposal) {
ec.processProposal((Proposal) msg);
} else if (msg instanceof Vote) {
ec.processVote((Vote) msg);
this.epochManager.processCommittedStateSync((CommittedStateSync) msg);
} else if (msg instanceof LocalTimeout) {
this.epochManager.processLocalTimeout((LocalTimeout) msg);
} else if (msg instanceof ConsensusEvent) {
this.epochManager.processConsensusEvent((ConsensusEvent) msg);
} else if (msg instanceof Hash) {
ec.processLocalSync((Hash) msg);
this.epochManager.processLocalSync((Hash) msg);
} else {
throw new IllegalStateException("Unknown msg: " + msg);
}
Expand Down
Loading

0 comments on commit 1641e68

Please sign in to comment.