Skip to content

Commit

Permalink
Catchain improvements (ton-blockchain#698)
Browse files Browse the repository at this point in the history
* Fix "sgn" in fift; fix marking infinite loops as noterurn in func

* TON-P1-1: Remove unused catchain queries

* TON-P1-15: Avoid synchronization with self

* TON-P1-3, TON-P1-17: Disallow more than one candidate per src per round (to prevent flood), add checks to process_broadcast

* TON-P1-10: Fix fast/slow attempts

* TON-P1-14: Add named constants

* TON-P1-18, TON-P1-19: Alloc temporary memory in the same way as persistent memory

* TON-P1-20: Add comment to choose_blocks_to_approve

* TON-P1-16: Avoid creating two catchain blocks on need_new_block

* TON-P1-8: Add some validation to validator-engine parameters

* TON-P1-6: Don't allow sending the same block many times

Many requests for the same block are not unusual (however, there's no need to answer them all)

* TON-P1-2: Enable prohibiting dependencies from blamed nodes (2.7.5 in CatChain doc), fix processing blame proofs

* Best practices

bp-6: Fix incorrect warning
bp-7: Remove unused code
bp-8: Bring back PerfWarningTimer logging (only when no callback)
bp-9: Remove unnecessary condition
bp-11: Remove commented-out code
bp-13: Divide code in validator-session-state
Adherence to Specification: Fix typo
  • Loading branch information
SpyCheese authored May 10, 2023
1 parent 7878578 commit 5abfe23
Show file tree
Hide file tree
Showing 27 changed files with 1,410 additions and 1,296 deletions.
1 change: 1 addition & 0 deletions catchain/catchain-receiver-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class CatChainReceiverInterface : public td::actor::Actor {
td::BufferSlice query, td::uint64 max_answer_size,
td::actor::ActorId<adnl::AdnlSenderInterface> via) = 0;
virtual void send_custom_message_data(const PublicKeyHash &dst, td::BufferSlice query) = 0;
virtual void on_blame_processed(td::uint32 source_id) = 0;

virtual void destroy() = 0;

Expand Down
27 changes: 18 additions & 9 deletions catchain/catchain-receiver-source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ td::Result<std::unique_ptr<CatChainReceiverSource>> CatChainReceiverSource::crea

void CatChainReceiverSourceImpl::blame(td::uint32 fork, CatChainBlockHeight height) {
blame();
if (!blamed_heights_.empty()) {
if (blamed_heights_.size() <= fork) {
blamed_heights_.resize(fork + 1, 0);
}
if (blamed_heights_[fork] == 0 || blamed_heights_[fork] > height) {
VLOG(CATCHAIN_INFO) << this << ": blamed at " << fork << " " << height;
blamed_heights_[fork] = height;
}
// if (!blamed_heights_.empty()) {
if (blamed_heights_.size() <= fork) {
blamed_heights_.resize(fork + 1, 0);
}
if (blamed_heights_[fork] == 0 || blamed_heights_[fork] > height) {
VLOG(CATCHAIN_INFO) << this << ": blamed at " << fork << " " << height;
blamed_heights_[fork] = height;
}
// }
}

void CatChainReceiverSourceImpl::blame() {
Expand Down Expand Up @@ -144,7 +144,7 @@ void CatChainReceiverSourceImpl::on_new_block(CatChainReceivedBlock *block) {
on_found_fork_proof(create_serialize_tl_object<ton_api::catchain_block_data_fork>(block->export_tl_dep(),
it->second->export_tl_dep())
.as_slice());
chain_->add_prepared_event(fork_proof());
chain_->on_found_fork_proof(id_, fork_proof());
}
blame();
return;
Expand All @@ -162,6 +162,15 @@ void CatChainReceiverSourceImpl::on_found_fork_proof(const td::Slice &proof) {
}
}

bool CatChainReceiverSourceImpl::allow_send_block(CatChainBlockHash hash) {
td::uint32 count = ++block_requests_count_[hash];
if (count > MAX_BLOCK_REQUESTS) {
VLOG(CATCHAIN_INFO) << this << ": node requested block " << hash << " " << count << " times";
return false;
}
return true;
}

} // namespace catchain

} // namespace ton
3 changes: 3 additions & 0 deletions catchain/catchain-receiver-source.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class CatChainReceiverSource {
virtual td::BufferSlice fork_proof() const = 0;
virtual bool fork_is_found() const = 0;

// One block can be sent to one node only a limited number of times to prevent DoS
virtual bool allow_send_block(CatChainBlockHash hash) = 0;

static td::Result<std::unique_ptr<CatChainReceiverSource>> create(CatChainReceiver *chain, PublicKey pub_key,
adnl::AdnlNodeIdShort adnl_id, td::uint32 id);

Expand Down
7 changes: 7 additions & 0 deletions catchain/catchain-receiver-source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class CatChainReceiverSourceImpl : public CatChainReceiverSource {
return chain_;
}

bool allow_send_block(CatChainBlockHash hash) override;

CatChainReceiverSourceImpl(CatChainReceiver *chain, PublicKey source, adnl::AdnlNodeIdShort adnl_id, td::uint32 id);

private:
Expand All @@ -130,6 +132,11 @@ class CatChainReceiverSourceImpl : public CatChainReceiverSource {

CatChainBlockHeight delivered_height_ = 0;
CatChainBlockHeight received_height_ = 0;

std::map<CatChainBlockHash, td::uint32> block_requests_count_;
// One block can be sent to one node up to 5 times

static const td::uint32 MAX_BLOCK_REQUESTS = 5;
};

} // namespace catchain
Expand Down
154 changes: 51 additions & 103 deletions catchain/catchain-receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ void CatChainReceiverImpl::receive_message_from_overlay(adnl::AdnlNodeIdShort sr
return;
}

/*auto S = get_source_by_hash(src);
CHECK(S != nullptr);
if (S->blamed()) {
VLOG(CATCHAIN_INFO) << this << ": dropping block update from blamed source " << src;
return;
}*/
if (data.size() > opts_.max_serialized_block_size) {
VLOG(CATCHAIN_WARNING) << this << ": dropping broken block from " << src << ": too big (size="
<< data.size() << ", limit=" << opts_.max_serialized_block_size << ")";
Expand All @@ -197,19 +190,6 @@ void CatChainReceiverImpl::receive_broadcast_from_overlay(const PublicKeyHash &s
callback_->on_broadcast(src, std::move(data));
}

/*void CatChainReceiverImpl::send_block(const PublicKeyHash &src, tl_object_ptr<ton_api::catchain_block> block,
td::BufferSlice payload) {
CHECK(read_db_);
CHECK(src == local_id_);
validate_block_sync(block, payload.as_slice()).ensure();
auto B = create_block(std::move(block), td::SharedSlice{payload.as_slice()});
CHECK(B != nullptr);
run_scheduler();
CHECK(B->delivered());
}*/

CatChainReceivedBlock *CatChainReceiverImpl::create_block(tl_object_ptr<ton_api::catchain_block> block,
td::SharedSlice payload) {
if (block->height_ == 0) {
Expand Down Expand Up @@ -267,21 +247,16 @@ td::Status CatChainReceiverImpl::validate_block_sync(const tl_object_ptr<ton_api

td::Status CatChainReceiverImpl::validate_block_sync(const tl_object_ptr<ton_api::catchain_block> &block,
const td::Slice &payload) const {
//LOG(INFO) << ton_api::to_string(block);
TRY_STATUS_PREFIX(CatChainReceivedBlock::pre_validate_block(this, block, payload), "failed to validate block: ");
// After pre_validate_block, block->height_ > 0
auto id = CatChainReceivedBlock::block_id(this, block, payload);
td::BufferSlice B = serialize_tl_object(id, true);

if (block->height_ > 0) {
auto id = CatChainReceivedBlock::block_id(this, block, payload);
td::BufferSlice B = serialize_tl_object(id, true);

CatChainReceiverSource *S = get_source_by_hash(PublicKeyHash{id->src_});
CHECK(S != nullptr);
Encryptor *E = S->get_encryptor_sync();
CHECK(E != nullptr);
return E->check_signature(B.as_slice(), block->signature_.as_slice());
} else {
return td::Status::OK();
}
CatChainReceiverSource *S = get_source_by_hash(PublicKeyHash{id->src_});
CHECK(S != nullptr);
Encryptor *E = S->get_encryptor_sync();
CHECK(E != nullptr);
return E->check_signature(B.as_slice(), block->signature_.as_slice());
}

void CatChainReceiverImpl::run_scheduler() {
Expand Down Expand Up @@ -515,7 +490,6 @@ CatChainReceiverImpl::CatChainReceiverImpl(std::unique_ptr<Callback> callback,
}
CHECK(local_idx_ != static_cast<td::uint32>(ids.size()));

//std::sort(short_ids.begin(), short_ids.end());
auto F = create_tl_object<ton_api::catchain_firstblock>(unique_hash, std::move(short_ids));

overlay_full_id_ = overlay::OverlayIdFull{serialize_tl_object(F, true)};
Expand All @@ -527,6 +501,8 @@ CatChainReceiverImpl::CatChainReceiverImpl(std::unique_ptr<Callback> callback,
blocks_[root_block_->get_hash()] = std::move(R);
last_sent_block_ = root_block_;

blame_processed_.resize(sources_.size(), false);

choose_neighbours();
}

Expand Down Expand Up @@ -704,7 +680,6 @@ void CatChainReceiverImpl::receive_query_from_overlay(adnl::AdnlNodeIdShort src,
auto F = fetch_tl_object<ton_api::Function>(data.clone(), true);
if (F.is_error()) {
callback_->on_custom_query(get_source_by_adnl_id(src)->get_hash(), std::move(data), std::move(promise));
//LOG(WARNING) << this << ": unknown query from " << src;
return;
}
auto f = F.move_as_ok();
Expand All @@ -717,68 +692,15 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
if (it == blocks_.end() || it->second->get_height() == 0 || !it->second->initialized()) {
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_blockNotFound>(), true));
} else {
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_blockResult>(it->second->export_tl()),
true, it->second->get_payload().as_slice()));
}
}

void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlocks query,
td::Promise<td::BufferSlice> promise) {
if (query.blocks_.size() > MAX_QUERY_BLOCKS) {
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "too many blocks"));
return;
}
td::int32 cnt = 0;
for (const CatChainBlockHash &b : query.blocks_) {
auto it = blocks_.find(b);
if (it != blocks_.end() && it->second->get_height() > 0) {
auto block = create_tl_object<ton_api::catchain_blockUpdate>(it->second->export_tl());
CHECK(!it->second->get_payload().empty());
td::BufferSlice B = serialize_tl_object(block, true, it->second->get_payload().clone());
CHECK(B.size() <= opts_.max_serialized_block_size);
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, src,
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(B));
cnt++;
CatChainReceiverSource *S = get_source_by_adnl_id(src);
CHECK(S != nullptr);
if (S->allow_send_block(it->second->get_hash())) {
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_blockResult>(it->second->export_tl()),
true, it->second->get_payload().as_slice()));
} else {
promise.set_error(td::Status::Error("block was requested too many times"));
}
}
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_sent>(cnt), true));
}

