Skip to content

Commit

Permalink
Enhance on keep alive
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 718908756
  • Loading branch information
ggli-google authored and copybara-github committed Jan 24, 2025
1 parent 639581f commit 03b74b5
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 216 deletions.
2 changes: 2 additions & 0 deletions connections/implementation/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ cc_library(
deps = [
":internal",
"//connections:core_types",
"//connections/implementation/analytics",
"//connections/implementation/flags:connections_flags",
"//connections/v3:v3_types",
"//internal/flags:nearby_flags",
Expand All @@ -188,6 +189,7 @@ cc_library(
"@com_google_absl//absl/functional:any_invocable",
"@com_google_absl//absl/functional:bind_front",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_for_library_testonly",
],
)
Expand Down
5 changes: 5 additions & 0 deletions connections/implementation/base_endpoint_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ void BaseEndpointChannel::CloseIo() {
}
}

uint32_t BaseEndpointChannel::GetNextKeepAliveSeqNo() const {
MutexLock lock(&keep_alive_mutex_);
return next_keep_alive_seq_no_++;
}

void BaseEndpointChannel::SetAnalyticsRecorder(
analytics::AnalyticsRecorder* analytics_recorder,
const std::string& endpoint_id) {
Expand Down
6 changes: 6 additions & 0 deletions connections/implementation/base_endpoint_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef CORE_INTERNAL_BASE_ENDPOINT_CHANNEL_H_
#define CORE_INTERNAL_BASE_ENDPOINT_CHANNEL_H_

#include <cstdint>
#include <memory>
#include <string>

Expand Down Expand Up @@ -84,6 +85,7 @@ class BaseEndpointChannel : public EndpointChannel {
ABSL_LOCKS_EXCLUDED(last_read_mutex_) override;
absl::Time GetLastWriteTimestamp() const
ABSL_LOCKS_EXCLUDED(last_write_mutex_) override;
uint32_t GetNextKeepAliveSeqNo() const override;
void SetAnalyticsRecorder(analytics::AnalyticsRecorder* analytics_recorder,
const std::string& endpoint_id) override;

Expand Down Expand Up @@ -117,6 +119,10 @@ class BaseEndpointChannel : public EndpointChannel {
absl::Time last_write_timestamp_ ABSL_GUARDED_BY(last_write_mutex_) =
absl::InfinitePast();

mutable Mutex keep_alive_mutex_;
mutable uint32_t next_keep_alive_seq_no_ ABSL_GUARDED_BY(keep_alive_mutex_) =
0;

const std::string service_id_;
const std::string channel_name_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "connections/implementation/connections_authentication_transport.h"

#include <cstdint>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -75,6 +76,7 @@ class MockEndpointChannel : public EndpointChannel {
MOCK_METHOD(void, Resume, (), (override));
MOCK_METHOD(absl::Time, GetLastReadTimestamp, (), (const, override));
MOCK_METHOD(absl::Time, GetLastWriteTimestamp, (), (const, override));
MOCK_METHOD(uint32_t, GetNextKeepAliveSeqNo, (), (const, override));
MOCK_METHOD(void, SetAnalyticsRecorder,
(analytics::AnalyticsRecorder*, const std::string&), (override));

Expand Down
5 changes: 5 additions & 0 deletions connections/implementation/encryption_runner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "connections/implementation/encryption_runner.h"

#include <cstddef>
#include <cstdint>
#include <string>

#include "gtest/gtest.h"
Expand Down Expand Up @@ -103,6 +104,9 @@ class FakeEndpointChannel : public EndpointChannel {
void Resume() override {}
absl::Time GetLastReadTimestamp() const override { return read_timestamp_; }
absl::Time GetLastWriteTimestamp() const override { return write_timestamp_; }
uint32_t GetNextKeepAliveSeqNo() const override {
return next_keep_alive_seq_no_++;
}
void SetAnalyticsRecorder(analytics::AnalyticsRecorder* analytics_recorder,
const std::string& endpoint_id) override {}

Expand All @@ -111,6 +115,7 @@ class FakeEndpointChannel : public EndpointChannel {
OutputStream* out_ = nullptr;
absl::Time read_timestamp_ = absl::InfinitePast();
absl::Time write_timestamp_ = absl::InfinitePast();
mutable uint32_t next_keep_alive_seq_no_ = 0;
};

struct User {
Expand Down
8 changes: 7 additions & 1 deletion connections/implementation/endpoint_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
#ifndef CORE_INTERNAL_ENDPOINT_CHANNEL_H_
#define CORE_INTERNAL_ENDPOINT_CHANNEL_H_

#include <cstdint>
#include <memory>
#include <string>

#include "securegcm/d2d_connection_context_v1.h"
#include "absl/time/time.h"
#include "connections/implementation/analytics/analytics_recorder.h"
#include "connections/implementation/analytics/packet_meta_data.h"
#include "internal/platform/byte_array.h"
Expand Down Expand Up @@ -125,13 +128,16 @@ class EndpointChannel {
// writes have occurred.
virtual absl::Time GetLastWriteTimestamp() const = 0;

// Returns the next sequence number to be used for a KeepAlive frame.
virtual uint32_t GetNextKeepAliveSeqNo() const = 0;

// Sets the AnalyticsRecorder instance for analytics.
virtual void SetAnalyticsRecorder(
analytics::AnalyticsRecorder* analytics_recorder,
const std::string& endpoint_id) = 0;

// Enables the multiplex socket on the EndpointChannel.
virtual bool EnableMultiplexSocket() {return false;}
virtual bool EnableMultiplexSocket() { return false; }
};

inline bool operator==(const EndpointChannel& lhs, const EndpointChannel& rhs) {
Expand Down
Loading

0 comments on commit 03b74b5

Please sign in to comment.