1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Diagnostics;
20 using System.IO;
21 using System.Threading;
22 using System.Threading.Tasks;
23 
24 namespace Thrift.Transport.Client
25 {
26     // ReSharper disable once InconsistentNaming
27     public class TMemoryBufferTransport : TEndpointTransport
28     {
29         private bool IsDisposed;
30         private byte[] Bytes;
31         private int _bytesUsed;
32 
TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048)33         public TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048)
34             : base(config)
35         {
36             Bytes = new byte[initialCapacity];
37         }
38 
TMemoryBufferTransport(byte[] buf, TConfiguration config)39         public TMemoryBufferTransport(byte[] buf, TConfiguration config)
40             :base(config)
41         {
42             Bytes = (byte[])buf.Clone();
43             _bytesUsed = Bytes.Length;
44             UpdateKnownMessageSize(_bytesUsed);
45         }
46 
47         public int Position { get; set; }
48 
49         public int Capacity
50         {
51             get
52             {
53                 Debug.Assert(_bytesUsed <= Bytes.Length);
54                 return Bytes.Length;
55             }
56             set
57             {
58                 Array.Resize(ref Bytes, value);
59                 _bytesUsed = value;
60             }
61         }
62 
63         public int Length
64         {
65             get {
66                 Debug.Assert(_bytesUsed <= Bytes.Length);
67                 return _bytesUsed;
68             }
69             set {
70                 if ((Bytes.Length < value) || (Bytes.Length > (10 * value)))
71                     Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25)));
72                 _bytesUsed = value;
73             }
74         }
75 
SetLength(int value)76         public void SetLength(int value)
77         {
78             Length = value;
79             Position = Math.Min(Position, value);
80         }
81 
82         public override bool IsOpen => true;
83 
OpenAsync(CancellationToken cancellationToken)84         public override Task OpenAsync(CancellationToken cancellationToken)
85         {
86             cancellationToken.ThrowIfCancellationRequested();
87             return Task.CompletedTask;
88         }
89 
Close()90         public override void Close()
91         {
92             /** do nothing **/
93         }
94 
Seek(int delta, SeekOrigin origin)95         public void Seek(int delta, SeekOrigin origin)
96         {
97             int newPos;
98             switch (origin)
99             {
100                 case SeekOrigin.Begin:
101                     newPos = delta;
102                     break;
103                 case SeekOrigin.Current:
104                     newPos = Position + delta;
105                     break;
106                 case SeekOrigin.End:
107                     newPos = _bytesUsed + delta;
108                     break;
109                 default:
110                     throw new ArgumentException(nameof(origin));
111             }
112 
113             if ((0 > newPos) || (newPos > _bytesUsed))
114                 throw new ArgumentException(nameof(origin));
115             Position = newPos;
116 
117             ResetConsumedMessageSize();
118             CountConsumedMessageBytes(Position);
119         }
120 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)121         public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
122         {
123             var count = Math.Min(Length - Position, length);
124             Buffer.BlockCopy(Bytes, Position, buffer, offset, count);
125             Position += count;
126             CountConsumedMessageBytes(count);
127             return new ValueTask<int>(count);
128         }
129 
WriteAsync(byte[] buffer, CancellationToken cancellationToken)130         public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
131         {
132             return WriteAsync(buffer, 0, buffer.Length, cancellationToken);
133         }
134 
WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)135         public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
136         {
137             var free = Length - Position;
138             Length = Length + count - free;
139             Buffer.BlockCopy(buffer, offset, Bytes, Position, count);
140             Position += count;
141             return Task.CompletedTask;
142         }
143 
FlushAsync(CancellationToken cancellationToken)144         public override Task FlushAsync(CancellationToken cancellationToken)
145         {
146             cancellationToken.ThrowIfCancellationRequested();
147             ResetConsumedMessageSize();
148             return Task.CompletedTask;
149         }
150 
GetBuffer()151         public byte[] GetBuffer()
152         {
153             var retval = new byte[Length];
154             Buffer.BlockCopy(Bytes, 0, retval, 0, Length);
155             return retval;
156         }
157 
TryGetBuffer(out ArraySegment<byte> bufSegment)158         internal bool TryGetBuffer(out ArraySegment<byte> bufSegment)
159         {
160             bufSegment = new ArraySegment<byte>(Bytes, 0, _bytesUsed);
161             return true;
162         }
163 
164         // IDisposable
Dispose(bool disposing)165         protected override void Dispose(bool disposing)
166         {
167             if (!IsDisposed)
168             {
169                 if (disposing)
170                 {
171                     // nothing to do
172                 }
173             }
174             IsDisposed = true;
175         }
176     }
177 }
178