442 lines
18 KiB
C#
442 lines
18 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Net.NetworkInformation;
|
|
|
|
namespace RoboidControl {
|
|
|
|
/// <summary>
|
|
/// A participant is used for communcation between things
|
|
/// </summary>
|
|
public class ParticipantUDP : Participant {
|
|
public byte[] buffer = new byte[1024];
|
|
public ulong publishInterval = 3000; // = 3 seconds
|
|
|
|
public string name = "Participant";
|
|
|
|
public bool isIsolated = false;
|
|
public Participant remoteSite;
|
|
|
|
public IPEndPoint endPoint = null;
|
|
public UdpClient udpClient = null;
|
|
public string broadcastIpAddress = "255.255.255.255";
|
|
|
|
public readonly ConcurrentQueue<IMessage> messageQueue = new ConcurrentQueue<IMessage>();
|
|
|
|
#region Init
|
|
|
|
/// <summary>
|
|
/// Create a participant with the give UDP port
|
|
/// </summary>
|
|
/// <param name="port">The port number on which to communicate</param>
|
|
public ParticipantUDP(int port = 0) : base("127.0.0.1", port) {
|
|
if (this.port == 0)
|
|
this.isIsolated = true;
|
|
Participant.AddParticipant(this);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a new participant for a site at the given address and port
|
|
/// </summary>
|
|
/// <param name="ipAddress">The ip address of the site server</param>
|
|
/// <param name="port">The port number of the site server</param>
|
|
public ParticipantUDP(string ipAddress, int port = 7681, int localPort = 7681) : base("127.0.0.1", localPort) {
|
|
if (this.port == 0)
|
|
this.isIsolated = true;
|
|
else
|
|
this.remoteSite = new Participant(ipAddress, port);
|
|
|
|
Participant.AddParticipant(this);
|
|
|
|
// Determine local IP address
|
|
IPAddress localIpAddress = null;
|
|
IPAddress subnetMask = null;
|
|
using (Socket socket = new(AddressFamily.InterNetwork, SocketType.Dgram, 0)) {
|
|
// Connect to a remote endpoint — we won't actually send anything
|
|
socket.Connect("1.1.1.1", 65530);
|
|
if (socket.LocalEndPoint is IPEndPoint endPoint) {
|
|
localIpAddress = endPoint.Address;
|
|
this.ipAddress = localIpAddress.ToString();
|
|
}
|
|
}
|
|
// Find subnet mask
|
|
foreach (NetworkInterface nwInterface in NetworkInterface.GetAllNetworkInterfaces()) {
|
|
if (nwInterface.OperationalStatus != OperationalStatus.Up)
|
|
continue;
|
|
|
|
foreach (UnicastIPAddressInformation unicastAddress in nwInterface.GetIPProperties().UnicastAddresses) {
|
|
if (unicastAddress.Address.AddressFamily == AddressFamily.InterNetwork && unicastAddress.Address.Equals(localIpAddress)
|
|
) {
|
|
subnetMask = unicastAddress.IPv4Mask;
|
|
}
|
|
}
|
|
}
|
|
if (localIpAddress != null && subnetMask != null)
|
|
GetBroadcastAddress(localIpAddress, subnetMask);
|
|
|
|
this.endPoint = new IPEndPoint(IPAddress.Any, localPort);
|
|
this.udpClient = new UdpClient(localPort);
|
|
this.udpClient.BeginReceive(new AsyncCallback(result => ReceiveUDP(result)), null);
|
|
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a participant using the given udp client
|
|
/// </summary>
|
|
/// <param name="udpClient">UDP client to use for communication</param>
|
|
/// <param name="port">The port number on which to communicate</param>
|
|
public ParticipantUDP(UdpClient udpClient, int port) : this() {
|
|
this.udpClient = udpClient;
|
|
this.port = port;
|
|
}
|
|
|
|
protected void GetBroadcastAddress(IPAddress ip, IPAddress subnetMask) {
|
|
byte[] ipBytes = ip.GetAddressBytes();
|
|
byte[] maskBytes = subnetMask.GetAddressBytes();
|
|
|
|
if (ipBytes.Length != maskBytes.Length)
|
|
throw new ArgumentException("IP address and subnet mask lengths do not match");
|
|
|
|
byte[] broadcastBytes = new byte[ipBytes.Length];
|
|
for (int i = 0; i < ipBytes.Length; i++) {
|
|
broadcastBytes[i] = (byte)(ipBytes[i] | (~maskBytes[i]));
|
|
}
|
|
|
|
IPAddress broadcastAddress = new(broadcastBytes);
|
|
this.broadcastIpAddress = broadcastAddress.ToString();
|
|
|
|
Console.WriteLine($"Subnet mask: {subnetMask.ToString()}");
|
|
Console.WriteLine($"Broadcast address: {this.broadcastIpAddress}");
|
|
}
|
|
|
|
private static ParticipantUDP isolatedParticipant = null;
|
|
public static ParticipantUDP Isolated() {
|
|
if (isolatedParticipant == null)
|
|
isolatedParticipant = new ParticipantUDP(0);
|
|
return isolatedParticipant;
|
|
}
|
|
#endregion Init
|
|
|
|
#region Update
|
|
|
|
protected ulong nextPublishMe = 0;
|
|
|
|
public override void Update(ulong currentTimeMS = 0) {
|
|
if (currentTimeMS == 0)
|
|
currentTimeMS = Thing.GetTimeMs();
|
|
|
|
if (this.isIsolated == false) {
|
|
if (this.publishInterval > 0 && currentTimeMS > this.nextPublishMe) {
|
|
ParticipantMsg msg = new ParticipantMsg(this.networkId);
|
|
if (this.remoteSite == null)
|
|
this.Publish(msg);
|
|
else
|
|
this.Send(this.remoteSite, msg);
|
|
|
|
this.nextPublishMe = currentTimeMS + this.publishInterval;
|
|
}
|
|
}
|
|
|
|
UpdateMyThings(currentTimeMS);
|
|
UpdateOtherThings(currentTimeMS);
|
|
}
|
|
|
|
protected virtual void UpdateMyThings(ulong currentTimeMS) {
|
|
foreach (Thing thing in this.things) {
|
|
if (thing == null)
|
|
continue;
|
|
|
|
if (thing.hierarchyChanged && !(this.isIsolated || this.networkId == 0)) {
|
|
ThingMsg thingMsg = new(this.networkId, thing);
|
|
this.Send(this.remoteSite, thingMsg);
|
|
}
|
|
|
|
// Why don't we do recursive?
|
|
// Because when a thing creates a thing in the update,
|
|
// that new thing is not sent out (because of hierarchyChanged)
|
|
// before it is updated itself: it is immediatedly updated!
|
|
thing.Update(currentTimeMS, false);
|
|
if (!(this.isIsolated || this.networkId == 0)) {
|
|
if (thing.terminate) {
|
|
DestroyMsg destroyMsg = new(this.networkId, thing);
|
|
this.Send(this.remoteSite, destroyMsg);
|
|
}
|
|
else {
|
|
// Send to remote site
|
|
PoseMsg poseMsg = new(thing.owner.networkId, thing);
|
|
this.Send(this.remoteSite, poseMsg);
|
|
BinaryMsg binaryMsg = new(thing.owner.networkId, thing);
|
|
this.Send(this.remoteSite, binaryMsg);
|
|
}
|
|
}
|
|
if (thing.terminate)
|
|
this.Remove(thing);
|
|
}
|
|
}
|
|
|
|
protected virtual void UpdateOtherThings(ulong currentTimeMS) {
|
|
for (int ownerIx = 0; ownerIx < Participant.participants.Count; ownerIx++) {
|
|
Participant participant = Participant.participants[ownerIx];
|
|
if (participant == null || participant == this)
|
|
continue;
|
|
|
|
participant.Update(currentTimeMS);
|
|
if (this.isIsolated)
|
|
continue;
|
|
|
|
for (int thingIx = 0; thingIx < participant.things.Count; thingIx++) {
|
|
Thing thing = participant.things[thingIx];
|
|
if (thing == null)
|
|
continue;
|
|
|
|
PoseMsg poseMsg = new(thing.owner.networkId, thing);
|
|
this.Send(participant, poseMsg);
|
|
BinaryMsg binaryMsg = new(thing.owner.networkId, thing);
|
|
this.Send(participant, binaryMsg);
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
#endregion Update
|
|
|
|
#region Send
|
|
|
|
public void SendThingInfo(Participant owner, Thing thing) {
|
|
// Console.WriteLine("Send thing info");
|
|
this.Send(owner, new ThingMsg(this.networkId, thing));
|
|
this.Send(owner, new NameMsg(this.networkId, thing));
|
|
this.Send(owner, new ModelUrlMsg(this.networkId, thing));
|
|
this.Send(owner, new BinaryMsg(this.networkId, thing));
|
|
}
|
|
|
|
public bool Send(Participant owner, IMessage msg) {
|
|
int bufferSize = msg.Serialize(ref this.buffer);
|
|
if (bufferSize <= 0)
|
|
return true;
|
|
|
|
IPEndPoint participantEndpoint = new IPEndPoint(IPAddress.Parse(owner.ipAddress), owner.port);
|
|
// Console.WriteLine($"msg to {participantEndpoint.Address.ToString()} {participantEndpoint.Port}");
|
|
this.udpClient?.Send(this.buffer, bufferSize, participantEndpoint);
|
|
return true;
|
|
}
|
|
|
|
public void PublishThingInfo(Thing thing) {
|
|
// Console.WriteLine("Publish thing info");
|
|
this.Publish(new ThingMsg(this.networkId, thing));
|
|
this.Publish(new NameMsg(this.networkId, thing));
|
|
this.Publish(new ModelUrlMsg(this.networkId, thing));
|
|
this.Publish(new BinaryMsg(this.networkId, thing));
|
|
}
|
|
|
|
public bool Publish(IMessage msg) {
|
|
int bufferSize = msg.Serialize(ref this.buffer);
|
|
if (bufferSize <= 0)
|
|
return true;
|
|
|
|
// Console.WriteLine($"publish to {broadcastIpAddress.ToString()} {this.port}");
|
|
this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port);
|
|
return true;
|
|
}
|
|
|
|
// public bool SendBuffer(int bufferSize) {
|
|
// //if (this.ipAddress == null)
|
|
// // return false;
|
|
|
|
// // UnityEngine.Debug.Log($"Send msg {buffer[0]} to {ipAddress}");
|
|
// //this.udpClient.Send(this.buffer, bufferSize, this.ipAddress, this.port);
|
|
// this.udpClient?.Send(this.buffer, bufferSize, this.endPoint);
|
|
// return true;
|
|
// }
|
|
|
|
// public bool PublishBuffer(int bufferSize) {
|
|
// if (this.broadcastIpAddress == null)
|
|
// return false;
|
|
|
|
// this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port);
|
|
// return true;
|
|
// }
|
|
|
|
#endregion
|
|
|
|
#region Receive
|
|
|
|
protected void ReceiveUDP(IAsyncResult result) {
|
|
// UnityEngine.Debug.Log("received");
|
|
if (this.udpClient == null) // || this.endPoint == null)
|
|
return;
|
|
|
|
byte[] data = this.udpClient.EndReceive(result, ref endPoint);
|
|
// This does not yet take multi-packet messages into account!
|
|
if (endPoint == null)
|
|
return;
|
|
|
|
// We can receive our own publish (broadcast) packages. How do we recognize them????
|
|
// It is hard to determine our source port
|
|
string ipAddress = endPoint.Address.ToString();
|
|
if (ipAddress != this.ipAddress) {
|
|
Participant remoteParticipant = GetParticipant(ipAddress, endPoint.Port);
|
|
remoteParticipant ??= AddParticipant(ipAddress, endPoint.Port);
|
|
|
|
ReceiveData(data, remoteParticipant);
|
|
}
|
|
|
|
udpClient.BeginReceive(new AsyncCallback(callbackResult => ReceiveUDP(callbackResult)), null);
|
|
}
|
|
|
|
public void ReceiveData(byte[] data, Participant sender) {
|
|
byte msgId = data[0];
|
|
if (msgId == 0xFF) {
|
|
// Timeout
|
|
return;
|
|
}
|
|
|
|
switch (msgId) {
|
|
case ParticipantMsg.Id: // 0xA0 / 160
|
|
this.Process(sender, new ParticipantMsg(data));
|
|
break;
|
|
case NetworkIdMsg.Id: // 0xA1 / 161
|
|
this.Process(sender, new NetworkIdMsg(data));
|
|
break;
|
|
case InvestigateMsg.Id: // 0x81
|
|
// result = await InvestigateMsg.Receive(dataStream, client, packetSize);
|
|
break;
|
|
case ThingMsg.id: // 0x80 / 128
|
|
this.Process(sender, new ThingMsg(data));
|
|
break;
|
|
case NameMsg.Id: // 0x91 / 145
|
|
this.Process(sender, new NameMsg(data));
|
|
break;
|
|
case ModelUrlMsg.Id: // 0x90 / 144
|
|
this.Process(sender, new ModelUrlMsg(data));
|
|
break;
|
|
case PoseMsg.Id: // 0x10 / 16
|
|
this.Process(sender, new PoseMsg(data));
|
|
// result = await PoseMsg.Receive(dataStream, client, packetSize);
|
|
break;
|
|
case BinaryMsg.Id: // 0xB1 / 177
|
|
this.Process(sender, new BinaryMsg(data));
|
|
break;
|
|
case TextMsg.Id: // 0xB0 / 176
|
|
// result = await TextMsg.Receive(dataStream, client, packetSize);
|
|
break;
|
|
case DestroyMsg.Id: // 0x20 / 32
|
|
this.Process(sender, new DestroyMsg(data));
|
|
// result = await DestroyMsg.Receive(dataStream, client, packetSize);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, ParticipantMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"{this.name} Process participantMsg {msg.networkId}");
|
|
#endif
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, NetworkIdMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"{this.name} Process SiteMsg {this.networkId} -> {msg.networkId}");
|
|
#endif
|
|
|
|
if (this.networkId != msg.networkId) {
|
|
this.networkId = msg.networkId;
|
|
foreach (Thing thing in this.things) //Thing.GetAllThings())
|
|
this.SendThingInfo(sender, thing);
|
|
}
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, InvestigateMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: InvestigateMsg [{msg.networkId}/{msg.thingId}]");
|
|
#endif
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, ThingMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: Process ThingMsg [{msg.networkId}/{msg.thingId}] {msg.thingType} {msg.parentId}");
|
|
#endif
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, NameMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: Process NameMsg [{msg.networkId}/{msg.thingId}] {msg.nameLength} {msg.name}");
|
|
#endif
|
|
|
|
Thing thing = sender.Get(msg.networkId, msg.thingId);
|
|
if (thing != null)
|
|
thing.name = msg.name;
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, ModelUrlMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: Process ModelUrlMsg [{msg.networkId}/{msg.thingId}] {msg.urlLength} {msg.url}");
|
|
#endif
|
|
|
|
Thing thing = sender.Get(msg.networkId, msg.thingId);
|
|
if (thing != null)
|
|
thing.modelUrl = msg.url;
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, PoseMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: Process PoseMsg [{msg.networkId}/{msg.thingId}] {msg.poseType}");
|
|
#endif
|
|
Thing thing = sender.Get(msg.networkId, msg.thingId);
|
|
if (thing != null) {
|
|
if ((msg.poseType & PoseMsg.Pose_Position) != 0)
|
|
thing.position = msg.position;
|
|
if ((msg.poseType & PoseMsg.Pose_Orientation) != 0)
|
|
thing.orientation = msg.orientation;
|
|
if ((msg.poseType & PoseMsg.Pose_LinearVelocity) != 0) {
|
|
thing.linearVelocity = msg.linearVelocity;
|
|
//Console.Write($"linear velocity = {thing.linearVelocity.ToVector3()}");
|
|
}
|
|
if ((msg.poseType & PoseMsg.Pose_AngularVelocity) != 0) {
|
|
thing.angularVelocity = msg.angularVelocity;
|
|
//Console.Write($"angular velocity = {thing.angularVelocity.ToVector3()}");
|
|
}
|
|
}
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, BinaryMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: Process BinaryMsg [{msg.networkId}/{msg.thingId}] {msg.dataLength}");
|
|
#endif
|
|
Thing thing = sender.Get(msg.networkId, msg.thingId);
|
|
thing?.ProcessBinary(msg.data);
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, TextMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: Process TextMsg {msg.textLength} {msg.text}");
|
|
#endif
|
|
|
|
}
|
|
|
|
protected virtual void Process(Participant sender, DestroyMsg msg) {
|
|
#if DEBUG
|
|
Console.WriteLine($"Participant: Process Destroy Msg [{msg.networkId}/{msg.thingId}]");
|
|
#endif
|
|
Thing thing = sender.Get(msg.networkId, msg.thingId);
|
|
if (thing != null)
|
|
this.Remove(thing);
|
|
thing.component.core = null;
|
|
}
|
|
|
|
private void ForwardMessage(IMessage msg) {
|
|
// foreach (Participant client in senders) {
|
|
// if (client == this)
|
|
// continue;
|
|
// //UnityEngine.Debug.Log($"---> {client.ipAddress}");
|
|
// //IMessage.SendMsg(client, msg);
|
|
// msg.Serialize(ref client.buffer);
|
|
// client.SendBuffer(client.buffer.Length);
|
|
// }
|
|
}
|
|
|
|
#endregion
|
|
}
|
|
} |