diff --git a/ClientMsg.cs b/ClientMsg.cs index 020a975..56f5063 100644 --- a/ClientMsg.cs +++ b/ClientMsg.cs @@ -9,7 +9,9 @@ namespace Passer.Control.Core { this.networkId = networkId; } - public ClientMsg(byte[] buffer) : base(buffer) { } + public ClientMsg(byte[] buffer) : base(buffer) { + this.networkId = buffer[1]; + } public override byte Serialize(ref byte[] buffer) { byte ix = 0; diff --git a/NetworkIdMsg.cs b/NetworkIdMsg.cs index 5c749ca..c730c40 100644 --- a/NetworkIdMsg.cs +++ b/NetworkIdMsg.cs @@ -8,7 +8,9 @@ namespace Passer.Control.Core { public NetworkIdMsg(byte networkId) { this.networkId = networkId; } - NetworkIdMsg(byte[] buffer) : base(buffer) { } + public NetworkIdMsg(byte[] buffer) : base(buffer) { + this.networkId = buffer[1]; + } public override byte Serialize(ref byte[] buffer) { buffer[0] = NetworkIdMsg.Id; diff --git a/Participant.cs b/Participant.cs index a5a1547..697c825 100644 --- a/Participant.cs +++ b/Participant.cs @@ -14,27 +14,35 @@ namespace Passer.Control.Core { public string ipAddress = "0.0.0.0"; public string broadcastIpAddress = "255.255.255.255"; public int port; - public Stream dataStream; + // public Stream dataStream; public byte[] buffer = new byte[256]; public byte networkId = 0; + public string name = "Participant"; public readonly ConcurrentQueue messageQueue = new(); - public Participant? GetClient(string ipAddress, int port) { + public List others = new List(); + + public Participant? GetParticipant(string ipAddress, int port) { foreach (Participant c in others) { if (c.ipAddress == ipAddress && c.port == port) return c; } return null; } - public List others = new List(); + public Participant AddParticipant(string ipAddress, int port) { + Participant participant = new Participant(ipAddress, port); + participant.networkId = (byte)this.others.Count; + //others.Add(); + return participant; + } #region Init public Participant() { - this.dataStream = new EchoStream(); + // this.dataStream = new EchoStream(); others.Add(this); } @@ -54,7 +62,6 @@ namespace Passer.Control.Core { public Participant(UdpClient udpClient, int port) : this() { this.udpClient = udpClient; - this.ipAddress = null; this.port = port; //this.dataStream = new EchoStream(); //clients.Add(this); @@ -64,15 +71,23 @@ namespace Passer.Control.Core { #region Update - protected void ReceiveUDP(IAsyncResult result) { + protected async void ReceiveUDP(IAsyncResult result) { if (udpClient == null || this.endPoint == null) return; try { + // Console.WriteLine($"{this.name} received"); byte[] data = udpClient.EndReceive(result, ref this.endPoint); - this.dataStream.WriteByte((byte)data.Length); - this.dataStream.Write(data, 0, data.Length); - Task task = Task.Run(() => ReceiveData()); + // This does not yet take multi-packet messages into account! + + Participant remoteParticipant = this.GetParticipant(endPoint.Address.ToString(), endPoint.Port); + if (remoteParticipant == null) { + remoteParticipant = this.AddParticipant(endPoint.Address.ToString(), endPoint.Port); + } + await ReceiveData(data, remoteParticipant); + // this.dataStream.WriteByte((byte)data.Length); + // this.dataStream.Write(data, 0, data.Length); + //Task task = Task.Run(() => ReceiveData()); } catch (Exception _) { Console.WriteLine("connection error"); @@ -87,7 +102,7 @@ namespace Passer.Control.Core { public virtual void Update(long currentTimeMs) { if (currentTimeMs > this.nextPublishMe) { this.Publish(new ClientMsg(this.networkId)); - Console.WriteLine($"Sent ClientMsg {this.networkId}"); + Console.WriteLine($"{this.name} Sent ClientMsg {this.networkId}"); this.nextPublishMe = currentTimeMs + Participant.publishInterval; } @@ -96,7 +111,7 @@ namespace Passer.Control.Core { if (client == null) continue; - client.ProcessMessages(); + this.ProcessMessages(client); } Thing.UpdateAll(currentTimeMs); } @@ -126,7 +141,7 @@ namespace Passer.Control.Core { if (bufferSize <= 0) return true; - //Console.WriteLine($"publish to {broadcastIpAddress.ToString()} {this.port}"); + // Console.WriteLine($"publish to {broadcastIpAddress.ToString()} {this.port}"); this.udpClient?.Send(this.buffer, bufferSize, this.broadcastIpAddress, this.port); return true; } @@ -153,105 +168,109 @@ namespace Passer.Control.Core { #region Receive - public async Task ReceiveData() { - while (true) { - byte packetSize = (byte)this.dataStream.ReadByte(); - if (packetSize != 0xFF) - await ReceiveData(this.dataStream, this, packetSize); - // else timeout - } - } + // public async Task ReceiveData_old() { + // while (true) { + // byte packetSize = (byte)this.dataStream.ReadByte(); + // if (packetSize != 0xFF) + // await ReceiveData(this.dataStream, this, packetSize); + // // else timeout + // } + // } - public static async Task ReceiveData(Stream dataStream, Participant client, byte packetSize) { - byte msgId = (byte)dataStream.ReadByte(); + // public static async Task ReceiveData(Stream dataStream, Participant client, byte packetSize) { + public async Task ReceiveData(byte[] data, Participant remoteParticipant) { + // byte msgId = (byte)dataStream.ReadByte(); + byte msgId = data[0]; if (msgId == 0xFF) { // Timeout return; } - //UnityEngine.Debug.Log($"R {msgId} from {client.ipAddress}"); + // Console.WriteLine($"R {msgId} from {remoteParticipant.ipAddress}"); bool result = false; switch (msgId) { case ClientMsg.Id: // 0xA0 / 160 - result = await ClientMsg.Receive(dataStream, client, packetSize); + //result = await ClientMsg.Receive(dataStream, client, packetSize); + this.ProcessClientMsg(remoteParticipant, new ClientMsg(data)); break; case NetworkIdMsg.Id: // 0xA1 / 161 - result = await NetworkIdMsg.Receive(dataStream, client, packetSize); + //result = await NetworkIdMsg.Receive(dataStream, client, packetSize); + this.ProcessNetworkIdMsg(remoteParticipant, new NetworkIdMsg(data)); break; case InvestigateMsg.Id: // 0x81 - result = await InvestigateMsg.Receive(dataStream, client, packetSize); + // result = await InvestigateMsg.Receive(dataStream, client, packetSize); break; case ThingMsg.id: // 0x80 / 128 - result = await ThingMsg.Receive(dataStream, client, packetSize); + // result = await ThingMsg.Receive(dataStream, client, packetSize); break; case NameMsg.Id: // 0x91 / 145 - result = await NameMsg.Receive(dataStream, client, packetSize); + // result = await NameMsg.Receive(dataStream, client, packetSize); break; case ModelUrlMsg.Id: // 0x90 / 144 - result = await ModelUrlMsg.Receive(dataStream, client, packetSize); + // result = await ModelUrlMsg.Receive(dataStream, client, packetSize); break; case PoseMsg.Id: // 0x10 / 16 - result = await PoseMsg.Receive(dataStream, client, packetSize); + // result = await PoseMsg.Receive(dataStream, client, packetSize); break; case CustomMsg.Id: // 0xB1 / 177 - result = await CustomMsg.Receive(dataStream, client, packetSize); + // result = await CustomMsg.Receive(dataStream, client, packetSize); break; case TextMsg.Id: // 0xB0 / 176 - result = await TextMsg.Receive(dataStream, client, packetSize); + // result = await TextMsg.Receive(dataStream, client, packetSize); break; case DestroyMsg.Id: // 0x20 / 32 - result = await DestroyMsg.Receive(dataStream, client, packetSize); + // result = await DestroyMsg.Receive(dataStream, client, packetSize); break; default: break; } - if (result == false) { - packetSize = msgId; // skip 1 byte, msgId is possibly a packet size byte - await ReceiveData(dataStream, client, packetSize); - } + // if (result == false) { + // packetSize = msgId; // skip 1 byte, msgId is possibly a packet size byte + // await ReceiveData(dataStream, client, packetSize); + // } } #endregion #region Process - public virtual void ProcessMessages() { + public virtual void ProcessMessages(Participant remoteParticipant) { while (this.messageQueue.TryDequeue(out IMessage msg)) - ProcessMessage(msg); + ProcessMessage(remoteParticipant, msg); } - public void ProcessMessage(IMessage msg) { + public void ProcessMessage(Participant remoteParticipant, IMessage msg) { switch (msg) { case ClientMsg clientMsg: - ProcessClient(clientMsg); + ProcessClientMsg(remoteParticipant, clientMsg); break; case NetworkIdMsg networkId: - ProcessNetworkId(networkId); + ProcessNetworkIdMsg(remoteParticipant, networkId); break; case InvestigateMsg investigate: - ProcessInvestigate(investigate); + ProcessInvestigateMsg(investigate); break; case ThingMsg thing: - ProcessThing(thing); + ProcessThingMsg(thing); break; case NameMsg name: //UnityEngine.Debug.Log($"Name [{name.networkId}/{name.thingId}] {name.name}"); - ProcessName(name); + ProcessNameMsg(name); break; case ModelUrlMsg modelUrl: - ProcessModelUrl(modelUrl); + ProcessModelUrlMsg(modelUrl); break; case PoseMsg pose: - ProcessPose(pose); + ProcessPoseMsg(pose); break; case CustomMsg custom: - ProcessCustom(custom); + ProcessCustomMsg(custom); break; case TextMsg text: - ProcessText(text); + ProcessTextMsg(text); break; case DestroyMsg destroy: - ProcessDestroy(destroy); + ProcessDestroyMsg(destroy); break; default: return; @@ -259,38 +278,38 @@ namespace Passer.Control.Core { ForwardMessage(msg); } - protected virtual void ProcessClient(ClientMsg msg) { } + protected virtual void ProcessClientMsg(Participant sender, ClientMsg msg) { } - protected virtual void ProcessNetworkId(NetworkIdMsg msg) { + protected virtual void ProcessNetworkIdMsg(Participant sender, NetworkIdMsg msg) { + Console.WriteLine($"{this.name} receive network id {this.networkId} {msg.networkId}"); if (this.networkId != msg.networkId) { this.networkId = msg.networkId; - Console.WriteLine($"receive network id {msg.networkId}"); foreach (Thing thing in Thing.GetAllThings()) this.SendThingInfo(thing); } } - protected virtual void ProcessInvestigate(InvestigateMsg msg) { } + protected virtual void ProcessInvestigateMsg(InvestigateMsg msg) { } - protected virtual void ProcessThing(ThingMsg msg) { + protected virtual void ProcessThingMsg(ThingMsg msg) { Console.WriteLine($"received thing {msg.thingId}"); } - protected virtual void ProcessName(NameMsg msg) { + protected virtual void ProcessNameMsg(NameMsg msg) { Console.WriteLine($"received name {msg.name}"); } - protected virtual void ProcessModelUrl(ModelUrlMsg msg) { + protected virtual void ProcessModelUrlMsg(ModelUrlMsg msg) { Console.WriteLine($"received name {msg.url}"); } - protected virtual void ProcessPose(PoseMsg msg) { } + protected virtual void ProcessPoseMsg(PoseMsg msg) { } - protected virtual void ProcessCustom(CustomMsg msg) { } + protected virtual void ProcessCustomMsg(CustomMsg msg) { } - protected virtual void ProcessText(TextMsg temsgxt) { } + protected virtual void ProcessTextMsg(TextMsg temsgxt) { } - protected virtual void ProcessDestroy(DestroyMsg msg) { } + protected virtual void ProcessDestroyMsg(DestroyMsg msg) { } private void ForwardMessage(IMessage msg) { foreach (Participant client in others) { diff --git a/SiteServer.cs b/SiteServer.cs index 7caa93d..52c90c7 100644 --- a/SiteServer.cs +++ b/SiteServer.cs @@ -12,76 +12,25 @@ namespace Passer.Control.Core { this.udpClient = new UdpClient(); // for receiving this.udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, port)); this.udpClient.BeginReceive(new AsyncCallback(result => ReceiveUDP(result)), null); - + this.name = "Site Server"; } - protected override void ProcessClient(ClientMsg msg) { - if (msg.networkId == 0) { - this.Send(new NetworkIdMsg(this.networkId)); - Console.WriteLine($"############################ New Client -> {this.networkId}"); + public override void Update(long currentTimeMs) { + for (int ix = 0; ix < this.others.Count; ix++) { + Participant client = this.others[ix]; + if (client == null) + continue; + + this.ProcessMessages(client); + } + Thing.UpdateAll(currentTimeMs); + } + + protected override void ProcessClientMsg(Participant sender, ClientMsg msg) { + if (msg.networkId == 0) { + Console.WriteLine($"{this.name} received New Client -> {sender.networkId}"); + sender.Send(new NetworkIdMsg(sender.networkId)); } - else - Console.WriteLine($"############################ New Client"); } } - // public static class SiteServer { - - // public static async Task ReceiveData(Stream dataStream, Participant client) { - // while (true) { - // byte packetSize = (byte)dataStream.ReadByte(); - // if (packetSize != 0xFF) - // await ReceiveData(dataStream, client, packetSize); - // // else timeout - // } - // } - - // public static async Task ReceiveData(Stream dataStream, Participant client, byte packetSize) { - // byte msgId = (byte)dataStream.ReadByte(); - // if (msgId == 0xFF) { - // // Timeout - // return; - // } - - // //UnityEngine.Debug.Log($"R {msgId} from {client.ipAddress}"); - // bool result = false; - // switch (msgId) { - // case ClientMsg.Id: // 0xA0 / 160 - // result = await ClientMsg.Receive(dataStream, client, packetSize); - // break; - // case NetworkIdMsg.Id: // 0xA1 / 161 - // result = await NetworkIdMsg.Receive(dataStream, client, packetSize); - // break; - // case InvestigateMsg.Id: // 0x81 - // result = await InvestigateMsg.Receive(dataStream, client, packetSize); - // break; - // case ThingMsg.id: // 0x80 / 128 - // result = await ThingMsg.Receive(dataStream, client, packetSize); - // break; - // case NameMsg.Id: // 0x91 / 145 - // result = await NameMsg.Receive(dataStream, client, packetSize); - // break; - // case ModelUrlMsg.Id: // 0x90 / 144 - // result = await ModelUrlMsg.Receive(dataStream, client, packetSize); - // break; - // case PoseMsg.Id: // 0x10 / 16 - // result = await PoseMsg.Receive(dataStream, client, packetSize); - // break; - // case CustomMsg.Id: // 0xB1 / 177 - // result = await CustomMsg.Receive(dataStream, client, packetSize); - // break; - // case TextMsg.Id: // 0xB0 / 176 - // result = await TextMsg.Receive(dataStream, client, packetSize); - // break; - // case DestroyMsg.Id: // 0x20 / 32 - // result = await DestroyMsg.Receive(dataStream, client, packetSize); - // break; - // default: - // break; - // } - // if (result == false) { - // packetSize = msgId; // skip 1 byte, msgId is possibly a packet size byte - // await ReceiveData(dataStream, client, packetSize); - // } - // } - // } } \ No newline at end of file diff --git a/test/UnitTest1.cs b/test/UnitTest1.cs index e6e0d3e..f369c56 100644 --- a/test/UnitTest1.cs +++ b/test/UnitTest1.cs @@ -8,15 +8,16 @@ public class Tests { } [Test] - public void Test_Client() { + public void Test_Participant() { Participant participant = new("127.0.0.1", 7681); long milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); long startTime = milliseconds; while (milliseconds < startTime + 7000) { participant.Update(milliseconds); - milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + Thread.Sleep(100); + milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } Assert.Pass(); @@ -30,8 +31,27 @@ public class Tests { long startTime = milliseconds; while (milliseconds < startTime + 7000) { siteServer.Update(milliseconds); - milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + Thread.Sleep(100); + milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + Assert.Pass(); + } + + [Test] + public void Test_SiteParticipant() { + SiteServer siteServer = new(7681); + Participant participant = new("127.0.0.1", 7681); + + long milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + long startTime = milliseconds; + while (milliseconds < startTime + 7000) { + siteServer.Update(milliseconds); + participant.Update(milliseconds); + + Thread.Sleep(100); + milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } Assert.Pass(); @@ -49,8 +69,9 @@ public class Tests { long startTime = milliseconds; while (milliseconds < startTime + 7000) { participant.Update(milliseconds); - milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + Thread.Sleep(100); + milliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } Assert.Pass();