Skip to content

Commit d7adbea

Browse files
NoelStephensUnityjeffreyrainy
authored andcommitted
NETC-8
Cleaning up the RPCQueue branch for PR. (first pass)
1 parent dc35020 commit d7adbea

5 files changed

Lines changed: 295 additions & 12 deletions

File tree

com.unity.multiplayer.mlapi/Runtime/Core/NetworkedBehaviour.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,22 @@ internal enum NExec
4848
// RuntimeAccessModifiersILPP will make this `protected`
4949
internal BitWriter BeginSendServerRpc(ServerRpcSendParams sendParams, bool isReliable)
5050
{
51+
// @mfatihmar (Unity) Begin: Temporary, placeholder implementation
5152
RPCQueueManager rpcQueueMananger = NetworkingManager.Singleton.GetRPCQueueManager();
5253
if(rpcQueueMananger != null)
5354
{
5455
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueManager.QueueItemType.ServerRPC, Time.realtimeSinceStartup, StandardRPC_Channel,0, NetworkingManager.Singleton.ServerClientId,null);
5556

5657
writer.WriteBit(false); // Encrypted
5758
writer.WriteBit(false); // Authenticated
58-
writer.WriteBits(MLAPIConstants.MLAPI_SERVER_RPC, 6); // MessageType
59+
writer.WriteBits(MLAPIConstants.MLAPI_STD_SERVER_RPC, 6); // MessageType
5960
writer.WriteUInt64Packed(NetworkId); // NetworkObjectId
6061
writer.WriteUInt16Packed(GetBehaviourId()); // NetworkBehaviourId
6162

6263
return writer;
6364
}
6465
return null;
66+
// @mfatihmar (Unity) End: Temporary, placeholder implementation
6567
}
6668

