Skip to content

Commit

Permalink
[Aptos Data Client] Make the optimistic fetch lag configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 26, 2023
1 parent 35012f8 commit b73b414
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 53 deletions.
8 changes: 4 additions & 4 deletions aptos-node/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::network::ApplicationNetworkInterfaces;
use aptos_config::config::{NodeConfig, StateSyncConfig, StorageServiceConfig};
use aptos_config::config::{NodeConfig, StateSyncConfig};
use aptos_consensus_notifications::ConsensusNotifier;
use aptos_data_client::client::AptosDataClient;
use aptos_data_streaming_service::{
Expand Down Expand Up @@ -117,7 +117,7 @@ pub fn start_state_sync_and_get_notification_handles(

// Start the state sync storage service
let storage_service_runtime = setup_state_sync_storage_service(
node_config.state_sync.storage_service,
node_config.state_sync,
peers_and_metadata,
network_service_events,
&db_rw,
Expand Down Expand Up @@ -202,7 +202,7 @@ fn setup_aptos_data_client(

/// Sets up the state sync storage service runtime
fn setup_state_sync_storage_service(
config: StorageServiceConfig,
config: StateSyncConfig,
peers_and_metadata: Arc<PeersAndMetadata>,
network_service_events: NetworkServiceEvents<StorageServiceMessage>,
db_rw: &DbReaderWriter,
Expand All @@ -212,7 +212,7 @@ fn setup_state_sync_storage_service(
let storage_service_runtime = aptos_runtimes::spawn_named_runtime("stor-server".into(), None);

// Spawn the state sync storage service servers on the runtime
let storage_reader = StorageReader::new(config, Arc::clone(&db_rw.reader));
let storage_reader = StorageReader::new(config.storage_service, Arc::clone(&db_rw.reader));
let service = StorageServiceServer::new(
config,
storage_service_runtime.handle().clone(),
Expand Down
5 changes: 4 additions & 1 deletion config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ pub struct AptosDataClientConfig {
pub max_num_in_flight_regular_polls: u64,
/// Maximum number of output reductions before transactions are returned
pub max_num_output_reductions: u64,
/// Maximum version lag we'll tolerate when sending optimistic fetch requests
pub max_optimistic_fetch_version_lag: u64,
/// Maximum timeout (in ms) when waiting for a response (after exponential increases)
pub max_response_timeout_ms: u64,
/// Maximum number of state keys and values per chunk
Expand All @@ -272,7 +274,8 @@ impl Default for AptosDataClientConfig {
max_num_in_flight_priority_polls: 10,
max_num_in_flight_regular_polls: 10,
max_num_output_reductions: 0,
max_response_timeout_ms: 60_000, // 60 seconds
max_optimistic_fetch_version_lag: 50_000, // Assumes 5K TPS for 10 seconds, which should be plenty
max_response_timeout_ms: 60_000, // 60 seconds
max_state_chunk_size: MAX_STATE_CHUNK_SIZE,
max_transaction_chunk_size: MAX_TRANSACTION_CHUNK_SIZE,
max_transaction_output_chunk_size: MAX_TRANSACTION_OUTPUT_CHUNK_SIZE,
Expand Down
3 changes: 2 additions & 1 deletion state-sync/aptos-data-client/src/peer_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ impl PeerStates {
return true;
}

// Check if the peer can service the request
self.peer_to_state
.get(peer)
.and_then(PeerState::storage_summary_if_not_ignored)
.map(|summary| summary.can_service(request))
.map(|summary| summary.can_service(&self.data_client_config, request))
.unwrap_or(false)
}

Expand Down
24 changes: 14 additions & 10 deletions state-sync/aptos-data-client/src/tests/priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@ use crate::{
tests::{mock::MockNetwork, utils},
};
use aptos_config::{
config::{BaseConfig, RoleType},
config::{AptosDataClientConfig, BaseConfig, RoleType},
network_id::NetworkId,
};
use aptos_storage_service_types::{
requests::{
DataRequest, NewTransactionOutputsWithProofRequest, NewTransactionsWithProofRequest,
StorageServiceRequest, TransactionOutputsWithProofRequest,
},
responses::OPTIMISTIC_FETCH_VERSION_LAG,
use aptos_storage_service_types::requests::{
DataRequest, NewTransactionOutputsWithProofRequest, NewTransactionsWithProofRequest,
StorageServiceRequest, TransactionOutputsWithProofRequest,
};
use claims::assert_matches;

Expand Down Expand Up @@ -148,7 +145,14 @@ async fn prioritized_peer_request_selection() {
#[tokio::test]
async fn prioritized_peer_optimistic_fetch_selection() {
::aptos_logger::Logger::init_for_testing();
let (mut mock_network, _, client, _) = MockNetwork::new(None, None, None);

// Create a data client with a max version lag of 100
let max_optimistic_fetch_version_lag = 100;
let data_client_config = AptosDataClientConfig {
max_optimistic_fetch_version_lag,
..Default::default()
};
let (mut mock_network, _, client, _) = MockNetwork::new(None, Some(data_client_config), None);

// Create test data
let known_version = 10000000;
Expand Down Expand Up @@ -209,7 +213,7 @@ async fn prioritized_peer_optimistic_fetch_selection() {
// Update the priority peer to be too far behind and verify it is not selected
client.update_summary(
priority_peer_1,
utils::create_storage_summary(known_version - OPTIMISTIC_FETCH_VERSION_LAG),
utils::create_storage_summary(known_version - max_optimistic_fetch_version_lag),
);
assert_eq!(
client.choose_peer_for_request(&storage_request),
Expand All @@ -219,7 +223,7 @@ async fn prioritized_peer_optimistic_fetch_selection() {
// Update the regular peer to be too far behind and verify neither is selected
client.update_summary(
regular_peer_1,
utils::create_storage_summary(known_version - (OPTIMISTIC_FETCH_VERSION_LAG * 2)),
utils::create_storage_summary(known_version - (max_optimistic_fetch_version_lag * 2)),
);
assert_matches!(
client.choose_peer_for_request(&storage_request),
Expand Down
35 changes: 23 additions & 12 deletions state-sync/storage-service/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use crate::{
};
use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{config::StorageServiceConfig, network_id::PeerNetworkId};
use aptos_config::{
config::{StateSyncConfig, StorageServiceConfig},
network_id::PeerNetworkId,
};
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_network::application::storage::PeersAndMetadata;
Expand Down Expand Up @@ -56,9 +59,9 @@ const CACHED_SUMMARY_UPDATE_CHANNEL_SIZE: usize = 1;
/// service requests from clients.
pub struct StorageServiceServer<T> {
bounded_executor: BoundedExecutor,
config: StorageServiceConfig,
network_requests: StorageServiceNetworkEvents,
storage: T,
storage_service_config: StorageServiceConfig,
time_service: TimeService,

// A cached storage server summary to avoid hitting the DB for every
Expand All @@ -82,35 +85,43 @@ pub struct StorageServiceServer<T> {

impl<T: StorageReaderInterface + Send + Sync> StorageServiceServer<T> {
pub fn new(
config: StorageServiceConfig,
config: StateSyncConfig,
executor: Handle,
storage: T,
time_service: TimeService,
peers_and_metadata: Arc<PeersAndMetadata>,
network_requests: StorageServiceNetworkEvents,
storage_service_listener: StorageServiceNotificationListener,
) -> Self {
let bounded_executor =
BoundedExecutor::new(config.max_concurrent_requests as usize, executor);
// Extract the individual component configs
let aptos_data_client_config = config.aptos_data_client;
let storage_service_config = config.storage_service;

// Create the required components
let bounded_executor = BoundedExecutor::new(
storage_service_config.max_concurrent_requests as usize,
executor,
);
let cached_storage_server_summary =
Arc::new(ArcSwap::from(Arc::new(StorageServerSummary::default())));
let optimistic_fetches = Arc::new(DashMap::new());
let lru_response_cache = Arc::new(Mutex::new(LruCache::new(
config.max_lru_cache_size as usize,
storage_service_config.max_lru_cache_size as usize,
)));
let request_moderator = Arc::new(RequestModerator::new(
aptos_data_client_config,
cached_storage_server_summary.clone(),
peers_and_metadata,
config,
storage_service_config,
time_service.clone(),
));
let storage_service_listener = Some(storage_service_listener);

Self {
config,
bounded_executor,
storage,
network_requests,
storage,
storage_service_config,
time_service,
cached_storage_server_summary,
lru_response_cache,
Expand Down Expand Up @@ -146,7 +157,7 @@ impl<T: StorageReaderInterface + Send + Sync> StorageServiceServer<T> {
) {
// Clone all required components for the task
let cached_storage_server_summary = self.cached_storage_server_summary.clone();
let config = self.config;
let config = self.storage_service_config;
let storage = self.storage.clone();
let time_service = self.time_service.clone();

Expand Down Expand Up @@ -211,7 +222,7 @@ impl<T: StorageReaderInterface + Send + Sync> StorageServiceServer<T> {
// Clone all required components for the task
let bounded_executor = self.bounded_executor.clone();
let cached_storage_server_summary = self.cached_storage_server_summary.clone();
let config = self.config;
let config = self.storage_service_config;
let optimistic_fetches = self.optimistic_fetches.clone();
let lru_response_cache = self.lru_response_cache.clone();
let request_moderator = self.request_moderator.clone();
Expand Down Expand Up @@ -271,7 +282,7 @@ impl<T: StorageReaderInterface + Send + Sync> StorageServiceServer<T> {
/// peer states in the request moderator.
async fn spawn_moderator_peer_refresher(&mut self) {
// Clone all required components for the task
let config = self.config;
let config = self.storage_service_config;
let request_moderator = self.request_moderator.clone();
let time_service = self.time_service.clone();

Expand Down
7 changes: 5 additions & 2 deletions state-sync/storage-service/server/src/moderator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{error::Error, logging::LogEntry, metrics, LogSchema};
use aptos_config::{
config::StorageServiceConfig,
config::{AptosDataClientConfig, StorageServiceConfig},
network_id::{NetworkId, PeerNetworkId},
};
use aptos_infallible::RwLock;
Expand Down Expand Up @@ -104,6 +104,7 @@ impl UnhealthyPeerState {
/// If a peer sends too many invalid requests, the moderator will mark the peer as
/// "unhealthy" and will ignore requests from that peer for some time.
pub struct RequestModerator {
aptos_data_client_config: AptosDataClientConfig,
cached_storage_server_summary: Arc<ArcSwap<StorageServerSummary>>,
peers_and_metadata: Arc<PeersAndMetadata>,
storage_service_config: StorageServiceConfig,
Expand All @@ -113,12 +114,14 @@ pub struct RequestModerator {

impl RequestModerator {
pub fn new(
aptos_data_client_config: AptosDataClientConfig,
cached_storage_server_summary: Arc<ArcSwap<StorageServerSummary>>,
peers_and_metadata: Arc<PeersAndMetadata>,
storage_service_config: StorageServiceConfig,
time_service: TimeService,
) -> Self {
Self {
aptos_data_client_config,
cached_storage_server_summary,
unhealthy_peer_states: Arc::new(RwLock::new(HashMap::new())),
peers_and_metadata,
Expand Down Expand Up @@ -148,7 +151,7 @@ impl RequestModerator {
let storage_server_summary = self.cached_storage_server_summary.load();

// Verify the request is serviceable using the current storage server summary
if !storage_server_summary.can_service(request) {
if !storage_server_summary.can_service(&self.aptos_data_client_config, request) {
// Increment the invalid request count for the peer
let mut unhealthy_peer_states = self.unhealthy_peer_states.write();
let unhealthy_peer_state = unhealthy_peer_states
Expand Down
24 changes: 16 additions & 8 deletions state-sync/storage-service/server/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::{
};
use anyhow::Result;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{config::StorageServiceConfig, network_id::NetworkId};
use aptos_config::{
config::{StateSyncConfig, StorageServiceConfig},
network_id::NetworkId,
};
use aptos_crypto::HashValue;
use aptos_network::{
application::{interface::NetworkServiceEvents, storage::PeersAndMetadata},
Expand Down Expand Up @@ -72,10 +75,14 @@ impl MockClient {
) {
utils::initialize_logger();

// Create the state sync config
let mut state_sync_config = StateSyncConfig::default();
let storage_service_config = storage_config.unwrap_or_default();
state_sync_config.storage_service = storage_service_config;

// Create the storage reader
let storage_config = storage_config.unwrap_or_default();
let storage_reader = StorageReader::new(
storage_config,
storage_service_config,
Arc::new(db_reader.unwrap_or_else(create_mock_db_reader)),
);

Expand All @@ -84,10 +91,11 @@ impl MockClient {
let mut network_and_events = HashMap::new();
let mut peer_manager_notifiers = HashMap::new();
for network_id in network_ids.clone() {
let queue_cfg =
aptos_channel::Config::new(storage_config.max_network_channel_size as usize)
.queue_style(QueueStyle::FIFO)
.counters(&metrics::PENDING_STORAGE_SERVER_NETWORK_EVENTS);
let queue_cfg = aptos_channel::Config::new(
storage_service_config.max_network_channel_size as usize,
)
.queue_style(QueueStyle::FIFO)
.counters(&metrics::PENDING_STORAGE_SERVER_NETWORK_EVENTS);
let (peer_manager_notifier, peer_manager_notification_receiver) = queue_cfg.build();
let (_, connection_notification_receiver) = queue_cfg.build();

Expand All @@ -111,7 +119,7 @@ impl MockClient {
let executor = tokio::runtime::Handle::current();
let mock_time_service = TimeService::mock();
let storage_server = StorageServiceServer::new(
storage_config,
state_sync_config,
executor,
storage_reader,
mock_time_service.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use crate::{
tests::{mock, utils},
};
use aptos_bounded_executor::BoundedExecutor;
use aptos_config::{config::StorageServiceConfig, network_id::PeerNetworkId};
use aptos_config::{
config::{AptosDataClientConfig, StorageServiceConfig},
network_id::PeerNetworkId,
};
use aptos_infallible::Mutex;
use aptos_storage_service_types::{
requests::{
Expand Down Expand Up @@ -69,6 +72,7 @@ async fn test_peers_with_ready_optimistic_fetches() {
Arc::new(ArcSwap::from(Arc::new(StorageServerSummary::default())));
let lru_response_cache = Arc::new(Mutex::new(LruCache::new(0)));
let request_moderator = Arc::new(RequestModerator::new(
AptosDataClientConfig::default(),
cached_storage_server_summary.clone(),
mock::create_peers_and_metadata(vec![]),
storage_service_config,
Expand Down Expand Up @@ -161,6 +165,7 @@ async fn test_remove_expired_optimistic_fetches() {
Arc::new(ArcSwap::from(Arc::new(StorageServerSummary::default())));
let lru_response_cache = Arc::new(Mutex::new(LruCache::new(0)));
let request_moderator = Arc::new(RequestModerator::new(
AptosDataClientConfig::default(),
cached_storage_server_summary.clone(),
mock::create_peers_and_metadata(vec![]),
storage_service_config,
Expand Down
Loading

0 comments on commit b73b414

Please sign in to comment.