RoboidControl-csharp/Participant.cs
2025-02-25 16:45:29 +01:00

356 lines
14 KiB
C#

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
namespace Passer.RoboidControl {
/// <summary>
/// A participant is used for communcation between things
/// </summary>
public class Participant : RemoteParticipant {
public byte[] buffer = new byte[1024];
public ulong publishInterval = 3000; // = 3 seconds
public string name = "Participant";
public IPEndPoint endPoint = null;
public UdpClient udpClient = null;
public string broadcastIpAddress = "255.255.255.255";
public readonly ConcurrentQueue<IMessage> messageQueue = new ConcurrentQueue<IMessage>();
#region Init
/// <summary>
/// Create a porticiapnt
/// </summary>
public Participant() {
//senders.Add(this);
}
/// <summary>
/// Create a participant with the give UDP port
/// </summary>
/// <param name="port">The port number on which to communicate</param>
// public Participant(int port) : this() {
// this.port = port;
// }
/// <summary>
/// Create a new participant for a site at the given address and port
/// </summary>
/// <param name="ipAddress">The ip address of the site server</param>
/// <param name="port">The port number of the site server</param>
public Participant(string ipAddress = "0.0.0.0", int port = 7681) : this() {
this.ipAddress = ipAddress;
this.port = port;
this.udpClient = new UdpClient();
this.udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, port)); // local port
// this.endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); // for sending
// this.udpClient = new UdpClient(port); // for receiving
// this.udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, port));
this.udpClient.BeginReceive(new AsyncCallback(result => ReceiveUDP(result)), null);
}
/// <summary>
/// Create a participant using the given udp client
/// </summary>
/// <param name="udpClient">UDP client to use for communication</param>
/// <param name="port">The port number on which to communicate</param>
public Participant(UdpClient udpClient, int port) : this() {
this.udpClient = udpClient;
this.port = port;
}
public List<RemoteParticipant> senders = new List<RemoteParticipant>();
public RemoteParticipant GetParticipant(string ipAddress, int port) {
//Console.WriteLine($"Get Participant {ipAddress}:{port}");
foreach (RemoteParticipant sender in senders) {
if (sender.ipAddress == ipAddress && sender.port == port)
return sender;
}
return null;
}
public RemoteParticipant AddParticipant(string ipAddress, int port) {
Console.WriteLine($"New Participant {ipAddress}:{port}");
RemoteParticipant participant = new RemoteParticipant(ipAddress, port) {
networkId = (byte)this.senders.Count
};
senders.Add(participant);
return participant;
}
protected readonly Dictionary<byte, Func<RemoteParticipant, byte, byte, Thing>> thingMsgProcessors = new Dictionary<byte, Func<RemoteParticipant, byte, byte, Thing>>();
public delegate Thing ThingConstructor(RemoteParticipant sender, byte networkId, byte thingId);
public void Register(byte thingType, ThingConstructor constr) {
thingMsgProcessors[thingType] = new Func<RemoteParticipant, byte, byte, Thing>(constr);
}
public void Register<ThingClass>(Thing.Type thingType) where ThingClass : Thing {
Register<ThingClass>((byte)thingType);
}
public void Register<ThingClass>(byte thingType) where ThingClass : Thing {
thingMsgProcessors[thingType] = (RemoteParticipant sender, byte networkId, byte thingId) =>
Activator.CreateInstance(typeof(ThingClass), sender, networkId, thingId) as ThingClass;
Console.WriteLine($"Registering {typeof(ThingClass)} for thing type {thingType}");
}
#endregion Init
#region Update
protected void ReceiveUDP(IAsyncResult result) {
if (this.udpClient == null || this.endPoint == null)
return;
byte[] data = this.udpClient.EndReceive(result, ref this.endPoint);
// This does not yet take multi-packet messages into account!
if (this.endPoint == null)
return;
// We can receive our own publish (broadcast) packages. How do we recognize them????
// It is hard to determine our source port
string ipAddress = this.endPoint.Address.ToString();
RemoteParticipant remoteParticipant = GetParticipant(ipAddress, this.endPoint.Port);
if (remoteParticipant == null)
remoteParticipant = AddParticipant(ipAddress, this.endPoint.Port);
ReceiveData(data, remoteParticipant);
udpClient.BeginReceive(new AsyncCallback(callbackResult => ReceiveUDP(callbackResult)), null);
}
protected ulong nextPublishMe = 0;
public virtual void Update(ulong currentTimeMS = 0) {
if (currentTimeMS == 0) {
#if UNITY_5_3_OR_NEWER
currentTimeMS = (ulong)(UnityEngine.Time.time * 1000);
#endif
}
if (this.publishInterval > 0 && currentTimeMS > this.nextPublishMe) {
Publish();
// Console.WriteLine($"{this.name} Publish ClientMsg {this.networkId}");
this.nextPublishMe = currentTimeMS + this.publishInterval;
}
int n = this.things.Count;
for (int ix = 0; ix < n; ix++) {
Thing thing = this.things[ix];
if (thing != null) {
thing.Update(currentTimeMS);
BinaryMsg binaryMsg = new(this.networkId, thing);
foreach (RemoteParticipant sender in this.senders)
this.Send(sender, binaryMsg);
}
}
}
public virtual void Publish() {
this.Publish(new ParticipantMsg(this.networkId));
}
#endregion Update
#region Send
public void SendThingInfo(RemoteParticipant remoteParticipant, Thing thing) {
Console.WriteLine("Send thing info");
this.Send(remoteParticipant, new ThingMsg(this.networkId, thing));
this.Send(remoteParticipant, new NameMsg(this.networkId, thing));
this.Send(remoteParticipant, new ModelUrlMsg(this.networkId, thing));
this.Send(remoteParticipant, new BinaryMsg(this.networkId, thing));
}
public bool Send(IMessage msg) {
int bufferSize = msg.Serialize(ref this.buffer);
if (bufferSize <= 0)
return true;
// Console.WriteLine($"msg to {endPoint.Address.ToString()} {endPoint.Port}");
this.udpClient?.Send(this.buffer, bufferSize, this.endPoint);
return true;
}
public bool Send(RemoteParticipant remoteParticipant, IMessage msg) {
int bufferSize = msg.Serialize(ref this.buffer);
if (bufferSize <= 0)
return true;
IPEndPoint participantEndpoint = new IPEndPoint(IPAddress.Parse(remoteParticipant.ipAddress), remoteParticipant.port);
Console.WriteLine($"msg to {participantEndpoint.Address.ToString()} {participantEndpoint.Port}");
this.udpClient?.Send(this.buffer, bufferSize, participantEndpoint);
return true;
}
public void PublishThingInfo(Thing thing) {
//Console.WriteLine("Publish thing info");
this.Publish(new ThingMsg(this.networkId, thing));
this.Publish(new NameMsg(this.networkId, thing));
this.Publish(new ModelUrlMsg(this.networkId, thing));
this.Publish(new BinaryMsg(this.networkId, thing));
}
public bool Publish(IMessage msg) {
int bufferSize = msg.Serialize(ref this.buffer);
if (bufferSize <= 0)
return true;
// Console.WriteLine($"publish to {broadcastIpAddress.ToString()} {this.port}");
this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port);
return true;
}
public bool SendBuffer(int bufferSize) {
//if (this.ipAddress == null)
// return false;
// UnityEngine.Debug.Log($"Send msg {buffer[0]} to {ipAddress}");
//this.udpClient.Send(this.buffer, bufferSize, this.ipAddress, this.port);
this.udpClient?.Send(this.buffer, bufferSize, this.endPoint);
return true;
}
public bool PublishBuffer(int bufferSize) {
if (this.broadcastIpAddress == null)
return false;
this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port);
return true;
}
#endregion
#region Receive
public void ReceiveData(byte[] data, RemoteParticipant remoteParticipant) {
byte msgId = data[0];
if (msgId == 0xFF) {
// Timeout
return;
}
switch (msgId) {
case ParticipantMsg.Id: // 0xA0 / 160
this.Process(remoteParticipant, new ParticipantMsg(data));
break;
case NetworkIdMsg.Id: // 0xA1 / 161
this.Process(remoteParticipant, new NetworkIdMsg(data));
break;
case InvestigateMsg.Id: // 0x81
// result = await InvestigateMsg.Receive(dataStream, client, packetSize);
break;
case ThingMsg.id: // 0x80 / 128
this.Process(remoteParticipant, new ThingMsg(data));
break;
case NameMsg.Id: // 0x91 / 145
this.Process(remoteParticipant, new NameMsg(data));
break;
case ModelUrlMsg.Id: // 0x90 / 144
this.Process(remoteParticipant, new ModelUrlMsg(data));
break;
case PoseMsg.Id: // 0x10 / 16
this.Process(remoteParticipant, new PoseMsg(data));
// result = await PoseMsg.Receive(dataStream, client, packetSize);
break;
case BinaryMsg.Id: // 0xB1 / 177
this.Process(remoteParticipant, new BinaryMsg(data));
break;
case TextMsg.Id: // 0xB0 / 176
// result = await TextMsg.Receive(dataStream, client, packetSize);
break;
case DestroyMsg.Id: // 0x20 / 32
// result = await DestroyMsg.Receive(dataStream, client, packetSize);
break;
default:
break;
}
}
#endregion
#region Process
protected virtual void Process(RemoteParticipant sender, ParticipantMsg msg) { }
protected virtual void Process(RemoteParticipant sender, NetworkIdMsg msg) {
Console.WriteLine($"{this.name} receive network id {this.networkId} {msg.networkId}");
if (this.networkId != msg.networkId) {
this.networkId = msg.networkId;
foreach (Thing thing in this.things) //Thing.GetAllThings())
this.SendThingInfo(sender, thing);
}
}
protected virtual void Process(InvestigateMsg msg) { }
protected virtual void Process(RemoteParticipant sender, ThingMsg msg) {
Console.WriteLine($"Participant: Process thing [{msg.networkId}/{msg.thingId}]");
}
protected virtual void Process(RemoteParticipant sender, NameMsg msg) {
// Console.WriteLine($"Participant: Process name [{msg.networkId}/{msg.thingId}] {msg.name}");
Thing thing = sender.Get(msg.networkId, msg.thingId);
if (thing != null)
thing.name = msg.name;
}
protected virtual void Process(RemoteParticipant sender, ModelUrlMsg msg) {
Console.WriteLine($"Participant: Process model [{msg.networkId}/{msg.thingId}] {msg.url}");
Thing thing = sender.Get(msg.networkId, msg.thingId);
if (thing != null)
thing.modelUrl = msg.url;
}
protected virtual void Process(RemoteParticipant sender, PoseMsg msg) {
//Console.WriteLine($"Participant: Process pose [{msg.networkId}/{msg.thingId}] {msg.poseType}");
Thing thing = sender.Get(msg.networkId, msg.thingId);
if (thing != null) {
thing.hasPosition = false;
if ((msg.poseType & PoseMsg.Pose_Position) != 0) {
thing.position = msg.position;
thing.hasPosition = true;
}
if ((msg.poseType & PoseMsg.Pose_Orientation) != 0)
thing.orientation = msg.orientation;
else
thing.orientation = null;
if ((msg.poseType & PoseMsg.Pose_LinearVelocity) != 0)
thing.linearVelocity = msg.linearVelocity;
if ((msg.poseType & PoseMsg.Pose_AngularVelocity) != 0)
thing.angularVelocity = msg.angularVelocity;
}
}
protected virtual void Process(RemoteParticipant sender, BinaryMsg msg) {
// Console.WriteLine($"Participant: Process binary [{msg.networkId}/{msg.thingId}]");
Thing thing = sender.Get(msg.networkId, msg.thingId);
thing?.ProcessBinary(msg.bytes);
}
protected virtual void Process(TextMsg temsgxt) { }
protected virtual void Process(DestroyMsg msg) { }
private void ForwardMessage(IMessage msg) {
foreach (Participant client in senders) {
if (client == this)
continue;
//UnityEngine.Debug.Log($"---> {client.ipAddress}");
//IMessage.SendMsg(client, msg);
msg.Serialize(ref client.buffer);
client.SendBuffer(client.buffer.Length);
}
}
#endregion
}
}