Skip to content

Commit

Permalink
udp-queue extension
Browse files Browse the repository at this point in the history
  • Loading branch information
alula committed Apr 9, 2020
1 parent fc65cb6 commit a2b8d6a
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 5 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Tiny, minimal dependency and embeddable library implementing Discord voice proto
- Encoding - Koe only implements voice server communication, not voice handling itself, so it only accepts Opus frames, you have set up an encoder yourself, use [lavaplayer](https://github.com/sedmelluq/lavaplayer), libav/ffmpeg or anything else.
- Voice receiving support - [it's not supported by Discord anyway](https://github.com/discordapp/discord-api-docs/issues/808#issuecomment-458863743), although someone could implement it by registering hooks.

#### Extensions

- [UDP-Queue](https://github.com/KyokoBot/koe/tree/master/ext-udpqueue)

#### Credits

[@TheAkio](https://github.com/TheAkio) for name idea.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.SecureRandom;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -103,23 +102,34 @@ public void handleSessionDescription(JsonObject object) {

@Override
public void sendFrame(byte payloadType, int timestamp, ByteBuf data, int len) {
var buf = createPacket(payloadType, timestamp, data, len);
if (buf != null) {
//logger.debug("Sent frame PT = {}, TS = {}", payloadType, timestamp);
channel.writeAndFlush(buf);
}
}

public ByteBuf createPacket(byte payloadType, int timestamp, ByteBuf data, int len) {
if (secretKey == null) {
return;
return null;
}

var buf = allocator.buffer();
buf.clear();
RTPHeaderWriter.writeV2(buf, payloadType, nextSeq(), timestamp, ssrc);
if (encryptionMode.box(data, len, buf, secretKey)) {
//logger.debug("Sent frame PT = {}, TS = {}", payloadType, timestamp);
channel.writeAndFlush(buf);
return buf;
} else {
logger.debug("Encryption failed!");
buf.release();
// handle failed encryption?
}

return null;
}

private char nextSeq() {
public char nextSeq() {
if ((seq + 1) > 0xffff) {
seq = 0;
} else {
Expand All @@ -129,6 +139,22 @@ private char nextSeq() {
return seq;
}

public byte[] getSecretKey() {
return secretKey;
}

public int getSsrc() {
return ssrc;
}

public EncryptionMode getEncryptionMode() {
return encryptionMode;
}

public SocketAddress getServerAddress() {
return serverAddress;
}

private static class Initializer extends ChannelInitializer<DatagramChannel> {
private final DiscordUDPConnection connection;
private final CompletableFuture<InetSocketAddress> future;
Expand Down
17 changes: 17 additions & 0 deletions ext-udpqueue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Koe - Native UDP-Queue extension

An extension that provides an implementation of [JDA-NAS](https://github.com/sedmelluq/jda-nas) in Koe,
which moves packet sending/scheduling logic outside JVM, therefore audio packets can be sent during GC pauses (as long as there's enough audio data in the queue).

Note that custom codec support is limited, adds additional latency and proper usage of Netty already
limits GC pressure because of much smaller number of allocations.

### Usage

Just add it to KoeOptions :^)

```java
var Koe = Koe.koe(KoeOptions.builder()
.setFramePollerFactory(new UdpQueueFramePollerFactory())
.create());
```
5 changes: 5 additions & 0 deletions ext-udpqueue/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
dependencies {
compileOnly project(':core')
compileOnly group: 'org.jetbrains', name: 'annotations', version: '13.0'
implementation 'com.sedmelluq:udp-queue:1.1.0-linux64'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package moe.kyokobot.koe.codec.udpqueue;

import com.sedmelluq.discord.lavaplayer.udpqueue.natives.UdpQueueManager;
import com.sedmelluq.lava.common.tools.DaemonThreadFactory;
import com.sedmelluq.lava.common.tools.ExecutorTools;
import moe.kyokobot.koe.VoiceConnection;
import moe.kyokobot.koe.codec.Codec;
import moe.kyokobot.koe.codec.FramePoller;
import moe.kyokobot.koe.codec.FramePollerFactory;
import moe.kyokobot.koe.codec.OpusCodec;
import org.jetbrains.annotations.Nullable;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class UdpQueueFramePollerFactory implements FramePollerFactory {
private static final int DEFAULT_BUFFER_DURATION = 400;
private static final int PACKET_INTERVAL = 20;
private static final int MAXIMUM_PACKET_SIZE = 4096;

private final int bufferDuration;
private final AtomicLong identifierCounter = new AtomicLong();
private final Object lock = new Object();
private final KeySetView<UdpQueueOpusFramePoller, Boolean> pollers = ConcurrentHashMap.newKeySet();
private UdpQueueManager queueManager;
private ScheduledExecutorService scheduler;

public UdpQueueFramePollerFactory() {
this(DEFAULT_BUFFER_DURATION);
}

public UdpQueueFramePollerFactory(int bufferDuration) {
this.bufferDuration = bufferDuration;
}

private void initialiseQueueManager() {
scheduler = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("native-udp"));
queueManager = new UdpQueueManager(bufferDuration / PACKET_INTERVAL,
TimeUnit.MILLISECONDS.toNanos(PACKET_INTERVAL), MAXIMUM_PACKET_SIZE);

scheduler.scheduleWithFixedDelay(this::populateQueues, 0, 40, TimeUnit.MILLISECONDS);

Thread thread = new Thread(process(queueManager));
thread.setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2);
thread.setDaemon(true);
thread.start();
}

private ScheduledExecutorService shutdownQueueManager() {
queueManager.close();
queueManager = null;

ScheduledExecutorService currentScheduler = scheduler;
scheduler = null;
return currentScheduler;
}

void addInstance(UdpQueueOpusFramePoller poller) {
synchronized (lock) {
pollers.add(poller);

if (queueManager == null) {
initialiseQueueManager();
}
}
}

void removeInstance(UdpQueueOpusFramePoller poller) {
ScheduledExecutorService schedulerToShutDown = null;

synchronized (lock) {
if (pollers.remove(poller) && pollers.isEmpty() && queueManager != null) {
schedulerToShutDown = shutdownQueueManager();
}
}

if (schedulerToShutDown != null) {
ExecutorTools.shutdownExecutor(schedulerToShutDown, "native udp queue populator");
}
}

private void populateQueues() {
UdpQueueManager manager = queueManager; /* avoid getfield opcode */

if (manager != null) {
for (var system : pollers) {
system.populateQueue(manager);
}
}
}

private static Runnable process(UdpQueueManager unbake) {
return unbake::process;
}

@Override
@Nullable
public FramePoller createFramePoller(Codec codec, VoiceConnection connection) {
if (OpusCodec.INSTANCE.equals(codec)) {
return new UdpQueueOpusFramePoller(identifierCounter.incrementAndGet(), this, connection);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package moe.kyokobot.koe.codec.udpqueue;

import com.sedmelluq.discord.lavaplayer.udpqueue.natives.UdpQueueManager;
import moe.kyokobot.koe.VoiceConnection;
import moe.kyokobot.koe.codec.AbstractFramePoller;
import moe.kyokobot.koe.codec.OpusCodec;
import moe.kyokobot.koe.internal.handler.DiscordUDPConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;

public class UdpQueueOpusFramePoller extends AbstractFramePoller {
private static final Logger logger = LoggerFactory.getLogger(UdpQueueOpusFramePoller.class);

private final long queueKey;
private final UdpQueueFramePollerFactory factory;
private AtomicInteger timestamp;

public UdpQueueOpusFramePoller(long queueKey, UdpQueueFramePollerFactory factory, VoiceConnection connection) {
super(connection);
this.queueKey = queueKey;
this.factory = factory;
this.timestamp = new AtomicInteger();
}

@Override
public void start() {
if (this.polling) {
throw new IllegalStateException("Polling already started!");
}

factory.addInstance(this);
this.polling = true;
}

@Override
public void stop() {
if (this.polling) {
factory.removeInstance(this);
}
this.polling = false;
}

void populateQueue(UdpQueueManager queueManager) {
int remaining = queueManager.getRemainingCapacity(queueKey);
//boolean emptyQueue = queueManager.getCapacity() - remaining > 0;

var handler = (DiscordUDPConnection) connection.getConnectionHandler();
var sender = connection.getSender();

for (int i = 0; i < remaining; i++) {
if (sender != null && handler != null && sender.canSendFrame()) {
var buf = allocator.buffer();
int start = buf.writerIndex();
sender.retrieve(OpusCodec.INSTANCE, buf);
int len = buf.writerIndex() - start;
//handler.sendFrame(OpusCodec.PAYLOAD_TYPE, timestamp.getAndAdd(960), buf, len);
var packet = handler.createPacket(OpusCodec.PAYLOAD_TYPE, timestamp.getAndAdd(960), buf, len);
if (packet != null) {
queueManager.queuePacket(queueKey, packet.nioBuffer(),
(InetSocketAddress) handler.getServerAddress());
}
buf.release();
}
}
}
}
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
rootProject.name = 'koe'

include ':core'
include ':ext-udpqueue'
include ':testbot'

1 change: 1 addition & 0 deletions testbot/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
compile project(':core')
compile project(':ext-udpqueue')
compile group: 'com.mewna', name: 'catnip', version: '1.3.3'
compile group: 'com.sedmelluq', name: 'lavaplayer', version: '1.3.38'
runtime group: 'ch.qos.logback', name: 'logback-classic', version: '1.3.0-alpha4'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package moe.kyokobot.koe.testbot;

import moe.kyokobot.koe.KoeOptions;

import java.util.ArrayList;

public class GCPressureGenerator {
@SuppressWarnings("squid:S1215")
private static Thread pressureThread = new Thread(() -> {
try {
while (true) {
try {
{
var l = new ArrayList<int[]>();
for (int i = 0; i < 10000; i++) {
l.add(new int[1024]);
}
l.stream().map(String::valueOf).count();
}
{
for (int i = 0; i < 25000; i++) {
var arr = "malksmdlkamsldmalksmdlkmasldmlkam32908092930180928308290488209830928081028013sldmlkamslkdmlakmsldkmlakmsldkmalsmdalksmldaads".split(String.valueOf(i));
}
}
{
for (int i = 0; i < 25000; i++) {
new TestBot(null);
}
}

long pre = System.currentTimeMillis();
System.gc();
System.out.printf("GC took %dms\n", System.currentTimeMillis() - pre);
} catch (OutOfMemoryError e) {
long pre = System.currentTimeMillis();
System.gc();
System.out.printf("OOM! GC took %dms\n", System.currentTimeMillis() - pre);
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Koe TestBot - GC Pressure Tester");

public static boolean toggle() {
if (pressureThread.isAlive()) {
pressureThread.interrupt();
return false;
} else {
pressureThread.start();
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package moe.kyokobot.koe.testbot;

public class Main {
/**
* Starts a Koe test bot instance with default configurations, without any extensions and etc.
*/
public class KoeTestBotLauncher {
public static void main(String... args) {
var bot = new TestBot(System.getenv("TOKEN"));
Runtime.getRuntime().addShutdownHook(new Thread(bot::stop));
Expand Down
8 changes: 8 additions & 0 deletions testbot/src/main/java/moe/kyokobot/koe/testbot/TestBot.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public void start() {
resolve(message.guild(), message.channel().asTextChannel(), message.content().substring(6));
});


catnip.observe(DiscordEvent.MESSAGE_CREATE)
.filter(message -> message.guildIdAsLong() != 0
&& !message.author().bot()
&& message.content().startsWith("!gcpress"))
.subscribe(message -> message.channel()
.sendMessage("GC pressure generator enabled = " + GCPressureGenerator.toggle()));

catnip.connect();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package moe.kyokobot.koe.testbot;

import moe.kyokobot.koe.Koe;
import moe.kyokobot.koe.KoeOptions;
import moe.kyokobot.koe.codec.udpqueue.UdpQueueFramePollerFactory;

public class UdpQueueTestBotLauncher {
public static void main(String[] args) {
var bot = new TestBot(System.getenv("TOKEN")) {
@Override
public Koe createKoe() {
return Koe.koe(KoeOptions.builder()
.setFramePollerFactory(new UdpQueueFramePollerFactory())
.create());
}
};
Runtime.getRuntime().addShutdownHook(new Thread(bot::stop));
bot.start();
}
}

0 comments on commit a2b8d6a

Please sign in to comment.