6769
// RuntimeAccessModifiersILPP will make this `protected`
@@ -86,7 +88,7 @@ internal BitWriter BeginSendClientRpc(ClientRpcSendParams sendParams, bool isRel
8688
var writer = rpcQueueMananger.BeginAddQueueItemToOutboundFrame(RPCQueueManager.QueueItemType.ClientRPC, Time.realtimeSinceStartup, StandardRPC_Channel,0, NetworkId,sendParams.TargetClientIds == null ? InternalMessageSender.GetAllClientIds().ToArray() : sendParams.TargetClientIds);
8789
writer.WriteBit(false); // Encrypted
8890
writer.WriteBit(false); // Authenticated
89-
writer.WriteBits(MLAPIConstants.MLAPI_CLIENT_RPC, 6); // MessageType
91+
writer.WriteBits(MLAPIConstants.MLAPI_STD_CLIENT_RPC, 6); // MessageType
9092
writer.WriteUInt64Packed(NetworkId); // NetworkObjectId
9193
writer.WriteUInt16Packed(GetBehaviourId()); // NetworkBehaviourId
9294

com.unity.multiplayer.mlapi/Runtime/Core/NetworkingManager.cs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,55 @@ internal void HandleIncomingData(ulong clientId, string channelName, ArraySegmen
11781178
ReceiveTime = receiveTime
11791179
});
11801180
break;
1181+
case MLAPIConstants.MLAPI_SERVER_RPC:
1182+
{
1183+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1184+
s_MLAPIServerRPCQueued.Begin();
1185+
#endif
1186+
1187+
if (IsServer) InternalMessageHandler.RPCReceiveQueueItem(clientId, messageStream, receiveTime,RPCQueueManager.QueueItemType.ServerRPC);
1188+
1189+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1190+
s_MLAPIServerRPCQueued.End();
1191+
#endif
1192+
break;
1193+
}
1194+
case MLAPIConstants.MLAPI_SERVER_RPC_REQUEST:
1195+
if (IsServer) InternalMessageHandler.HandleServerRPCRequest(clientId, messageStream, channelName, security);
1196+
break;
1197+
case MLAPIConstants.MLAPI_SERVER_RPC_RESPONSE:
1198+
if (IsClient) InternalMessageHandler.HandleServerRPCResponse(clientId, messageStream);
1199+
break;
1200+
case MLAPIConstants.MLAPI_CLIENT_RPC:
1201+
{
1202+
if (IsClient)
1203+
{
1204+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1205+
s_MLAPIClientRPCQueued.Begin();
1206+
#endif
1207+
1208+
InternalMessageHandler.RPCReceiveQueueItem(clientId, messageStream,receiveTime,RPCQueueManager.QueueItemType.ClientRPC);
1209+
1210+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
1211+
s_MLAPIClientRPCQueued.End();
1212+
#endif
1213+
}
1214+
break;
1215+
}
1216+
case MLAPIConstants.MLAPI_CLIENT_RPC_REQUEST:
1217+
if (IsClient) InternalMessageHandler.HandleClientRPCRequest(clientId, messageStream, channelName, security, BufferCallback, new PreBufferPreset()
1218+
{
1219+
AllowBuffer = allowBuffer,
1220+
ChannelName = channelName,
1221+
ClientId = clientId,
1222+
Data = data,
1223+
MessageType = messageType,
1224+
ReceiveTime = receiveTime
1225+
});
1226+
break;
1227+
case MLAPIConstants.MLAPI_CLIENT_RPC_RESPONSE:
1228+
if (IsServer) InternalMessageHandler.HandleClientRPCResponse(clientId, messageStream);
1229+
break;
11811230
case MLAPIConstants.MLAPI_UNNAMED_MESSAGE:
11821231
InternalMessageHandler.HandleUnnamedMessage(clientId, messageStream);
11831232
break;
@@ -1201,7 +1250,7 @@ internal void HandleIncomingData(ulong clientId, string channelName, ArraySegmen
12011250
case MLAPIConstants.MLAPI_SERVER_LOG:
12021251
if (IsServer && NetworkConfig.EnableNetworkLogs) InternalMessageHandler.HandleNetworkLog(clientId, messageStream);
12031252
break;
1204-
case MLAPIConstants.MLAPI_SERVER_RPC:
1253+
case MLAPIConstants.MLAPI_STD_SERVER_RPC:
12051254
{
12061255
if (IsServer)
12071256
{
@@ -1218,7 +1267,7 @@ internal void HandleIncomingData(ulong clientId, string channelName, ArraySegmen
12181267
}
12191268
break;
12201269
}
1221-
case MLAPIConstants.MLAPI_CLIENT_RPC:
1270+
case MLAPIConstants.MLAPI_STD_CLIENT_RPC:
12221271
{
12231272
if (IsClient)
12241273
{

com.unity.multiplayer.mlapi/Runtime/Core/RPCMethods.meta

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

com.unity.multiplayer.mlapi/Runtime/Messaging/InternalMessageHandler.cs

Lines changed: 225 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,23 @@ internal static class InternalMessageHandler
5050

5151
static ProfilerMarker s_HandleNetworkedVarUpdate =
5252
new ProfilerMarker("InternalMessageHandler.HandleNetworkedVarUpdate");
53+
static ProfilerMarker s_HandleServerRPC =
54+
new ProfilerMarker("InternalMessageHandler.HandleServerRPC");
55+
56+
static ProfilerMarker s_HandleServerRPCRequest =
57+
new ProfilerMarker("InternalMessageHandler.HandleServerRPCRequest");
58+
59+
static ProfilerMarker s_HandleServerRPCResponse =
60+
new ProfilerMarker("InternalMessageHandler.HandleServerRPCResponse");
61+
62+
static ProfilerMarker s_HandleClientRPC =
63+
new ProfilerMarker("InternalMessageHandler.HandleClientRPC");
64+
65+
static ProfilerMarker s_HandleClientRPCRequest =
66+
new ProfilerMarker("InternalMessageHandler.HandleClientRPCRequest");
67+
68+
static ProfilerMarker s_HandleClientRPCResponse =
69+
new ProfilerMarker("InternalMessageHandler.HandleClientRPCResponse");
5370
static ProfilerMarker s_HandleUnnamedMessage =
5471
new ProfilerMarker("InternalMessageHandler.HandleUnnamedMessage");
5572

@@ -690,12 +707,219 @@ internal static void HandleNetworkedVarUpdate(ulong clientId, Stream stream, Act
690707
/// </summary>
691708
/// <param name="clientId"></param>
692709
/// <param name="stream"></param>
693-
internal static void RPCReceiveQueueItem(ulong clientId, Stream stream, float receiveTime, RPCQueueManager.QueueItemType queueItemType)
710+
internal static void RPCReceiveQueueItem(ulong clientId, Stream stream,float receiveTime,RPCQueueManager.QueueItemType queueItemType)
694711
{
695712
if (NetworkingManager.Singleton.IsServer && clientId == NetworkingManager.Singleton.ServerClientId)
696713
{
697714
return;
698715
}
716+
ProfilerStatManager.rpcsRcvd.Record();
717+
RPCQueueManager rpcQueueManager = NetworkingManager.Singleton.GetRPCQueueManager();
718+
if(rpcQueueManager != null)
719+
{
720+
rpcQueueManager.AddQueueItemToInboundFrame(queueItemType, receiveTime, clientId, (BitStream)stream);
721+
}
722+
}
723+
724+
725+
726+
727+
internal static void HandleServerRPC(ulong clientId, Stream stream)
728+
{
729+
ProfilerStatManager.rpcsRcvd.Record();
730+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
731+
s_HandleServerRPC.Begin();
732+
#endif
733+
using (PooledBitReader reader = PooledBitReader.Get(stream))
734+
{
735+
ulong networkId = reader.ReadUInt64Packed();
736+
ushort behaviourId = reader.ReadUInt16Packed();
737+
ulong hash = reader.ReadUInt64Packed();
738+
739+
if (SpawnManager.SpawnedObjects.ContainsKey(networkId))
740+
{
741+
NetworkedBehaviour behaviour = SpawnManager.SpawnedObjects[networkId].GetBehaviourAtOrderIndex(behaviourId);
742+
743+
if (behaviour == null)
744+
{
745+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ServerRPC message received for a non-existent behaviour. NetworkId: " + networkId + ", behaviourIndex: " + behaviourId);
746+
}
747+
else
748+
{
749+
behaviour.OnRemoteServerRPC(hash, clientId, stream);
750+
}
751+
}
752+
else if (NetworkingManager.Singleton.IsServer || !NetworkingManager.Singleton.NetworkConfig.EnableMessageBuffering)
753+
{
754+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ServerRPC message received for a non-existent object with id: " + networkId + ". This message is lost.");
755+
}
756+
}
757+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
758+
s_HandleServerRPC.End();
759+
#endif
760+
}
761+
762+
internal static void HandleServerRPCRequest(ulong clientId, Stream stream, string channelName, SecuritySendFlags security)
763+
{
764+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
765+
s_HandleServerRPCRequest.Begin();
766+
#endif
767+
using (PooledBitReader reader = PooledBitReader.Get(stream))
768+
{
769+
ulong networkId = reader.ReadUInt64Packed();
770+
ushort behaviourId = reader.ReadUInt16Packed();
771+
ulong hash = reader.ReadUInt64Packed();
772+
ulong responseId = reader.ReadUInt64Packed();
773+
774+
if (SpawnManager.SpawnedObjects.ContainsKey(networkId))
775+
{
776+
NetworkedBehaviour behaviour = SpawnManager.SpawnedObjects[networkId].GetBehaviourAtOrderIndex(behaviourId);
777+
778+
if (behaviour == null)
779+
{
780+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ServerRPCRequest message received for a non-existent behaviour. NetworkId: " + networkId + ", behaviourIndex: " + behaviourId);
781+
}
782+
else
783+
{
784+
object result = behaviour.OnRemoteServerRPC(hash, clientId, stream);
785+
786+
using (PooledBitStream responseStream = PooledBitStream.Get())
787+
{
788+
using (PooledBitWriter responseWriter = PooledBitWriter.Get(responseStream))
789+
{
790+
responseWriter.WriteUInt64Packed(responseId);
791+
responseWriter.WriteObjectPacked(result);
792+
}
793+
794+
InternalMessageSender.Send(clientId, MLAPIConstants.MLAPI_SERVER_RPC_RESPONSE, channelName, responseStream, security);
795+
ProfilerStatManager.rpcsSent.Record();
796+
}
797+
}
798+
}
799+
else
800+
{
801+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ServerRPCRequest message received for a non-existent object with id: " + networkId + ". This message is lost.");
802+
}
803+
}
804+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
805+
s_HandleServerRPCRequest.End();
806+
#endif
807+
}
808+
809+
internal static void HandleServerRPCResponse(ulong clientId, Stream stream)
810+
{
811+
ProfilerStatManager.rpcsRcvd.Record();
812+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
813+
s_HandleServerRPCResponse.Begin();
814+
#endif
815+
using (PooledBitReader reader = PooledBitReader.Get(stream))
816+
{
817+
ulong responseId = reader.ReadUInt64Packed();
818+
819+
if (ResponseMessageManager.ContainsKey(responseId))
820+
{
821+
RpcResponseBase responseBase = ResponseMessageManager.GetByKey(responseId);
822+
823+
ResponseMessageManager.Remove(responseId);
824+
825+
responseBase.IsDone = true;
826+
responseBase.Result = reader.ReadObjectPacked(responseBase.Type);
827+
responseBase.IsSuccessful = true;
828+
}
829+
else
830+
{
831+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ServerRPCResponse message received for a non-existent responseId: " + responseId + ". This response is lost.");
832+
}
833+
}
834+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
835+
s_HandleServerRPCResponse.End();
836+
#endif
837+
}
838+
839+
internal static void HandleClientRPC(ulong clientId, Stream stream, Action<ulong, PreBufferPreset> bufferCallback, PreBufferPreset bufferPreset)
840+
{
841+
ProfilerStatManager.rpcsRcvd.Record();
842+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
843+
s_HandleClientRPC.Begin();
844+
#endif
845+
using (PooledBitReader reader = PooledBitReader.Get(stream))
846+
{
847+
ulong networkId = reader.ReadUInt64Packed();
848+
ushort behaviourId = reader.ReadUInt16Packed();
849+
ulong hash = reader.ReadUInt64Packed();
850+
851+
if (SpawnManager.SpawnedObjects.ContainsKey(networkId))
852+
{
853+
NetworkedBehaviour behaviour = SpawnManager.SpawnedObjects[networkId].GetBehaviourAtOrderIndex(behaviourId);
854+
855+
if (behaviour == null)
856+
{
857+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPC message received for a non-existent behaviour. NetworkId: " + networkId + ", behaviourIndex: " + behaviourId);
858+
}
859+
else
860+
{
861+
behaviour.OnRemoteClientRPC(hash, clientId, stream);
862+
}
863+
}
864+
else if (NetworkingManager.Singleton.IsServer || !NetworkingManager.Singleton.NetworkConfig.EnableMessageBuffering)
865+
{
866+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPC message received for a non-existent object with id: " + networkId + ". This message is lost.");
867+
}
868+
else
869+
{
870+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPC message received for a non-existent object with id: " + networkId + ". This message will be buffered and might be recovered.");
871+
bufferCallback(networkId, bufferPreset);
872+
}
873+
}
874+
#if DEVELOPMENT_BUILD || UNITY_EDITOR
875+
s_HandleClientRPC.End();
876+
#endif
877+
}
878+
879+
internal static void HandleClientRPCRequest(ulong clientId, Stream stream, string channelName, SecuritySendFlags security, Action<ulong, PreBufferPreset> bufferCallback, PreBufferPreset bufferPreset)
880+
{
881+
if (NetworkingManager.Singleton.IsServer && clientId == NetworkingManager.Singleton.ServerClientId)
882+
{
883+
ulong networkId = reader.ReadUInt64Packed();
884+
ushort behaviourId = reader.ReadUInt16Packed();
885+
ulong hash = reader.ReadUInt64Packed();
886+
ulong responseId = reader.ReadUInt64Packed();
887+
888+
if (SpawnManager.SpawnedObjects.ContainsKey(networkId))
889+
{
890+
NetworkedBehaviour behaviour = SpawnManager.SpawnedObjects[networkId].GetBehaviourAtOrderIndex(behaviourId);
891+
892+
if (behaviour == null)
893+
{
894+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPCRequest message received for a non-existent behaviour. NetworkId: " + networkId + ", behaviourIndex: " + behaviourId);
895+
}
896+
else
897+
{
898+
object result = behaviour.OnRemoteClientRPC(hash, clientId, stream);
899+
900+
using (PooledBitStream responseStream = PooledBitStream.Get())
901+
{
902+
using (PooledBitWriter responseWriter = PooledBitWriter.Get(responseStream))
903+
{
904+
responseWriter.WriteUInt64Packed(responseId);
905+
responseWriter.WriteObjectPacked(result);
906+
}
907+
908+
InternalMessageSender.Send(clientId, MLAPIConstants.MLAPI_CLIENT_RPC_RESPONSE, channelName, responseStream, security);
909+
ProfilerStatManager.rpcsSent.Record();
910+
}
911+
}
912+
}
913+
else if (NetworkingManager.Singleton.IsServer || !NetworkingManager.Singleton.NetworkConfig.EnableMessageBuffering)
914+
{
915+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPCRequest message received for a non-existent object with id: " + networkId + ". This message is lost.");
916+
}
917+
else
918+
{
919+
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPCRequest message received for a non-existent object with id: " + networkId + ". This message will be buffered and might be recovered.");
920+
bufferCallback(networkId, bufferPreset);
921+
}
922+
}
699923

700924
ProfilerStatManager.rpcsRcvd.Record();
701925
RPCQueueManager rpcQueueManager = NetworkingManager.Singleton.GetRPCQueueManager();

0 commit comments

Comments
 (0)