1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 package org.apache.thrift.async; 20 21 import java.io.IOException; 22 import java.nio.ByteBuffer; 23 import java.nio.channels.SelectionKey; 24 import java.nio.channels.Selector; 25 import java.util.concurrent.atomic.AtomicLong; 26 27 import org.apache.thrift.TException; 28 import org.apache.thrift.protocol.TProtocol; 29 import org.apache.thrift.protocol.TProtocolFactory; 30 import org.apache.thrift.transport.layered.TFramedTransport; 31 import org.apache.thrift.transport.TMemoryBuffer; 32 import org.apache.thrift.transport.TNonblockingTransport; 33 import org.apache.thrift.transport.TTransportException; 34 35 /** 36 * Encapsulates an async method call. 37 * <p> 38 * Need to generate: 39 * <ul> 40 * <li>protected abstract void write_args(TProtocol protocol)</li> 41 * <li>protected abstract T getResult() throws <Exception_1>, <Exception_2>, ...</li> 42 * </ul> 43 * 44 * @param <T> The return type of the encapsulated method call. 45 */ 46 public abstract class TAsyncMethodCall<T> { 47 48 private static final int INITIAL_MEMORY_BUFFER_SIZE = 128; 49 private static AtomicLong sequenceIdCounter = new AtomicLong(0); 50 51 public static enum State { 52 CONNECTING, 53 WRITING_REQUEST_SIZE, 54 WRITING_REQUEST_BODY, 55 READING_RESPONSE_SIZE, 56 READING_RESPONSE_BODY, 57 RESPONSE_READ, 58 ERROR; 59 } 60 61 /** 62 * Next step in the call, initialized by start() 63 */ 64 private State state = null; 65 66 protected final TNonblockingTransport transport; 67 private final TProtocolFactory protocolFactory; 68 protected final TAsyncClient client; 69 private final AsyncMethodCallback<T> callback; 70 private final boolean isOneway; 71 private long sequenceId; 72 private final long timeout; 73 74 private ByteBuffer sizeBuffer; 75 private final byte[] sizeBufferArray = new byte[4]; 76 private ByteBuffer frameBuffer; 77 78 private long startTime = System.currentTimeMillis(); 79 TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway)80 protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) { 81 this.transport = transport; 82 this.callback = callback; 83 this.protocolFactory = protocolFactory; 84 this.client = client; 85 this.isOneway = isOneway; 86 this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement(); 87 this.timeout = client.getTimeout(); 88 } 89 getState()90 protected State getState() { 91 return state; 92 } 93 isFinished()94 protected boolean isFinished() { 95 return state == State.RESPONSE_READ; 96 } 97 getStartTime()98 protected long getStartTime() { 99 return startTime; 100 } 101 getSequenceId()102 protected long getSequenceId() { 103 return sequenceId; 104 } 105 getClient()106 public TAsyncClient getClient() { 107 return client; 108 } 109 hasTimeout()110 public boolean hasTimeout() { 111 return timeout > 0; 112 } 113 getTimeoutTimestamp()114 public long getTimeoutTimestamp() { 115 return timeout + startTime; 116 } 117 write_args(TProtocol protocol)118 protected abstract void write_args(TProtocol protocol) throws TException; 119 getResult()120 protected abstract T getResult() throws Exception; 121 122 /** 123 * Initialize buffers. 124 * @throws TException if buffer initialization fails 125 */ prepareMethodCall()126 protected void prepareMethodCall() throws TException { 127 TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE); 128 TProtocol protocol = protocolFactory.getProtocol(memoryBuffer); 129 write_args(protocol); 130 131 int length = memoryBuffer.length(); 132 frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0, length); 133 134 TFramedTransport.encodeFrameSize(length, sizeBufferArray); 135 sizeBuffer = ByteBuffer.wrap(sizeBufferArray); 136 } 137 138 /** 139 * Register with selector and start first state, which could be either connecting or writing. 140 * @throws IOException if register or starting fails 141 */ start(Selector sel)142 void start(Selector sel) throws IOException { 143 SelectionKey key; 144 if (transport.isOpen()) { 145 state = State.WRITING_REQUEST_SIZE; 146 key = transport.registerSelector(sel, SelectionKey.OP_WRITE); 147 } else { 148 state = State.CONNECTING; 149 key = transport.registerSelector(sel, SelectionKey.OP_CONNECT); 150 151 // non-blocking connect can complete immediately, 152 // in which case we should not expect the OP_CONNECT 153 if (transport.startConnect()) { 154 registerForFirstWrite(key); 155 } 156 } 157 158 key.attach(this); 159 } 160 registerForFirstWrite(SelectionKey key)161 void registerForFirstWrite(SelectionKey key) throws IOException { 162 state = State.WRITING_REQUEST_SIZE; 163 key.interestOps(SelectionKey.OP_WRITE); 164 } 165 getFrameBuffer()166 protected ByteBuffer getFrameBuffer() { 167 return frameBuffer; 168 } 169 170 /** 171 * Transition to next state, doing whatever work is required. Since this 172 * method is only called by the selector thread, we can make changes to our 173 * select interests without worrying about concurrency. 174 * @param key 175 */ transition(SelectionKey key)176 void transition(SelectionKey key) { 177 // Ensure key is valid 178 if (!key.isValid()) { 179 key.cancel(); 180 Exception e = new TTransportException("Selection key not valid!"); 181 onError(e); 182 return; 183 } 184 185 // Transition function 186 try { 187 switch (state) { 188 case CONNECTING: 189 doConnecting(key); 190 break; 191 case WRITING_REQUEST_SIZE: 192 doWritingRequestSize(); 193 break; 194 case WRITING_REQUEST_BODY: 195 doWritingRequestBody(key); 196 break; 197 case READING_RESPONSE_SIZE: 198 doReadingResponseSize(); 199 break; 200 case READING_RESPONSE_BODY: 201 doReadingResponseBody(key); 202 break; 203 default: // RESPONSE_READ, ERROR, or bug 204 throw new IllegalStateException("Method call in state " + state 205 + " but selector called transition method. Seems like a bug..."); 206 } 207 } catch (Exception e) { 208 key.cancel(); 209 key.attach(null); 210 onError(e); 211 } 212 } 213 onError(Exception e)214 protected void onError(Exception e) { 215 client.onError(e); 216 callback.onError(e); 217 state = State.ERROR; 218 } 219 doReadingResponseBody(SelectionKey key)220 private void doReadingResponseBody(SelectionKey key) throws TTransportException { 221 if (transport.read(frameBuffer) < 0) { 222 throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed"); 223 } 224 if (frameBuffer.remaining() == 0) { 225 cleanUpAndFireCallback(key); 226 } 227 } 228 cleanUpAndFireCallback(SelectionKey key)229 private void cleanUpAndFireCallback(SelectionKey key) { 230 state = State.RESPONSE_READ; 231 key.interestOps(0); 232 // this ensures that the TAsyncMethod instance doesn't hang around 233 key.attach(null); 234 try { 235 T result = this.getResult(); 236 client.onComplete(); 237 callback.onComplete(result); 238 } catch (Exception e) { 239 key.cancel(); 240 onError(e); 241 } 242 } 243 doReadingResponseSize()244 private void doReadingResponseSize() throws TTransportException { 245 if (transport.read(sizeBuffer) < 0) { 246 throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame size failed"); 247 } 248 if (sizeBuffer.remaining() == 0) { 249 state = State.READING_RESPONSE_BODY; 250 frameBuffer = ByteBuffer.allocate(TFramedTransport.decodeFrameSize(sizeBufferArray)); 251 } 252 } 253 doWritingRequestBody(SelectionKey key)254 private void doWritingRequestBody(SelectionKey key) throws TTransportException { 255 if (transport.write(frameBuffer) < 0) { 256 throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed"); 257 } 258 if (frameBuffer.remaining() == 0) { 259 if (isOneway) { 260 cleanUpAndFireCallback(key); 261 } else { 262 state = State.READING_RESPONSE_SIZE; 263 sizeBuffer.rewind(); // Prepare to read incoming frame size 264 key.interestOps(SelectionKey.OP_READ); 265 } 266 } 267 } 268 doWritingRequestSize()269 private void doWritingRequestSize() throws TTransportException { 270 if (transport.write(sizeBuffer) < 0) { 271 throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame size failed"); 272 } 273 if (sizeBuffer.remaining() == 0) { 274 state = State.WRITING_REQUEST_BODY; 275 } 276 } 277 doConnecting(SelectionKey key)278 private void doConnecting(SelectionKey key) throws IOException { 279 if (!key.isConnectable() || !transport.finishConnect()) { 280 throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT"); 281 } 282 registerForFirstWrite(key); 283 } 284 } 285