Skip to content

Commit 5d75987

Browse files
Merge branch 'feature/rpcqueuerevisited' into feature/networkupdate
2 parents 39f0502 + e1dc42f commit 5d75987

16 files changed

Lines changed: 684 additions & 271 deletions
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
using MLAPI.Serialization.Pooled;
2+
using MLAPI.Serialization;
3+
using MLAPI.Configuration;
4+
using MLAPI.Profiling;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System;
8+
using System.IO;
9+
10+
namespace MLAPI
11+
{
12+
class MessageBatcher
13+
{
14+
public class SendStream
15+
{
16+
public FrameQueueItem Item;
17+
public PooledBitStream Stream = PooledBitStream.Get();
18+
public bool Empty = true;
19+
}
20+
21+
// Stores the stream of batched RPC to send to each client, by ClientId
22+
private Dictionary<ulong, SendStream> SendDict = new Dictionary<ulong, SendStream>();
23+
private PooledBitWriter Writer = new PooledBitWriter(PooledBitStream.Get());
24+
25+
// Used to store targets, internally
26+
private ulong[] TargetList = new ulong[0];
27+
28+
// Used to mark longer lengths. Works because we can't have zero-sized messages
29+
private const byte LongLenMarker = 0;
30+
31+
private void PushLength(int length, ref PooledBitWriter writer)
32+
{
33+
// If length is single byte we write it
34+
if (length < 256)
35+
{
36+
writer.WriteByte((byte)length); // write the amounts of bytes that are coming up
37+
}
38+
else
39+
{
40+
// otherwise we write a two-byte length
41+
writer.WriteByte(LongLenMarker); // mark larger size
42+
writer.WriteByte((byte)(length % 256)); // write the length modulo 256
43+
writer.WriteByte((byte)(length / 256)); // write the length divided by 256
44+
}
45+
}
46+
47+
private int PopLength(in BitStream messageStream)
48+
{
49+
int read = messageStream.ReadByte();
50+
// if we read a non-zero value, we have a single byte length
51+
// or a -1 error we can return
52+
if (read != LongLenMarker)
53+
{
54+
return read;
55+
}
56+
// otherwise, a two-byte length follows. We'll read in len1, len2
57+
int len1 = messageStream.ReadByte();
58+
if (len1 < 0)
59+
{
60+
// pass errors back to caller
61+
return len1;
62+
}
63+
int len2 = messageStream.ReadByte();
64+
if (len2 < 0)
65+
{
66+
// pass errors back to caller
67+
return len2;
68+
}
69+
70+
return len1 + len2 * 256;
71+
}
72+
73+
/// <summary>
74+
/// FillTargetList
75+
/// Fills a list with the ClientId's an item is targeted to
76+
/// </summary>
77+
/// <param name="item">the FrameQueueItem we want targets for</param>
78+
/// <param name="list">the list to fill</param>
79+
private static void FillTargetList(in FrameQueueItem item, ref ulong[] list)
80+
{
81+
switch (item.queueItemType)
82+
{
83+
case RpcQueueContainer.QueueItemType.ServerRpc:
84+
Array.Resize(ref list, 1);
85+
list[0] = item.networkId;
86+
break;
87+
case RpcQueueContainer.QueueItemType.ClientRpc:
88+
// copy the list
89+
list = item.clientIds.ToArray();
90+
break;
91+
default:
92+
break;
93+
}
94+
}
95+
96+
/// <summary>
97+
/// QueueItem
98+
/// Add a FrameQueueItem to be sent
99+
/// </summary>queueItem
100+
/// <param name="item">the threshold in bytes</param>
101+
public void QueueItem(in FrameQueueItem item)
102+
{
103+
FillTargetList(item, ref TargetList);
104+
105+
foreach (ulong clientId in TargetList)
106+
{
107+
if (!SendDict.ContainsKey(clientId))
108+
{
109+
// todo: consider what happens if many clients join and leave the game consecutively
110+
// we probably need a cleanup mechanism at some point
111+
SendDict[clientId] = new SendStream();
112+
}
113+
114+
if (SendDict[clientId].Empty)
115+
{
116+
SendDict[clientId].Empty = false;
117+
SendDict[clientId].Item = item;
118+
Writer.SetStream(SendDict[clientId].Stream);
119+
120+
Writer.WriteBit(false); // Encrypted
121+
Writer.WriteBit(false); // Authenticated
122+
123+
switch (item.queueItemType)
124+
{
125+
// 6 bits are used for the message type, which is an MLAPIConstants
126+
case RpcQueueContainer.QueueItemType.ServerRpc:
127+
Writer.WriteBits(MLAPIConstants.MLAPI_SERVER_RPC, 6); // MessageType
128+
break;
129+
case RpcQueueContainer.QueueItemType.ClientRpc:
130+
Writer.WriteBits(MLAPIConstants.MLAPI_CLIENT_RPC, 6); // MessageType
131+
break;
132+
}
133+
}
134+
135+
// write the amounts of bytes that are coming up
136+
PushLength(item.messageData.Count, ref Writer);
137+
138+
// write the message to send
139+
// todo: is there a faster alternative to .ToArray()
140+
Writer.WriteBytes(item.messageData.ToArray(), item.messageData.Count);
141+
142+
ProfilerStatManager.bytesSent.Record((int)item.messageData.Count);
143+
ProfilerStatManager.rpcsSent.Record();
144+
}
145+
}
146+
147+
public delegate void SendCallbackType(ulong clientId, SendStream messageStream);
148+
public delegate void ReceiveCallbackType(BitStream messageStream, MLAPI.RpcQueueContainer.QueueItemType messageType, ulong clientId, float time);
149+
150+
/// <summary>
151+
/// SendItems
152+
/// Send any batch of RPC that are of length above threshold
153+
/// </summary>
154+
/// <param name="threshold"> the threshold in bytes</param>
155+
/// <param name="sendCallback"> the function to call for sending the batch</param>
156+
public void SendItems(int threshold, SendCallbackType sendCallback)
157+
{
158+
foreach (KeyValuePair<ulong, SendStream> entry in SendDict)
159+
{
160+
if (!entry.Value.Empty)
161+
{
162+
// read the queued message
163+
int length = (int)SendDict[entry.Key].Stream.Length;
164+
165+
if (length >= threshold)
166+
{
167+
sendCallback(entry.Key, entry.Value);
168+
ProfilerStatManager.rpcBatchesSent.Record();
169+
170+
// clear the batch that was sent from the SendDict
171+
entry.Value.Stream.SetLength(0);
172+
entry.Value.Stream.Position = 0;
173+
entry.Value.Empty = true;
174+
}
175+
}
176+
}
177+
}
178+
179+
/// <summary>
180+
/// ReceiveItems
181+
/// Process the messageStream and call the callback with individual RPC messages
182+
/// </summary>
183+
/// <param name="messageStream"> the messageStream containing the batched RPC</param>
184+
/// <param name="receiveCallback"> the callback to call has type int f(message, type, clientId, time) </param>
185+
/// <param name="messageType"> the message type to pass back to callback</param>
186+
/// <param name="clientId"> the clientId to pass back to callback</param>
187+
/// <param name="receiveTime"> the packet receive time to pass back to callback</param>
188+
public int ReceiveItems(in BitStream messageStream, ReceiveCallbackType receiveCallback, MLAPI.RpcQueueContainer.QueueItemType messageType, ulong clientId, float receiveTime)
189+
{
190+
do
191+
{
192+
// read the length of the next RPC
193+
int rpcSize = PopLength(messageStream);
194+
195+
if (rpcSize < 0)
196+
{
197+
// abort if there's an error reading lengths
198+
return 0;
199+
}
200+
201+
// copy what comes after current stream position
202+
long pos = messageStream.Position;
203+
BitStream copy = PooledBitStream.Get();
204+
copy.SetLength(rpcSize);
205+
copy.Position = 0;
206+
Buffer.BlockCopy(messageStream.GetBuffer(), (int)pos, copy.GetBuffer(), 0, rpcSize);
207+
208+
receiveCallback(copy, messageType, clientId, receiveTime);
209+
210+
// seek over the RPC
211+
// RPCReceiveQueueItem peeks at content, it doesn't advance
212+
messageStream.Seek(rpcSize, SeekOrigin.Current);
213+
} while (messageStream.Position < messageStream.Length);
214+
return 0;
215+
}
216+
}
217+
}

