1 /** @file
2   Licensed to the Apache Software Foundation (ASF) under one
3   or more contributor license agreements.  See the NOTICE file
4   distributed with this work for additional information
5   regarding copyright ownership.  The ASF licenses this file
6   to you under the Apache License, Version 2.0 (the
7   "License"); you may not use this file except in compliance
8   with the License.  You may obtain a copy of the License at
9 
10       http://www.apache.org/licenses/LICENSE-2.0
11 
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17  */
18 
19 #pragma once
20 
21 #include "ts/ts.h"
22 
23 #include "slice.h"
24 #include "util.h"
25 
26 #include <cinttypes>
27 
28 struct Channel {
29   TSVIO m_vio{nullptr};
30   TSIOBuffer m_iobuf{nullptr};
31   TSIOBufferReader m_reader{nullptr};
32 
~ChannelChannel33   ~Channel()
34   {
35     if (nullptr != m_reader) {
36       TSIOBufferReaderFree(m_reader);
37     }
38     if (nullptr != m_iobuf) {
39       TSIOBufferDestroy(m_iobuf);
40     }
41   }
42 
43   int64_t
drainReaderChannel44   drainReader()
45   {
46     int64_t consumed = 0;
47 
48     if (nullptr != m_reader && reader_avail_more_than(m_reader, 0)) {
49       int64_t const avail = TSIOBufferReaderAvail(m_reader);
50       TSIOBufferReaderConsume(m_reader, avail);
51       consumed = avail;
52       if (nullptr != m_vio) {
53         TSVIONDoneSet(m_vio, TSVIONDoneGet(m_vio) + consumed);
54       }
55     }
56 
57     return consumed;
58   }
59 
60   bool
setForReadChannel61   setForRead(TSVConn vc, TSCont contp, int64_t const bytesin)
62   {
63     TSAssert(nullptr != vc);
64     if (nullptr == m_iobuf) {
65       m_iobuf  = TSIOBufferCreate();
66       m_reader = TSIOBufferReaderAlloc(m_iobuf);
67     } else {
68       int64_t const drained = drainReader();
69       if (0 < drained) {
70         DEBUG_LOG("Drained from reader: %" PRId64, drained);
71       }
72     }
73     m_vio = TSVConnRead(vc, contp, m_iobuf, bytesin);
74     return nullptr != m_vio;
75   }
76 
77   bool
setForWriteChannel78   setForWrite(TSVConn vc, TSCont contp, int64_t const bytesout)
79   {
80     TSAssert(nullptr != vc);
81     if (nullptr == m_iobuf) {
82       m_iobuf  = TSIOBufferCreate();
83       m_reader = TSIOBufferReaderAlloc(m_iobuf);
84     } else {
85       int64_t const drained = drainReader();
86       if (0 < drained) {
87         DEBUG_LOG("Drained from reader: %" PRId64, drained);
88       }
89     }
90     m_vio = TSVConnWrite(vc, contp, m_reader, bytesout);
91     return nullptr != m_vio;
92   }
93 
94   void
closeChannel95   close()
96   {
97     if (nullptr != m_reader) {
98       drainReader();
99     }
100     m_vio = nullptr;
101   }
102 
103   bool
isOpenChannel104   isOpen() const
105   {
106     return nullptr != m_vio;
107   }
108 
109   bool
isDrainedChannel110   isDrained() const
111   {
112     return nullptr == m_reader || !reader_avail_more_than(m_reader, 0);
113   }
114 };
115 
116 struct Stage // upstream or downstream (server or client)
117 {
118   Stage(Stage const &) = delete;
119   Stage &operator=(Stage const &) = delete;
120 
121   TSVConn m_vc{nullptr};
122   Channel m_read;
123   Channel m_write;
124 
StageStage125   Stage() {}
~StageStage126   ~Stage()
127   {
128     if (nullptr != m_vc) {
129       TSVConnClose(m_vc);
130     }
131   }
132 
133   void
setupConnectionStage134   setupConnection(TSVConn vc)
135   {
136     if (nullptr != m_vc) {
137       TSVConnClose(m_vc);
138     }
139     m_read.close();
140     m_write.close();
141     m_vc = vc;
142   }
143 
144   void
setupVioReadStage145   setupVioRead(TSCont contp, int64_t const bytesin)
146   {
147     m_read.setForRead(m_vc, contp, bytesin);
148   }
149 
150   void
setupVioWriteStage151   setupVioWrite(TSCont contp, int64_t const bytesout)
152   {
153     m_write.setForWrite(m_vc, contp, bytesout);
154   }
155 
156   void
abortStage157   abort()
158   {
159     if (nullptr != m_vc) {
160       TSVConnAbort(m_vc, TS_VC_CLOSE_ABORT);
161       m_vc = nullptr;
162     }
163     m_read.close();
164     m_write.close();
165   }
166 
167   void
closeStage168   close()
169   {
170     if (nullptr != m_vc) {
171       TSVConnClose(m_vc);
172       m_vc = nullptr;
173     }
174     m_read.close();
175     m_write.close();
176   }
177 
178   bool
isOpenStage179   isOpen() const
180   {
181     return nullptr != m_vc && (m_read.isOpen() || m_write.isOpen());
182   }
183 };
184