From 5cc8446f070dcb7d9822ab90837edc3231ca30bd Mon Sep 17 00:00:00 2001 From: Xytabich Date: Tue, 9 Feb 2021 23:30:35 +0300 Subject: [PATCH] removed connection confirmation now unreliable flush works always reliable sequence sent rework --- UNet/NetworkManager.cs | 101 +++++------ UNet/Socket.cs | 368 +++++++++++++---------------------------- 2 files changed, 158 insertions(+), 311 deletions(-) diff --git a/UNet/NetworkManager.cs b/UNet/NetworkManager.cs index aa756eb..5e2b5c1 100644 --- a/UNet/NetworkManager.cs +++ b/UNet/NetworkManager.cs @@ -13,14 +13,14 @@ public class NetworkManager : UdonSharpBehaviour private const byte MODE_RELIABLE = 1; private const byte MODE_RELIABLE_SEQUENCED = 2; private const byte RELIABLE_ACK = 3; - private const byte CONNECTION_REGISTRATION = 4; - private const byte CONNECTION_ACK = 5; private const byte TARGET_ALL = 0; private const byte TARGET_MASTER = 1; private const byte TARGET_SINGLE = 2; private const byte TARGET_MULTIPLE = 3; + private const byte MSG_TYPE_MASK = 3; + [UdonSynced] private int masterConnection = -1; @@ -43,8 +43,6 @@ public class NetworkManager : UdonSharpBehaviour private bool hasMaster = false; private bool isInitComplete = false; - private uint connectedBeforeInit = 0; - void Start() { var playersList = new VRCPlayerApi[VRCPlayerApi.GetPlayerCount()]; @@ -163,8 +161,8 @@ public void HandlePacket(int connection, byte[] dataBuffer, int dataBufferLength while(index < dataBufferLength) { int header = dataBuffer[index]; - int type = header & 7; - int target = (header >> 3) & 3; + int type = header & MSG_TYPE_MASK; + int target = (header >> 2) & 3; index++; if(type == MODE_UNRELIABLE) @@ -210,25 +208,27 @@ public void HandlePacket(int connection, byte[] dataBuffer, int dataBufferLength } else if(type == MODE_RELIABLE_SEQUENCED) { - int id = dataBuffer[index] << 8 | dataBuffer[index + 1]; - index += 2; - int sequence = dataBuffer[index]; - index++; - int targetIndex = index; - index += GetTargetHeaderSize(target); + if(ImTarget(target, dataBuffer, index + 3)) + { + int id = dataBuffer[index] << 8 | dataBuffer[index + 1]; + index += 2; + int sequence = dataBuffer[index]; + index++; + index += GetTargetHeaderSize(target); - int len = dataBuffer[index]; - index++; + int len = dataBuffer[index]; + index++; - if(ImTarget(target, dataBuffer, targetIndex)) - { socket.OnReceiveReliableSequenced(connection, id, sequence, dataBuffer, index, len); + index += len; } - else if(type == MODE_RELIABLE_SEQUENCED) + else { - socket.MarkReliableSequence(connection, id, sequence); + index += 3; + index += GetTargetHeaderSize(target); + index += dataBuffer[index]; + index += 1; } - index += len; } else if(type == RELIABLE_ACK) { @@ -248,36 +248,6 @@ public void HandlePacket(int connection, byte[] dataBuffer, int dataBufferLength index += GetTargetHeaderSize(target); } } - else if(type == CONNECTION_REGISTRATION) - { - if(ImTarget(target, dataBuffer, index)) - { - socket.ConnectionAck(connection); - if((connectionsMask & (1u << connection)) == 0) - { - connectionsMask |= 1u << connection; - if(isInitComplete && eventListeners != null) - { - int owner = allConnections[connection].owner; - for(var i = 0; i < eventListenersCount; i++) - { - var listener = eventListeners[i]; - listener.SetProgramVariable("OnUNetConnected_playerId", owner); - listener.SendCustomEvent("OnUNetConnected"); - } - } - } - } - index += GetTargetHeaderSize(target); - } - else if(type == CONNECTION_ACK) - { - if(ImTarget(target, dataBuffer, index)) - { - socket.ConnectionConfirmed(connection); - } - index += GetTargetHeaderSize(target); - } } } @@ -307,18 +277,29 @@ public void OnOwnerReceived(int index, int playerId) socket.SetProgramVariable("manager", this); socket.Init(); - socket.ConnectionRegAll(connectedBeforeInit); - connectedBeforeInit = 0; - hasLocal = true; } else { - if(hasLocal) socket.ConnectionReg(index); - else connectedBeforeInit |= 1u << index; + connectionsMask |= 1u << index; } - if(!isInitComplete && hasLocal && hasMaster) Init(); + if(isInitComplete) + { + if(eventListeners != null) + { + for(var i = 0; i < eventListenersCount; i++) + { + var listener = eventListeners[i]; + listener.SetProgramVariable("OnUNetConnected_playerId", playerId); + listener.SendCustomEvent("OnUNetConnected"); + } + } + } + else + { + if(hasLocal && hasMaster) Init(); + } } } @@ -412,7 +393,7 @@ public bool SendTargets(int mode, byte[] data, int dataLength, int[] targetPlaye return socket.SendTarget(mode, data, dataLength, targetPlayerIds[0]); } int len = targetPlayerIds.Length; - int[] indices = new int[len]; + uint targetsMask = 0; for(var i = 0; i < len; i++) { int targetConnection = targetPlayerIds[i]; @@ -426,9 +407,9 @@ public bool SendTargets(int mode, byte[] data, int dataLength, int[] targetPlaye } } if(index < 0) return false; - indices[i] = index; + targetsMask = 1u << index; } - return socket.SendTargets(mode, data, dataLength, indices); + return socket.SendTargets(mode, data, dataLength, targetsMask); } private void Init() @@ -445,9 +426,9 @@ private void Init() int localId = Networking.LocalPlayer.playerId; for(var i = 0; i < totalConnectionsCount; i++) { - if((connectionsMask & (1u << i)) != 0) + int owner = allConnections[i].owner; + if(owner >= 0 && owner != localId) { - int owner = allConnections[i].owner; for(var j = 0; j < eventListenersCount; j++) { var listener = eventListeners[j]; diff --git a/UNet/Socket.cs b/UNet/Socket.cs index 6397cf9..85a1e73 100644 --- a/UNet/Socket.cs +++ b/UNet/Socket.cs @@ -1,4 +1,5 @@ -using UdonSharp; +using System; +using UdonSharp; using UnityEngine; namespace UNet @@ -12,17 +13,15 @@ public class Socket : UdonSharpBehaviour private const byte MODE_RELIABLE = 1; private const byte MODE_RELIABLE_SEQUENCED = 2; private const byte RELIABLE_ACK = 3; - private const byte CONNECTION_REGISTRATION = 4; - private const byte CONNECTION_ACK = 5; private const byte TARGET_ALL = 0; - private const byte TARGET_MASTER = 1 << 3; - private const byte TARGET_SINGLE = 2 << 3; - private const byte TARGET_MULTIPLE = 3 << 3; + private const byte TARGET_MASTER = 1 << 2; + private const byte TARGET_SINGLE = 2 << 2; + private const byte TARGET_MULTIPLE = 3 << 2; + + private const byte MSG_TYPE_MASK = 3; private const byte RELIABLE_ACK_MSG_HEADER = RELIABLE_ACK | TARGET_SINGLE; - private const byte CONNECTION_REG_MSG_HEADER = CONNECTION_REGISTRATION | TARGET_MULTIPLE; - private const byte CONNECTION_ACK_MSG_HEADER = CONNECTION_ACK | TARGET_MULTIPLE; private const int MAX_PACKET_SIZE = 144; @@ -41,22 +40,15 @@ public class Socket : UdonSharpBehaviour private const int RELIABLE_ACK_DATA_LENGTH = 6;//header + id(2 bytes) + mask(2 bytes) + connection - /// - /// Flush unreliable data buffers, if data doesn't fit into the packet - /// - public bool flushNotFitUnreliable = false; - private NetworkManager manager = null; private Connection connection = null; - #region reliable send private int reliableStartId = 0; private uint reliableExpectMask = 0; private bool updateReliable = false; - private int reliableSequence = 0; - private uint reliableSendMask = 0; + private uint reliableAttemptsMask = 0; private int reliableBufferIndex = 0; private int reliableBufferedCount = 0; @@ -65,17 +57,22 @@ public class Socket : UdonSharpBehaviour private int[] reliableLengths; #endregion + #region reliable sequenced + private int reliableSequenceStartIndex = 0; + private uint reliableSequenceTargets = 0; + #endregion + #region reliable ack private int[] sendAckStartIds; private uint[] sendAckMasks; #endregion - #region reliable receive - private int[] receiveSequencedBufferIndices; + #region reliable sequenced receive + private int[] receiveSequencedStartIndices; private byte[][][] receiveSequencedBuffer; - private uint[] receiveSequencesIgnoreMask; + #endregion - private int[] receivedReliableSequences; + #region reliable receive private int[] receivedReliableStartIds; private uint[] receivedReliableMasks; #endregion @@ -92,17 +89,11 @@ public class Socket : UdonSharpBehaviour private byte[] dataBuffer; #endregion - #region connection confirmation - private uint connectionsAck = 0; - private uint connectionReg = 0; - #endregion - private byte[] tmpDataBuffer; public void Init() { int connectionsCount = manager.totalConnectionsCount; - dataBufferLength = 0; dataBuffer = new byte[MAX_PACKET_SIZE]; unreliableBuffer = new byte[UNRELIABLE_BUFFER_SIZE][]; @@ -114,19 +105,16 @@ public void Init() sendAckStartIds = new int[connectionsCount]; sendAckMasks = new uint[connectionsCount]; - tmpDataBuffer = new byte[Mathf.Max(RELIABLE_ACK_DATA_LENGTH, 5)];//5 is max size of connection confirmation message + tmpDataBuffer = new byte[RELIABLE_ACK_DATA_LENGTH]; receivedReliableMasks = new uint[connectionsCount]; receivedReliableStartIds = new int[connectionsCount]; - receivedReliableSequences = new int[connectionsCount]; for(var i = 0; i < connectionsCount; i++) { receivedReliableStartIds[i] = -1; - receivedReliableSequences[i] = -1; } - receiveSequencesIgnoreMask = new uint[connectionsCount]; - receiveSequencedBufferIndices = new int[connectionsCount]; + receiveSequencedStartIndices = new int[connectionsCount]; receiveSequencedBuffer = new byte[connectionsCount][][]; for(var i = 0; i < connectionsCount; i++) { @@ -144,12 +132,10 @@ public void OnConnectionRelease(int connectionIndex) updateReliable = true; receivedReliableStartIds[connectionIndex] = -1; - receivedReliableSequences[connectionIndex] = -1; receivedReliableMasks[connectionIndex] = 0; sendAckStartIds[connectionIndex] = 0; sendAckMasks[connectionIndex] = 0; - receiveSequencesIgnoreMask[connectionIndex] = 0; - receiveSequencedBufferIndices[connectionIndex] = 0; + receiveSequencedStartIndices[connectionIndex] = 0; var buffer = receiveSequencedBuffer[connectionIndex]; for(var i = 0; i < RELIABLE_BUFFER_SIZE; i++) { @@ -160,74 +146,46 @@ public void OnConnectionRelease(int connectionIndex) #region add to buffer public bool SendAll(int mode, byte[] data, int count) { - byte[] buffer = TryAddModeData(mode, 1, data, count); + byte[] buffer = TryAddModeData(mode, manager.connectionsMask, 1, data, count); if(buffer == null) return false; buffer[0] = (byte)(mode | TARGET_ALL); - if(mode == MODE_RELIABLE || mode == MODE_RELIABLE_SEQUENCED) - { - int index = (reliableBufferIndex + reliableBufferedCount - 1) % RELIABLE_BUFFER_SIZE; - reliableExpectedAcks[index] = manager.connectionsMask; - } - return true; } public bool SendMaster(int mode, byte[] data, int count) { - byte[] buffer = TryAddModeData(mode, 1, data, count); + byte[] buffer = TryAddModeData(mode, MASTER_EXPECT_MASK, 1, data, count); if(buffer == null) return false; buffer[0] = (byte)(mode | TARGET_MASTER); - if(mode == MODE_RELIABLE || mode == MODE_RELIABLE_SEQUENCED) - { - int index = (reliableBufferIndex + reliableBufferedCount - 1) % RELIABLE_BUFFER_SIZE; - reliableExpectedAcks[index] = mode == MODE_RELIABLE_SEQUENCED ? manager.connectionsMask : MASTER_EXPECT_MASK; - } - return true; } public bool SendTarget(int mode, byte[] data, int count, int targetConnection) { - byte[] buffer = TryAddModeData(mode, 2, data, count); + byte[] buffer = TryAddModeData(mode, 1u << targetConnection, 2, data, count); if(buffer == null) return false; buffer[0] = (byte)(mode | TARGET_SINGLE); int targetIndex = 1; - if(mode == MODE_RELIABLE || mode == MODE_RELIABLE_SEQUENCED) - { - targetIndex += 2 + (mode == MODE_RELIABLE_SEQUENCED ? 1 : 0); - int index = (reliableBufferIndex + reliableBufferedCount - 1) % RELIABLE_BUFFER_SIZE; - reliableExpectedAcks[index] = mode == MODE_RELIABLE_SEQUENCED ? manager.connectionsMask : (1u << targetConnection); - } + if(mode == MODE_RELIABLE) targetIndex += 2; + if(mode == MODE_RELIABLE_SEQUENCED) targetIndex += 3; buffer[targetIndex] = (byte)targetConnection; return true; } - public bool SendTargets(int mode, byte[] data, int count, int[] targetConnections) + public bool SendTargets(int mode, byte[] data, int count, uint connectionsMask) { int maskSize = manager.connectionsMaskBytesCount; - byte[] buffer = TryAddModeData(mode, 1 + maskSize, data, count); + byte[] buffer = TryAddModeData(mode, connectionsMask, 1 + maskSize, data, count); if(buffer == null) return false; - buffer[0] = (byte)(mode | TARGET_MULTIPLE); - uint connectionsMask = 0; - int len = targetConnections.Length; - for(var i = 0; i < len; i++) - { - connectionsMask |= 1u << targetConnections[i]; - } - int targetIndex = 1; - if(mode == MODE_RELIABLE || mode == MODE_RELIABLE_SEQUENCED) - { - targetIndex += 2 + (mode == MODE_RELIABLE_SEQUENCED ? 1 : 0); - int index = (reliableBufferIndex + reliableBufferedCount - 1) % RELIABLE_BUFFER_SIZE; - reliableExpectedAcks[index] = mode == MODE_RELIABLE_SEQUENCED ? manager.connectionsMask : connectionsMask; - } + if(mode == MODE_RELIABLE) targetIndex += 2; + if(mode == MODE_RELIABLE_SEQUENCED) targetIndex += 3; buffer[targetIndex] = (byte)(connectionsMask & 255); if(maskSize > 1) @@ -249,7 +207,7 @@ public bool SendTargets(int mode, byte[] data, int count, int[] targetConnection return true; } - private byte[] TryAddModeData(int mode, int addSize, byte[] data, int dataSize) + private byte[] TryAddModeData(int mode, uint targets, int addSize, byte[] data, int dataSize) { if(mode == MODE_UNRELIABLE) { @@ -258,7 +216,7 @@ private byte[] TryAddModeData(int mode, int addSize, byte[] data, int dataSize) int fullMsgSize = dataSize + 1 + addSize; if(fullMsgSize > MAX_PACKET_SIZE) { - UnityEngine.Debug.LogErrorFormat("Message is too long: {0} max size: {1}", fullMsgSize, MAX_PACKET_SIZE); + Debug.LogErrorFormat("Message is too long: {0} max size: {1}", fullMsgSize, MAX_PACKET_SIZE); return null; } @@ -275,11 +233,13 @@ private byte[] TryAddModeData(int mode, int addSize, byte[] data, int dataSize) if(reliableBufferedCount >= RELIABLE_BUFFER_SIZE) return null; - int reliableHeadSize = 2 + (mode == MODE_RELIABLE_SEQUENCED ? 1 : 0); + int reliableHeadSize = 2; + if(mode == MODE_RELIABLE_SEQUENCED) reliableHeadSize++; + int fullMsgSize = dataSize + 1 + addSize + reliableHeadSize; if(fullMsgSize > MAX_PACKET_SIZE) { - UnityEngine.Debug.LogErrorFormat("Message is too long: {0} max size: {1}", fullMsgSize, MAX_PACKET_SIZE); + Debug.LogErrorFormat("Message is too long: {0} max size: {1}", fullMsgSize, MAX_PACKET_SIZE); return null; } @@ -289,12 +249,19 @@ private byte[] TryAddModeData(int mode, int addSize, byte[] data, int dataSize) int id = (reliableStartId + reliableBufferedCount) % RELIABLE_IDS_COUNT; buffer[1] = (byte)(id >> 8 & 255); buffer[2] = (byte)(id & 255); + reliableExpectMask = reliableExpectMask | (1u << reliableBufferedCount); + + reliableExpectedAcks[index] = manager.connectionsMask; if(mode == MODE_RELIABLE_SEQUENCED) { - buffer[3] = (byte)reliableSequence; - reliableSequence = (reliableSequence + 1) % RELIABLE_SEQUENCES_COUNT; + if(reliableSequenceTargets == 0 || reliableSequenceTargets != targets || reliableSequenceStartIndex >= RELIABLE_BUFFER_SIZE) + { + reliableSequenceTargets = targets; + reliableSequenceStartIndex = 0; + } + buffer[3] = (byte)reliableSequenceStartIndex; + reliableSequenceStartIndex++; } - reliableExpectMask = reliableExpectMask | (1u << reliableBufferedCount); reliableLengths[index] = fullMsgSize; reliableBufferedCount++; @@ -320,7 +287,7 @@ private byte[] FillMessageData(byte[][] targetBuffer, int targetIndex, byte[] da #region reliable ack public void OnReceivedAck(int connection, int idStart, uint mask) { - if(IdGreaterThan(idStart, reliableStartId)) mask <<= (idStart - reliableStartId + RELIABLE_IDS_COUNT) % RELIABLE_IDS_COUNT; + if(IdDiff(idStart, reliableStartId) > 0) mask <<= (idStart - reliableStartId + RELIABLE_IDS_COUNT) % RELIABLE_IDS_COUNT; else mask >>= (reliableStartId - idStart + RELIABLE_IDS_COUNT) % RELIABLE_IDS_COUNT; int index = reliableBufferIndex; @@ -368,89 +335,25 @@ private void UpdateReliable() reliableExpectMask |= 1u << i; } } - reliableSendMask &= reliableExpectMask; + reliableAttemptsMask &= reliableExpectMask; if(count > 0) { reliableStartId = (reliableStartId + count) % RELIABLE_IDS_COUNT; reliableBufferedCount -= count; reliableBufferIndex = (reliableBufferIndex + count) % RELIABLE_BUFFER_SIZE; - reliableSendMask >>= count; + reliableAttemptsMask >>= count; reliableExpectMask >>= count; } } #endregion - #region connection confirmation - public void ConnectionAck(int connectionIndex) - { - connectionsAck |= 1u << connectionIndex; - } - - public void ConnectionConfirmed(int connectionIndex) - { - connectionReg &= (1u << connectionIndex) ^ 0xFFFFFFFF; - } - - public void ConnectionReg(int connectionIndex) - { - connectionReg |= 1u << connectionIndex; - } - - public void ConnectionRegAll(uint connections) - { - connectionReg |= connections; - } - #endregion - #region send public void PrepareSendStream() { if(updateReliable) UpdateReliable(); dataBufferLength = 0; - if(connectionsAck != 0) - { - int maskSize = manager.connectionsMaskBytesCount; - tmpDataBuffer[0] = CONNECTION_ACK_MSG_HEADER; - tmpDataBuffer[1] = (byte)(connectionsAck & 255); - if(maskSize > 1) - { - tmpDataBuffer[2] = (byte)(connectionsAck >> 8 & 255); - if(maskSize > 2) - { - tmpDataBuffer[3] = (byte)(connectionsAck >> 16 & 255); - if(maskSize > 3) - { - tmpDataBuffer[4] = (byte)(connectionsAck >> 24 & 255); - } - } - } - if(TryAddToBuffer(tmpDataBuffer, maskSize + 1)) - { - connectionsAck = 0; - } - } - if(connectionReg != 0) - { - int maskSize = manager.connectionsMaskBytesCount; - tmpDataBuffer[0] = CONNECTION_REG_MSG_HEADER; - tmpDataBuffer[1] = (byte)(connectionReg & 255); - if(maskSize > 1) - { - tmpDataBuffer[2] = (byte)(connectionReg >> 8 & 255); - if(maskSize > 2) - { - tmpDataBuffer[3] = (byte)(connectionReg >> 16 & 255); - if(maskSize > 3) - { - tmpDataBuffer[4] = (byte)(connectionReg >> 24 & 255); - } - } - } - TryAddToBuffer(tmpDataBuffer, maskSize + 1); - } - int len = sendAckMasks.Length; for(var i = 0; i < len; i++) { @@ -472,54 +375,57 @@ public void PrepareSendStream() } } - bool sendUnreliable = true; - bool sendOrder = false; + bool sendUnreliable = false; int reliableSendIndex = 0; - while(true) + int prevSequence = -1; + bool sendSequenced = true; + while(unreliableBufferedCount > 0 || reliableSendIndex < reliableBufferedCount) { - if(sendOrder && sendUnreliable) + if(sendUnreliable && unreliableBufferedCount > 0) { - if(unreliableBufferedCount > 0) - { - var buffer = unreliableBuffer[unreliableBufferIndex]; - var bufferLength = unreliableLengths[unreliableBufferIndex]; - if(TryAddToBuffer(buffer, bufferLength)) - { - unreliableBufferIndex = (unreliableBufferIndex + 1) % UNRELIABLE_BUFFER_SIZE; - unreliableBufferedCount--; - } - else sendUnreliable = false; - } - else sendUnreliable = false; + TryAddToBuffer(unreliableBuffer[unreliableBufferIndex], unreliableLengths[unreliableBufferIndex]); + unreliableBufferIndex = (unreliableBufferIndex + 1) % UNRELIABLE_BUFFER_SIZE; + unreliableBufferedCount--; } else if(reliableSendIndex < reliableBufferedCount) { uint mask = 1u << reliableSendIndex; - if((reliableExpectMask & mask) == mask && (reliableSendMask & mask) == 0) + if((reliableExpectMask & mask) != 0) { + bool sendReliable = true; + int index = (reliableBufferIndex + reliableSendIndex) % RELIABLE_BUFFER_SIZE; var buffer = reliableBuffer[index]; - var bufferLength = reliableLengths[index]; - if(TryAddToBuffer(buffer, bufferLength)) + if((buffer[0] & MSG_TYPE_MASK) == MODE_RELIABLE_SEQUENCED) + { + if(sendSequenced) + { + int sequence = buffer[3]; + // If the sequence is less than or equal to the previous value, then a new group has started + // But only one group can be sent at a time + if(sequence > prevSequence) + { + prevSequence = sequence; + } + else sendSequenced = false; + } + + sendReliable = sendSequenced; + } + + if(sendReliable && (reliableAttemptsMask & mask) == 0) { - reliableSendMask |= mask; + TryAddToBuffer(buffer, reliableLengths[index]); } + reliableAttemptsMask |= mask; } reliableSendIndex++; } - else if(!sendUnreliable) break; - - sendOrder = !sendOrder; + sendUnreliable = !sendUnreliable; } - if(reliableSendMask == reliableExpectMask) reliableSendMask = 0; + if(reliableAttemptsMask == reliableExpectMask) reliableAttemptsMask = 0; connection.SetProgramVariable("dataBufferLength", dataBufferLength); connection.SetProgramVariable("dataBuffer", dataBuffer); - - if(flushNotFitUnreliable) - { - unreliableBufferIndex = 0; - unreliableBufferedCount = 0; - } } private bool TryAddToBuffer(byte[] data, int count) @@ -552,71 +458,32 @@ public void OnReceiveReliableSequenced(int connectionIndex, int id, int sequence { if(IsNewReliable(connectionIndex, id)) { - int lastSequence = receivedReliableSequences[connectionIndex]; - if(lastSequence < 0 || sequence == (lastSequence + 1) % RELIABLE_SEQUENCES_COUNT) + int sequenceStartIndex = receiveSequencedStartIndices[connectionIndex]; + if(sequence < sequenceStartIndex) sequenceStartIndex = 0; + + var connectionBuffer = receiveSequencedBuffer[connectionIndex]; + if(sequence == sequenceStartIndex) { OnDataReceived(connectionIndex, dataBuffer, index, len); - int count = ApplyBufferedSequences(connectionIndex, 0); - receivedReliableSequences[connectionIndex] = (sequence + count) % RELIABLE_SEQUENCES_COUNT; - } - else - { - int bufferIndex = (sequence - lastSequence + RELIABLE_SEQUENCES_COUNT) % RELIABLE_SEQUENCES_COUNT; - bufferIndex = (receiveSequencedBufferIndices[connectionIndex] + bufferIndex) % RELIABLE_BUFFER_SIZE; - - var data = new byte[len]; - for(var i = 0; i < len; i++) + sequenceStartIndex = sequence + 1; + while(sequenceStartIndex < RELIABLE_BUFFER_SIZE) { - data[i] = dataBuffer[index + i]; - } - receiveSequencedBuffer[connectionIndex][bufferIndex] = data; - } - } - } + var buffer = connectionBuffer[sequenceStartIndex]; + if(buffer == null) break; + connectionBuffer[sequenceStartIndex] = null; + sequenceStartIndex++; - public void MarkReliableSequence(int connectionIndex, int id, int sequence) - { - if(IsNewReliable(connectionIndex, id)) - { - int lastSequence = receivedReliableSequences[connectionIndex]; - if(lastSequence < 0 || sequence == (lastSequence + 1) % RELIABLE_SEQUENCES_COUNT) - { - int count = ApplyBufferedSequences(connectionIndex, 0); - receivedReliableSequences[connectionIndex] = (sequence + count) % RELIABLE_SEQUENCES_COUNT; + OnDataReceived(connectionIndex, buffer, 0, buffer.Length); + } } else { - receiveSequencesIgnoreMask[connectionIndex] |= 1u << (lastSequence - sequence); + // bad way but there is no better method for copying arrays + connectionBuffer[sequence] = Convert.FromBase64String(Convert.ToBase64String(dataBuffer, index, len)); } - } - } - - private int ApplyBufferedSequences(int connectionIndex, int readedCount) - { - var buffer = receiveSequencedBuffer[connectionIndex]; - int bufferIndex = receiveSequencedBufferIndices[connectionIndex]; - bufferIndex = (bufferIndex + readedCount) % RELIABLE_BUFFER_SIZE; - uint mask = receiveSequencesIgnoreMask[connectionIndex]; - mask >>= readedCount; - - uint bit = (mask & 1u); - byte[] data = null; - while(bit == 1 || (data = buffer[bufferIndex]) != null) - { - if(bit == 0) - { - OnDataReceived(connectionIndex, data, 0, data.Length); - buffer[bufferIndex] = null; - } - mask >>= 1; - bit = (mask & 1u); - bufferIndex = (bufferIndex + 1) % RELIABLE_BUFFER_SIZE; - readedCount++; + receiveSequencedStartIndices[connectionIndex] = sequenceStartIndex; } - receiveSequencedBufferIndices[connectionIndex] = bufferIndex; - receiveSequencesIgnoreMask[connectionIndex] = mask; - return readedCount; } private bool IsNewReliable(int connectionIndex, int id) @@ -631,43 +498,40 @@ private bool IsNewReliable(int connectionIndex, int id) else { minIndex = sendAckStartIds[connectionIndex]; - if(id < minIndex) + if(IdDiff(id, minIndex) < 0) { mask <<= minIndex - id; minIndex = id; sendAckStartIds[connectionIndex] = id; } } - uint bit = 1u << (id - minIndex); + uint bit = 1u << IdDiff(id, minIndex); sendAckMasks[connectionIndex] = mask | bit; int startId = receivedReliableStartIds[connectionIndex]; - if(startId < 0 || IdGreaterThan(id, startId)) + if(startId < 0) startId = id; + int offset = IdDiff(id, startId); + if(offset > -RELIABLE_BUFFER_SIZE) { mask = receivedReliableMasks[connectionIndex]; - int bitIndex; - if(startId < 0) + if(offset < 0) { - bitIndex = 0; + mask <<= -offset; + offset = 0; startId = id; } - else + else if(offset >= RELIABLE_BUFFER_SIZE) { - bitIndex = (id - startId + RELIABLE_IDS_COUNT) % RELIABLE_IDS_COUNT; - - if(bitIndex >= RELIABLE_BUFFER_SIZE) - { - bitIndex = RELIABLE_BUFFER_SIZE - 1; - int newStart = (id - bitIndex + RELIABLE_IDS_COUNT) % RELIABLE_IDS_COUNT; - int shift = (newStart - startId + RELIABLE_IDS_COUNT) % RELIABLE_IDS_COUNT; - if(shift < RELIABLE_BUFFER_SIZE) mask >>= shift; - startId = newStart; - } + offset = RELIABLE_BUFFER_SIZE - 1; + int newStart = (id - offset + RELIABLE_IDS_COUNT) % RELIABLE_IDS_COUNT; + int shift = IdDiff(newStart, startId); + if(shift < RELIABLE_BUFFER_SIZE) mask >>= shift; + startId = newStart; } - if((mask & 1u << bitIndex) == 0) + if((mask & 1u << offset) == 0) { - mask |= 1u << bitIndex; + mask |= 1u << offset; receivedReliableMasks[connectionIndex] = mask; receivedReliableStartIds[connectionIndex] = startId; return true; @@ -676,9 +540,11 @@ private bool IsNewReliable(int connectionIndex, int id) return false; } - private bool IdGreaterThan(int id, int reference) + private int IdDiff(int id, int reference) { - return (id > reference && (id - reference) < RELIABLE_IDS_COUNT_HALF) || (id < reference && ((reference - id) >= RELIABLE_IDS_COUNT_HALF)); + if(id < RELIABLE_IDS_COUNT_HALF) id += RELIABLE_IDS_COUNT; + if(reference < RELIABLE_IDS_COUNT_HALF) reference += RELIABLE_IDS_COUNT; + return id - reference; } private void OnDataReceived(int connectionIndex, byte[] dataBuffer, int index, int length)