com.unity.multiplayer.mlapi/Runtime/Core/MessageBatcher.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

com.unity.multiplayer.mlapi/Runtime/Core/NetworkUpdateManager.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
/// About the Network Update Loop
2+
/// The NetworkUpdateEngine is a temporary solution for the network update loop implementation.
3+
/// This will be revised with a more robust and modular implementation in the near future.
4+
15
using System;
26
using System.Text;
37
using System.Collections.Generic;

com.unity.multiplayer.mlapi/Runtime/Core/NetworkedBehaviour.cs

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ internal enum NExec
3838
Client = 2
3939
}
4040

41+
/// <summary>
42+
/// This is a temporary solution for channel names and the below hardcoded value might not be mandatory in the future.
43+
/// </summary>
4144
private const string StandardRpc_ChannelName = "STDRPC";
4245

4346
#pragma warning disable 414
@@ -49,17 +52,8 @@ internal enum NExec
4952
internal BitWriter BeginSendServerRpc(ServerRpcSendParams sendParams, bool isReliable)
5053
{
5154
var rpcQueueMananger = NetworkingManager.Singleton.RpcQueueManager;
52-
if (rpcQueueMananger == null)
53-
{
54-
return null;
55-
}
56-
57-
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueManager.QueueItemType.ServerRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkingManager.Singleton.ServerClientId, null);
58-
writer.WriteBit(false); // Encrypted
59-
writer.WriteBit(false); // Authenticated
60-
writer.WriteBits(MLAPIConstants.MLAPI_SERVER_RPC, 6); // MessageType
61-
6255

