using System; using System.Collections.Generic; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; namespace RoboidControl { /// /// A participant is used for communcation between things /// public class Participant : RemoteParticipant { public byte[] buffer = new byte[1024]; public ulong publishInterval = 3000; // = 3 seconds public string name = "Participant"; public IPEndPoint endPoint = null; public UdpClient udpClient = null; public string broadcastIpAddress = "255.255.255.255"; public readonly ConcurrentQueue messageQueue = new ConcurrentQueue(); #region Init /// /// Create a porticiapnt /// public Participant() { //senders.Add(this); } /// /// Create a participant with the give UDP port /// /// The port number on which to communicate // public Participant(int port) : this() { // this.port = port; // } /// /// Create a new participant for a site at the given address and port /// /// The ip address of the site server /// The port number of the site server public Participant(string ipAddress = "0.0.0.0", int port = 7681) : this() { this.ipAddress = ipAddress; this.port = port; this.udpClient = new UdpClient(); this.udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, port)); // local port // this.endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); // for sending // this.udpClient = new UdpClient(port); // for receiving // this.udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, port)); this.udpClient.BeginReceive(new AsyncCallback(result => ReceiveUDP(result)), null); } /// /// Create a participant using the given udp client /// /// UDP client to use for communication /// The port number on which to communicate public Participant(UdpClient udpClient, int port) : this() { this.udpClient = udpClient; this.port = port; } public List senders = new List(); public RemoteParticipant GetParticipant(string ipAddress, int port) { //Console.WriteLine($"Get Participant {ipAddress}:{port}"); foreach (RemoteParticipant sender in senders) { if (sender.ipAddress == ipAddress && sender.port == port) return sender; } return null; } public RemoteParticipant AddParticipant(string ipAddress, int port) { Console.WriteLine($"New Participant {ipAddress}:{port}"); RemoteParticipant participant = new RemoteParticipant(ipAddress, port) { networkId = (byte)this.senders.Count }; senders.Add(participant); return participant; } protected readonly Dictionary> thingMsgProcessors = new Dictionary>(); public delegate Thing ThingConstructor(RemoteParticipant sender, byte networkId, byte thingId); public void Register(byte thingType, ThingConstructor constr) { thingMsgProcessors[thingType] = new Func(constr); } public void Register(Thing.Type thingType) where ThingClass : Thing { Register((byte)thingType); } public void Register(byte thingType) where ThingClass : Thing { thingMsgProcessors[thingType] = (RemoteParticipant sender, byte networkId, byte thingId) => Activator.CreateInstance(typeof(ThingClass), sender, networkId, thingId) as ThingClass; Console.WriteLine($"Registering {typeof(ThingClass)} for thing type {thingType}"); } #endregion Init #region Update protected void ReceiveUDP(IAsyncResult result) { if (this.udpClient == null || this.endPoint == null) return; byte[] data = this.udpClient.EndReceive(result, ref this.endPoint); // This does not yet take multi-packet messages into account! if (this.endPoint == null) return; // We can receive our own publish (broadcast) packages. How do we recognize them???? // It is hard to determine our source port string ipAddress = this.endPoint.Address.ToString(); RemoteParticipant remoteParticipant = GetParticipant(ipAddress, this.endPoint.Port); if (remoteParticipant == null) remoteParticipant = AddParticipant(ipAddress, this.endPoint.Port); ReceiveData(data, remoteParticipant); udpClient.BeginReceive(new AsyncCallback(callbackResult => ReceiveUDP(callbackResult)), null); } protected ulong nextPublishMe = 0; public virtual void Update(ulong currentTimeMS = 0) { if (currentTimeMS == 0) { #if UNITY_5_3_OR_NEWER currentTimeMS = (ulong)(UnityEngine.Time.time * 1000); #endif } if (this.publishInterval > 0 && currentTimeMS > this.nextPublishMe) { Publish(); // Console.WriteLine($"{this.name} Publish ClientMsg {this.networkId}"); this.nextPublishMe = currentTimeMS + this.publishInterval; } int n = this.things.Count; for (int ix = 0; ix < n; ix++) { Thing thing = this.things[ix]; if (thing != null) { thing.Update(currentTimeMS); BinaryMsg binaryMsg = new(this.networkId, thing); foreach (RemoteParticipant sender in this.senders) this.Send(sender, binaryMsg); } } } public virtual void Publish() { this.Publish(new ParticipantMsg(this.networkId)); } #endregion Update #region Send public void SendThingInfo(RemoteParticipant remoteParticipant, Thing thing) { Console.WriteLine("Send thing info"); this.Send(remoteParticipant, new ThingMsg(this.networkId, thing)); this.Send(remoteParticipant, new NameMsg(this.networkId, thing)); this.Send(remoteParticipant, new ModelUrlMsg(this.networkId, thing)); this.Send(remoteParticipant, new BinaryMsg(this.networkId, thing)); } public bool Send(IMessage msg) { int bufferSize = msg.Serialize(ref this.buffer); if (bufferSize <= 0) return true; // Console.WriteLine($"msg to {endPoint.Address.ToString()} {endPoint.Port}"); this.udpClient?.Send(this.buffer, bufferSize, this.endPoint); return true; } public bool Send(RemoteParticipant remoteParticipant, IMessage msg) { int bufferSize = msg.Serialize(ref this.buffer); if (bufferSize <= 0) return true; IPEndPoint participantEndpoint = new IPEndPoint(IPAddress.Parse(remoteParticipant.ipAddress), remoteParticipant.port); Console.WriteLine($"msg to {participantEndpoint.Address.ToString()} {participantEndpoint.Port}"); this.udpClient?.Send(this.buffer, bufferSize, participantEndpoint); return true; } public void PublishThingInfo(Thing thing) { //Console.WriteLine("Publish thing info"); this.Publish(new ThingMsg(this.networkId, thing)); this.Publish(new NameMsg(this.networkId, thing)); this.Publish(new ModelUrlMsg(this.networkId, thing)); this.Publish(new BinaryMsg(this.networkId, thing)); } public bool Publish(IMessage msg) { int bufferSize = msg.Serialize(ref this.buffer); if (bufferSize <= 0) return true; // Console.WriteLine($"publish to {broadcastIpAddress.ToString()} {this.port}"); this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port); return true; } public bool SendBuffer(int bufferSize) { //if (this.ipAddress == null) // return false; // UnityEngine.Debug.Log($"Send msg {buffer[0]} to {ipAddress}"); //this.udpClient.Send(this.buffer, bufferSize, this.ipAddress, this.port); this.udpClient?.Send(this.buffer, bufferSize, this.endPoint); return true; } public bool PublishBuffer(int bufferSize) { if (this.broadcastIpAddress == null) return false; this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port); return true; } #endregion #region Receive public void ReceiveData(byte[] data, RemoteParticipant remoteParticipant) { byte msgId = data[0]; if (msgId == 0xFF) { // Timeout return; } switch (msgId) { case ParticipantMsg.Id: // 0xA0 / 160 this.Process(remoteParticipant, new ParticipantMsg(data)); break; case NetworkIdMsg.Id: // 0xA1 / 161 this.Process(remoteParticipant, new NetworkIdMsg(data)); break; case InvestigateMsg.Id: // 0x81 // result = await InvestigateMsg.Receive(dataStream, client, packetSize); break; case ThingMsg.id: // 0x80 / 128 this.Process(remoteParticipant, new ThingMsg(data)); break; case NameMsg.Id: // 0x91 / 145 this.Process(remoteParticipant, new NameMsg(data)); break; case ModelUrlMsg.Id: // 0x90 / 144 this.Process(remoteParticipant, new ModelUrlMsg(data)); break; case PoseMsg.Id: // 0x10 / 16 this.Process(remoteParticipant, new PoseMsg(data)); // result = await PoseMsg.Receive(dataStream, client, packetSize); break; case BinaryMsg.Id: // 0xB1 / 177 this.Process(remoteParticipant, new BinaryMsg(data)); break; case TextMsg.Id: // 0xB0 / 176 // result = await TextMsg.Receive(dataStream, client, packetSize); break; case DestroyMsg.Id: // 0x20 / 32 // result = await DestroyMsg.Receive(dataStream, client, packetSize); break; default: break; } } #endregion #region Process protected virtual void Process(RemoteParticipant sender, ParticipantMsg msg) { } protected virtual void Process(RemoteParticipant sender, NetworkIdMsg msg) { Console.WriteLine($"{this.name} receive network id {this.networkId} {msg.networkId}"); if (this.networkId != msg.networkId) { this.networkId = msg.networkId; foreach (Thing thing in this.things) //Thing.GetAllThings()) this.SendThingInfo(sender, thing); } } protected virtual void Process(InvestigateMsg msg) { } protected virtual void Process(RemoteParticipant sender, ThingMsg msg) { Console.WriteLine($"Participant: Process thing [{msg.networkId}/{msg.thingId}]"); } protected virtual void Process(RemoteParticipant sender, NameMsg msg) { // Console.WriteLine($"Participant: Process name [{msg.networkId}/{msg.thingId}] {msg.name}"); Thing thing = sender.Get(msg.networkId, msg.thingId); if (thing != null) thing.name = msg.name; } protected virtual void Process(RemoteParticipant sender, ModelUrlMsg msg) { //Console.WriteLine($"Participant: Process model [{msg.networkId}/{msg.thingId}] {msg.url}"); Thing thing = sender.Get(msg.networkId, msg.thingId); if (thing != null) thing.modelUrl = msg.url; Thing thing = sender.Get(msg.networkId, msg.thingId); if (thing != null) thing.modelUrl = msg.url; } protected virtual void Process(RemoteParticipant sender, PoseMsg msg) { //Console.WriteLine($"Participant: Process pose [{msg.networkId}/{msg.thingId}] {msg.poseType}"); Thing thing = sender.Get(msg.networkId, msg.thingId); if (thing != null) { thing.hasPosition = false; if ((msg.poseType & PoseMsg.Pose_Position) != 0) { thing.position = msg.position; thing.hasPosition = true; } if ((msg.poseType & PoseMsg.Pose_Orientation) != 0) thing.orientation = msg.orientation; else thing.orientation = null; if ((msg.poseType & PoseMsg.Pose_LinearVelocity) != 0) thing.linearVelocity = msg.linearVelocity; if ((msg.poseType & PoseMsg.Pose_AngularVelocity) != 0) thing.angularVelocity = msg.angularVelocity; } } protected virtual void Process(RemoteParticipant sender, BinaryMsg msg) { // Console.WriteLine($"Participant: Process binary [{msg.networkId}/{msg.thingId}]"); Thing thing = sender.Get(msg.networkId, msg.thingId); thing?.ProcessBinary(msg.bytes); } protected virtual void Process(TextMsg temsgxt) { } protected virtual void Process(DestroyMsg msg) { } private void ForwardMessage(IMessage msg) { foreach (Participant client in senders) { if (client == this) continue; //UnityEngine.Debug.Log($"---> {client.ipAddress}"); //IMessage.SendMsg(client, msg); msg.Serialize(ref client.buffer); client.SendBuffer(client.buffer.Length); } } #endregion } }