
aebe4c0 Arduino Ant now works. d3cb4c1 Merge commit 'fbeed8e80922152c3404fbd5d2b243ae95792ec1' into V2 fbeed8e Used Client override for processing messages 394dc22 Merge commit '355dd5c1c519cf07cfb6b9f9200f7f7311e68f20' into V2 355dd5c Fixed ThingMsg format becb194 Merge commit 'f35d60369daf41a4fcd987ef8b31bd384b9536ba' into V2 9b53eee Merge commit 'a48ae12fc2f6d4a99119c128e78bf4b103e607c3' into V2 f35d603 Further improvements a48ae12 ControlCore mostly works (but I don't see a model on the site server yet) d8fc41f First step for ControlCore support git-subtree-dir: Runtime/HumanoidControl/Scripts/Networking/Roboid/ControlCore git-subtree-split: aebe4c0f8e805259a5aea4a4cb6b72343d73257a
173 lines
5.3 KiB
C#
173 lines
5.3 KiB
C#
using System;
|
|
using System.IO;
|
|
using System.Threading.Tasks;
|
|
using System.Threading;
|
|
using System.Collections.Concurrent;
|
|
|
|
public class EchoStream : Stream {
|
|
public override bool CanTimeout { get; } = true;
|
|
public override int ReadTimeout { get; set; } = Timeout.Infinite;
|
|
public override int WriteTimeout { get; set; } = Timeout.Infinite;
|
|
public override bool CanRead { get; } = true;
|
|
public override bool CanSeek { get; } = false;
|
|
public override bool CanWrite { get; } = true;
|
|
|
|
public bool CopyBufferOnWrite { get; set; } = false;
|
|
|
|
private readonly object _lock = new object();
|
|
|
|
// Default underlying mechanism for BlockingCollection is ConcurrentQueue<T>, which is what we want
|
|
private readonly BlockingCollection<byte[]> _Buffers;
|
|
private int _maxQueueDepth = 10;
|
|
|
|
private byte[] m_buffer = null;
|
|
private int m_offset = 0;
|
|
private int m_count = 0;
|
|
|
|
private bool m_Closed = false;
|
|
private bool m_FinalZero = false; //after the stream is closed, set to true after returning a 0 for read()
|
|
public override void Close() {
|
|
m_Closed = true;
|
|
|
|
// release any waiting writes
|
|
_Buffers.CompleteAdding();
|
|
}
|
|
|
|
public bool DataAvailable {
|
|
get {
|
|
return _Buffers.Count > 0;
|
|
}
|
|
}
|
|
|
|
private long _Length = 0L;
|
|
public override long Length {
|
|
get {
|
|
return _Length;
|
|
}
|
|
}
|
|
|
|
private long _Position = 0L;
|
|
public override long Position {
|
|
get {
|
|
return _Position;
|
|
}
|
|
set {
|
|
throw new NotImplementedException();
|
|
}
|
|
}
|
|
|
|
public EchoStream() : this(10) {
|
|
}
|
|
|
|
public EchoStream(int maxQueueDepth) {
|
|
_maxQueueDepth = maxQueueDepth;
|
|
_Buffers = new BlockingCollection<byte[]>(_maxQueueDepth);
|
|
}
|
|
|
|
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
|
|
public new Task WriteAsync(byte[] buffer, int offset, int count) {
|
|
return Task.Run(() => Write(buffer, offset, count));
|
|
}
|
|
|
|
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
|
|
public new Task<int> ReadAsync(byte[] buffer, int offset, int count) {
|
|
return Task.Run(() => {
|
|
return Read(buffer, offset, count);
|
|
});
|
|
}
|
|
|
|
public override void Write(byte[] buffer, int offset, int count) {
|
|
if (m_Closed || buffer.Length - offset < count || count <= 0)
|
|
return;
|
|
|
|
byte[] newBuffer;
|
|
if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length)
|
|
newBuffer = buffer;
|
|
else {
|
|
newBuffer = new byte[count];
|
|
System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count);
|
|
}
|
|
if (!_Buffers.TryAdd(newBuffer, WriteTimeout))
|
|
throw new TimeoutException("EchoStream Write() Timeout");
|
|
|
|
_Length += count;
|
|
}
|
|
|
|
public override int Read(byte[] buffer, int offset, int count) {
|
|
if (count == 0)
|
|
return 0;
|
|
lock (_lock) {
|
|
if (m_count == 0 && _Buffers.Count == 0) {
|
|
if (m_Closed) {
|
|
if (!m_FinalZero) {
|
|
m_FinalZero = true;
|
|
return 0;
|
|
}
|
|
else {
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
if (_Buffers.TryTake(out m_buffer, ReadTimeout)) {
|
|
m_offset = 0;
|
|
m_count = m_buffer.Length;
|
|
}
|
|
else {
|
|
if (m_Closed) {
|
|
if (!m_FinalZero) {
|
|
m_FinalZero = true;
|
|
return 0;
|
|
}
|
|
else {
|
|
return -1;
|
|
}
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
int returnBytes = 0;
|
|
while (count > 0) {
|
|
if (m_count == 0) {
|
|
if (_Buffers.TryTake(out m_buffer, 0)) {
|
|
m_offset = 0;
|
|
m_count = m_buffer.Length;
|
|
}
|
|
else
|
|
break;
|
|
}
|
|
|
|
var bytesToCopy = (count < m_count) ? count : m_count;
|
|
System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy);
|
|
m_offset += bytesToCopy;
|
|
m_count -= bytesToCopy;
|
|
offset += bytesToCopy;
|
|
count -= bytesToCopy;
|
|
|
|
returnBytes += bytesToCopy;
|
|
}
|
|
|
|
_Position += returnBytes;
|
|
|
|
return returnBytes;
|
|
}
|
|
}
|
|
|
|
public override int ReadByte() {
|
|
byte[] returnValue = new byte[1];
|
|
return (Read(returnValue, 0, 1) <= 0 ? -1 : (int)returnValue[0]);
|
|
}
|
|
|
|
public override void Flush() {
|
|
}
|
|
|
|
public override long Seek(long offset, SeekOrigin origin) {
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
public override void SetLength(long value) {
|
|
throw new NotImplementedException();
|
|
}
|
|
} |