From 0b5eb20243a8850b5a2e670e10869a894dba5d50 Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Tue, 2 Feb 2021 16:22:19 +0100 Subject: [PATCH] Add a deduplication utility for raw buffers --- Makefile.am | 1 + src/mavlink-router/dedup.cpp | 77 ++++++++++++++++++++++++++++++++++++ src/mavlink-router/dedup.h | 40 +++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 src/mavlink-router/dedup.cpp create mode 100644 src/mavlink-router/dedup.h diff --git a/Makefile.am b/Makefile.am index 47fdd92b..9a478945 100644 --- a/Makefile.am +++ b/Makefile.am @@ -139,6 +139,7 @@ mavlink_routerd_SOURCES = \ src/common/conf_file.cpp \ src/common/conf_file.h \ src/common/dbg.h \ + src/mavlink-router/dedup.cpp \ src/common/mavlink.h \ src/mavlink-router/endpoint.cpp \ src/mavlink-router/endpoint.h \ diff --git a/src/mavlink-router/dedup.cpp b/src/mavlink-router/dedup.cpp new file mode 100644 index 00000000..d60323c5 --- /dev/null +++ b/src/mavlink-router/dedup.cpp @@ -0,0 +1,77 @@ +/* + * This file is part of the MAVLink Router project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "dedup.h" + +#include +#include +#include +#include + +class DedupImpl { +public: + + bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms) + { + bool already_in_buffer = false; + using namespace std::chrono; + uint64_t timestamp = duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + // pop data from front queue, delete corresponding data from multiset + while (_time_hash_queue.size() > 0 && _time_hash_queue.front().first > timestamp + dedup_period_ms) { + uint64_t hash_to_delete = _time_hash_queue.front().second; + _multiset.erase(_multiset.find(hash_to_delete)); // NOTE: don't call erase on key, it will delete all + _time_hash_queue.pop(); + } + + // hash buffer + // TODO: with C++17 use a string_view instead, or use a custom hash function + _hash_buffer.assign((const char*)buffer, (uint64_t)size); + uint64_t hash = std::hash{}(_hash_buffer); + + if (_multiset.find(hash) != _multiset.end()) { + already_in_buffer = true; + } + + // add hash and timestamp to back of queue, and add another copy of hash to multiset + _multiset.insert(hash); + _time_hash_queue.emplace(timestamp, hash); + + return already_in_buffer; + } + + + std::queue> _time_hash_queue; + std::unordered_multiset _multiset; + std::string _hash_buffer; +}; + +Dedup::Dedup(uint32_t dedup_period_ms) : _dedup_period_ms(dedup_period_ms), _impl(new DedupImpl()) +{ +} + +Dedup::~Dedup() +{ +} + + +Dedup::PacketStatus Dedup::add_check_packet(const uint8_t* buffer, uint32_t size) +{ + if (_impl->add_check_packet(buffer, size, _dedup_period_ms)) { + return PacketStatus::NEW_PACKET_OR_TIMED_OUT; + } + return PacketStatus::ALREADY_EXISTS_IN_BUFFER; +} diff --git a/src/mavlink-router/dedup.h b/src/mavlink-router/dedup.h new file mode 100644 index 00000000..5dd5c8c5 --- /dev/null +++ b/src/mavlink-router/dedup.h @@ -0,0 +1,40 @@ +/* + * This file is part of the MAVLink Router project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +#pragma once + +#include + +class DedupImpl; +class Dedup { +public: + + enum class PacketStatus { + NEW_PACKET_OR_TIMED_OUT, + ALREADY_EXISTS_IN_BUFFER + }; + + Dedup(uint32_t dedup_period_ms); + ~Dedup(); + + PacketStatus add_check_packet(const uint8_t* buffer, uint32_t size); +private: + + uint32_t _dedup_period_ms; + std::unique_ptr _impl; +};