Skip to content

Commit d130e99

Browse files
Merge branch 'develop' into feature/rpcqueuerevisited
2 parents cadac6a + 51b9e2e commit d130e99

6 files changed

Lines changed: 284 additions & 61 deletions

File tree

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
using MLAPI.Serialization.Pooled;
2+
using MLAPI.Serialization;
3+
using MLAPI.Configuration;
4+
using MLAPI.Profiling;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System;
8+
using System.IO;
9+
10+
namespace MLAPI
11+
{
12+
class MessageBatcher
13+
{
14+
public class SendStream
15+
{
16+
public FrameQueueItem Item;
17+
public PooledBitStream Stream = PooledBitStream.Get();
18+
public bool Empty = true;
19+
}
20+
21+
// Stores the stream of batched RPC to send to each client, by ClientId
22+
private Dictionary<ulong, SendStream> SendDict = new Dictionary<ulong, SendStream>();
23+
private PooledBitWriter Writer = new PooledBitWriter(PooledBitStream.Get());
24+
25+
// Used to store targets, internally
26+
private ulong[] TargetList = new ulong[0];
27+
28+
// Used to mark longer lengths. Works because we can't have zero-sized messages
29+
private const byte LongLenMarker = 0;
30+
31+
private void PushLength(int length, ref PooledBitWriter writer)
32+
{
33+
// If length is single byte we write it
34+
if (length < 256)
35+
{
36+
writer.WriteByte((byte)length); // write the amounts of bytes that are coming up
37+
}
38+
else
39+
{
40+
// otherwise we write a two-byte length
41+
writer.WriteByte(LongLenMarker); // mark larger size
42+
writer.WriteByte((byte)(length % 256)); // write the length modulo 256
43+
writer.WriteByte((byte)(length / 256)); // write the length divided by 256
44+
}
45+
}
46+
47+
private int PopLength(in BitStream messageStream)
48+
{
49+
int read = messageStream.ReadByte();
50+
// if we read a non-zero value, we have a single byte length
51+
// or a -1 error we can return
52+
if (read != LongLenMarker)
53+
{
54+
return read;
55+
}
56+
// otherwise, a two-byte length follows. We'll read in len1, len2
57+
int len1 = messageStream.ReadByte();
58+
if (len1 < 0)
59+
{
60+
// pass errors back to caller
61+
return len1;
62+
}
63+
int len2 = messageStream.ReadByte();
64+
if (len2 < 0)
65+
{
66+
// pass errors back to caller
67+
return len2;
68+
}
69+
70+
return len1 + len2 * 256;
71+
}
72+
73+
/// <summary>
74+
/// FillTargetList
75+
/// Fills a list with the ClientId's an item is targeted to
76+
/// </summary>
77+
/// <param name="item">the FrameQueueItem we want targets for</param>
78+
/// <param name="list">the list to fill</param>
79+
private static void FillTargetList(in FrameQueueItem item, ref ulong[] list)
80+
{
81+
switch (item.QueueItemType)
82+
{
83+
case RPCQueueManager.QueueItemType.ServerRpc:
84+
Array.Resize(ref list, 1);
85+
list[0] = item.NetworkId;
86+
break;
87+
case RPCQueueManager.QueueItemType.ClientRpc:
88+
// copy the list
89+
list = item.ClientIds.ToArray();
90+
break;
91+
default:
92+
break;
93+
}
94+
}
95+
96+
/// <summary>
97+
/// QueueItem
98+
/// Add a FrameQueueItem to be sent
99+
/// </summary>queueItem
100+
/// <param name="item">the threshold in bytes</param>
101+
public void QueueItem(in FrameQueueItem item)
102+
{
103+
FillTargetList(item, ref TargetList);
104+
105+
foreach (ulong clientId in TargetList)
106+
{
107+
if (!SendDict.ContainsKey(clientId))
108+
{
109+
// todo: consider what happens if many clients join and leave the game consecutively
110+
// we probably need a cleanup mechanism at some point
111+
SendDict[clientId] = new SendStream();
112+
}
113+
114+
if (SendDict[clientId].Empty)
115+
{
116+
SendDict[clientId].Empty = false;
117+
SendDict[clientId].Item = item;
118+
Writer.SetStream(SendDict[clientId].Stream);
119+
120+
Writer.WriteBit(false); // Encrypted
121+
Writer.WriteBit(false); // Authenticated
122+
123+
switch (item.QueueItemType)
124+
{
125+
// 6 bits are used for the message type, which is an MLAPIConstants
126+
case RPCQueueManager.QueueItemType.ServerRpc:
127+
Writer.WriteBits(MLAPIConstants.MLAPI_SERVER_RPC, 6); // MessageType
128+
break;
129+
case RPCQueueManager.QueueItemType.ClientRpc:
130+
Writer.WriteBits(MLAPIConstants.MLAPI_CLIENT_RPC, 6); // MessageType
131+
break;
132+
}
133+
}
134+
135+
// write the amounts of bytes that are coming up
136+
PushLength(item.MessageData.Count, ref Writer);
137+
138+
// write the message to send
139+
// todo: is there a faster alternative to .ToArray()
140+
Writer.WriteBytes(item.MessageData.ToArray(), item.MessageData.Count);
141+
142+
ProfilerStatManager.bytesSent.Record((int)item.MessageData.Count);
143+
ProfilerStatManager.rpcsSent.Record();
144+
}
145+
}
146+
147+
public delegate void SendCallbackType(ulong clientId, SendStream messageStream);
148+
public delegate void ReceiveCallbackType(BitStream messageStream, MLAPI.RPCQueueManager.QueueItemType messageType, ulong clientId, float time);
149+
150+
/// <summary>
151+
/// SendItems
152+
/// Send any batch of RPC that are of length above threshold
153+
/// </summary>
154+
/// <param name="threshold"> the threshold in bytes</param>
155+
/// <param name="sendCallback"> the function to call for sending the batch</param>
156+
public void SendItems(int threshold, SendCallbackType sendCallback)
157+
{
158+
foreach (KeyValuePair<ulong, SendStream> entry in SendDict)
159+
{
160+
if (!entry.Value.Empty)
161+
{
162+
// read the queued message
163+
int length = (int)SendDict[entry.Key].Stream.Length;
164+
165+
if (length >= threshold)
166+
{
167+
sendCallback(entry.Key, entry.Value);
168+
ProfilerStatManager.rpcBatchesSent.Record();
169+
170+
// clear the batch that was sent from the SendDict
171+
entry.Value.Stream.SetLength(0);
172+
entry.Value.Stream.Position = 0;
173+
entry.Value.Empty = true;
174+
}
175+
}
176+
}
177+
}
178+
179+
/// <summary>
180+
/// ReceiveItems
181+
/// Process the messageStream and call the callback with individual RPC messages
182+
/// </summary>
183+
/// <param name="messageStream"> the messageStream containing the batched RPC</param>
184+
/// <param name="receiveCallback"> the callback to call has type int f(message, type, clientId, time) </param>
185+
/// <param name="messageType"> the message type to pass back to callback</param>
186+
/// <param name="clientId"> the clientId to pass back to callback</param>
187+
/// <param name="receiveTime"> the packet receive time to pass back to callback</param>
188+
public int ReceiveItems(in BitStream messageStream, ReceiveCallbackType receiveCallback, MLAPI.RPCQueueManager.QueueItemType messageType, ulong clientId, float receiveTime)
189+
{
190+
do
191+
{
192+
// read the length of the next RPC
193+
int rpcSize = PopLength(messageStream);
194+
195+
if (rpcSize < 0)
196+
{
197+
// abort if there's an error reading lengths
198+
return 0;
199+
}
200+
201+
// copy what comes after current stream position
202+
long pos = messageStream.Position;
203+
BitStream copy = PooledBitStream.Get();
204+
copy.SetLength(rpcSize);
205+
copy.Position = 0;
206+
Buffer.BlockCopy(messageStream.GetBuffer(), (int)pos, copy.GetBuffer(), 0, rpcSize);
207+
208+
receiveCallback(copy, messageType, clientId, receiveTime);
209+
210+
// seek over the RPC
211+
// RPCReceiveQueueItem peeks at content, it doesn't advance
212+
messageStream.Seek(rpcSize, SeekOrigin.Current);
213+
} while (messageStream.Position < messageStream.Length);
214+
return 0;
215+
}
216+
}
217+
}

