using System; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Net.NetworkInformation; namespace RoboidControl { /// /// A participant using UDP communication /// /// A local participant is the local device which can communicate with /// other participants It manages all local things and communcation with other /// participants. Each application has a local participant which is usually /// explicit in the code. An participant can be isolated. In that case it is /// standalong and does not communicate with other participants. /// /// It is possible to work with an hidden participant by creating things without /// specifying a participant in the constructor. In that case an hidden isolated /// participant is created which can be obtained using /// RoboidControl::IsolatedParticipant::Isolated(). /// @sa RoboidControl::Thing::Thing() public class ParticipantUDP : Participant { #region Init /// /// Create a participant without connecting to a site /// /// The port number on which to communicate /// These participant typically broadcast Participant messages to let site /// servers on the local network know their presence. Alternatively they can /// broadcast information which can be used directly by other participants. public ParticipantUDP(int port = 0) : base("127.0.0.1", port) { if (this.port == 0) this.isIsolated = true; Participant.AddParticipant(this); } /// /// Create a participant which will try to connect to a site. /// /// The ip address of the site server /// The port number of the site server /// The port used by the local participant public ParticipantUDP(string ipAddress, int port = 7681, int localPort = 7681) : base("127.0.0.1", localPort) { if (this.port == 0) this.isIsolated = true; else this.remoteSite = new Participant(ipAddress, port); Participant.AddParticipant(this); // Determine local IP address IPAddress localIpAddress = null; IPAddress subnetMask = null; using (Socket socket = new(AddressFamily.InterNetwork, SocketType.Dgram, 0)) { // Connect to a remote endpoint — we won't actually send anything socket.Connect("1.1.1.1", 65530); if (socket.LocalEndPoint is IPEndPoint endPoint) { localIpAddress = endPoint.Address; this.ipAddress = localIpAddress.ToString(); } } // Find subnet mask foreach (NetworkInterface nwInterface in NetworkInterface.GetAllNetworkInterfaces()) { if (nwInterface.OperationalStatus != OperationalStatus.Up) continue; foreach (UnicastIPAddressInformation unicastAddress in nwInterface.GetIPProperties().UnicastAddresses) { if (unicastAddress.Address.AddressFamily == AddressFamily.InterNetwork && unicastAddress.Address.Equals(localIpAddress) ) { subnetMask = unicastAddress.IPv4Mask; } } } if (localIpAddress != null && subnetMask != null) GetBroadcastAddress(localIpAddress, subnetMask); this.endPoint = new IPEndPoint(IPAddress.Any, localPort); this.udpClient = new UdpClient(localPort); this.udpClient.BeginReceive(new AsyncCallback(result => ReceiveUDP(result)), null); } private static ParticipantUDP isolatedParticipant = null; /// /// Isolated participant is used when the application is run without networking /// /// A participant without networking support public static ParticipantUDP Isolated() { if (isolatedParticipant == null) isolatedParticipant = new ParticipantUDP(0); return isolatedParticipant; } #endregion Init /// /// The name of the participant /// public string name = "ParticipantUDP"; /// /// True if the participant is running isolated. /// /// Isolated participants do not communicate with other participants public bool isIsolated = false; /// /// The remote site when this participant is connected to a site /// public Participant remoteSite = null; /// /// The interval in milliseconds for publishing (broadcasting) data on the local network /// public ulong publishInterval = 3000; // = 3 seconds //public byte[] buffer = new byte[1024]; public IPEndPoint endPoint = null; public UdpClient udpClient = null; public string broadcastIpAddress = "255.255.255.255"; public readonly ConcurrentQueue messageQueue = new ConcurrentQueue(); protected void GetBroadcastAddress(IPAddress ip, IPAddress subnetMask) { byte[] ipBytes = ip.GetAddressBytes(); byte[] maskBytes = subnetMask.GetAddressBytes(); if (ipBytes.Length != maskBytes.Length) throw new ArgumentException("IP address and subnet mask lengths do not match"); byte[] broadcastBytes = new byte[ipBytes.Length]; for (int i = 0; i < ipBytes.Length; i++) { broadcastBytes[i] = (byte)(ipBytes[i] | (~maskBytes[i])); } IPAddress broadcastAddress = new(broadcastBytes); this.broadcastIpAddress = broadcastAddress.ToString(); Console.WriteLine($"Subnet mask: {subnetMask.ToString()}"); Console.WriteLine($"Broadcast address: {this.broadcastIpAddress}"); } #region Update protected ulong nextPublishMe = 0; public override void Update(ulong currentTimeMS = 0) { if (currentTimeMS == 0) currentTimeMS = Thing.GetTimeMs(); if (this.isIsolated == false) { if (this.publishInterval > 0 && currentTimeMS > this.nextPublishMe) { ParticipantMsg msg = new ParticipantMsg(this.networkId); if (this.remoteSite == null) this.Publish(msg); else //this.Send(this.remoteSite, msg); this.remoteSite.Send(msg); this.nextPublishMe = currentTimeMS + this.publishInterval; } } UpdateMyThings(currentTimeMS); UpdateOtherThings(currentTimeMS); } protected virtual void UpdateMyThings(ulong currentTimeMS) { foreach (Thing thing in this.things) { if (thing == null) continue; if (thing.hierarchyChanged && !(this.isIsolated || this.networkId == 0)) { ThingMsg thingMsg = new(this.networkId, thing); // this.Send(this.remoteSite, thingMsg); this.remoteSite.Send(thingMsg); } // Why don't we do recursive? // Because when a thing creates a thing in the update, // that new thing is not sent out (because of hierarchyChanged) // before it is updated itself: it is immediatedly updated! thing.Update(currentTimeMS, false); if (!(this.isIsolated || this.networkId == 0)) { if (thing.terminate) { DestroyMsg destroyMsg = new(this.networkId, thing); // this.Send(this.remoteSite, destroyMsg); this.remoteSite.Send(destroyMsg); } else { // Send to remote site // PoseMsg poseMsg = new(thing.owner.networkId, thing); // this.Send(this.remoteSite, poseMsg); // BinaryMsg binaryMsg = new(thing.owner.networkId, thing); // this.Send(this.remoteSite, binaryMsg); this.remoteSite.Send(new PoseMsg(thing.owner.networkId, thing)); this.remoteSite.Send(new BinaryMsg(thing.owner.networkId, thing)); } } if (thing.terminate) this.Remove(thing); } } protected virtual void UpdateOtherThings(ulong currentTimeMS) { for (int ownerIx = 0; ownerIx < Participant.participants.Count; ownerIx++) { Participant participant = Participant.participants[ownerIx]; if (participant == null || participant == this) continue; participant.Update(currentTimeMS); if (this.isIsolated) continue; for (int thingIx = 0; thingIx < participant.things.Count; thingIx++) { Thing thing = participant.things[thingIx]; if (thing == null) continue; // PoseMsg poseMsg = new(thing.owner.networkId, thing); // this.Send(participant, poseMsg); // BinaryMsg binaryMsg = new(thing.owner.networkId, thing); // this.Send(participant, binaryMsg); participant.Send(new PoseMsg(thing.owner.networkId, thing)); participant.Send(new BinaryMsg(thing.owner.networkId, thing)); } } } #endregion Update #region Send public void SendThingInfo(Participant owner, Thing thing) { // Console.WriteLine("Send thing info"); // this.Send(owner, new ThingMsg(this.networkId, thing)); // this.Send(owner, new NameMsg(this.networkId, thing)); // this.Send(owner, new ModelUrlMsg(this.networkId, thing)); // this.Send(owner, new PoseMsg(this.networkId, thing)); // this.Send(owner, new BinaryMsg(this.networkId, thing)); owner.Send(new ThingMsg(this.networkId, thing)); owner.Send(new NameMsg(this.networkId, thing)); owner.Send(new ModelUrlMsg(this.networkId, thing)); owner.Send(new PoseMsg(this.networkId, thing)); owner.Send(new BinaryMsg(this.networkId, thing)); } public void PublishThingInfo(Thing thing) { // Console.WriteLine("Publish thing info"); this.Publish(new ThingMsg(this.networkId, thing)); this.Publish(new NameMsg(this.networkId, thing)); this.Publish(new ModelUrlMsg(this.networkId, thing)); this.Publish(new PoseMsg(this.networkId, thing)); this.Publish(new BinaryMsg(this.networkId, thing)); } // public bool Send(Participant owner, IMessage msg) { // int bufferSize = msg.Serialize(ref this.buffer); // if (bufferSize <= 0) // return true; // IPEndPoint participantEndpoint = new IPEndPoint(IPAddress.Parse(owner.ipAddress), owner.port); // // Console.WriteLine($"msg to {participantEndpoint.Address.ToString()} {participantEndpoint.Port}"); // this.udpClient?.Send(this.buffer, bufferSize, participantEndpoint); // return true; // } public bool Publish(IMessage msg) { int bufferSize = msg.Serialize(ref this.buffer); if (bufferSize <= 0) return true; // Console.WriteLine($"publish to {broadcastIpAddress.ToString()} {this.port}"); this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port); return true; } #endregion #region Receive protected void ReceiveUDP(IAsyncResult result) { // UnityEngine.Debug.Log("received"); if (this.udpClient == null) // || this.endPoint == null) return; byte[] data = this.udpClient.EndReceive(result, ref endPoint); // This does not yet take multi-packet messages into account! if (endPoint == null) return; // We can receive our own publish (broadcast) packages. How do we recognize them???? // It is hard to determine our source port string ipAddress = endPoint.Address.ToString(); if (ipAddress != this.ipAddress) { Participant remoteParticipant = GetParticipant(ipAddress, endPoint.Port); remoteParticipant ??= AddParticipant(ipAddress, endPoint.Port); ReceiveData(data, remoteParticipant); } udpClient.BeginReceive(new AsyncCallback(callbackResult => ReceiveUDP(callbackResult)), null); } public void ReceiveData(byte[] data, Participant sender) { byte msgId = data[0]; if (msgId == 0xFF) { // Timeout return; } switch (msgId) { case ParticipantMsg.Id: // 0xA0 / 160 this.Process(sender, new ParticipantMsg(data)); break; case NetworkIdMsg.Id: // 0xA1 / 161 this.Process(sender, new NetworkIdMsg(data)); break; case InvestigateMsg.Id: // 0x81 // result = await InvestigateMsg.Receive(dataStream, client, packetSize); break; case ThingMsg.id: // 0x80 / 128 this.Process(sender, new ThingMsg(data)); break; case NameMsg.Id: // 0x91 / 145 this.Process(sender, new NameMsg(data)); break; case ModelUrlMsg.Id: // 0x90 / 144 this.Process(sender, new ModelUrlMsg(data)); break; case PoseMsg.Id: // 0x10 / 16 this.Process(sender, new PoseMsg(data)); // result = await PoseMsg.Receive(dataStream, client, packetSize); break; case BinaryMsg.Id: // 0xB1 / 177 this.Process(sender, new BinaryMsg(data)); break; case TextMsg.Id: // 0xB0 / 176 // result = await TextMsg.Receive(dataStream, client, packetSize); break; case DestroyMsg.Id: // 0x20 / 32 this.Process(sender, new DestroyMsg(data)); // result = await DestroyMsg.Receive(dataStream, client, packetSize); break; default: break; } } protected virtual void Process(Participant sender, ParticipantMsg msg) { #if DEBUG Console.WriteLine($"{this.name} Process participantMsg {msg.networkId}"); #endif } protected virtual void Process(Participant sender, NetworkIdMsg msg) { #if DEBUG Console.WriteLine($"{this.name} Process SiteMsg {this.networkId} -> {msg.networkId}"); #endif if (this.networkId != msg.networkId) { this.networkId = msg.networkId; foreach (Thing thing in this.things) //Thing.GetAllThings()) this.SendThingInfo(sender, thing); } } protected virtual void Process(Participant sender, InvestigateMsg msg) { #if DEBUG Console.WriteLine($"{this.name}: Process InvestigateMsg [{msg.networkId}/{msg.thingId}]"); #endif } protected virtual void Process(Participant sender, ThingMsg msg) { #if DEBUG Console.WriteLine($"{this.name}: Process ThingMsg [{msg.networkId}/{msg.thingId}] {msg.thingType} {msg.parentId}"); #endif } protected virtual void Process(Participant sender, NameMsg msg) { #if DEBUG Console.WriteLine($"{this.name}: Process NameMsg [{msg.networkId}/{msg.thingId}] {msg.nameLength} {msg.name}"); #endif Thing thing = sender.Get(msg.thingId); if (thing != null) thing.name = msg.name; } protected virtual void Process(Participant sender, ModelUrlMsg msg) { #if DEBUG Console.WriteLine($"{this.name}: Process ModelUrlMsg [{msg.networkId}/{msg.thingId}] {msg.urlLength} {msg.url}"); #endif Thing thing = sender.Get(msg.thingId); if (thing != null) thing.modelUrl = msg.url; } protected virtual void Process(Participant sender, PoseMsg msg) { #if DEBUG Console.WriteLine($"{this.name}: Process PoseMsg [{msg.networkId}/{msg.thingId}] {msg.poseType}"); #endif Participant owner = Participant.GetParticipant(msg.networkId); if (owner == null) return; Thing thing = owner.Get(msg.thingId); if (thing == null) return; if ((msg.poseType & PoseMsg.Pose_Position) != 0) thing.position = msg.position; if ((msg.poseType & PoseMsg.Pose_Orientation) != 0) thing.orientation = msg.orientation; if ((msg.poseType & PoseMsg.Pose_LinearVelocity) != 0) thing.linearVelocity = msg.linearVelocity; if ((msg.poseType & PoseMsg.Pose_AngularVelocity) != 0) thing.angularVelocity = msg.angularVelocity; } protected virtual void Process(Participant sender, BinaryMsg msg) { #if DEBUG Console.WriteLine($"{this.name}: Process BinaryMsg [{msg.networkId}/{msg.thingId}] {msg.dataLength}"); #endif Thing thing = sender.Get(msg.thingId); thing?.ProcessBinary(msg.data); } protected virtual void Process(Participant sender, TextMsg msg) { #if DEBUG Console.WriteLine($"Participant: Process TextMsg {msg.textLength} {msg.text}"); #endif } protected virtual void Process(Participant sender, DestroyMsg msg) { #if DEBUG Console.WriteLine($"Participant: Process Destroy Msg [{msg.networkId}/{msg.thingId}]"); #endif Thing thing = sender.Get(msg.thingId); if (thing != null) this.Remove(thing); #if UNITY_5_3_OR_NEWER thing.component.core = null; #endif } private void ForwardMessage(IMessage msg) { // foreach (Participant client in senders) { // if (client == this) // continue; // //UnityEngine.Debug.Log($"---> {client.ipAddress}"); // //IMessage.SendMsg(client, msg); // msg.Serialize(ref client.buffer); // client.SendBuffer(client.buffer.Length); // } } #endregion } }