From d8fc41f1c4657a762d4827d05d4006348ae53210 Mon Sep 17 00:00:00 2001 From: Pascal Serrarens Date: Fri, 6 Dec 2024 15:39:36 +0100 Subject: [PATCH] First step for ControlCore support --- EchoStream.cs | 173 ++++++++++++++++++++++++++++++++++++++++++++ EchoStream.cs.meta | 2 + Messages.cs | 175 ++++++++++++++++++++++++++++++++++++++------- SiteServer.cs | 85 ++++++++++++++++++++++ SiteServer.cs.meta | 2 + 5 files changed, 410 insertions(+), 27 deletions(-) create mode 100644 EchoStream.cs create mode 100644 EchoStream.cs.meta create mode 100644 SiteServer.cs create mode 100644 SiteServer.cs.meta diff --git a/EchoStream.cs b/EchoStream.cs new file mode 100644 index 0000000..c9ae949 --- /dev/null +++ b/EchoStream.cs @@ -0,0 +1,173 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using System.Threading; +using System.Collections.Concurrent; + +public class EchoStream : Stream { + public override bool CanTimeout { get; } = true; + public override int ReadTimeout { get; set; } = Timeout.Infinite; + public override int WriteTimeout { get; set; } = Timeout.Infinite; + public override bool CanRead { get; } = true; + public override bool CanSeek { get; } = false; + public override bool CanWrite { get; } = true; + + public bool CopyBufferOnWrite { get; set; } = false; + + private readonly object _lock = new object(); + + // Default underlying mechanism for BlockingCollection is ConcurrentQueue, which is what we want + private readonly BlockingCollection _Buffers; + private int _maxQueueDepth = 10; + + private byte[] m_buffer = null; + private int m_offset = 0; + private int m_count = 0; + + private bool m_Closed = false; + private bool m_FinalZero = false; //after the stream is closed, set to true after returning a 0 for read() + public override void Close() { + m_Closed = true; + + // release any waiting writes + _Buffers.CompleteAdding(); + } + + public bool DataAvailable { + get { + return _Buffers.Count > 0; + } + } + + private long _Length = 0L; + public override long Length { + get { + return _Length; + } + } + + private long _Position = 0L; + public override long Position { + get { + return _Position; + } + set { + throw new NotImplementedException(); + } + } + + public EchoStream() : this(10) { + } + + public EchoStream(int maxQueueDepth) { + _maxQueueDepth = maxQueueDepth; + _Buffers = new BlockingCollection(_maxQueueDepth); + } + + // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once + public new Task WriteAsync(byte[] buffer, int offset, int count) { + return Task.Run(() => Write(buffer, offset, count)); + } + + // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once + public new Task ReadAsync(byte[] buffer, int offset, int count) { + return Task.Run(() => { + return Read(buffer, offset, count); + }); + } + + public override void Write(byte[] buffer, int offset, int count) { + if (m_Closed || buffer.Length - offset < count || count <= 0) + return; + + byte[] newBuffer; + if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length) + newBuffer = buffer; + else { + newBuffer = new byte[count]; + System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count); + } + if (!_Buffers.TryAdd(newBuffer, WriteTimeout)) + throw new TimeoutException("EchoStream Write() Timeout"); + + _Length += count; + } + + public override int Read(byte[] buffer, int offset, int count) { + if (count == 0) + return 0; + lock (_lock) { + if (m_count == 0 && _Buffers.Count == 0) { + if (m_Closed) { + if (!m_FinalZero) { + m_FinalZero = true; + return 0; + } + else { + return -1; + } + } + + if (_Buffers.TryTake(out m_buffer, ReadTimeout)) { + m_offset = 0; + m_count = m_buffer.Length; + } + else { + if (m_Closed) { + if (!m_FinalZero) { + m_FinalZero = true; + return 0; + } + else { + return -1; + } + } + else { + return 0; + } + } + } + + int returnBytes = 0; + while (count > 0) { + if (m_count == 0) { + if (_Buffers.TryTake(out m_buffer, 0)) { + m_offset = 0; + m_count = m_buffer.Length; + } + else + break; + } + + var bytesToCopy = (count < m_count) ? count : m_count; + System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy); + m_offset += bytesToCopy; + m_count -= bytesToCopy; + offset += bytesToCopy; + count -= bytesToCopy; + + returnBytes += bytesToCopy; + } + + _Position += returnBytes; + + return returnBytes; + } + } + + public override int ReadByte() { + byte[] returnValue = new byte[1]; + return (Read(returnValue, 0, 1) <= 0 ? -1 : (int)returnValue[0]); + } + + public override void Flush() { + } + + public override long Seek(long offset, SeekOrigin origin) { + throw new NotImplementedException(); + } + + public override void SetLength(long value) { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/EchoStream.cs.meta b/EchoStream.cs.meta new file mode 100644 index 0000000..063a4f2 --- /dev/null +++ b/EchoStream.cs.meta @@ -0,0 +1,2 @@ +fileFormatVersion: 2 +guid: 422516a56cbf14d46aaa0b1bc09115bf \ No newline at end of file diff --git a/Messages.cs b/Messages.cs index b517f4d..acbecad 100644 --- a/Messages.cs +++ b/Messages.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.IO; using System.Net.Sockets; using System.Threading.Tasks; +using System; namespace Passer.Control { @@ -76,16 +77,25 @@ namespace Passer.Control { #region Client public class ClientMsg : IMessage { + public const byte Id = 0xA0; public const byte length = 2; - public byte clientId; + public byte networkId; + public ClientMsg(byte networkId) { + this.networkId = networkId; + } public ClientMsg(byte[] data) : base(data) { } + public override void Deserialize(byte[] data) { base.Deserialize(data); uint ix = 0; - clientId = data[ix++]; + networkId = data[ix]; } + public static bool Send(Client client, byte networkId) { + ClientMsg msg = new(networkId); + return SendMsg(client, msg); + } public static async Task Receive(Stream dataStream, Client client, byte packetSize) { if (packetSize != length) return false; @@ -95,12 +105,12 @@ namespace Passer.Control { if (client.networkId == 0) { client.networkId = (byte)(Client.clients.Count); - NetworkIdMsg.Send(client); + NetworkIdMsg.Send(client, client.networkId); //if (string.IsNullOrEmpty(sceneUrl) == false) //SendModelUrl(client, sceneUrl); } - else if (msg.clientId == 0) { - NetworkIdMsg.Send(client); + else if (msg.networkId == 0) { + NetworkIdMsg.Send(client, client.networkId); //if (string.IsNullOrEmpty(sceneUrl) == false) //SendModelUrl(client, sceneUrl); } @@ -116,20 +126,92 @@ namespace Passer.Control { public class NetworkIdMsg : IMessage { public const byte Id = 0xA1; public const byte length = 2; + public byte networkId; - public static bool Send(Client client) { - byte[] data = new byte[NetworkIdMsg.length]; - data[0] = NetworkIdMsg.Id; - data[1] = client.networkId; - return SendMsg(client, data); + NetworkIdMsg(byte networkId) { + this.networkId = networkId; + } + NetworkIdMsg(byte[] data) : base(data) { } + + public override byte[] Serialize() { + byte[] data = new byte[NetworkIdMsg.length]; + data[0] = NetworkIdMsg.Id; + data[1] = this.networkId; + return data; + } + public override void Deserialize(byte[] data) { + uint ix = 0; + this.networkId = data[ix]; + } + + public static bool Send(Client client, byte networkId) { + NetworkIdMsg msg = new(networkId); + return SendMsg(client, msg); + //byte[] data = new byte[NetworkIdMsg.length]; + //data[0] = NetworkIdMsg.Id; + //data[1] = client.networkId; + //return SendMsg(client, data); } + public static async Task Receive(Stream dataStream, Client client, byte packetSize) { + if (packetSize != length) + return false; + + byte[] buffer = await Receive(dataStream, packetSize); + NetworkIdMsg msg = new(buffer); + client.messageQueue.Enqueue(msg); + return true; + } } - #endregion Network Id + #endregion Network Id - #region Thing + #region Investigate - public class ThingMsg : IMessage { + class InvestigateMsg : IMessage { + public const byte Id = 0x81; + public const byte length = 3; + public byte networkId; + public byte thingId; + + public InvestigateMsg(byte networkId, byte thingId) { + this.networkId = networkId; + this.thingId = thingId; + } + public InvestigateMsg(byte[] data) : base(data) { } + + public override byte[] Serialize() { + byte[] buffer = new byte[InvestigateMsg.length]; + buffer[0] = InvestigateMsg.Id; + buffer[1] = this.networkId; + buffer[2] = this.thingId; + return buffer; + } + public override void Deserialize(byte[] data) { + uint ix = 0; + this.networkId = data[ix++]; + this.thingId = data[ix++]; + } + + public static bool Send(Client client, byte thingId) { + InvestigateMsg msg = new(client.networkId, thingId); + return SendMsg(client, msg); + } + public static async Task Receive(Stream dataStream, Client client, byte packetSize) { + if (packetSize != length) + return false; + + byte[] buffer = await Receive(dataStream, packetSize); + InvestigateMsg msg = new(buffer); + client.messageQueue.Enqueue(msg); + return true; + + } + } + + #endregion Investigate + #region Thing + + public class ThingMsg : IMessage { public const byte length = 4; public const byte Id = 0x80; public byte objectId; @@ -144,13 +226,13 @@ namespace Passer.Control { parentId = data[ix]; } - public static bool Send(Client client, byte networkId, byte thingId, byte thingType) { + public static bool Send(Client client, byte thingId, byte thingType, byte parentId) { byte[] data = new byte[4]; data[0] = ThingMsg.Id; - data[1] = networkId; + data[1] = client.networkId; data[2] = thingId; data[3] = thingType; - data[4] = 0x00; // parent not supported yet + data[4] = parentId; return SendMsg(client, data); } public static async Task Receive(Stream dataStream, Client client, byte packetSize) { @@ -170,19 +252,39 @@ namespace Passer.Control { #region Name public class NameMsg : IMessage { + public const byte Id = 0x91; // 145 + public const byte length = 3; public byte networkId = 0; - public byte objectId; + public byte thingId; public byte len; public string name; + public NameMsg(byte thingId, string name) { + this.thingId = thingId; + this.name = name; + } public NameMsg(byte[] data) : base(data) { } - public override void Deserialize(byte[] data) { + + public override byte[] Serialize() { + byte[] data = new byte[length + this.name.Length]; + data[0] = NameMsg.Id; + data[1] = this.thingId; + data[2] = (byte)this.name.Length; + for (int ix = 0; ix < this.name.Length; ix++) + data[3 + ix] = (byte)this.name[ix]; + return data; + } + public override void Deserialize(byte[] data) { uint ix = 0; - this.objectId = data[ix++]; + this.thingId = data[ix++]; int strlen = data[ix++]; this.name = System.Text.Encoding.UTF8.GetString(data, (int)ix, strlen); } + public static bool Send(Client client, byte thingId, string name) { + NameMsg msg = new NameMsg(thingId, name); + return SendMsg(client, msg); + } public static async Task Receive(Stream dataStream, Client client, byte packetSize) { byte[] buffer = await Receive(dataStream, packetSize); NameMsg msg = new(buffer); @@ -198,12 +300,13 @@ namespace Passer.Control { public class ModelUrlMsg : IMessage { public const byte Id = 0x90; // (144) Model URL - public byte objectId; + public byte thingId; public Spherical position; public float scale; public string url; - public ModelUrlMsg(string url, float scale = 1) { + public ModelUrlMsg(byte thingId, string url, float scale = 1) { + this.thingId = thingId; this.url = url; this.scale = scale; this.position = Spherical.zero; @@ -213,7 +316,7 @@ namespace Passer.Control { public override byte[] Serialize() { byte[] data = new byte[this.url.Length + 9]; data[0] = ModelUrlMsg.Id; - data[1] = 0x00; // Thing Id + data[1] = this.thingId; // Thing Id // data[2]..[5] == position 0, 0, 0 data[6] = 0x3C; // Dummy float16 value 1 data[7] = 0x00; @@ -225,15 +328,15 @@ namespace Passer.Control { } public override void Deserialize(byte[] data) { uint ix = 0; - this.objectId = data[ix++]; + this.thingId = data[ix++]; this.position = LowLevelMessages.ReceiveSpherical(data, ref ix); this.scale = LowLevelMessages.ReceiveFloat16(data, ref ix); int strlen = data[ix++]; url = System.Text.Encoding.UTF8.GetString(data, (int)ix, strlen); } - public static bool Send(Client client, string modelUrl) { - ModelUrlMsg msg = new(modelUrl); + public static bool Send(Client client, byte thingId, string modelUrl) { + ModelUrlMsg msg = new(thingId, modelUrl); return SendMsg(client, msg); } public static async Task Receive(Stream dataStream, Client client, byte packetSize) { @@ -249,14 +352,28 @@ namespace Passer.Control { #region Pose public class PoseMsg : IMessage { + public const byte Id = 0x10; public const byte length = 3 + 4 + 4; public byte thingId; + public byte poseType; + public const byte Pose_Position = 0x01; + public const byte Pose_Orientation = 0x02; public Spherical position; public Quat32 orientation; - public PoseMsg(byte[] data) : base(data) { } + public PoseMsg(byte thingId, Spherical position, Quat32 orientation) { + this.thingId = thingId; + this.position = position; + this.orientation = orientation; + this.poseType = 0; + if (this.position != null) + this.poseType |= Pose_Position; + if (this.orientation != null) + this.poseType |= Pose_Orientation; + } + public PoseMsg(byte[] data) : base(data) { } public override void Deserialize(byte[] data) { uint ix = 0; @@ -269,6 +386,10 @@ namespace Passer.Control { orientation = LowLevelMessages.ReceiveQuat32(data, ref ix); } + public static bool Send(Client client, byte thingId, Spherical position, Quat32 orientation) { + PoseMsg msg = new(thingId, position, orientation); + return SendMsg(client, msg); + } public static async Task Receive(Stream dataStream, Client client, byte packetSize) { if (packetSize != length) return false; @@ -278,7 +399,6 @@ namespace Passer.Control { client.messageQueue.Enqueue(msg); return true; - } } @@ -339,6 +459,7 @@ namespace Passer.Control { #region Destroy public class DestroyMsg : IMessage { + public const byte Id = 0x20; public const byte length = 2; public byte objectId; diff --git a/SiteServer.cs b/SiteServer.cs new file mode 100644 index 0000000..cd01666 --- /dev/null +++ b/SiteServer.cs @@ -0,0 +1,85 @@ + +using System.IO; +using System.Threading.Tasks; + +namespace Passer.Control { + + public static class SiteServer { + public static async Task ReceiveData(Stream dataStream, Client client) { + while (true) { + byte packetSize = (byte)dataStream.ReadByte(); + if (packetSize == 0xFF) + //Debug.Log("Receive timeout"); + // Timeout + ; + else + await ReceiveData(dataStream, client, packetSize); + } + } + + public static async Task ReceiveData(Stream dataStream, Client client, byte packetSize) { + byte msgId = (byte)dataStream.ReadByte(); + if (msgId == 0xFF) { + // Timeout + return; + } + + bool result = false; + switch (msgId) { + case PoseMsg.Id: // Object Pose (16) + result = await PoseMsg.Receive(dataStream, client, packetSize); + break; + case DestroyMsg.Id: // Destroy object (32) + result = await DestroyMsg.Receive(dataStream, client, packetSize); + break; + case ThingMsg.Id: + result = await ThingMsg.Receive(dataStream, client, packetSize); + break; + case InvestigateMsg.Id: + result = await InvestigateMsg.Receive(dataStream, client, packetSize); + break; + case ModelUrlMsg.Id: // Model URL (144) + result = await BytesMsg.Receive(dataStream, client, packetSize); + break; + case NameMsg.Id: // Object Name (145) + result = await NameMsg.Receive(dataStream, client, packetSize); + break; + case ClientMsg.Id: + result = await ClientMsg.Receive(dataStream, client, packetSize); + break; + case NetworkIdMsg.Id: + result = await NetworkIdMsg.Receive(dataStream, client, packetSize); + break; + //case TextMsg.Id: // Text (176) + // result = await TextMsg.Receive(dataStream, client, packetSize); + // break; + case BytesMsg.Id: + result = await BytesMsg.Receive(dataStream, client, packetSize); + break; + default: + break; + } + if (result == false) { + packetSize = msgId; // skip 1 byte, msgId is possibly a packet size byte + await ReceiveData(dataStream, client, packetSize); + } + } + + public static void ProcessMessage(ISiteServer site, Client client, IMessage msg) { + switch (msg) { + case NetworkIdMsg networkId: + site.ProcessNetworkId(client, networkId); + break; + case ModelUrlMsg modelUrl: + site.ProcessModelUrl(client, modelUrl); + break; + } + } + } + + public interface ISiteServer { + + public void ProcessNetworkId(Client client, NetworkIdMsg networkId); + public void ProcessModelUrl(Client client, ModelUrlMsg modelUrl); + } +} \ No newline at end of file diff --git a/SiteServer.cs.meta b/SiteServer.cs.meta new file mode 100644 index 0000000..02fd171 --- /dev/null +++ b/SiteServer.cs.meta @@ -0,0 +1,2 @@ +fileFormatVersion: 2 +guid: 53345abb9310d344baa67c19a8d8249f \ No newline at end of file