From 92c1ff960042407a3640d5d8b359f62197f230c3 Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 26 Apr 2024 15:03:14 -0300 Subject: [PATCH 1/4] refactor: move subscriptions directly to operations_collector --- .../beacon/beacon_node.ex | 1 - .../beacon/sync_blocks.ex | 1 + .../p2p/gossip/operations_collector.ex | 124 +++++++++++++++++- 3 files changed, 124 insertions(+), 2 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index 21305e09c..fd8a3b490 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -59,7 +59,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do LambdaEthereumConsensus.P2P.IncomingRequests, LambdaEthereumConsensus.Beacon.PendingBlocks, LambdaEthereumConsensus.Beacon.SyncBlocks, - LambdaEthereumConsensus.P2P.GossipSub, LambdaEthereumConsensus.P2P.Gossip.Attestation, LambdaEthereumConsensus.P2P.Gossip.BeaconBlock, LambdaEthereumConsensus.P2P.Gossip.BlobSideCar diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index dd3e6d452..e27fbb6b3 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -75,6 +75,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do Logger.info("[Optimistic Sync] Sync completed") Gossip.BeaconBlock.start() Gossip.BlobSideCar.start() + Gossip.OperationsCollector.start() else Process.sleep(1000) perform_sync(remaining_chunks) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index 441a83539..c65ae4a09 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -4,7 +4,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do """ use GenServer + alias LambdaEthereumConsensus.Beacon.BeaconChain + alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.StateTransition.Misc + alias LambdaEthereumConsensus.Utils.BitField alias Types.Attestation alias Types.AttesterSlashing alias Types.BeaconBlock @@ -12,6 +15,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do alias Types.SignedBLSToExecutionChange alias Types.SignedVoluntaryExit + require Logger + @operations [ :bls_to_execution_change, :attester_slashing, @@ -20,8 +25,20 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do :attestation ] + @topic_msgs [ + "beacon_aggregate_and_proof", + "voluntary_exit", + "proposer_slashing", + "attester_slashing", + "bls_to_execution_change" + ] + def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + def start() do + GenServer.call(__MODULE__, :start) + end + @spec notify_bls_to_execution_change_gossip(SignedBLSToExecutionChange.t()) :: :ok def notify_bls_to_execution_change_gossip(%SignedBLSToExecutionChange{} = msg) do GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) @@ -87,10 +104,19 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do @impl GenServer def init(_init_arg) do - state = Map.new(@operations, &{&1, []}) |> Map.put(:slot, nil) + topics = get_topic_names() + Enum.each(topics, &Libp2pPort.join_topic/1) + + state = Map.new(@operations, &{&1, []}) |> Map.put(:slot, nil) |> Map.put(:topics, topics) {:ok, state} end + @impl true + def handle_call(:start, _from, %{topics: topics} = state) do + Enum.each(topics, &Libp2pPort.subscribe_to_topic/1) + {:reply, :ok, state} + end + @impl GenServer def handle_call({:get, operation, count}, _from, state) when operation in @operations do # NOTE: we don't remove these from the state, since after a block is built @@ -113,6 +139,102 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do {:noreply, filter_messages(state, slot, operations)} end + @impl true + def handle_info( + {:gossipsub, + {<<_::binary-size(15)>> <> "beacon_aggregate_and_proof" <> _, _msg_id, message}}, + state + ) do + with {:ok, uncompressed} <- :snappyer.decompress(message), + {:ok, + %Types.SignedAggregateAndProof{message: %Types.AggregateAndProof{aggregate: aggregate}}} <- + Ssz.from_ssz(uncompressed, Types.SignedAggregateAndProof) do + votes = BitField.count(aggregate.aggregation_bits) + slot = aggregate.data.slot + root = aggregate.data.beacon_block_root |> Base.encode16() + + # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. + # ForkChoice.on_attestation(aggregate) + notify_attestation_gossip(aggregate) + + Logger.debug( + "[Gossip] Aggregate decoded. Total attestations: #{votes}", + slot: slot, + root: root + ) + + {:noreply, state} + end + end + + @impl true + def handle_info( + {:gossipsub, {<<_::binary-size(15)>> <> "voluntary_exit" <> _, _msg_id, message}}, + state + ) do + with {:ok, uncompressed} <- :snappyer.decompress(message), + {:ok, %Types.SignedVoluntaryExit{} = signed_voluntary_exit} <- + Ssz.from_ssz(uncompressed, Types.SignedVoluntaryExit) do + notify_voluntary_exit_gossip(signed_voluntary_exit) + + {:noreply, state} + end + end + + @impl true + def handle_info( + {:gossipsub, {<<_::binary-size(15)>> <> "proposer_slashing" <> _, _msg_id, message}}, + state + ) do + with {:ok, uncompressed} <- :snappyer.decompress(message), + {:ok, %Types.ProposerSlashing{} = proposer_slashing} <- + Ssz.from_ssz(uncompressed, Types.ProposerSlashing) do + notify_proposer_slashing_gossip(proposer_slashing) + + {:noreply, state} + end + end + + @impl true + def handle_info( + {:gossipsub, {<<_::binary-size(15)>> <> "attester_slashing" <> _, _msg_id, message}}, + state + ) do + with {:ok, uncompressed} <- :snappyer.decompress(message), + {:ok, %Types.AttesterSlashing{} = attester_slashing} <- + Ssz.from_ssz(uncompressed, Types.AttesterSlashing) do + notify_attester_slashing_gossip(attester_slashing) + + {:noreply, state} + end + end + + @impl true + def handle_info( + {:gossipsub, + {<<_::binary-size(15)>> <> "bls_to_execution_change" <> _, _msg_id, message}}, + state + ) do + with {:ok, uncompressed} <- :snappyer.decompress(message), + {:ok, %Types.SignedBLSToExecutionChange{} = bls_to_execution_change} <- + Ssz.from_ssz(uncompressed, Types.SignedBLSToExecutionChange) do + notify_bls_to_execution_change_gossip(bls_to_execution_change) + + {:noreply, state} + end + end + + defp get_topic_names() do + fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower) + + topics = + Enum.map(@topic_msgs, fn topic_msg -> + "/eth2/#{fork_context}/#{topic_msg}/ssz_snappy" + end) + + topics + end + defp filter_messages(state, slot, operations) do indices = operations.bls_to_execution_changes From 27cbe013aebb63da17d302e36eaa0f6d531ccf74 Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 26 Apr 2024 15:37:12 -0300 Subject: [PATCH 2/4] refactor: remove wrapper functions --- .../beacon/beacon_node.ex | 3 +- .../beacon/sync_blocks.ex | 10 ++++-- .../p2p/gossip/operations_collector.ex | 35 +++---------------- 3 files changed, 14 insertions(+), 34 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index fd8a3b490..47bd47b24 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -61,7 +61,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do LambdaEthereumConsensus.Beacon.SyncBlocks, LambdaEthereumConsensus.P2P.Gossip.Attestation, LambdaEthereumConsensus.P2P.Gossip.BeaconBlock, - LambdaEthereumConsensus.P2P.Gossip.BlobSideCar + LambdaEthereumConsensus.P2P.Gossip.BlobSideCar, + LambdaEthereumConsensus.P2P.Gossip.OperationsCollector ] ++ validator_children Supervisor.init(children, strategy: :one_for_all) diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index e27fbb6b3..99e7a8fb4 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -73,15 +73,19 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do if Enum.empty?(chunks) do Logger.info("[Optimistic Sync] Sync completed") - Gossip.BeaconBlock.start() - Gossip.BlobSideCar.start() - Gossip.OperationsCollector.start() + start_subscriptions() else Process.sleep(1000) perform_sync(remaining_chunks) end end + defp start_subscriptions() do + Gossip.BeaconBlock.start() + Gossip.BlobSideCar.start() + Gossip.OperationsCollector.start() + end + @spec fetch_blocks_by_slot(Types.slot(), non_neg_integer()) :: {:ok, [SignedBeaconBlock.t()]} | {:error, String.t()} def fetch_blocks_by_slot(from, count) do diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index c65ae4a09..0c5df248a 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -39,51 +39,26 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do GenServer.call(__MODULE__, :start) end - @spec notify_bls_to_execution_change_gossip(SignedBLSToExecutionChange.t()) :: :ok - def notify_bls_to_execution_change_gossip(%SignedBLSToExecutionChange{} = msg) do - GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) - end - @spec get_bls_to_execution_changes(non_neg_integer()) :: list(SignedBLSToExecutionChange.t()) def get_bls_to_execution_changes(count) do GenServer.call(__MODULE__, {:get, :bls_to_execution_change, count}) end - @spec notify_attester_slashing_gossip(AttesterSlashing.t()) :: :ok - def notify_attester_slashing_gossip(%AttesterSlashing{} = msg) do - GenServer.cast(__MODULE__, {:attester_slashing, msg}) - end - @spec get_attester_slashings(non_neg_integer()) :: list(AttesterSlashing.t()) def get_attester_slashings(count) do GenServer.call(__MODULE__, {:get, :attester_slashing, count}) end - @spec notify_proposer_slashing_gossip(ProposerSlashing.t()) :: :ok - def notify_proposer_slashing_gossip(%ProposerSlashing{} = msg) do - GenServer.cast(__MODULE__, {:proposer_slashing, msg}) - end - @spec get_proposer_slashings(non_neg_integer()) :: list(ProposerSlashing.t()) def get_proposer_slashings(count) do GenServer.call(__MODULE__, {:get, :proposer_slashing, count}) end - @spec notify_voluntary_exit_gossip(SignedVoluntaryExit.t()) :: :ok - def notify_voluntary_exit_gossip(%SignedVoluntaryExit{} = msg) do - GenServer.cast(__MODULE__, {:voluntary_exit, msg}) - end - @spec get_voluntary_exits(non_neg_integer()) :: list(SignedVoluntaryExit.t()) def get_voluntary_exits(count) do GenServer.call(__MODULE__, {:get, :voluntary_exit, count}) end - @spec notify_attestation_gossip(Attestation.t()) :: :ok - def notify_attestation_gossip(%Attestation{} = msg) do - GenServer.cast(__MODULE__, {:attestation, msg}) - end - @spec get_attestations(non_neg_integer()) :: list(Attestation.t()) def get_attestations(count) do GenServer.call(__MODULE__, {:get, :attestation, count}) @@ -155,7 +130,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. # ForkChoice.on_attestation(aggregate) - notify_attestation_gossip(aggregate) + GenServer.cast(__MODULE__, {:attestation, aggregate}) Logger.debug( "[Gossip] Aggregate decoded. Total attestations: #{votes}", @@ -175,7 +150,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.SignedVoluntaryExit{} = signed_voluntary_exit} <- Ssz.from_ssz(uncompressed, Types.SignedVoluntaryExit) do - notify_voluntary_exit_gossip(signed_voluntary_exit) + GenServer.cast(__MODULE__, {:voluntary_exit, signed_voluntary_exit}) {:noreply, state} end @@ -189,7 +164,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.ProposerSlashing{} = proposer_slashing} <- Ssz.from_ssz(uncompressed, Types.ProposerSlashing) do - notify_proposer_slashing_gossip(proposer_slashing) + GenServer.cast(__MODULE__, {:proposer_slashing, proposer_slashing}) {:noreply, state} end @@ -203,7 +178,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.AttesterSlashing{} = attester_slashing} <- Ssz.from_ssz(uncompressed, Types.AttesterSlashing) do - notify_attester_slashing_gossip(attester_slashing) + GenServer.cast(__MODULE__, {:attester_slashing, attester_slashing}) {:noreply, state} end @@ -218,7 +193,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.SignedBLSToExecutionChange{} = bls_to_execution_change} <- Ssz.from_ssz(uncompressed, Types.SignedBLSToExecutionChange) do - notify_bls_to_execution_change_gossip(bls_to_execution_change) + GenServer.cast(__MODULE__, {:bls_to_execution_change, bls_to_execution_change}) {:noreply, state} end From 4483f3c086367b2d812c93d12e55e16b82317e30 Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 26 Apr 2024 15:54:23 -0300 Subject: [PATCH 3/4] refactor: reduce GenServer calls --- .../p2p/gossip/operations_collector.ex | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index 0c5df248a..bc72c3920 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -103,13 +103,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end @impl GenServer - # TODO: filter duplicates - def handle_cast({operation, msg}, state) - when operation in @operations do - new_msgs = [msg | Map.fetch!(state, operation)] - {:noreply, Map.replace!(state, operation, new_msgs)} - end - def handle_cast({:new_block, slot, operations}, state) do {:noreply, filter_messages(state, slot, operations)} end @@ -128,17 +121,15 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do slot = aggregate.data.slot root = aggregate.data.beacon_block_root |> Base.encode16() - # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. - # ForkChoice.on_attestation(aggregate) - GenServer.cast(__MODULE__, {:attestation, aggregate}) - Logger.debug( "[Gossip] Aggregate decoded. Total attestations: #{votes}", slot: slot, root: root ) - {:noreply, state} + # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. + # ForkChoice.on_attestation(aggregate) + handle_msg({:attestation, aggregate}, state) end end @@ -150,9 +141,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.SignedVoluntaryExit{} = signed_voluntary_exit} <- Ssz.from_ssz(uncompressed, Types.SignedVoluntaryExit) do - GenServer.cast(__MODULE__, {:voluntary_exit, signed_voluntary_exit}) - - {:noreply, state} + handle_msg({:voluntary_exit, signed_voluntary_exit}, state) end end @@ -164,9 +153,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.ProposerSlashing{} = proposer_slashing} <- Ssz.from_ssz(uncompressed, Types.ProposerSlashing) do - GenServer.cast(__MODULE__, {:proposer_slashing, proposer_slashing}) - - {:noreply, state} + handle_msg({:proposer_slashing, proposer_slashing}, state) end end @@ -178,9 +165,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.AttesterSlashing{} = attester_slashing} <- Ssz.from_ssz(uncompressed, Types.AttesterSlashing) do - GenServer.cast(__MODULE__, {:attester_slashing, attester_slashing}) - - {:noreply, state} + handle_msg({:attester_slashing, attester_slashing}, state) end end @@ -193,9 +178,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.SignedBLSToExecutionChange{} = bls_to_execution_change} <- Ssz.from_ssz(uncompressed, Types.SignedBLSToExecutionChange) do - GenServer.cast(__MODULE__, {:bls_to_execution_change, bls_to_execution_change}) - - {:noreply, state} + handle_msg({:bls_to_execution_change, bls_to_execution_change}, state) end end @@ -210,6 +193,13 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do topics end + # TODO: filter duplicates + defp handle_msg({operation, msg}, state) + when operation in @operations do + new_msgs = [msg | Map.fetch!(state, operation)] + {:noreply, Map.replace!(state, operation, new_msgs)} + end + defp filter_messages(state, slot, operations) do indices = operations.bls_to_execution_changes From f9ad4b1e0f707d668b391a9a5ea6cd541cfc4b1c Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 26 Apr 2024 15:56:32 -0300 Subject: [PATCH 4/4] refactor: remove gossipsub, consumer and handler files --- .../p2p/gossip/consumer.ex | 57 ------------- .../p2p/gossip/gossipsub.ex | 42 ---------- .../p2p/gossip/handler.ex | 79 ------------------- 3 files changed, 178 deletions(-) delete mode 100644 lib/lambda_ethereum_consensus/p2p/gossip/consumer.ex delete mode 100644 lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex delete mode 100644 lib/lambda_ethereum_consensus/p2p/gossip/handler.ex diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/consumer.ex b/lib/lambda_ethereum_consensus/p2p/gossip/consumer.ex deleted file mode 100644 index 133c94e7c..000000000 --- a/lib/lambda_ethereum_consensus/p2p/gossip/consumer.ex +++ /dev/null @@ -1,57 +0,0 @@ -defmodule LambdaEthereumConsensus.P2P.Gossip.Consumer do - @moduledoc """ - This module consumes events created by Subscriber. - """ - require Logger - use Broadway - - def start_link(%{topic: topic_name, ssz_type: _, handler: _} = opts) - when is_binary(topic_name) do - Broadway.start_link(__MODULE__, - name: get_id(topic_name), - context: opts, - producer: [ - module: {LambdaEthereumConsensus.P2P.Subscriber, %{topic: topic_name}}, - concurrency: 1 - ], - processors: [ - default: [concurrency: 8, max_demand: 1] - ] - ) - end - - def child_spec(%{topic: topic_name} = arg) do - %{ - id: get_id(topic_name), - start: {__MODULE__, :start_link, [arg]} - } - end - - @impl true - def handle_message(_, %Broadway.Message{data: data} = message, %{ - topic: topic, - ssz_type: ssz_type, - handler: handler - }) do - with {:ok, decompressed} <- :snappyer.decompress(data), - {:ok, res} <- Ssz.from_ssz(decompressed, ssz_type), - :ok <- handler.(res) do - message - else - {:error, reason} -> - data - |> Base.encode16() - |> then(&"[#{topic}] (err: #{reason}) raw: '#{&1}'") - |> Logger.warning() - - Broadway.Message.failed(message, reason) - end - end - - defp get_id(topic_name) do - __MODULE__ - |> Atom.to_string() - |> then(&Enum.join([&1, ".", topic_name])) - |> String.to_atom() - end -end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex deleted file mode 100644 index 2664833b0..000000000 --- a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex +++ /dev/null @@ -1,42 +0,0 @@ -defmodule LambdaEthereumConsensus.P2P.GossipSub do - @moduledoc """ - Supervises topic subscribers. - """ - use Supervisor - - alias LambdaEthereumConsensus.Beacon.BeaconChain - alias LambdaEthereumConsensus.P2P.Gossip.Consumer - alias LambdaEthereumConsensus.P2P.Gossip.Handler - - def start_link(opts) do - Supervisor.start_link(__MODULE__, opts, name: __MODULE__) - end - - @impl true - def init(_opts) do - topics = [ - {"beacon_aggregate_and_proof", Types.SignedAggregateAndProof, - &Handler.handle_beacon_aggregate_and_proof/1}, - {"voluntary_exit", Types.SignedVoluntaryExit, &Handler.handle_voluntary_exit/1}, - {"proposer_slashing", Types.ProposerSlashing, &Handler.handle_proposer_slashing/1}, - {"attester_slashing", Types.AttesterSlashing, &Handler.handle_attester_slashing/1}, - {"bls_to_execution_change", Types.SignedBLSToExecutionChange, - &Handler.handle_bls_to_execution_change/1} - - # {"sync_committee_contribution_and_proof", Types.SignedContributionAndProof}, - # {"sync_committee_0", Types.SyncCommitteeMessage} - ] - - fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower) - - children = - for {topic_msg, ssz_type, handler} <- topics do - topic = "/eth2/#{fork_context}/#{topic_msg}/ssz_snappy" - {Consumer, %{topic: topic, ssz_type: ssz_type, handler: handler}} - end - - children = children ++ [LambdaEthereumConsensus.P2P.Gossip.OperationsCollector] - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex deleted file mode 100644 index 3ddacf2c1..000000000 --- a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex +++ /dev/null @@ -1,79 +0,0 @@ -defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do - @moduledoc """ - Module that implements the handle_message callback, - used in the GossipConsumer module to handle messages. - """ - require Logger - - alias LambdaEthereumConsensus.Beacon.BeaconChain - alias LambdaEthereumConsensus.Beacon.PendingBlocks - alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector - alias LambdaEthereumConsensus.Store.BlobDb - alias LambdaEthereumConsensus.Utils.BitField - alias Types.AggregateAndProof - alias Types.AttesterSlashing - alias Types.BlobSidecar - alias Types.ProposerSlashing - alias Types.SignedAggregateAndProof - alias Types.SignedBeaconBlock - alias Types.SignedBLSToExecutionChange - alias Types.SignedVoluntaryExit - - def handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block) do - current_slot = BeaconChain.get_current_slot() - - if block.slot > current_slot - ChainSpec.get("SLOTS_PER_EPOCH") do - Logger.info("[Gossip] Block received", slot: block.slot) - - PendingBlocks.add_block(signed_block) - end - - :ok - end - - def handle_beacon_aggregate_and_proof(%SignedAggregateAndProof{ - message: %AggregateAndProof{aggregate: aggregate} - }) do - votes = BitField.count(aggregate.aggregation_bits) - slot = aggregate.data.slot - root = aggregate.data.beacon_block_root |> Base.encode16() - - # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. - # ForkChoice.on_attestation(aggregate) - OperationsCollector.notify_attestation_gossip(aggregate) - - Logger.debug( - "[Gossip] Aggregate decoded. Total attestations: #{votes}", - slot: slot, - root: root - ) - end - - def handle_bls_to_execution_change(%SignedBLSToExecutionChange{} = message) do - # TODO: validate message first - OperationsCollector.notify_bls_to_execution_change_gossip(message) - end - - def handle_attester_slashing(%AttesterSlashing{} = message) do - # TODO: validate message first - OperationsCollector.notify_attester_slashing_gossip(message) - end - - def handle_proposer_slashing(%ProposerSlashing{} = message) do - # TODO: validate message first - OperationsCollector.notify_proposer_slashing_gossip(message) - end - - def handle_voluntary_exit(%SignedVoluntaryExit{} = message) do - # TODO: validate message first - OperationsCollector.notify_voluntary_exit_gossip(message) - end - - def handle_blob_sidecar(%BlobSidecar{index: blob_index} = blob, blob_index) do - BlobDb.store_blob(blob) - Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}") - end - - # Ignore blobs with mismatched indices - def handle_blob_sidecar(_, _), do: :ok -end