-
Notifications
You must be signed in to change notification settings - Fork 461
Expand file tree
/
Copy pathMessageBatcher.cs
More file actions
217 lines (192 loc) · 8.45 KB
/
MessageBatcher.cs
File metadata and controls
217 lines (192 loc) · 8.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
using MLAPI.Serialization.Pooled;
using MLAPI.Serialization;
using MLAPI.Configuration;
using MLAPI.Profiling;
using System.Collections.Generic;
using System.Linq;
using System;
using System.IO;
namespace MLAPI
{
class MessageBatcher
{
public class SendStream
{
public string channel;
public PooledBitStream Stream = PooledBitStream.Get();
public bool Empty = true;
}
// Stores the stream of batched RPC to send to each client, by ClientId
private Dictionary<ulong, SendStream> SendDict = new Dictionary<ulong, SendStream>();
private PooledBitWriter Writer = new PooledBitWriter(PooledBitStream.Get());
// Used to store targets, internally
private ulong[] TargetList = new ulong[0];
// Used to mark longer lengths. Works because we can't have zero-sized messages
private const byte LongLenMarker = 0;
private void PushLength(int length, ref PooledBitWriter writer)
{
// If length is single byte we write it
if (length < 256)
{
writer.WriteByte((byte)length); // write the amounts of bytes that are coming up
}
else
{
// otherwise we write a two-byte length
writer.WriteByte(LongLenMarker); // mark larger size
writer.WriteByte((byte)(length % 256)); // write the length modulo 256
writer.WriteByte((byte)(length / 256)); // write the length divided by 256
}
}
private int PopLength(in BitStream messageStream)
{
int read = messageStream.ReadByte();
// if we read a non-zero value, we have a single byte length
// or a -1 error we can return
if (read != LongLenMarker)
{
return read;
}
// otherwise, a two-byte length follows. We'll read in len1, len2
int len1 = messageStream.ReadByte();
if (len1 < 0)
{
// pass errors back to caller
return len1;
}
int len2 = messageStream.ReadByte();
if (len2 < 0)
{
// pass errors back to caller
return len2;
}
return len1 + len2 * 256;
}
/// <summary>
/// FillTargetList
/// Fills a list with the ClientId's an item is targeted to
/// </summary>
/// <param name="item">the FrameQueueItem we want targets for</param>
/// <param name="list">the list to fill</param>
private static void FillTargetList(in FrameQueueItem item, ref ulong[] list)
{
switch (item.QueueItemType)
{
case RPCQueueManager.QueueItemType.ServerRpc:
Array.Resize(ref list, 1);
list[0] = item.NetworkId;
break;
case RPCQueueManager.QueueItemType.ClientRpc:
// copy the list
list = item.ClientIds.ToArray();
break;
default:
break;
}
}
/// <summary>
/// QueueItem
/// Add a FrameQueueItem to be sent
/// </summary>queueItem
/// <param name="item">the threshold in bytes</param>
public void QueueItem(in FrameQueueItem item)
{
FillTargetList(item, ref TargetList);
foreach (ulong clientId in TargetList)
{
if (!SendDict.ContainsKey(clientId))
{
// todo: consider what happens if many clients join and leave the game consecutively
// we probably need a cleanup mechanism at some point
SendDict[clientId] = new SendStream();
}
if (SendDict[clientId].Empty)
{
SendDict[clientId].Empty = false;
SendDict[clientId].channel = item.Channel;
Writer.SetStream(SendDict[clientId].Stream);
Writer.WriteBit(false); // Encrypted
Writer.WriteBit(false); // Authenticated
switch (item.QueueItemType)
{
// 6 bits are used for the message type, which is an MLAPIConstants
case RPCQueueManager.QueueItemType.ServerRpc:
Writer.WriteBits(MLAPIConstants.MLAPI_SERVER_RPC, 6); // MessageType
break;
case RPCQueueManager.QueueItemType.ClientRpc:
Writer.WriteBits(MLAPIConstants.MLAPI_CLIENT_RPC, 6); // MessageType
break;
}
}
// write the amounts of bytes that are coming up
PushLength(item.MessageData.Count, ref Writer);
// write the message to send
// todo: is there a faster alternative to .ToArray()
Writer.WriteBytes(item.MessageData.ToArray(), item.MessageData.Count);
ProfilerStatManager.bytesSent.Record((int)item.MessageData.Count);
ProfilerStatManager.rpcsSent.Record();
}
}
public delegate void SendCallbackType(ulong clientId, SendStream messageStream);
public delegate void ReceiveCallbackType(BitStream messageStream, MLAPI.RPCQueueManager.QueueItemType messageType, ulong clientId, float time);
/// <summary>
/// SendItems
/// Send any batch of RPC that are of length above threshold
/// </summary>
/// <param name="threshold"> the threshold in bytes</param>
/// <param name="sendCallback"> the function to call for sending the batch</param>
public void SendItems(int threshold, SendCallbackType sendCallback)
{
foreach (KeyValuePair<ulong, SendStream> entry in SendDict)
{
if (!entry.Value.Empty)
{
// read the queued message
int length = (int)SendDict[entry.Key].Stream.Length;
if (length >= threshold)
{
sendCallback(entry.Key, entry.Value);
ProfilerStatManager.rpcBatchesSent.Record();
// clear the batch that was sent from the SendDict
entry.Value.Stream.SetLength(0);
entry.Value.Stream.Position = 0;
entry.Value.Empty = true;
}
}
}
}
/// <summary>
/// ReceiveItems
/// Process the messageStream and call the callback with individual RPC messages
/// </summary>
/// <param name="messageStream"> the messageStream containing the batched RPC</param>
/// <param name="receiveCallback"> the callback to call has type int f(message, type, clientId, time) </param>
/// <param name="messageType"> the message type to pass back to callback</param>
/// <param name="clientId"> the clientId to pass back to callback</param>
/// <param name="receiveTime"> the packet receive time to pass back to callback</param>
public int ReceiveItems(in BitStream messageStream, ReceiveCallbackType receiveCallback, MLAPI.RPCQueueManager.QueueItemType messageType, ulong clientId, float receiveTime)
{
using PooledBitStream copy = PooledBitStream.Get();
do
{
// read the length of the next RPC
int rpcSize = PopLength(messageStream);
if (rpcSize < 0)
{
// abort if there's an error reading lengths
return 0;
}
// copy what comes after current stream position
long pos = messageStream.Position;
copy.SetLength(rpcSize);
copy.Position = 0;
Buffer.BlockCopy(messageStream.GetBuffer(), (int)pos, copy.GetBuffer(), 0, rpcSize);
receiveCallback(copy, messageType, clientId, receiveTime);
// seek over the RPC
// RPCReceiveQueueItem peeks at content, it doesn't advance
messageStream.Seek(rpcSize, SeekOrigin.Current);
} while (messageStream.Position < messageStream.Length);
return 0;
}
}
}