com.unity.multiplayer.mlapi/Runtime/Core/MessageBatcher.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

com.unity.multiplayer.mlapi/Runtime/Core/NetworkedBehaviour.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,7 @@ internal BitWriter BeginSendServerRpc(ServerRpcSendParams sendParams, bool isRel
5353
{
5454
var rpcQueueMananger = NetworkingManager.Singleton.RpcQueueManager;
5555

56-
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueContainer.QueueItemType.ServerRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkingManager.Singleton.ServerClientId, null);
57-
writer.WriteBit(false); // Encrypted
58-
writer.WriteBit(false); // Authenticated
59-
writer.WriteBits(MLAPIConstants.MLAPI_SERVER_RPC, 6); // MessageType
56+
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueManager.QueueItemType.ServerRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkingManager.Singleton.ServerClientId, null);
6057
writer.WriteUInt64Packed(NetworkId); // NetworkObjectId
6158
writer.WriteUInt16Packed(GetBehaviourId()); // NetworkBehaviourId
6259
return writer;
@@ -77,10 +74,7 @@ internal BitWriter BeginSendClientRpc(ClientRpcSendParams sendParams, bool isRel
7774
//This will start a new queue item entry and will then return the writer to the current frame's stream
7875
var rpcQueueMananger = NetworkingManager.Singleton.RpcQueueManager;
7976

80-
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueContainer.QueueItemType.ClientRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkId, sendParams.TargetClientIds ?? NetworkingManager.Singleton.ConnectedClientsList.Select(c => c.ClientId).ToArray());
81-
writer.WriteBit(false); // Encrypted
82-
writer.WriteBit(false); // Authenticated
83-
writer.WriteBits(MLAPIConstants.MLAPI_CLIENT_RPC, 6); // MessageType
77+
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueManager.QueueItemType.ClientRpc, Time.realtimeSinceStartup, StandardRpc_ChannelName, 0, NetworkId, sendParams.TargetClientIds ?? NetworkingManager.Singleton.ConnectedClientsList.Select(c => c.ClientId).ToArray());
8478
writer.WriteUInt64Packed(NetworkId); // NetworkObjectId
8579
writer.WriteUInt16Packed(GetBehaviourId()); // NetworkBehaviourId
8680
return writer;

