Skip to content

Commit

Permalink
Added RTT network tables protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Gold872 committed Dec 11, 2023
1 parent 490c162 commit d38fe3f
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 56 deletions.
183 changes: 128 additions & 55 deletions lib/services/nt4.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Written by Michael Jansen from Team 3015, Ranger Robotics
// Additional inspiration taken from Jonah from Team 6328, Mechanical Advantage

import 'dart:async';
import 'dart:convert';
Expand All @@ -13,8 +14,11 @@ import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:elastic_dashboard/services/log.dart';

class NT4Client {
// TODO: Change to 1000 for 2024 NT4.1 updates
static int pingTimeoutMs = 5000;
static const int _pingIntervalMsV40 = 1000;
static const int _pingIntervalMsV41 = 200;

static const int _pingTimeoutMsV40 = 5000;
static const int _pingTimeoutMsV41 = 1000;

String serverBaseAddress;
final VoidCallback? onConnect;
Expand All @@ -29,18 +33,24 @@ class NT4Client {
final Map<String, NT4Topic> _clientPublishedTopics = {};
final Map<int, NT4Topic> announcedTopics = {};
int _clientId = 0;
bool _serverConnectionActive = false;
int _serverTimeOffsetUS = 0;
int _latencyMs = 0;

WebSocketChannel? _mainWebsocket;
// TODO: Uncomment for 2024 NT4.1 updates
// WebSocketChannel? _rttWebsocket;
WebSocketChannel? _rttWebsocket;

Timer? _pingTimer;
Timer? _pongTimer;

int _pingInterval = _pingIntervalMsV40;
int _timeoutInterval = _pingTimeoutMsV40;

Timer? pingTimer;
Timer? pongTimer;
bool _serverConnectionActive = false;
bool _rttConnectionActive = false;

bool _useRTT = false;

int lastPongTime = 0;
int _lastPongTime = 0;

Map<int, NT4Subscription> get subscriptions => _subscriptions;
Set<NT4Subscription> get subscribedTopics => _subscribedTopics;
Expand Down Expand Up @@ -268,8 +278,11 @@ class NT4Client {
var rawData =
serialize([timeTopic.pubUID, 0, timeTopic.getTypeId(), timeToSend]);

// TODO: Change this to _rttWebsocket for 2024 NT4.1 updates
_mainWebsocket?.sink.add(rawData);
if (_useRTT) {
_rttWebsocket?.sink.add(rawData);
} else {
_mainWebsocket?.sink.add(rawData);
}
}
}

Expand All @@ -280,7 +293,7 @@ class NT4Client {
int serverTimeAtRx = (serverTimestamp - rtt / 2.0).round();
_serverTimeOffsetUS = serverTimeAtRx - rxTime;

lastPongTime = rxTime;
_lastPongTime = rxTime;

_latencyMs = (rtt / 2) ~/ 1000;
}
Expand Down Expand Up @@ -327,32 +340,41 @@ class NT4Client {

String mainServerAddr = 'ws://$serverBaseAddress:5810/nt/elastic';

// TODO: Uncomment for 2024 NT4.1 updates
// String rttServerAddr = 'ws://$serverBaseAddress:5810/nt/elastic';

_mainWebsocket = WebSocketChannel.connect(Uri.parse(mainServerAddr),
protocols: ['networktables.first.wpi.edu']);

// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket = WebSocketChannel.connect(Uri.parse(rttServerAddr),
// protocols: ['rtt.networktables.first.wpi.edu']);
_mainWebsocket =
WebSocketChannel.connect(Uri.parse(mainServerAddr), protocols: [
'networktables.first.wpi.edu',
'v4.1.networktables.first.wpi.edu',
]);

try {
await _mainWebsocket!.ready;
// TODO: Uncomment for 2024 NT4.1 updates
// await _rttWebsocket!.ready;
} catch (e) {
// Failed to connect... try again
Future.delayed(const Duration(seconds: 1), wsConnect);
return;
}

pingTimer ??= Timer.periodic(const Duration(milliseconds: 200), (timer) {
_rttSendTimestamp();
});
// TODO: Uncomment for 2024 NT4.1 updates
// pongTimer ??=
// Timer.periodic(const Duration(milliseconds: 1000), _checkPingStatus);
if (!mainServerAddr.contains(serverBaseAddress)) {
return;
}

if (_pingTimer != null) {
_pingTimer!.cancel();
}
if (_pongTimer != null) {
_pongTimer!.cancel();
}

if (_mainWebsocket!.protocol == 'v4.1.networktables.first.wpi.edu') {
_useRTT = true;
_pingInterval = _pingIntervalMsV41;
_timeoutInterval = _pingTimeoutMsV41;
_rttConnect();
} else {
_useRTT = false;
_pingInterval = _pingIntervalMsV40;
_timeoutInterval = _pingTimeoutMsV40;
}

_mainWebsocket!.stream.listen(
(data) {
Expand All @@ -377,25 +399,6 @@ class NT4Client {
},
);

// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket!.stream.listen(
// (data) {
// if (data is! List<int>) {
// return;
// }

// var msg = Unpacker.fromList(data).unpackList();

// int topicID = msg[0] as int;
// int timestampUS = msg[1] as int;
// var value = msg[3];

// if (topicID == -1) {
// _rttHandleRecieveTimestamp(timestampUS, value as int);
// }
// },
// );

NT4Topic timeTopic = NT4Topic(
name: "Time",
type: NT4TypeStr.kInt,
Expand All @@ -404,8 +407,15 @@ class NT4Client {
properties: {});
announcedTopics[timeTopic.id] = timeTopic;

_lastPongTime = 0;
_rttSendTimestamp();

_pingTimer = Timer.periodic(Duration(milliseconds: _pingInterval), (timer) {
_rttSendTimestamp();
});
_pongTimer =
Timer.periodic(Duration(milliseconds: _pingInterval), _checkPingStatus);

for (NT4Topic topic in _clientPublishedTopics.values) {
_wsPublish(topic);
_wsSetProperties(topic);
Expand All @@ -416,17 +426,80 @@ class NT4Client {
}
}

void _rttConnect() async {
if (!_useRTT || _rttConnectionActive) {
return;
}

String rttServerAddr = 'ws://$serverBaseAddress:5810/nt/elastic';
_rttWebsocket = WebSocketChannel.connect(Uri.parse(rttServerAddr),
protocols: ['rtt.networktables.first.wpi.edu']);

try {
await _rttWebsocket!.ready;
} catch (e) {
Future.delayed(const Duration(seconds: 1), _rttConnect);
return;
}

if (!rttServerAddr.contains(serverBaseAddress)) {
return;
}

_rttWebsocket!.stream.listen(
(data) {
if (!_rttConnectionActive) {
logger.info('RTT protocol connected on $serverBaseAddress');
_rttConnectionActive = true;
}

if (data is! List<int>) {
return;
}

var msg = Unpacker.fromList(data).unpackList();

int topicID = msg[0] as int;
int timestampUS = msg[1] as int;
var value = msg[3];

if (value is! int) {
return;
}

if (topicID == -1) {
_rttHandleRecieveTimestamp(timestampUS, value);
}
},
onDone: _rttOnClose,
);
}

void _rttOnClose() {
_rttWebsocket?.sink.close();
_rttWebsocket = null;

_lastPongTime = 0;
_rttConnectionActive = false;

logger.debug('[RTT] Connection closed');
}

void _wsOnClose() {
_mainWebsocket?.sink.close();
// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket?.sink.close();
_rttWebsocket?.sink.close();

_pingTimer?.cancel();
_pongTimer?.cancel();

_mainWebsocket = null;
// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket = null;
_rttWebsocket = null;

_serverConnectionActive = false;
_rttConnectionActive = false;
_useRTT = false;

lastPongTime = 0;
_lastPongTime = 0;
_latencyMs = 0;

onDisconnect?.call();
Expand Down Expand Up @@ -519,7 +592,7 @@ class NT4Client {
sub.updateValue(value);
}
}
} else if (topicID == -1) {
} else if (topicID == -1 && !_useRTT) {
_rttHandleRecieveTimestamp(timestampUS, value as int);
} else {
logger.debug('[NT4] ignoring binary data, invalid topic ID');
Expand All @@ -532,13 +605,13 @@ class NT4Client {
}

void _checkPingStatus(Timer timer) {
if (!_serverConnectionActive || lastPongTime == 0) {
if (!_serverConnectionActive || _lastPongTime == 0) {
return;
}

int currentTime = DateTime.now().millisecondsSinceEpoch;

if (currentTime - lastPongTime > pingTimeoutMs) {
if (currentTime - _lastPongTime > _timeoutInterval) {
_wsOnClose();
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/services/nt4_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class NT4Connection {
serverBaseAddress: ipAddress,
onConnect: () {
logger.info(
'Network Tables connected on IP address ${Settings.ipAddress}');
'Network Tables connected on IP address ${_ntClient.serverBaseAddress}');
_ntConnected = true;

for (VoidCallback callback in onConnectedListeners) {
Expand Down

0 comments on commit d38fe3f

Please sign in to comment.