diff --git a/EchoStream.cs b/EchoStream.cs new file mode 100644 index 0000000..c9ae949 --- /dev/null +++ b/EchoStream.cs @@ -0,0 +1,173 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using System.Threading; +using System.Collections.Concurrent; + +public class EchoStream : Stream { + public override bool CanTimeout { get; } = true; + public override int ReadTimeout { get; set; } = Timeout.Infinite; + public override int WriteTimeout { get; set; } = Timeout.Infinite; + public override bool CanRead { get; } = true; + public override bool CanSeek { get; } = false; + public override bool CanWrite { get; } = true; + + public bool CopyBufferOnWrite { get; set; } = false; + + private readonly object _lock = new object(); + + // Default underlying mechanism for BlockingCollection is ConcurrentQueue, which is what we want + private readonly BlockingCollection _Buffers; + private int _maxQueueDepth = 10; + + private byte[] m_buffer = null; + private int m_offset = 0; + private int m_count = 0; + + private bool m_Closed = false; + private bool m_FinalZero = false; //after the stream is closed, set to true after returning a 0 for read() + public override void Close() { + m_Closed = true; + + // release any waiting writes + _Buffers.CompleteAdding(); + } + + public bool DataAvailable { + get { + return _Buffers.Count > 0; + } + } + + private long _Length = 0L; + public override long Length { + get { + return _Length; + } + } + + private long _Position = 0L; + public override long Position { + get { + return _Position; + } + set { + throw new NotImplementedException(); + } + } + + public EchoStream() : this(10) { + } + + public EchoStream(int maxQueueDepth) { + _maxQueueDepth = maxQueueDepth; + _Buffers = new BlockingCollection(_maxQueueDepth); + } + + // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once + public new Task WriteAsync(byte[] buffer, int offset, int count) { + return Task.Run(() => Write(buffer, offset, count)); + } + + // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once + public new Task ReadAsync(byte[] buffer, int offset, int count) { + return Task.Run(() => { + return Read(buffer, offset, count); + }); + } + + public override void Write(byte[] buffer, int offset, int count) { + if (m_Closed || buffer.Length - offset < count || count <= 0) + return; + + byte[] newBuffer; + if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length) + newBuffer = buffer; + else { + newBuffer = new byte[count]; + System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count); + } + if (!_Buffers.TryAdd(newBuffer, WriteTimeout)) + throw new TimeoutException("EchoStream Write() Timeout"); + + _Length += count; + } + + public override int Read(byte[] buffer, int offset, int count) { + if (count == 0) + return 0; + lock (_lock) { + if (m_count == 0 && _Buffers.Count == 0) { + if (m_Closed) { + if (!m_FinalZero) { + m_FinalZero = true; + return 0; + } + else { + return -1; + } + } + + if (_Buffers.TryTake(out m_buffer, ReadTimeout)) { + m_offset = 0; + m_count = m_buffer.Length; + } + else { + if (m_Closed) { + if (!m_FinalZero) { + m_FinalZero = true; + return 0; + } + else { + return -1; + } + } + else { + return 0; + } + } + } + + int returnBytes = 0; + while (count > 0) { + if (m_count == 0) { + if (_Buffers.TryTake(out m_buffer, 0)) { + m_offset = 0; + m_count = m_buffer.Length; + } + else + break; + } + + var bytesToCopy = (count < m_count) ? count : m_count; + System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy); + m_offset += bytesToCopy; + m_count -= bytesToCopy; + offset += bytesToCopy; + count -= bytesToCopy; + + returnBytes += bytesToCopy; + } + + _Position += returnBytes; + + return returnBytes; + } + } + + public override int ReadByte() { + byte[] returnValue = new byte[1]; + return (Read(returnValue, 0, 1) <= 0 ? -1 : (int)returnValue[0]); + } + + public override void Flush() { + } + + public override long Seek(long offset, SeekOrigin origin) { + throw new NotImplementedException(); + } + + public override void SetLength(long value) { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/EchoStream.cs.meta b/EchoStream.cs.meta new file mode 100644 index 0000000..063a4f2 --- /dev/null +++ b/EchoStream.cs.meta @@ -0,0 +1,2 @@ +fileFormatVersion: 2 +guid: 422516a56cbf14d46aaa0b1bc09115bf \ No newline at end of file diff --git a/LowLevelMessages.cs b/LowLevelMessages.cs index a976ddf..f279dbc 100644 --- a/LowLevelMessages.cs +++ b/LowLevelMessages.cs @@ -1,6 +1,13 @@ using Passer; public class LowLevelMessages { + + public static void SendSpherical(byte[] buffer, ref uint ix, Spherical v) { + SendFloat16(buffer, ref ix, new float16(v.distance)); + SendAngle8(buffer, ref ix, v.horizontal); + SendAngle8(buffer, ref ix, v.vertical); + } + public static Spherical ReceiveSpherical(byte[] data, ref uint ix) { float horizontal = ReceiveAngle8(data, ref ix); float vertical = ReceiveAngle8(data, ref ix); @@ -9,6 +16,23 @@ public class LowLevelMessages { return v; } + public static void SendQuat32(byte[] buffer, ref uint ix, Quat32 q) { + int qx = (int)(q.x * 127 + 128); + int qy = (int)(q.y * 127 + 128); + int qz = (int)(q.z * 127 + 128); + int qw = (int)(q.w * 255); + if (q.w < 0) { + qx = -qx; + qy = -qy; + qz = -qz; + qw = -qw; + } + + buffer[ix++] = (byte)qx; + buffer[ix++] = (byte)qy; + buffer[ix++] = (byte)qz; + buffer[ix++] = (byte)qw; + } public static Quat32 ReceiveQuat32(byte[] data, ref uint ix) { Quat32 q = new( (data[ix++] - 128.0F) / 127.0F, @@ -18,11 +42,26 @@ public class LowLevelMessages { return q; } + public static void SendAngle8(byte[] buffer, ref uint ix, float angle) { + // Normalize angle + while (angle >= 180) + angle -= 360; + while (angle < -180) + angle += 360; + buffer[ix++] = (byte)((angle / 360.0f) * 256.0f); + } + public static float ReceiveAngle8(byte[] data, ref uint ix) { float value = (data[ix++] * 180) / 128.0F; return value; } + public static void SendFloat16(byte[] data, ref uint ix, float16 f) { + ushort binary = f.GetBinary(); + data[ix++] = (byte)(binary >> 8); + data[ix++] = (byte)(binary & 255); + } + public static float ReceiveFloat16(byte[] data, ref uint ix) { ushort value = (ushort)(data[ix++] << 8 | data[ix++]); float16 f16 = new(); @@ -30,6 +69,4 @@ public class LowLevelMessages { float f = f16.toFloat(); return f; } - - } diff --git a/SiteServer.cs b/SiteServer.cs new file mode 100644 index 0000000..cd01666 --- /dev/null +++ b/SiteServer.cs @@ -0,0 +1,85 @@ + +using System.IO; +using System.Threading.Tasks; + +namespace Passer.Control { + + public static class SiteServer { + public static async Task ReceiveData(Stream dataStream, Client client) { + while (true) { + byte packetSize = (byte)dataStream.ReadByte(); + if (packetSize == 0xFF) + //Debug.Log("Receive timeout"); + // Timeout + ; + else + await ReceiveData(dataStream, client, packetSize); + } + } + + public static async Task ReceiveData(Stream dataStream, Client client, byte packetSize) { + byte msgId = (byte)dataStream.ReadByte(); + if (msgId == 0xFF) { + // Timeout + return; + } + + bool result = false; + switch (msgId) { + case PoseMsg.Id: // Object Pose (16) + result = await PoseMsg.Receive(dataStream, client, packetSize); + break; + case DestroyMsg.Id: // Destroy object (32) + result = await DestroyMsg.Receive(dataStream, client, packetSize); + break; + case ThingMsg.Id: + result = await ThingMsg.Receive(dataStream, client, packetSize); + break; + case InvestigateMsg.Id: + result = await InvestigateMsg.Receive(dataStream, client, packetSize); + break; + case ModelUrlMsg.Id: // Model URL (144) + result = await BytesMsg.Receive(dataStream, client, packetSize); + break; + case NameMsg.Id: // Object Name (145) + result = await NameMsg.Receive(dataStream, client, packetSize); + break; + case ClientMsg.Id: + result = await ClientMsg.Receive(dataStream, client, packetSize); + break; + case NetworkIdMsg.Id: + result = await NetworkIdMsg.Receive(dataStream, client, packetSize); + break; + //case TextMsg.Id: // Text (176) + // result = await TextMsg.Receive(dataStream, client, packetSize); + // break; + case BytesMsg.Id: + result = await BytesMsg.Receive(dataStream, client, packetSize); + break; + default: + break; + } + if (result == false) { + packetSize = msgId; // skip 1 byte, msgId is possibly a packet size byte + await ReceiveData(dataStream, client, packetSize); + } + } + + public static void ProcessMessage(ISiteServer site, Client client, IMessage msg) { + switch (msg) { + case NetworkIdMsg networkId: + site.ProcessNetworkId(client, networkId); + break; + case ModelUrlMsg modelUrl: + site.ProcessModelUrl(client, modelUrl); + break; + } + } + } + + public interface ISiteServer { + + public void ProcessNetworkId(Client client, NetworkIdMsg networkId); + public void ProcessModelUrl(Client client, ModelUrlMsg modelUrl); + } +} \ No newline at end of file diff --git a/SiteServer.cs.meta b/SiteServer.cs.meta new file mode 100644 index 0000000..02fd171 --- /dev/null +++ b/SiteServer.cs.meta @@ -0,0 +1,2 @@ +fileFormatVersion: 2 +guid: 53345abb9310d344baa67c19a8d8249f \ No newline at end of file