56+
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RpcQueueContainer.QueueItemType.ServerRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkingManager.Singleton.ServerClientId, null);
6357
writer.WriteUInt64Packed(NetworkId); // NetworkObjectId
6458
writer.WriteUInt16Packed(GetBehaviourId()); // NetworkBehaviourId
6559

@@ -90,30 +84,8 @@ internal BitWriter BeginSendClientRpc(ClientRpcSendParams sendParams, bool isRel
9084
{
9185
//This will start a new queue item entry and will then return the writer to the current frame's stream
9286
var rpcQueueMananger = NetworkingManager.Singleton.RpcQueueManager;
93-
if (rpcQueueMananger == null)
94-
{
95-
return null;
96-
}
97-
98-
ulong[] TargetIds = sendParams.TargetClientIds;
99-
if(TargetIds == null)
100-
{
101-
if((NetworkingManager.Singleton.ConnectedClientsList.Count > 0))
102-
{
103-
var FirstPass = NetworkingManager.Singleton.ConnectedClientsList.Select(c => c.ClientId).ToList();
104-
FirstPass.Remove(NetworkingManager.Singleton.ServerClientId);
105-
TargetIds = FirstPass.ToArray();
106-
}
107-
}
108-
109-
//var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueManager.QueueItemType.ClientRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkId, sendParams.TargetClientIds ?? NetworkingManager.Singleton.ConnectedClientsList.Select(c => c.ClientId).ToArray());
110-
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueManager.QueueItemType.ClientRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkId, TargetIds);
111-
writer.WriteBit(false); // Encrypted
112-
writer.WriteBit(false); // Authenticated
113-
writer.WriteBits(MLAPIConstants.MLAPI_CLIENT_RPC, 6); // MessageType
114-
115-
11687

88+
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RpcQueueContainer.QueueItemType.ClientRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkId, sendParams.TargetClientIds ?? NetworkingManager.Singleton.ConnectedClientsList.Select(c => c.ClientId).ToArray());
11789
writer.WriteUInt64Packed(NetworkId); // NetworkObjectId
11890
writer.WriteUInt16Packed(GetBehaviourId()); // NetworkBehaviourId
11991

0 commit comments

Comments
 (0)