Skip to content

Commit

Permalink
initial gateway v5 support
Browse files Browse the repository at this point in the history
  • Loading branch information
alula committed Apr 27, 2022
1 parent d61af5d commit 07e2855
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 19 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# koe

Tiny, minimal dependency and embeddable library implementing Discord voice protocol built on [Netty](https://netty.io), aiming for high performance and reduced GC usage.
Tiny, minimal dependency and embeddable library implementing Discord media server protocols, built on [Netty](https://netty.io), aiming for high performance and reduced GC usage.

[Get it on JitPack](https://jitpack.io/#moe.kyokobot.koe/core)

Expand All @@ -27,8 +27,9 @@ dependencies {

#### Features

- Support for voice gateway v4.
- Supports voice gateway v4 and v5.
- Easily extendable for stuff such as support for codecs other than Opus or video sending, if Discord ever decides to support it on bots.
- Experimental video support.
- Basic RTCP support for measuring packet loss and other stuff.

#### Non-goals / won't do
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ subprojects {
apply plugin: 'java'

group = 'moe.kyokobot'
version = '1.0.0-SNAPSHOT'
version = '1.1.0-SNAPSHOT'

sourceCompatibility = 11
targetCompatibility = 11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
Expand Down Expand Up @@ -80,7 +75,8 @@ public CompletableFuture<Void> start() {

var chFuture = bootstrap.connect(websocketURI.getHost(), websocketURI.getPort() == -1 ? 443 : websocketURI.getPort());
chFuture.addListener(new NettyFutureWrapper<>(future));
future.thenAccept(v -> this.channel = chFuture.channel());
future.thenAccept(v -> this.channel = chFuture.channel())
.thenAccept(v -> this.identify());

return connectFuture;
}
Expand All @@ -107,6 +103,8 @@ public boolean isOpen() {
return open;
}

protected abstract void identify();

protected abstract void handlePayload(JsonObject object);

protected void onClose(int code, @Nullable String reason, boolean remote) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import moe.kyokobot.koe.internal.VoiceConnectionImpl;

public enum GatewayVersion {
V3(null),
V4(VoiceGatewayV4Connection::new);
V4(VoiceGatewayV4Connection::new),
V5(VoiceGatewayV5Connection::new);

private final VoiceGatewayConnectionFactory factory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ public VoiceGatewayV4Connection(VoiceConnectionImpl connection, VoiceServerInfo
super(connection, voiceServerInfo, 4);
}

@Override
protected void identify() {
logger.debug("Identifying...");
sendInternalPayload(Op.IDENTIFY, new JsonObject()
.addAsString("server_id", connection.getGuildId())
.addAsString("user_id", connection.getClient().getClientId())
.add("session_id", voiceServerInfo.getSessionId())
.add("token", voiceServerInfo.getToken()));
}

@Override
protected void handlePayload(JsonObject object) {
var op = object.getInt("op");
Expand All @@ -52,13 +62,6 @@ protected void handlePayload(JsonObject object) {

logger.debug("Received HELLO, heartbeat interval: {}", interval);
setupHeartbeats(interval);

logger.debug("Identifying...");
sendInternalPayload(Op.IDENTIFY, new JsonObject()
.addAsString("server_id", connection.getGuildId())
.addAsString("user_id", connection.getClient().getClientId())
.add("session_id", voiceServerInfo.getSessionId())
.add("token", voiceServerInfo.getToken()));
break;
}
case Op.READY: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package moe.kyokobot.koe.gateway;

import moe.kyokobot.koe.VoiceServerInfo;
import moe.kyokobot.koe.crypto.EncryptionMode;
import moe.kyokobot.koe.internal.VoiceConnectionImpl;
import moe.kyokobot.koe.internal.handler.DiscordUDPConnection;
import moe.kyokobot.koe.internal.json.JsonArray;
import moe.kyokobot.koe.internal.json.JsonObject;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class VoiceGatewayV5Connection extends AbstractVoiceGatewayConnection {
private static final Logger logger = LoggerFactory.getLogger(VoiceGatewayV5Connection.class);
private static final JsonArray SUPPORTED_CODECS;

static {
SUPPORTED_CODECS = new JsonArray();
SUPPORTED_CODECS.add(new JsonObject()
.add("name", "opus")
.add("type", "audio")
.add("priority", 1000)
.add("payload_type", 120));
}

private int ssrc;
private SocketAddress address;
private List<String> encryptionModes;
private UUID rtcConnectionId;
private ScheduledFuture heartbeatFuture;

public VoiceGatewayV5Connection(VoiceConnectionImpl connection, VoiceServerInfo voiceServerInfo) {
super(connection, voiceServerInfo, 5);
}

@Override
protected void identify() {
logger.debug("Identifying...");
sendInternalPayload(Op.IDENTIFY, new JsonObject()
.addAsString("server_id", connection.getGuildId())
.addAsString("user_id", connection.getClient().getClientId())
.add("session_id", voiceServerInfo.getSessionId())
.add("token", voiceServerInfo.getToken())
.add("video", true));
}

@Override
protected void handlePayload(JsonObject object) {
var op = object.getInt("op");

switch (op) {
case Op.HELLO: {
var data = object.getObject("d");
int interval = data.getInt("heartbeat_interval");

logger.debug("Received HELLO, heartbeat interval: {}", interval);
setupHeartbeats(interval);
break;
}
case Op.READY: {
var data = object.getObject("d");
var port = data.getInt("port");
var ip = data.getString("ip");
ssrc = data.getInt("ssrc");
encryptionModes = data.getArray("modes")
.stream()
.map(o -> (String) o)
.collect(Collectors.toList());
address = new InetSocketAddress(ip, port);

connection.getDispatcher().gatewayReady((InetSocketAddress) address, ssrc);
logger.debug("Voice READY, ssrc: {}", ssrc);
selectProtocol("udp");
break;
}
case Op.SESSION_DESCRIPTION: {
var data = object.getObject("d");
logger.debug("Got session description: {}", data);

if (connection.getConnectionHandler() == null) {
logger.warn("Received session description before protocol selection? (connection id = {})",
this.rtcConnectionId);
break;
}

connection.getDispatcher().sessionDescription(data);
connection.getConnectionHandler().handleSessionDescription(data);
break;
}
case Op.CLIENT_CONNECT: {
var data = object.getObject("d");
var user = data.getString("user_id");
var audioSsrc = data.getInt("audio_ssrc", 0);
var videoSsrc = data.getInt("video_ssrc", 0);
connection.getDispatcher().userConnected(user, audioSsrc, videoSsrc);
break;
}
case Op.CLIENT_DISCONNECT: {
var data = object.getObject("d");
var user = data.getString("user_id");
connection.getDispatcher().userDisconnected(user);
break;
}
default:
break;
}
}

@Override
protected void onClose(int code, @Nullable String reason, boolean remote) {
super.onClose(code, reason, remote);
if (this.heartbeatFuture != null) {
heartbeatFuture.cancel(true);
}
}

@Override
public void updateSpeaking(int mask) {
sendInternalPayload(Op.SPEAKING, new JsonObject()
.add("speaking", mask)
.add("delay", 0)
.add("ssrc", ssrc));
}

private void setupHeartbeats(int interval) {
if (eventExecutor != null) {
heartbeatFuture = eventExecutor.scheduleAtFixedRate(this::heartbeat, interval, interval,
TimeUnit.MILLISECONDS);
}
}

private void heartbeat() {
sendInternalPayload(Op.HEARTBEAT, System.currentTimeMillis());
}

private void selectProtocol(String protocol) {
var mode = EncryptionMode.select(encryptionModes);
logger.debug("Selected preferred encryption mode: {}", mode);

rtcConnectionId = UUID.randomUUID();
logger.debug("Generated new connection id: {}", rtcConnectionId);

// known values: ["udp", "webrtc"]
if (protocol.equals("udp")) {
var conn = new DiscordUDPConnection(connection, address, ssrc);
conn.connect().thenAccept(ourAddress -> {
logger.debug("Connected, our external address is: {}", ourAddress);
connection.getDispatcher().externalIPDiscovered(ourAddress);

var udpInfo = new JsonObject()
.add("address", ourAddress.getAddress().getHostAddress())
.add("port", ourAddress.getPort())
.add("mode", mode);

sendInternalPayload(Op.SELECT_PROTOCOL, new JsonObject()
.add("protocol", "udp")
.add("codecs", SUPPORTED_CODECS)
.add("rtc_connection_id", rtcConnectionId.toString())
.add("data", udpInfo)
.combine(udpInfo));

sendInternalPayload(Op.CLIENT_CONNECT, new JsonObject()
.add("audio_ssrc", ssrc)
.add("video_ssrc", 0)
.add("rtx_ssrc", 0));
});

connection.setConnectionHandler(conn);
logger.debug("Waiting for session description...");
} else if (protocol.equals("webrtc")) {
// do ICE and then generate SDP with info like above?
throw new IllegalArgumentException("WebRTC protocol is not supported yet!");
}
}
}

0 comments on commit 07e2855

Please sign in to comment.