Skip to content

Commit

Permalink
Add a deduplication utility for raw buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
Julian Kent committed Feb 3, 2021
1 parent 5c19097 commit 8fb83ee
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ mavlink_routerd_SOURCES = \
src/common/conf_file.cpp \
src/common/conf_file.h \
src/common/dbg.h \
src/mavlink-router/dedup.cpp \
src/common/mavlink.h \
src/mavlink-router/endpoint.cpp \
src/mavlink-router/endpoint.h \
Expand Down
80 changes: 80 additions & 0 deletions src/mavlink-router/dedup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* This file is part of the MAVLink Router project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


#include "dedup.h"

#include <chrono>
#include <string>
#include <queue>
#include <unordered_set>

class DedupImpl {
using hash_t = uint64_t;
using time_t = uint64_t;

public:

bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms)
{
bool already_in_buffer = false;
using namespace std::chrono;
time_t timestamp = duration_cast<milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
// pop data from front queue, delete corresponding data from multiset
while (_time_hash_queue.size() > 0 && _time_hash_queue.front().first > timestamp + dedup_period_ms) {
hash_t hash_to_delete = _time_hash_queue.front().second;
_multiset.erase(_multiset.find(hash_to_delete)); // NOTE: don't call erase on key, it will delete all
_time_hash_queue.pop();
}

// hash buffer
// TODO: with C++17 use a string_view instead, or use a custom hash function
_hash_buffer.assign((const char*)buffer, (uint64_t)size);
hash_t hash = std::hash<std::string>{}(_hash_buffer);

if (_multiset.find(hash) != _multiset.end()) {
already_in_buffer = true;
}

// add hash and timestamp to back of queue, and add another copy of hash to multiset
_multiset.insert(hash);
_time_hash_queue.emplace(timestamp, hash);

return !already_in_buffer;
}

private:
std::queue<std::pair<time_t, hash_t>> _time_hash_queue;
std::unordered_multiset<hash_t> _multiset;
std::string _hash_buffer;
};

Dedup::Dedup(uint32_t dedup_period_ms) : _dedup_period_ms(dedup_period_ms), _impl(new DedupImpl())
{
}

Dedup::~Dedup()
{
}


Dedup::PacketStatus Dedup::add_check_packet(const uint8_t* buffer, uint32_t size)
{
if (_impl->add_check_packet(buffer, size, _dedup_period_ms)) {
return PacketStatus::NEW_PACKET_OR_TIMED_OUT;
}
return PacketStatus::ALREADY_EXISTS_IN_BUFFER;
}
40 changes: 40 additions & 0 deletions src/mavlink-router/dedup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* This file is part of the MAVLink Router project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



#pragma once

#include <memory>

class DedupImpl;
class Dedup {
public:

enum class PacketStatus {
NEW_PACKET_OR_TIMED_OUT,
ALREADY_EXISTS_IN_BUFFER
};

Dedup(uint32_t dedup_period_ms);
~Dedup();

PacketStatus add_check_packet(const uint8_t* buffer, uint32_t size);
private:

uint32_t _dedup_period_ms;
std::unique_ptr<DedupImpl> _impl;
};

0 comments on commit 8fb83ee

Please sign in to comment.