using System.Collections.Generic; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.IO; using System.Threading.Tasks; namespace Passer.Control.Core { public class Participant { //public ConnectionMethod connection; public IPEndPoint endPoint; public UdpClient udpClient; public string ipAddress; public string broadcastIpAddress = "255.255.255.255"; public int port; public Stream dataStream; public byte[] buffer = new byte[256]; public byte networkId = 0; public readonly ConcurrentQueue messageQueue = new(); public Participant GetClient(string ipAddress, int port) { foreach (Participant c in others) { if (c.ipAddress == ipAddress && c.port == port) return c; } return null; } public List others = new List(); #region Init public Participant() { this.dataStream = new EchoStream(); others.Add(this); } public Participant(UdpClient udpClient, int port) : this() { this.udpClient = udpClient; this.ipAddress = null; this.port = port; //this.dataStream = new EchoStream(); //clients.Add(this); } #endregion Init #region Update private static readonly float publishInterval = 3.0f; private float nextPublishMe = 0; public virtual void Update(float currentTime) { if (currentTime > this.nextPublishMe) { this.PublishBuffer(ClientMsg.Serialized(this.buffer, this.networkId)); ClientMsg.Publish(this, this.networkId); this.nextPublishMe = currentTime + Participant.publishInterval; } for (int ix = 0; ix < this.others.Count; ix++) { Participant client = this.others[ix]; if (client == null) continue; client.ProcessMessages(); } Thing.UpdateAll(currentTime); } #endregion Update #region Send public bool SendBuffer(int bufferSize) { //if (this.ipAddress == null) // return false; // UnityEngine.Debug.Log($"Send msg {buffer[0]} to {ipAddress}"); //this.udpClient.Send(this.buffer, bufferSize, this.ipAddress, this.port); this.udpClient.Send(this.buffer, bufferSize, this.endPoint); return true; } public bool PublishBuffer(int bufferSize) { if (this.broadcastIpAddress == null) return false; this.udpClient.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port); return true; } #endregion #region Receive public async Task ReceiveData() { while (true) { byte packetSize = (byte)this.dataStream.ReadByte(); if (packetSize != 0xFF) await ReceiveData(this.dataStream, this, packetSize); // else timeout } } public static async Task ReceiveData(Stream dataStream, Participant client, byte packetSize) { byte msgId = (byte)dataStream.ReadByte(); if (msgId == 0xFF) { // Timeout return; } //UnityEngine.Debug.Log($"R {msgId} from {client.ipAddress}"); bool result = false; switch (msgId) { case ClientMsg.Id: // 0xA0 / 160 result = await ClientMsg.Receive(dataStream, client, packetSize); break; case NetworkIdMsg.Id: // 0xA1 / 161 result = await NetworkIdMsg.Receive(dataStream, client, packetSize); break; case InvestigateMsg.Id: // 0x81 result = await InvestigateMsg.Receive(dataStream, client, packetSize); break; case ThingMsg.id: // 0x80 / 128 result = await ThingMsg.Receive(dataStream, client, packetSize); break; case NameMsg.Id: // 0x91 / 145 result = await NameMsg.Receive(dataStream, client, packetSize); break; case ModelUrlMsg.Id: // 0x90 / 144 result = await ModelUrlMsg.Receive(dataStream, client, packetSize); break; case PoseMsg.Id: // 0x10 / 16 result = await PoseMsg.Receive(dataStream, client, packetSize); break; case CustomMsg.Id: // 0xB1 / 177 result = await CustomMsg.Receive(dataStream, client, packetSize); break; case TextMsg.Id: // 0xB0 / 176 result = await TextMsg.Receive(dataStream, client, packetSize); break; case DestroyMsg.Id: // 0x20 / 32 result = await DestroyMsg.Receive(dataStream, client, packetSize); break; default: break; } if (result == false) { packetSize = msgId; // skip 1 byte, msgId is possibly a packet size byte await ReceiveData(dataStream, client, packetSize); } } #endregion #region Process public virtual void ProcessMessages() { while (this.messageQueue.TryDequeue(out IMessage msg)) ProcessMessage(msg); } public void ProcessMessage(IMessage msg) { switch (msg) { case ClientMsg clientMsg: ProcessClient(clientMsg); break; case NetworkIdMsg networkId: ProcessNetworkId(networkId); break; case InvestigateMsg investigate: ProcessInvestigate(investigate); break; case ThingMsg thing: ProcessThing(thing); break; case NameMsg name: //UnityEngine.Debug.Log($"Name [{name.networkId}/{name.thingId}] {name.name}"); ProcessName(name); break; case ModelUrlMsg modelUrl: ProcessModelUrl(modelUrl); break; case PoseMsg pose: ProcessPose(pose); break; case CustomMsg custom: ProcessCustom(custom); break; case TextMsg text: ProcessText(text); break; case DestroyMsg destroy: ProcessDestroy(destroy); break; default: return; } ForwardMessage(msg); } protected virtual void ProcessClient(ClientMsg client) { } protected virtual void ProcessNetworkId(NetworkIdMsg networkId) { } protected virtual void ProcessInvestigate(InvestigateMsg investigate) { } protected virtual void ProcessThing(ThingMsg thing) { } protected virtual void ProcessName(NameMsg name) { } protected virtual void ProcessModelUrl(ModelUrlMsg modelUrl) { } protected virtual void ProcessPose(PoseMsg pose) { } protected virtual void ProcessCustom(CustomMsg custom) { } protected virtual void ProcessText(TextMsg text) { } protected virtual void ProcessDestroy(DestroyMsg destroy) { } private void ForwardMessage(IMessage msg) { foreach (Participant client in others) { if (client == this) continue; //UnityEngine.Debug.Log($"---> {client.ipAddress}"); IMessage.SendMsg(client, msg); } } #endregion } }