RoboidControl-csharp/Participant.cs
2025-01-02 09:30:33 +01:00

325 lines
12 KiB
C#

using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.IO;
using System.Threading.Tasks;
namespace Passer.Control.Core {
public class Participant {
//public ConnectionMethod connection;
public IPEndPoint? endPoint = null;
public UdpClient? udpClient = null;
public string ipAddress = "0.0.0.0";
public string broadcastIpAddress = "255.255.255.255";
public int port;
// public Stream dataStream;
public byte[] buffer = new byte[256];
public byte networkId = 0;
public string name = "Participant";
public readonly ConcurrentQueue<IMessage> messageQueue = new();
public List<Participant> others = new List<Participant>();
public Participant? GetParticipant(string ipAddress, int port) {
foreach (Participant c in others) {
if (c.ipAddress == ipAddress && c.port == port)
return c;
}
return null;
}
public Participant AddParticipant(string ipAddress, int port) {
Participant participant = new Participant(ipAddress, port);
participant.networkId = (byte)this.others.Count;
//others.Add();
return participant;
}
#region Init
public Participant() {
// this.dataStream = new EchoStream();
others.Add(this);
}
/// <summary>
/// Create a new participant
/// </summary>
/// <param name="ipAddress">The ip address of the site server</param>
/// <param name="port">The port number of the site server</param>
public Participant(string ipAddress = "0.0.0.0", int port = 7681) : this() {
this.ipAddress = ipAddress;
this.port = port;
this.endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); // for sending
this.udpClient = new UdpClient(); // for receiving
this.udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, 0));
this.udpClient.BeginReceive(new AsyncCallback(result => ReceiveUDP(result)), null);
}
public Participant(UdpClient udpClient, int port) : this() {
this.udpClient = udpClient;
this.port = port;
//this.dataStream = new EchoStream();
//clients.Add(this);
}
#endregion Init
#region Update
protected async void ReceiveUDP(IAsyncResult result) {
if (udpClient == null || this.endPoint == null)
return;
try {
// Console.WriteLine($"{this.name} received");
byte[] data = udpClient.EndReceive(result, ref this.endPoint);
// This does not yet take multi-packet messages into account!
Participant remoteParticipant = this.GetParticipant(endPoint.Address.ToString(), endPoint.Port);
if (remoteParticipant == null) {
remoteParticipant = this.AddParticipant(endPoint.Address.ToString(), endPoint.Port);
}
await ReceiveData(data, remoteParticipant);
// this.dataStream.WriteByte((byte)data.Length);
// this.dataStream.Write(data, 0, data.Length);
//Task task = Task.Run(() => ReceiveData());
}
catch (Exception _) {
Console.WriteLine("connection error");
}
udpClient.BeginReceive(new AsyncCallback(result => ReceiveUDP(result)), null);
}
private static readonly long publishInterval = 3000;
private long nextPublishMe = 0;
public virtual void Update(long currentTimeMs) {
if (currentTimeMs > this.nextPublishMe) {
this.Publish(new ClientMsg(this.networkId));
Console.WriteLine($"{this.name} Sent ClientMsg {this.networkId}");
this.nextPublishMe = currentTimeMs + Participant.publishInterval;
}
for (int ix = 0; ix < this.others.Count; ix++) {
Participant client = this.others[ix];
if (client == null)
continue;
this.ProcessMessages(client);
}
Thing.UpdateAll(currentTimeMs);
}
#endregion Update
#region Send
public void SendThingInfo(Thing thing) {
this.Send(new ThingMsg(this.networkId, thing));
this.Send(new NameMsg(this.networkId, thing));
this.Send(new ModelUrlMsg(this.networkId, thing));
}
public bool Send(IMessage msg) {
int bufferSize = msg.Serialize(ref this.buffer);
if (bufferSize <= 0)
return true;
Console.WriteLine($"msg to {endPoint.Address.ToString()} {endPoint.Port}");
this.udpClient?.Send(this.buffer, bufferSize, this.endPoint);
return true;
}
public bool Publish(IMessage msg) {
int bufferSize = msg.Serialize(ref this.buffer);
if (bufferSize <= 0)
return true;
// Console.WriteLine($"publish to {broadcastIpAddress.ToString()} {this.port}");
this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port);
return true;
}
public bool SendBuffer(int bufferSize) {
//if (this.ipAddress == null)
// return false;
// UnityEngine.Debug.Log($"Send msg {buffer[0]} to {ipAddress}");
//this.udpClient.Send(this.buffer, bufferSize, this.ipAddress, this.port);
this.udpClient?.Send(this.buffer, bufferSize, this.endPoint);
return true;
}
public bool PublishBuffer(int bufferSize) {
if (this.broadcastIpAddress == null)
return false;
this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port);
return true;
}
#endregion
#region Receive
// public async Task ReceiveData_old() {
// while (true) {
// byte packetSize = (byte)this.dataStream.ReadByte();
// if (packetSize != 0xFF)
// await ReceiveData(this.dataStream, this, packetSize);
// // else timeout
// }
// }
// public static async Task ReceiveData(Stream dataStream, Participant client, byte packetSize) {
public async Task ReceiveData(byte[] data, Participant remoteParticipant) {
// byte msgId = (byte)dataStream.ReadByte();
byte msgId = data[0];
if (msgId == 0xFF) {
// Timeout
return;
}
// Console.WriteLine($"R {msgId} from {remoteParticipant.ipAddress}");
bool result = false;
switch (msgId) {
case ClientMsg.Id: // 0xA0 / 160
//result = await ClientMsg.Receive(dataStream, client, packetSize);
this.ProcessClientMsg(remoteParticipant, new ClientMsg(data));
break;
case NetworkIdMsg.Id: // 0xA1 / 161
//result = await NetworkIdMsg.Receive(dataStream, client, packetSize);
this.ProcessNetworkIdMsg(remoteParticipant, new NetworkIdMsg(data));
break;
case InvestigateMsg.Id: // 0x81
// result = await InvestigateMsg.Receive(dataStream, client, packetSize);
break;
case ThingMsg.id: // 0x80 / 128
// result = await ThingMsg.Receive(dataStream, client, packetSize);
break;
case NameMsg.Id: // 0x91 / 145
// result = await NameMsg.Receive(dataStream, client, packetSize);
break;
case ModelUrlMsg.Id: // 0x90 / 144
// result = await ModelUrlMsg.Receive(dataStream, client, packetSize);
break;
case PoseMsg.Id: // 0x10 / 16
// result = await PoseMsg.Receive(dataStream, client, packetSize);
break;
case CustomMsg.Id: // 0xB1 / 177
// result = await CustomMsg.Receive(dataStream, client, packetSize);
break;
case TextMsg.Id: // 0xB0 / 176
// result = await TextMsg.Receive(dataStream, client, packetSize);
break;
case DestroyMsg.Id: // 0x20 / 32
// result = await DestroyMsg.Receive(dataStream, client, packetSize);
break;
default:
break;
}
// if (result == false) {
// packetSize = msgId; // skip 1 byte, msgId is possibly a packet size byte
// await ReceiveData(dataStream, client, packetSize);
// }
}
#endregion
#region Process
public virtual void ProcessMessages(Participant remoteParticipant) {
while (this.messageQueue.TryDequeue(out IMessage msg))
ProcessMessage(remoteParticipant, msg);
}
public void ProcessMessage(Participant remoteParticipant, IMessage msg) {
switch (msg) {
case ClientMsg clientMsg:
ProcessClientMsg(remoteParticipant, clientMsg);
break;
case NetworkIdMsg networkId:
ProcessNetworkIdMsg(remoteParticipant, networkId);
break;
case InvestigateMsg investigate:
ProcessInvestigateMsg(investigate);
break;
case ThingMsg thing:
ProcessThingMsg(thing);
break;
case NameMsg name:
//UnityEngine.Debug.Log($"Name [{name.networkId}/{name.thingId}] {name.name}");
ProcessNameMsg(name);
break;
case ModelUrlMsg modelUrl:
ProcessModelUrlMsg(modelUrl);
break;
case PoseMsg pose:
ProcessPoseMsg(pose);
break;
case CustomMsg custom:
ProcessCustomMsg(custom);
break;
case TextMsg text:
ProcessTextMsg(text);
break;
case DestroyMsg destroy:
ProcessDestroyMsg(destroy);
break;
default:
return;
}
ForwardMessage(msg);
}
protected virtual void ProcessClientMsg(Participant sender, ClientMsg msg) { }
protected virtual void ProcessNetworkIdMsg(Participant sender, NetworkIdMsg msg) {
Console.WriteLine($"{this.name} receive network id {this.networkId} {msg.networkId}");
if (this.networkId != msg.networkId) {
this.networkId = msg.networkId;
foreach (Thing thing in Thing.GetAllThings())
this.SendThingInfo(thing);
}
}
protected virtual void ProcessInvestigateMsg(InvestigateMsg msg) { }
protected virtual void ProcessThingMsg(ThingMsg msg) {
Console.WriteLine($"received thing {msg.thingId}");
}
protected virtual void ProcessNameMsg(NameMsg msg) {
Console.WriteLine($"received name {msg.name}");
}
protected virtual void ProcessModelUrlMsg(ModelUrlMsg msg) {
Console.WriteLine($"received name {msg.url}");
}
protected virtual void ProcessPoseMsg(PoseMsg msg) { }
protected virtual void ProcessCustomMsg(CustomMsg msg) { }
protected virtual void ProcessTextMsg(TextMsg temsgxt) { }
protected virtual void ProcessDestroyMsg(DestroyMsg msg) { }
private void ForwardMessage(IMessage msg) {
foreach (Participant client in others) {
if (client == this)
continue;
//UnityEngine.Debug.Log($"---> {client.ipAddress}");
IMessage.SendMsg(client, msg);
}
}
#endregion
}
}