1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Diagnostics; 20 using System.IO; 21 using System.Threading; 22 using System.Threading.Tasks; 23 24 namespace Thrift.Transport.Client 25 { 26 // ReSharper disable once InconsistentNaming 27 public class TMemoryBufferTransport : TEndpointTransport 28 { 29 private bool IsDisposed; 30 private byte[] Bytes; 31 private int _bytesUsed; 32 TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048)33 public TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048) 34 : base(config) 35 { 36 Bytes = new byte[initialCapacity]; 37 } 38 TMemoryBufferTransport(byte[] buf, TConfiguration config)39 public TMemoryBufferTransport(byte[] buf, TConfiguration config) 40 :base(config) 41 { 42 Bytes = (byte[])buf.Clone(); 43 _bytesUsed = Bytes.Length; 44 UpdateKnownMessageSize(_bytesUsed); 45 } 46 47 public int Position { get; set; } 48 49 public int Capacity 50 { 51 get 52 { 53 Debug.Assert(_bytesUsed <= Bytes.Length); 54 return Bytes.Length; 55 } 56 set 57 { 58 Array.Resize(ref Bytes, value); 59 _bytesUsed = value; 60 } 61 } 62 63 public int Length 64 { 65 get { 66 Debug.Assert(_bytesUsed <= Bytes.Length); 67 return _bytesUsed; 68 } 69 set { 70 if ((Bytes.Length < value) || (Bytes.Length > (10 * value))) 71 Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25))); 72 _bytesUsed = value; 73 } 74 } 75 SetLength(int value)76 public void SetLength(int value) 77 { 78 Length = value; 79 Position = Math.Min(Position, value); 80 } 81 82 public override bool IsOpen => true; 83 OpenAsync(CancellationToken cancellationToken)84 public override Task OpenAsync(CancellationToken cancellationToken) 85 { 86 cancellationToken.ThrowIfCancellationRequested(); 87 return Task.CompletedTask; 88 } 89 Close()90 public override void Close() 91 { 92 /** do nothing **/ 93 } 94 Seek(int delta, SeekOrigin origin)95 public void Seek(int delta, SeekOrigin origin) 96 { 97 int newPos; 98 switch (origin) 99 { 100 case SeekOrigin.Begin: 101 newPos = delta; 102 break; 103 case SeekOrigin.Current: 104 newPos = Position + delta; 105 break; 106 case SeekOrigin.End: 107 newPos = _bytesUsed + delta; 108 break; 109 default: 110 throw new ArgumentException(nameof(origin)); 111 } 112 113 if ((0 > newPos) || (newPos > _bytesUsed)) 114 throw new ArgumentException(nameof(origin)); 115 Position = newPos; 116 117 ResetConsumedMessageSize(); 118 CountConsumedMessageBytes(Position); 119 } 120 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)121 public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 122 { 123 var count = Math.Min(Length - Position, length); 124 Buffer.BlockCopy(Bytes, Position, buffer, offset, count); 125 Position += count; 126 CountConsumedMessageBytes(count); 127 return new ValueTask<int>(count); 128 } 129 WriteAsync(byte[] buffer, CancellationToken cancellationToken)130 public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) 131 { 132 return WriteAsync(buffer, 0, buffer.Length, cancellationToken); 133 } 134 WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)135 public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) 136 { 137 var free = Length - Position; 138 Length = Length + count - free; 139 Buffer.BlockCopy(buffer, offset, Bytes, Position, count); 140 Position += count; 141 return Task.CompletedTask; 142 } 143 FlushAsync(CancellationToken cancellationToken)144 public override Task FlushAsync(CancellationToken cancellationToken) 145 { 146 cancellationToken.ThrowIfCancellationRequested(); 147 ResetConsumedMessageSize(); 148 return Task.CompletedTask; 149 } 150 GetBuffer()151 public byte[] GetBuffer() 152 { 153 var retval = new byte[Length]; 154 Buffer.BlockCopy(Bytes, 0, retval, 0, Length); 155 return retval; 156 } 157 TryGetBuffer(out ArraySegment<byte> bufSegment)158 internal bool TryGetBuffer(out ArraySegment<byte> bufSegment) 159 { 160 bufSegment = new ArraySegment<byte>(Bytes, 0, _bytesUsed); 161 return true; 162 } 163 164 // IDisposable Dispose(bool disposing)165 protected override void Dispose(bool disposing) 166 { 167 if (!IsDisposed) 168 { 169 if (disposing) 170 { 171 // nothing to do 172 } 173 } 174 IsDisposed = true; 175 } 176 } 177 } 178