Skip to content

Commit

Permalink
remove unneeded volatiles & atomics, make things final pt 2
Browse files Browse the repository at this point in the history
  • Loading branch information
natanbc authored and alula committed Apr 27, 2022
1 parent 25ec16b commit e1689b6
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public abstract class AbstractFramePoller implements FramePoller {
protected final MediaConnection connection;
protected final ByteBufAllocator allocator;
protected final EventLoopGroup eventLoop;
protected volatile boolean polling = false;
protected boolean polling = false;

public AbstractFramePoller(MediaConnection connection) {
this.connection = connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.function.Function;

public class NettyFramePollerFactory implements FramePollerFactory {
private Map<Codec, Function<MediaConnection, FramePoller>> codecMap;
private final Map<Codec, Function<MediaConnection, FramePoller>> codecMap;

public NettyFramePollerFactory() {
codecMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import moe.kyokobot.koe.MediaConnection;
import moe.kyokobot.koe.codec.AbstractFramePoller;
import moe.kyokobot.koe.codec.H264Codec;
import moe.kyokobot.koe.media.IntReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class NettyH264FramePoller extends AbstractFramePoller {
private static final Logger logger = LoggerFactory.getLogger(NettyH264FramePoller.class);
/**
* Delay between frame polling attempts.
*/
private static final int FRAME_RATE = 1000 / 30;

public NettyH264FramePoller(MediaConnection connection) {
super(connection);
Expand All @@ -21,15 +25,11 @@ public NettyH264FramePoller(MediaConnection connection) {
*/
private long lastFrame = 0;

/**
* Delay between frame polling attempts.
*/
private AtomicInteger frameRate = new AtomicInteger(1000 / 30);

/**
* Current frame timestamp.
*/
private AtomicInteger timestamp = new AtomicInteger();
private final IntReference timestamp = new IntReference();

@Override
public void start() {
Expand Down Expand Up @@ -74,7 +74,7 @@ private void pollFrame() {
logger.error("Sending frame failed", e);
}

long frameDelay = frameRate.get() - (System.currentTimeMillis() - lastFrame);
long frameDelay = FRAME_RATE - (System.currentTimeMillis() - lastFrame);

if (frameDelay > 0) {
eventLoop.schedule(this::loop, frameDelay, TimeUnit.MILLISECONDS);
Expand All @@ -85,7 +85,7 @@ private void pollFrame() {

private void loop() {
if (System.currentTimeMillis() < lastFrame + 60) {
lastFrame += frameRate.get();
lastFrame += FRAME_RATE;
} else {
lastFrame = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import moe.kyokobot.koe.MediaConnection;
import moe.kyokobot.koe.codec.AbstractFramePoller;
import moe.kyokobot.koe.codec.OpusCodec;
import moe.kyokobot.koe.media.IntReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class NettyOpusFramePoller extends AbstractFramePoller {
private static final Logger logger = LoggerFactory.getLogger(NettyOpusFramePoller.class);
Expand All @@ -24,7 +24,7 @@ public NettyOpusFramePoller(MediaConnection connection) {
/**
* Current frame timestamp.
*/
private AtomicInteger timestamp = new AtomicInteger();
private final IntReference timestamp = new IntReference();

@Override
public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class XSalsa20Poly1305EncryptionMode implements EncryptionMode {
private final byte[] extendedNonce = new byte[24];
private final byte[] m = new byte[984];
private final byte[] c = new byte[984];
private TweetNaclFastInstanced nacl = new TweetNaclFastInstanced();
private final TweetNaclFastInstanced nacl = new TweetNaclFastInstanced();

@Override
@SuppressWarnings("Duplicates")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import io.netty.buffer.ByteBuf;
import moe.kyokobot.koe.internal.crypto.TweetNaclFastInstanced;

import java.util.concurrent.atomic.AtomicInteger;

public class XSalsa20Poly1305LiteEncryptionMode implements EncryptionMode {
private final byte[] extendedNonce = new byte[24];
private final byte[] m = new byte[984];
private final byte[] c = new byte[984];
private AtomicInteger seq = new AtomicInteger(0x80000000);
private TweetNaclFastInstanced nacl = new TweetNaclFastInstanced();
private final TweetNaclFastInstanced nacl = new TweetNaclFastInstanced();
private int seq = 0x80000000;

@Override
@SuppressWarnings("Duplicates")
Expand All @@ -24,7 +22,7 @@ public boolean box(ByteBuf packet, int len, ByteBuf output, byte[] secretKey) {
m[i + 32] = packet.readByte();
}

int s = this.seq.getAndIncrement();
int s = this.seq++;
extendedNonce[0] = (byte) (s & 0xff);
extendedNonce[1] = (byte) ((s >> 8) & 0xff);
extendedNonce[2] = (byte) ((s >> 16) & 0xff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class XSalsa20Poly1305SuffixEncryptionMode implements EncryptionMode {
private final byte[] extendedNonce = new byte[24];
private final byte[] m = new byte[984];
private final byte[] c = new byte[984];
private TweetNaclFastInstanced nacl = new TweetNaclFastInstanced();
private final TweetNaclFastInstanced nacl = new TweetNaclFastInstanced();

@Override
@SuppressWarnings("Duplicates")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public abstract class AbstractMediaGatewayConnection implements MediaGatewayConn

protected EventExecutor eventExecutor;
protected Channel channel;
private volatile boolean open;
private volatile boolean closed = false;
private boolean open;
private boolean closed = false;

public AbstractMediaGatewayConnection(@NotNull MediaConnectionImpl connection,
@NotNull VoiceServerInfo voiceServerInfo,
Expand Down Expand Up @@ -188,7 +188,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (!connectFuture.isDone()) {
connectFuture.completeExceptionally(cause);
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/moe/kyokobot/koe/media/IntReference.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package moe.kyokobot.koe.media;

/**
* Mutable reference to an int value. Provides no atomicity guarantees
* and should not be shared between threads without external synchronization.
*/
public class IntReference {
private int value;

public int get() {
return value;
}

public void set(int value) {
this.value = value;
}

public void add(int amount) {
this.value += amount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import io.netty.buffer.ByteBuf;
import moe.kyokobot.koe.codec.Codec;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Base interface for media frame providers. Note that Koe doesn't handle stuff such as speaking state, silent frames
* or etc., these are implemented by codec-specific frame provider classes.
Expand Down Expand Up @@ -44,9 +42,10 @@ public interface MediaFrameProvider {
*
* @param codec {@link Codec} type this handler was registered with.
* @param buf {@link ByteBuf} the buffer where the media data should be written to.
* @param timestamp {@link AtomicInteger} reference to current frame timestamp, which must be updated with
* @param timestamp {@link IntReference} reference to current frame timestamp, which must be updated with
* timestamp of written frame.
* @return If true, Koe will immediately attempt to poll a next frame, this is meant for video transmissions.
*/
boolean retrieve(Codec codec, ByteBuf buf, AtomicInteger timestamp);
boolean retrieve(Codec codec, ByteBuf buf, IntReference timestamp);

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,23 @@
import moe.kyokobot.koe.codec.Codec;
import moe.kyokobot.koe.codec.OpusCodec;
import moe.kyokobot.koe.gateway.SpeakingFlags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Implementation of {@link MediaFrameProvider} which automatically takes care of
* checking codec type, sending silent frames and updating speaking state.
*/
public abstract class OpusAudioFrameProvider implements MediaFrameProvider {
private static final Logger logger = LoggerFactory.getLogger(OpusAudioFrameProvider.class);
private static final int SILENCE_FRAME_COUNT = 5;
private final MediaConnection connection;
private final Op12HackListener hackListener;

// volatile because of multiple event loop threads accessing these fields.
private AtomicInteger counter = new AtomicInteger();
private volatile long lastFramePolled = 0;
private volatile boolean lastProvide = false;
private volatile boolean lastSpeaking = false;
private volatile boolean speaking = false;
private int counter;
private long lastFramePolled = 0;
private boolean lastProvide = false;
private boolean lastSpeaking = false;
private boolean speaking = false;
private int speakingMask = SpeakingFlags.NORMAL;

public OpusAudioFrameProvider(MediaConnection connection) {
Expand Down Expand Up @@ -60,7 +55,7 @@ public final boolean canSendFrame(Codec codec) {
return false;
}

if (counter.get() > 0) {
if (counter > 0) {
return true;
}

Expand All @@ -69,7 +64,7 @@ public final boolean canSendFrame(Codec codec) {
if (lastProvide != provide) {
lastProvide = provide;
if (!provide) {
counter.set(SILENCE_FRAME_COUNT);
counter = SILENCE_FRAME_COUNT;
return true;
}
}
Expand All @@ -78,20 +73,20 @@ public final boolean canSendFrame(Codec codec) {
}

@Override
public final boolean retrieve(Codec codec, ByteBuf buf, AtomicInteger timestamp) {
public final boolean retrieve(Codec codec, ByteBuf buf, IntReference timestamp) {
if (codec.getPayloadType() != OpusCodec.PAYLOAD_TYPE) {
return false;
}

if (counter.get() > 0) {
counter.decrementAndGet();
if (counter > 0) {
counter--;
buf.writeBytes(OpusCodec.SILENCE_FRAME);

if (speaking) {
setSpeaking(false);
}

timestamp.addAndGet(960);
timestamp.add(960);
return false;
}

Expand All @@ -104,7 +99,7 @@ public final boolean retrieve(Codec codec, ByteBuf buf, AtomicInteger timestamp)
}

if (!written) {
counter.set(SILENCE_FRAME_COUNT);
counter = SILENCE_FRAME_COUNT;
}

long now = System.currentTimeMillis();
Expand All @@ -114,7 +109,7 @@ public final boolean retrieve(Codec codec, ByteBuf buf, AtomicInteger timestamp)
setSpeaking(written);
}

timestamp.addAndGet(960);
timestamp.add(960);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class QueueManagerPool {
private final AtomicLong queueKeySeq;
private final UdpQueueManager[] managers;
private volatile boolean closed;
private boolean closed;

public QueueManagerPool(int size, int bufferDuration) {
if (size <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@
import moe.kyokobot.koe.codec.AbstractFramePoller;
import moe.kyokobot.koe.codec.OpusCodec;
import moe.kyokobot.koe.internal.handler.DiscordUDPConnection;
import moe.kyokobot.koe.media.IntReference;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class UdpQueueOpusFramePoller extends AbstractFramePoller {
private QueueManagerPool.UdpQueueWrapper manager;
private AtomicInteger timestamp;
private final QueueManagerPool.UdpQueueWrapper manager;
private final IntReference timestamp = new IntReference();
private long lastFrame;

public UdpQueueOpusFramePoller(QueueManagerPool.UdpQueueWrapper manager, MediaConnection connection) {
super(connection);
this.timestamp = new AtomicInteger();
this.manager = manager;
}

Expand Down

0 comments on commit e1689b6

Please sign in to comment.