-
Notifications
You must be signed in to change notification settings - Fork 461
feat: Dynamically size the UnityTransport send queues [MTT-2816] #2212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
1913c91
2fa18a9
056dc8e
669eedb
258f87f
1cf4d94
7e9ecb7
819ed97
52147d4
4248189
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,26 +8,33 @@ namespace Unity.Netcode.Transports.UTP | |
| /// <summary>Queue for batched messages meant to be sent through UTP.</summary> | ||
| /// <remarks> | ||
| /// Messages should be pushed on the queue with <see cref="PushMessage"/>. To send batched | ||
| /// messages, call <see cref="FillWriter"> with the <see cref="DataStreamWriter"/> obtained from | ||
| /// <see cref="NetworkDriver.BeginSend"/>. This will fill the writer with as many messages as | ||
| /// possible. If the send is successful, call <see cref="Consume"/> to remove the data from the | ||
| /// queue. | ||
| /// messages, call <see cref="FillWriterWithMessages"/> or <see cref="FillWriterWithBytes"/> | ||
| /// with the <see cref="DataStreamWriter"/> obtained from <see cref="NetworkDriver.BeginSend"/>. | ||
| /// This will fill the writer with as many messages/bytes as possible. If the send is | ||
| /// successful, call <see cref="Consume"/> to remove the data from the queue. | ||
| /// | ||
| /// This is meant as a companion to <see cref="BatchedReceiveQueue"/>, which should be used to | ||
| /// read messages sent with this queue. | ||
| /// </remarks> | ||
| internal struct BatchedSendQueue : IDisposable | ||
| { | ||
| private NativeArray<byte> m_Data; | ||
| private NativeList<byte> m_Data; | ||
| private NativeArray<int> m_HeadTailIndices; | ||
| private int m_MaximumCapacity; | ||
| private int m_MinimumCapacity; | ||
|
|
||
| /// <summary>Overhead that is added to each message in the queue.</summary> | ||
| public const int PerMessageOverhead = sizeof(int); | ||
|
|
||
| internal const int MinimumMinimumCapacity = 4096; | ||
|
|
||
| // Indices into m_HeadTailIndicies. | ||
| private const int k_HeadInternalIndex = 0; | ||
| private const int k_TailInternalIndex = 1; | ||
|
|
||
| // Only used for testing purposes. | ||
| internal int BackingListLength => m_Data.Length; | ||
|
|
||
| /// <summary>Index of the first byte of the oldest data in the queue.</summary> | ||
| private int HeadIndex | ||
| { | ||
|
|
@@ -43,18 +50,32 @@ private int TailIndex | |
| } | ||
|
|
||
| public int Length => TailIndex - HeadIndex; | ||
|
|
||
| public bool IsEmpty => HeadIndex == TailIndex; | ||
|
|
||
| public bool IsCreated => m_Data.IsCreated; | ||
|
|
||
| /// <summary>Construct a new empty send queue.</summary> | ||
| /// <param name="capacity">Maximum capacity of the send queue.</param> | ||
| public BatchedSendQueue(int capacity) | ||
| { | ||
| m_Data = new NativeArray<byte>(capacity, Allocator.Persistent); | ||
| // Make sure the maximum capacity will be even. | ||
| m_MaximumCapacity = capacity + (capacity & 1); | ||
|
|
||
| // We pick the minimum capacity such that if we keep doubling it, we'll eventually hit | ||
| // the maximum capacity exactly. The alternative would be to use capacities that are | ||
| // powers of 2, but this can lead to over-allocating quite a bit of memory (especially | ||
| // since we expect maximum capacities to be in the megabytes range). The approach taken | ||
| // here avoids this issue, at the cost of not having allocations of nice round sizes. | ||
| m_MinimumCapacity = m_MaximumCapacity; | ||
| while (m_MinimumCapacity / 2 >= MinimumMinimumCapacity) | ||
| { | ||
| m_MinimumCapacity /= 2; | ||
| } | ||
|
|
||
| m_Data = new NativeList<byte>(m_MinimumCapacity, Allocator.Persistent); | ||
| m_HeadTailIndices = new NativeArray<int>(2, Allocator.Persistent); | ||
|
|
||
| m_Data.ResizeUninitialized(m_MinimumCapacity); | ||
|
|
||
| HeadIndex = 0; | ||
| TailIndex = 0; | ||
| } | ||
|
|
@@ -68,6 +89,16 @@ public void Dispose() | |
| } | ||
| } | ||
|
|
||
| /// <summary>Write a raw buffer to a DataStreamWriter.</summary> | ||
| private unsafe void WriteBytes(ref DataStreamWriter writer, byte* data, int length) | ||
| { | ||
| #if UTP_TRANSPORT_2_0_ABOVE | ||
| writer.WriteBytesUnsafe(data, length); | ||
| #else | ||
| writer.WriteBytes(data, length); | ||
| #endif | ||
| } | ||
|
|
||
| /// <summary>Append data at the tail of the queue. No safety checks.</summary> | ||
| private void AppendDataAtTail(ArraySegment<byte> data) | ||
| { | ||
|
|
@@ -79,11 +110,7 @@ private void AppendDataAtTail(ArraySegment<byte> data) | |
|
|
||
| fixed (byte* dataPtr = data.Array) | ||
| { | ||
| #if UTP_TRANSPORT_2_0_ABOVE | ||
| writer.WriteBytesUnsafe(dataPtr + data.Offset, data.Count); | ||
| #else | ||
| writer.WriteBytes(dataPtr + data.Offset, data.Count); | ||
| #endif | ||
| WriteBytes(ref writer, dataPtr + data.Offset, data.Count); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -110,23 +137,49 @@ public bool PushMessage(ArraySegment<byte> message) | |
| return true; | ||
| } | ||
|
|
||
| // Check if there would be enough room if we moved data at the beginning of m_Data. | ||
| if (m_Data.Length - TailIndex + HeadIndex >= sizeof(int) + message.Count) | ||
| // Move the data at the beginning of of m_Data. Either it will leave enough space for | ||
| // the message, or we'll grow m_Data and will want the data at the beginning anyway. | ||
| if (HeadIndex > 0 && Length > 0) | ||
| { | ||
| // Move the data back at the beginning of m_Data. | ||
| unsafe | ||
| { | ||
| UnsafeUtility.MemMove(m_Data.GetUnsafePtr(), (byte*)m_Data.GetUnsafePtr() + HeadIndex, Length); | ||
| } | ||
|
|
||
| TailIndex = Length; | ||
| HeadIndex = 0; | ||
| } | ||
|
|
||
| // If there's enough space left at the end for the message, now is a good time to trim | ||
| // the capacity of m_Data if it got very large. We define "very large" here as having | ||
| // more than 75% of m_Data unused after adding the new message. | ||
| if (m_Data.Length - TailIndex >= sizeof(int) + message.Count) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a big issue, but when moving from The array doesn't have the concept of capacity, so that's why a For this to work as expected, I'd assume then that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you're correct, the list's length always matches its capacity. I'll add a comment about that since that's a non-obvious but important part of how the code is meant to work. Originally I had planned on getting rid of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After having tried it, we can't rely on the list's capacity as the capacity of our queue here, because |
||
| { | ||
| AppendDataAtTail(message); | ||
|
|
||
| while (TailIndex < m_Data.Length / 4 && m_Data.Length > m_MinimumCapacity) | ||
|
NoelStephensUnity marked this conversation as resolved.
Outdated
|
||
| { | ||
| m_Data.ResizeUninitialized(m_Data.Length / 2); | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| return false; | ||
| // If we get here we need to grow m_Data until the data fits (or it's too large). | ||
| while (m_Data.Length - TailIndex < sizeof(int) + message.Count) | ||
| { | ||
| // Can't grow m_Data anymore. Message simply won't fit. | ||
| if (m_Data.Length * 2 > m_MaximumCapacity) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| m_Data.ResizeUninitialized(m_Data.Length * 2); | ||
| } | ||
|
|
||
| // If we get here we know there's now enough room for the message. | ||
| AppendDataAtTail(message); | ||
| return true; | ||
| } | ||
|
|
||
| /// <summary> | ||
|
|
@@ -153,11 +206,13 @@ public int FillWriterWithMessages(ref DataStreamWriter writer) | |
|
|
||
| unsafe | ||
| { | ||
| var dataPtr = (byte*)m_Data.GetUnsafePtr() + HeadIndex; | ||
|
|
||
| #if UTP_TRANSPORT_2_0_ABOVE | ||
| var slice = m_Data.GetSubArray(HeadIndex, Length); | ||
| var slice = NativeArray.ConvertExistingDataToNativeArray<byte>(dataPtr, Length, Allocator.None); | ||
| var reader = new DataStreamReader(slice); | ||
| #else | ||
| var reader = new DataStreamReader((byte*)m_Data.GetUnsafePtr() + HeadIndex, Length); | ||
| var reader = new DataStreamReader(dataPtr, Length); | ||
| #endif | ||
|
|
||
| var writerAvailable = writer.Capacity; | ||
|
|
@@ -177,11 +232,7 @@ public int FillWriterWithMessages(ref DataStreamWriter writer) | |
| writer.WriteInt(messageLength); | ||
|
|
||
| var messageOffset = HeadIndex + reader.GetBytesRead(); | ||
| #if UTP_TRANSPORT_2_0_ABOVE | ||
| writer.WriteBytesUnsafe((byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength); | ||
| #else | ||
| writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength); | ||
| #endif | ||
| WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength); | ||
|
|
||
| writerAvailable -= sizeof(int) + messageLength; | ||
| readerOffset += sizeof(int) + messageLength; | ||
|
|
@@ -218,11 +269,7 @@ public int FillWriterWithBytes(ref DataStreamWriter writer) | |
|
|
||
| unsafe | ||
| { | ||
| #if UTP_TRANSPORT_2_0_ABOVE | ||
| writer.WriteBytesUnsafe((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength); | ||
| #else | ||
| writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength); | ||
| #endif | ||
| WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength); | ||
| } | ||
|
|
||
| return copyLength; | ||
|
|
@@ -236,10 +283,14 @@ public int FillWriterWithBytes(ref DataStreamWriter writer) | |
| /// <param name="size">Number of bytes to consume from the queue.</param> | ||
| public void Consume(int size) | ||
| { | ||
| // Adjust the head/tail indices such that we consume the given size. | ||
| if (size >= Length) | ||
| { | ||
| HeadIndex = 0; | ||
| TailIndex = 0; | ||
|
|
||
| // This is a no-op if m_Data is already at minimum capacity. | ||
| m_Data.ResizeUninitialized(m_MinimumCapacity); | ||
| } | ||
| else | ||
| { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.