com.unity.multiplayer.mlapi/Runtime/Core/NetworkingManager.cs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,7 @@ private void HandleRawTransportPoll(NetEventType eventType, ulong clientId, stri
10461046
}
10471047

10481048
private readonly BitStream inputStreamWrapper = new BitStream(new byte[0]);
1049+
private MessageBatcher batcher = new MessageBatcher();
10491050

10501051
internal void HandleIncomingData(ulong clientId, string channelName, ArraySegment<byte> data, float receiveTime, bool allowBuffer)
10511052
{
@@ -1164,32 +1165,18 @@ internal void HandleIncomingData(ulong clientId, string channelName, ArraySegmen
11641165
{
11651166
if (IsServer)
11661167
{
1167-
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1168-
s_MLAPIServerSTDRPCQueued.Begin();
1169-
#endif
1170-
1171-
InternalMessageHandler.RPCReceiveQueueItem(clientId, messageStream, receiveTime,RPCQueueContainer.QueueItemType.ServerRpc);
1172-
1173-
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1174-
s_MLAPIServerSTDRPCQueued.End();
1175-
#endif
1168+
batcher.ReceiveItems(messageStream, ReceiveCallback, RPCQueueManager.QueueItemType.ServerRpc, clientId, receiveTime);
11761169
}
1170+
ProfilerStatManager.rpcBatchesRcvd.Record();
11771171
break;
11781172
}
11791173
case MLAPIConstants.MLAPI_CLIENT_RPC:
11801174
{
11811175
if (IsClient)
11821176
{
1183-
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1184-
s_MLAPIClientSTDRPCQueued.Begin();
1185-
#endif
1186-
1187-
InternalMessageHandler.RPCReceiveQueueItem(clientId, messageStream,receiveTime,RPCQueueContainer.QueueItemType.ClientRpc);
1188-
1189-
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1190-
s_MLAPIClientSTDRPCQueued.End();
1191-
#endif
1177+
batcher.ReceiveItems(messageStream, ReceiveCallback, RPCQueueManager.QueueItemType.ClientRpc, clientId, receiveTime);
11921178
}
1179+
ProfilerStatManager.rpcBatchesRcvd.Record();
11931180
break;
11941181
}
11951182
default:
@@ -1205,6 +1192,31 @@ internal void HandleIncomingData(ulong clientId, string channelName, ArraySegmen
12051192
#endif
12061193
}
12071194

1195+
private static void ReceiveCallback(BitStream messageStream, MLAPI.RPCQueueManager.QueueItemType messageType, ulong clientId, float receiveTime)
1196+
{
1197+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1198+
if (messageType == RPCQueueManager.QueueItemType.ServerRpc)
1199+
{
1200+
s_MLAPIServerSTDRPCQueued.Begin();
1201+
}
1202+
else
1203+
{
1204+
s_MLAPIClientSTDRPCQueued.Begin();
1205+
}
1206+
#endif
1207+
InternalMessageHandler.RPCReceiveQueueItem(clientId, messageStream, receiveTime, messageType);
1208+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1209+
if (messageType == RPCQueueManager.QueueItemType.ServerRpc)
1210+
{
1211+
s_MLAPIServerSTDRPCQueued.End();
1212+
}
1213+
else
1214+
{
1215+
s_MLAPIClientSTDRPCQueued.End();
1216+
}
1217+
#endif
1218+
}
1219+
12081220
/// <summary>
12091221
/// InvokeRPC
12101222
/// Called when an inbound queued RPC is invoked

0 commit comments

Comments
 (0)