void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlockHistory query,
td::Promise<td::BufferSlice> promise) {
int64_t h = query.height_;
if (h <= 0) {
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "not-positive height"));
return;
}
if (h > MAX_QUERY_HEIGHT) {
h = MAX_QUERY_HEIGHT;
}
std::set<CatChainBlockHash> s{query.stop_if_.begin(), query.stop_if_.end()};

CatChainReceivedBlock *B = get_block(query.block_);
if (B == nullptr) {
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_sent>(0), true));
return;
}
if (static_cast<CatChainBlockHeight>(h) > B->get_height()) {
h = B->get_height();
}
td::uint32 cnt = 0;
while (h-- > 0) {
if (s.find(B->get_hash()) != s.end()) {
break;
}
auto block = create_tl_object<ton_api::catchain_blockUpdate>(B->export_tl());
CHECK(!B->get_payload().empty());
td::BufferSlice BB = serialize_tl_object(block, true, B->get_payload().as_slice());
CHECK(BB.size() <= opts_.max_serialized_block_size);
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, src,
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(BB));
B = B->get_prev();
cnt++;
}
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_sent>(cnt), true));
}

void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getDifference query,
Expand Down Expand Up @@ -832,19 +754,23 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
}
}
CHECK(right > 0);
CatChainReceiverSource *S0 = get_source_by_adnl_id(src);
CHECK(S0 != nullptr);
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
if (vt[i] >= 0 && my_vt[i] > vt[i]) {
CatChainReceiverSource *S = get_source(i);
td::int32 t = (my_vt[i] - vt[i] > right) ? right : (my_vt[i] - vt[i]);
while (t-- > 0) {
CatChainReceivedBlock *M = S->get_block(++vt[i]);
CHECK(M != nullptr);
auto block = create_tl_object<ton_api::catchain_blockUpdate>(M->export_tl());
CHECK(!M->get_payload().empty());
td::BufferSlice BB = serialize_tl_object(block, true, M->get_payload().as_slice());
CHECK(BB.size() <= opts_.max_serialized_block_size);
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, src,
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(BB));
if (S0->allow_send_block(M->get_hash())) {
auto block = create_tl_object<ton_api::catchain_blockUpdate>(M->export_tl());
CHECK(!M->get_payload().empty());
td::BufferSlice BB = serialize_tl_object(block, true, M->get_payload().as_slice());
CHECK(BB.size() <= opts_.max_serialized_block_size);
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, src,
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(BB));
}
}
}
}
Expand Down Expand Up @@ -1031,10 +957,15 @@ void CatChainReceiverImpl::written_unsafe_root_block(CatChainReceivedBlock *bloc

void CatChainReceiverImpl::alarm() {
alarm_timestamp() = td::Timestamp::never();
if (next_sync_ && next_sync_.is_in_past()) {
if (next_sync_ && next_sync_.is_in_past() && get_sources_cnt() > 1) {
next_sync_ = td::Timestamp::in(td::Random::fast(SYNC_INTERVAL_MIN, SYNC_INTERVAL_MAX));
for (unsigned i = 0; i < SYNC_ITERATIONS; i++) {
CatChainReceiverSource *S = get_source(td::Random::fast(0, static_cast<td::int32>(get_sources_cnt()) - 1));
auto idx = td::Random::fast(1, static_cast<td::int32>(get_sources_cnt()) - 1);
if (idx == static_cast<td::int32>(local_idx_)) {
idx = 0;
}
// idx is a random number in [0, get_sources_cnt-1] not equal to local_idx
CatChainReceiverSource *S = get_source(idx);
CHECK(S != nullptr);
if (!S->blamed()) {
synchronize_with(S);
Expand Down Expand Up @@ -1117,6 +1048,23 @@ static void destroy_db(const std::string& name, td::uint32 attempt) {
}
}

void CatChainReceiverImpl::on_found_fork_proof(td::uint32 source_id, td::BufferSlice data) {
if (blame_processed_[source_id]) {
add_block(std::move(data), std::vector<CatChainBlockHash>());
} else {
pending_fork_proofs_[source_id] = std::move(data);
}
}

void CatChainReceiverImpl::on_blame_processed(td::uint32 source_id) {
blame_processed_[source_id] = true;
auto it = pending_fork_proofs_.find(source_id);
if (it != pending_fork_proofs_.end()) {
add_block(std::move(it->second), std::vector<CatChainBlockHash>());
pending_fork_proofs_.erase(it);
}
}

void CatChainReceiverImpl::destroy() {
auto name = db_root_ + "/catchainreceiver" + db_suffix_ + td::base64url_encode(as_slice(incarnation_));
delay_action([name]() { destroy_db(name, 0); }, td::Timestamp::in(DESTROY_DB_DELAY));
Expand Down
2 changes: 1 addition & 1 deletion catchain/catchain-receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CatChainReceiver : public CatChainReceiverInterface {
virtual void run_block(CatChainReceivedBlock *block) = 0;
virtual void deliver_block(CatChainReceivedBlock *block) = 0;
virtual td::uint32 add_fork() = 0;
virtual void add_prepared_event(td::BufferSlice data) = 0;
virtual void on_found_fork_proof(td::uint32 source_id, td::BufferSlice data) = 0;
virtual void on_blame(td::uint32 source_id) = 0;

virtual const CatChainOptions &opts() const = 0;
Expand Down
17 changes: 5 additions & 12 deletions catchain/catchain-receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ class CatChainReceiverImpl final : public CatChainReceiver {
return PrintId{incarnation_, local_id_};
}

void add_prepared_event(td::BufferSlice data) override {
add_block(std::move(data), std::vector<CatChainBlockHash>());
}
CatChainSessionId get_incarnation() const override {
return incarnation_;
}
Expand Down Expand Up @@ -73,23 +70,17 @@ class CatChainReceiverImpl final : public CatChainReceiver {
void receive_query_from_overlay(adnl::AdnlNodeIdShort src, td::BufferSlice data,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlock query, td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlocks query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlockHistory query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getDifference query,
td::Promise<td::BufferSlice> promise);
template <class T>
void process_query(adnl::AdnlNodeIdShort src, const T &query, td::Promise<td::BufferSlice> promise) {
//LOG(WARNING) << this << ": unknown query from " << src;
callback_->on_custom_query(get_source_by_adnl_id(src)->get_hash(), serialize_tl_object(&query, true),
std::move(promise));
}
void receive_broadcast_from_overlay(const PublicKeyHash &src, td::BufferSlice data);

void receive_block(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);
void receive_block_answer(adnl::AdnlNodeIdShort src, td::BufferSlice);
//void send_block(const PublicKeyHash &src, tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);

CatChainReceivedBlock *create_block(tl_object_ptr<ton_api::catchain_block> block, td::SharedSlice payload) override;
CatChainReceivedBlock *create_block(tl_object_ptr<ton_api::catchain_block_dep> block) override;
Expand Down Expand Up @@ -117,6 +108,8 @@ class CatChainReceiverImpl final : public CatChainReceiver {
void on_blame(td::uint32 src) override {
callback_->blame(src);
}
void on_found_fork_proof(td::uint32 source_id, td::BufferSlice data) override;
void on_blame_processed(td::uint32 source_id) override;
const CatChainOptions &opts() const override {
return opts_;
}
Expand Down Expand Up @@ -204,9 +197,6 @@ class CatChainReceiverImpl final : public CatChainReceiver {

std::vector<td::uint32> neighbours_;

//std::queue<tl_object_ptr<ton_api::catchain_block_inner_Data>> events_;
//std::queue<td::BufferSlice> raw_events_;

td::actor::ActorId<keyring::Keyring> keyring_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<overlay::Overlays> overlay_manager_;
Expand All @@ -231,6 +221,9 @@ class CatChainReceiverImpl final : public CatChainReceiver {
bool started_{false};

std::list<CatChainReceivedBlock *> to_run_;

std::vector<bool> blame_processed_;
std::map<td::uint32, td::BufferSlice> pending_fork_proofs_;
};

} // namespace catchain
Expand Down
Loading

0 comments on commit 5abfe23

Please sign in to comment.