1 /** @file 2 Licensed to the Apache Software Foundation (ASF) under one 3 or more contributor license agreements. See the NOTICE file 4 distributed with this work for additional information 5 regarding copyright ownership. The ASF licenses this file 6 to you under the Apache License, Version 2.0 (the 7 "License"); you may not use this file except in compliance 8 with the License. You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17 */ 18 19 #pragma once 20 21 #include "ts/ts.h" 22 23 #include "slice.h" 24 #include "util.h" 25 26 #include <cinttypes> 27 28 struct Channel { 29 TSVIO m_vio{nullptr}; 30 TSIOBuffer m_iobuf{nullptr}; 31 TSIOBufferReader m_reader{nullptr}; 32 ~ChannelChannel33 ~Channel() 34 { 35 if (nullptr != m_reader) { 36 TSIOBufferReaderFree(m_reader); 37 } 38 if (nullptr != m_iobuf) { 39 TSIOBufferDestroy(m_iobuf); 40 } 41 } 42 43 int64_t drainReaderChannel44 drainReader() 45 { 46 int64_t consumed = 0; 47 48 if (nullptr != m_reader && reader_avail_more_than(m_reader, 0)) { 49 int64_t const avail = TSIOBufferReaderAvail(m_reader); 50 TSIOBufferReaderConsume(m_reader, avail); 51 consumed = avail; 52 if (nullptr != m_vio) { 53 TSVIONDoneSet(m_vio, TSVIONDoneGet(m_vio) + consumed); 54 } 55 } 56 57 return consumed; 58 } 59 60 bool setForReadChannel61 setForRead(TSVConn vc, TSCont contp, int64_t const bytesin) 62 { 63 TSAssert(nullptr != vc); 64 if (nullptr == m_iobuf) { 65 m_iobuf = TSIOBufferCreate(); 66 m_reader = TSIOBufferReaderAlloc(m_iobuf); 67 } else { 68 int64_t const drained = drainReader(); 69 if (0 < drained) { 70 DEBUG_LOG("Drained from reader: %" PRId64, drained); 71 } 72 } 73 m_vio = TSVConnRead(vc, contp, m_iobuf, bytesin); 74 return nullptr != m_vio; 75 } 76 77 bool setForWriteChannel78 setForWrite(TSVConn vc, TSCont contp, int64_t const bytesout) 79 { 80 TSAssert(nullptr != vc); 81 if (nullptr == m_iobuf) { 82 m_iobuf = TSIOBufferCreate(); 83 m_reader = TSIOBufferReaderAlloc(m_iobuf); 84 } else { 85 int64_t const drained = drainReader(); 86 if (0 < drained) { 87 DEBUG_LOG("Drained from reader: %" PRId64, drained); 88 } 89 } 90 m_vio = TSVConnWrite(vc, contp, m_reader, bytesout); 91 return nullptr != m_vio; 92 } 93 94 void closeChannel95 close() 96 { 97 if (nullptr != m_reader) { 98 drainReader(); 99 } 100 m_vio = nullptr; 101 } 102 103 bool isOpenChannel104 isOpen() const 105 { 106 return nullptr != m_vio; 107 } 108 109 bool isDrainedChannel110 isDrained() const 111 { 112 return nullptr == m_reader || !reader_avail_more_than(m_reader, 0); 113 } 114 }; 115 116 struct Stage // upstream or downstream (server or client) 117 { 118 Stage(Stage const &) = delete; 119 Stage &operator=(Stage const &) = delete; 120 121 TSVConn m_vc{nullptr}; 122 Channel m_read; 123 Channel m_write; 124 StageStage125 Stage() {} ~StageStage126 ~Stage() 127 { 128 if (nullptr != m_vc) { 129 TSVConnClose(m_vc); 130 } 131 } 132 133 void setupConnectionStage134 setupConnection(TSVConn vc) 135 { 136 if (nullptr != m_vc) { 137 TSVConnClose(m_vc); 138 } 139 m_read.close(); 140 m_write.close(); 141 m_vc = vc; 142 } 143 144 void setupVioReadStage145 setupVioRead(TSCont contp, int64_t const bytesin) 146 { 147 m_read.setForRead(m_vc, contp, bytesin); 148 } 149 150 void setupVioWriteStage151 setupVioWrite(TSCont contp, int64_t const bytesout) 152 { 153 m_write.setForWrite(m_vc, contp, bytesout); 154 } 155 156 void abortStage157 abort() 158 { 159 if (nullptr != m_vc) { 160 TSVConnAbort(m_vc, TS_VC_CLOSE_ABORT); 161 m_vc = nullptr; 162 } 163 m_read.close(); 164 m_write.close(); 165 } 166 167 void closeStage168 close() 169 { 170 if (nullptr != m_vc) { 171 TSVConnClose(m_vc); 172 m_vc = nullptr; 173 } 174 m_read.close(); 175 m_write.close(); 176 } 177 178 bool isOpenStage179 isOpen() const 180 { 181 return nullptr != m_vc && (m_read.isOpen() || m_write.isOpen()); 182 } 183 }; 184