Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement AudioPacketInterceptor #425

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 97 additions & 80 deletions src/main/java/net/dv8tion/jda/core/audio/AudioConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class AudioConnection
public static final int OPUS_FRAME_TIME_AMOUNT = 20;//This is 20 milliseconds. We are only dealing with 20ms opus packets.
public static final int OPUS_CHANNEL_COUNT = 2; //We want to use stereo. If the audio given is mono, the encoder promotes it
// to Left and Right mono (stereo that is the same on both sides)

private static final byte[] SILENCE_BYTES = new byte[] {(byte)0xF8, (byte)0xFF, (byte)0xFE};

private final TIntLongMap ssrcMap = new TIntLongHashMap();
private final TIntObjectMap<Decoder> opusDecoders = new TIntObjectHashMap<>();
private final HashMap<User, Queue<Pair<Long, short[]>>> combinedQueue = new HashMap<>();
Expand All @@ -67,6 +70,7 @@ public class AudioConnection
private VoiceChannel channel;
private volatile AudioSendHandler sendHandler = null;
private volatile AudioReceiveHandler receiveHandler = null;
private volatile AudioPacketInterceptor packetInterceptor = null;
private PointerByReference opusEncoder;
private ScheduledExecutorService combinedAudioExecutor;

Expand All @@ -79,7 +83,6 @@ public class AudioConnection

private volatile int silenceCounter = 0;
boolean sentSilenceOnConnect = false;
private final byte[] silenceBytes = new byte[] {(byte)0xF8, (byte)0xFF, (byte)0xFE};

