HumanoidControl_Free/EchoStream.cs
Pascal Serrarens 7781fcf2de Squashed 'Runtime/HumanoidControl/Scripts/Networking/Roboid/ControlCore/' changes from 9919aa6..aebe4c0
aebe4c0 Arduino Ant now works.
d3cb4c1 Merge commit 'fbeed8e80922152c3404fbd5d2b243ae95792ec1' into V2
fbeed8e Used Client override for processing messages
394dc22 Merge commit '355dd5c1c519cf07cfb6b9f9200f7f7311e68f20' into V2
355dd5c Fixed ThingMsg format
becb194 Merge commit 'f35d60369daf41a4fcd987ef8b31bd384b9536ba' into V2
9b53eee Merge commit 'a48ae12fc2f6d4a99119c128e78bf4b103e607c3' into V2
f35d603 Further improvements
a48ae12 ControlCore mostly works (but I don't see a model on the site server yet)
d8fc41f First step for ControlCore support

git-subtree-dir: Runtime/HumanoidControl/Scripts/Networking/Roboid/ControlCore
git-subtree-split: aebe4c0f8e805259a5aea4a4cb6b72343d73257a
2024-12-09 12:12:08 +01:00

173 lines
5.3 KiB
C#

using System;
using System.IO;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
public class EchoStream : Stream {
public override bool CanTimeout { get; } = true;
public override int ReadTimeout { get; set; } = Timeout.Infinite;
public override int WriteTimeout { get; set; } = Timeout.Infinite;
public override bool CanRead { get; } = true;
public override bool CanSeek { get; } = false;
public override bool CanWrite { get; } = true;
public bool CopyBufferOnWrite { get; set; } = false;
private readonly object _lock = new object();
// Default underlying mechanism for BlockingCollection is ConcurrentQueue<T>, which is what we want
private readonly BlockingCollection<byte[]> _Buffers;
private int _maxQueueDepth = 10;
private byte[] m_buffer = null;
private int m_offset = 0;
private int m_count = 0;
private bool m_Closed = false;
private bool m_FinalZero = false; //after the stream is closed, set to true after returning a 0 for read()
public override void Close() {
m_Closed = true;
// release any waiting writes
_Buffers.CompleteAdding();
}
public bool DataAvailable {
get {
return _Buffers.Count > 0;
}
}
private long _Length = 0L;
public override long Length {
get {
return _Length;
}
}
private long _Position = 0L;
public override long Position {
get {
return _Position;
}
set {
throw new NotImplementedException();
}
}
public EchoStream() : this(10) {
}
public EchoStream(int maxQueueDepth) {
_maxQueueDepth = maxQueueDepth;
_Buffers = new BlockingCollection<byte[]>(_maxQueueDepth);
}
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
public new Task WriteAsync(byte[] buffer, int offset, int count) {
return Task.Run(() => Write(buffer, offset, count));
}
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
public new Task<int> ReadAsync(byte[] buffer, int offset, int count) {
return Task.Run(() => {
return Read(buffer, offset, count);
});
}
public override void Write(byte[] buffer, int offset, int count) {
if (m_Closed || buffer.Length - offset < count || count <= 0)
return;
byte[] newBuffer;
if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length)
newBuffer = buffer;
else {
newBuffer = new byte[count];
System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count);
}
if (!_Buffers.TryAdd(newBuffer, WriteTimeout))
throw new TimeoutException("EchoStream Write() Timeout");
_Length += count;
}
public override int Read(byte[] buffer, int offset, int count) {
if (count == 0)
return 0;
lock (_lock) {
if (m_count == 0 && _Buffers.Count == 0) {
if (m_Closed) {
if (!m_FinalZero) {
m_FinalZero = true;
return 0;
}
else {
return -1;
}
}
if (_Buffers.TryTake(out m_buffer, ReadTimeout)) {
m_offset = 0;
m_count = m_buffer.Length;
}
else {
if (m_Closed) {
if (!m_FinalZero) {
m_FinalZero = true;
return 0;
}
else {
return -1;
}
}
else {
return 0;
}
}
}
int returnBytes = 0;
while (count > 0) {
if (m_count == 0) {
if (_Buffers.TryTake(out m_buffer, 0)) {
m_offset = 0;
m_count = m_buffer.Length;
}
else
break;
}
var bytesToCopy = (count < m_count) ? count : m_count;
System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy);
m_offset += bytesToCopy;
m_count -= bytesToCopy;
offset += bytesToCopy;
count -= bytesToCopy;
returnBytes += bytesToCopy;
}
_Position += returnBytes;
return returnBytes;
}
}
public override int ReadByte() {
byte[] returnValue = new byte[1];
return (Read(returnValue, 0, 1) <= 0 ? -1 : (int)returnValue[0]);
}
public override void Flush() {
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotImplementedException();
}
public override void SetLength(long value) {
throw new NotImplementedException();
}
}