public AudioConnection(AudioWebSocket webSocket, VoiceChannel channel)
{
Expand All @@ -100,7 +103,6 @@ public void run()
{
final long timeout = getGuild().getAudioManager().getConnectTimeout();

JDAImpl api = (JDAImpl) getJDA();
long started = System.currentTimeMillis();
boolean connectionTimeout = false;
while (!webSocket.isReady() && !connectionTimeout)
Expand Down Expand Up @@ -150,6 +152,12 @@ public void setReceivingHandler(AudioReceiveHandler handler)
setupReceiveSystem();
}

public void setPacketInterceptor(AudioPacketInterceptor packetInterceptor)
{
this.packetInterceptor = packetInterceptor;
setupReceiveSystem();
}

public void setQueueTimeout(long queueTimeout)
{
this.queueTimeout = queueTimeout;
Expand Down Expand Up @@ -258,28 +266,34 @@ else if (sendHandler == null && sendSystem != null)

private synchronized void setupReceiveSystem()
{
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation initializes the receive thread even if there is only an AudioPacketInterceptor, but no ReceiveHandler.

I have tried to cleanup this setup routine as much as I could, but I would appreciate a second pair of eyes checking on it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With your changes, if there is a "consumer" but the socket is either null or closed, nothing happens.
In the old implementation, if the socket was null/closed (no matter if "consumer" is set, there was cleanup (your else branch)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I did not spot that!

I think it does not make a difference in this case tough, the combined audio thread only gets setup when the udpSocket is not null, so there is nothing to clean up.

if (udpSocket != null && !udpSocket.isClosed() && receiveHandler != null && receiveThread == null)
boolean hasConsumer = packetInterceptor != null || receiveHandler != null;

if (hasConsumer && udpSocket != null && !udpSocket.isClosed())
{
// UDP connection open, and we are expecting packets
setupReceiveThread();
}
else if (receiveHandler == null && receiveThread != null)
else
{
receiveThread.interrupt();
receiveThread = null;

if (combinedAudioExecutor != null)
// Cleanup existing threads
if (receiveThread != null)
{
combinedAudioExecutor.shutdownNow();
combinedAudioExecutor = null;
receiveThread.interrupt();
receiveThread = null;
}

opusDecoders.valueCollection().forEach(Decoder::close);
opusDecoders.clear();
}
else if (receiveHandler != null && !receiveHandler.canReceiveCombined() && combinedAudioExecutor != null)

if (combinedAudioExecutor != null)
{
combinedAudioExecutor.shutdownNow();
combinedAudioExecutor = null;
// If the handler is gone/unable to receive combined audio, cleanup the thread
if (receiveHandler == null || !receiveHandler.canReceiveCombined())
{
combinedAudioExecutor.shutdownNow();
combinedAudioExecutor = null;
}
}
}

Expand Down Expand Up @@ -307,82 +321,85 @@ public void run()
{
udpSocket.receive(receivedPacket);

if (receiveHandler != null && (receiveHandler.canReceiveUser() || receiveHandler.canReceiveCombined()) && webSocket.getSecretKey() != null)
// Check if we have to do anything with the packet
boolean canHandle = receiveHandler != null && (receiveHandler.canReceiveCombined() || receiveHandler.canReceiveUser());
boolean canReceive = packetInterceptor != null || canHandle;

// If receiver state changes, queue silence packets (not sure why this is required)
if (couldReceive != canReceive)
{
if (!couldReceive)
{
couldReceive = true;
sendSilentPackets();
}
AudioPacket decryptedPacket = AudioPacket.decryptAudioPacket(receivedPacket, webSocket.getSecretKey());

int ssrc = decryptedPacket.getSSRC();
final long userId = ssrcMap.get(ssrc);
Decoder decoder = opusDecoders.get(ssrc);
if (userId == ssrcMap.getNoEntryValue())
{
byte[] audio = decryptedPacket.getEncodedAudio();

//If the bytes are silence, then this was caused by a User joining the voice channel,
// and as such, we haven't yet received information to pair the SSRC with the UserId.
if (!Arrays.equals(audio, silenceBytes))
LOG.debug("Received audio data with an unknown SSRC id. Ignoring");

continue;
}
if (decoder == null)
{
decoder = new Decoder(ssrc);
opusDecoders.put(ssrc, decoder);
}
if (!decoder.isInOrder(decryptedPacket.getSequence()))
couldReceive = canReceive;
sendSilentPackets();
}

if (!canReceive)
{
// No consumers, discard packet
continue;
}

AudioPacket decrypted = AudioPacket.decryptAudioPacket(receivedPacket, webSocket.getSecretKey());

// Resolve User instance for packet
final int ssrc = decrypted.getSSRC();
final long userId = ssrcMap.get(ssrc);

if (userId == ssrcMap.getNoEntryValue())
{
if (!Arrays.equals(decrypted.getEncodedAudio(), SILENCE_BYTES))
LOG.debug("Received audio data with an unknown SSRC id. Ignoring");

continue;
}

User user = getJDA().getUserById(userId);
if (user == null)
{
LOG.warn("Received audio data with a known SSRC, but the userId associate with the SSRC is unknown to JDA!");
continue;
}

// Allow interceptor to digest the raw packet, and suppress further processing
if (packetInterceptor != null && packetInterceptor.handleDecryptedPacket(decrypted, user))
continue;

// Pass to internal dispatch, only if there is a receive handler capable of handling it
if (canHandle)
{
Decoder decoder = opusDecoders.get(decrypted.getSSRC());

if (decoder == null)
opusDecoders.put(ssrc, decoder = new Decoder(ssrc));

if (!decoder.isInOrder(decrypted.getSequence()))
{
LOG.trace("Got out-of-order audio packet. Ignoring.");
continue;
}

User user = getJDA().getUserById(userId);
if (user == null)
LOG.warn("Received audio data with a known SSRC, but the userId associate with the SSRC is unknown to JDA!");
else

short[] decodedAudio = decoder.decodeFromOpus(decrypted);

if (decodedAudio == null)
{
LOG.trace("Received audio data but Opus failed to properly decode, instead it returned an error");
}
else
{
// if (decoder.wasPacketLost(decryptedPacket.getSequence()))
// {
// LOG.debug("Packet(s) missed. Using Opus packetloss-compensation.");
// short[] decodedAudio = decoder.decodeFromOpus(null);
// receiveHandler.handleUserAudio(new UserAudio(user, decodedAudio));
// }
short[] decodedAudio = decoder.decodeFromOpus(decryptedPacket);

//If decodedAudio is null, then the Opus decode failed, so throw away the packet.
if (decodedAudio == null)
if (receiveHandler.canReceiveUser())
{
LOG.trace("Received audio data but Opus failed to properly decode, instead it returned an error");
receiveHandler.handleUserAudio(new UserAudio(user, decodedAudio));
}
else
if (receiveHandler.canReceiveCombined())
{
if (receiveHandler.canReceiveUser())
{
receiveHandler.handleUserAudio(new UserAudio(user, decodedAudio));
}
if (receiveHandler.canReceiveCombined())
{
Queue<Pair<Long, short[]>> queue = combinedQueue.get(user);
if (queue == null)
{
queue = new ConcurrentLinkedQueue<>();
combinedQueue.put(user, queue);
}
queue.add(Pair.<Long, short[]>of(System.currentTimeMillis(), decodedAudio));
}
Queue<Pair<Long, short[]>> queue = combinedQueue.get(user);

if (queue == null)
combinedQueue.put(user, queue = new ConcurrentLinkedQueue<>());

queue.add(Pair.<Long, short[]>of(System.currentTimeMillis(), decodedAudio));
}
}
}
else if (couldReceive)
{
couldReceive = false;
sendSilentPackets();
}
}
catch (SocketTimeoutException e)
{
Expand All @@ -405,7 +422,7 @@ else if (couldReceive)
receiveThread.start();
}

if (receiveHandler.canReceiveCombined())
if (receiveHandler != null && receiveHandler.canReceiveCombined())
{
setupCombinedExecutor();
}
Expand Down Expand Up @@ -545,7 +562,7 @@ public DatagramPacket getNextPacket(boolean changeTalking)
}
else if (silenceCounter > -1)
{
AudioPacket packet = new AudioPacket(seq, timestamp, webSocket.getSSRC(), silenceBytes);
AudioPacket packet = new AudioPacket(seq, timestamp, webSocket.getSSRC(), SILENCE_BYTES);

nextPacket = packet.asEncryptedUdpPacket(webSocket.getAddress(), webSocket.getSecretKey());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2015-2017 Austin Keener & Michael Ritter & Florian Spieß
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dv8tion.jda.core.audio;

import net.dv8tion.jda.core.entities.User;

/**
* Semi-internal API, used to intercept decrypted AudioPackets before they get processed by the
* receiver thread of {@link net.dv8tion.jda.core.audio.AudioConnection AudioConnection}.
*/
public interface AudioPacketInterceptor
{

/**
* This method is called whenever the {@link net.dv8tion.jda.core.audio.AudioConnection AudioConnection} receive
* thread has successfully decrypted an audio packet, but before it is passed trough the OPUS decoder and the dispatch
* mechanism of JDA.
* For further documentation on the internals, refer to {@link net.dv8tion.jda.core.audio.AudioPacket AudioPacket}.
* <p>
* <b>Be vary that JDA does not guarantee the order and timing of incoming packets, implementors are responsible
* for reassembling audio streams themselves.</b>
* <p>
* The method is called from the network receive thread, just like {@link net.dv8tion.jda.core.audio.AudioReceiveHandler#handleUserAudio(UserAudio) AudioReceiveHandler.handleUserAudio}
* <p>
* Note that by returning true, JDA will discard the packet without further processing.
*
* @param decryptedPacket Audio packet that has been received, and successfully decrypted
* @param user User the packet originated from
*
* @return If true, the packet is discarded, and won't be decoded or passed to the registered {@link net.dv8tion.jda.core.audio.AudioReceiveHandler AudioReceiveHandler}
*/
boolean handleDecryptedPacket(AudioPacket decryptedPacket, User user);

}
27 changes: 27 additions & 0 deletions src/main/java/net/dv8tion/jda/core/managers/AudioManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package net.dv8tion.jda.core.managers;

import net.dv8tion.jda.core.JDA;
import net.dv8tion.jda.core.audio.AudioPacketInterceptor;
import net.dv8tion.jda.core.audio.AudioReceiveHandler;
import net.dv8tion.jda.core.audio.AudioSendHandler;
import net.dv8tion.jda.core.audio.hooks.ConnectionListener;
Expand Down Expand Up @@ -199,6 +200,32 @@ public interface AudioManager
*/
AudioReceiveHandler getReceiveHandler();

/**
* Sets the {@link net.dv8tion.jda.core.audio.AudioPacketInterceptor AudioPacketInterceptor}
* that the manager will pass decrypted audio packets before further processing.
*
* <p>The interceptor provided here will persist between audio connection connect and disconnects.
* Furthermore, you don't need to have an audio connection to set an interceptor, but it is highly
* recommended to set this before the connection is established. (See note)
* When JDA sets up a new audio connection it will use the interceptor provided here.
* <br>Setting this to null will remove the interceptor.
*
* Note that if you wish to entirely suppress creation of OPUS decoders by JDA, you will have to
* register your interceptor <b>before</b> the connection is established!
*
* @param packetInterceptor
* The {@link net.dv8tion.jda.core.audio.AudioPacketInterceptor AudioPacketInterceptor}
*/
void setPacketInterceptor(AudioPacketInterceptor packetInterceptor);

/**
* The currently set {@link net.dv8tion.jda.core.audio.AudioPacketInterceptor AudioPacketInterceptor}.
* If there is no interceptor currently set, this method will return {@code null}.
*
* @return The currently active {@link net.dv8tion.jda.core.audio.AudioPacketInterceptor AudioPacketInterceptor} or {@code null}.
*/
AudioPacketInterceptor getPacketInterceptor();

/**
* Sets the {@link net.dv8tion.jda.core.audio.hooks.ConnectionListener ConnectionListener} for this AudioManager.
* It will be informed about meta data of any audio connection established through this AudioManager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.dv8tion.jda.core.Permission;
import net.dv8tion.jda.core.WebSocketCode;
import net.dv8tion.jda.core.audio.AudioConnection;
import net.dv8tion.jda.core.audio.AudioPacketInterceptor;
import net.dv8tion.jda.core.audio.AudioReceiveHandler;
import net.dv8tion.jda.core.audio.AudioSendHandler;
import net.dv8tion.jda.core.audio.hooks.ConnectionListener;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class AudioManagerImpl implements AudioManager

protected AudioSendHandler sendHandler;
protected AudioReceiveHandler receiveHandler;
protected AudioPacketInterceptor packetInterceptor;

protected ListenerProxy connectionListener = new ListenerProxy();
protected long queueTimeout = 100;
protected boolean shouldReconnect = true;
Expand Down Expand Up @@ -222,6 +225,21 @@ public AudioReceiveHandler getReceiveHandler()
return receiveHandler;
}

@Override
public void setPacketInterceptor(AudioPacketInterceptor packetInterceptor)
{
this.packetInterceptor = packetInterceptor;
if (audioConnection != null)
audioConnection.setPacketInterceptor(packetInterceptor);
}

@Override
public AudioPacketInterceptor getPacketInterceptor()
{
return packetInterceptor;
}


@Override
public void setConnectionListener(ConnectionListener listener)
{
Expand Down Expand Up @@ -304,6 +322,7 @@ public void setAudioConnection(AudioConnection audioConnection)
this.queuedAudioConnection = null;
audioConnection.setSendingHandler(sendHandler);
audioConnection.setReceivingHandler(receiveHandler);
audioConnection.setPacketInterceptor(packetInterceptor);
audioConnection.setQueueTimeout(queueTimeout);
}

Expand Down