1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 package org.apache.thrift.async;
20 
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.util.concurrent.atomic.AtomicLong;
26 
27 import org.apache.thrift.TException;
28 import org.apache.thrift.protocol.TProtocol;
29 import org.apache.thrift.protocol.TProtocolFactory;
30 import org.apache.thrift.transport.layered.TFramedTransport;
31 import org.apache.thrift.transport.TMemoryBuffer;
32 import org.apache.thrift.transport.TNonblockingTransport;
33 import org.apache.thrift.transport.TTransportException;
34 
35 /**
36  * Encapsulates an async method call.
37  * <p>
38  * Need to generate:
39  * <ul>
40  *   <li>protected abstract void write_args(TProtocol protocol)</li>
41  *   <li>protected abstract T getResult() throws &lt;Exception_1&gt;, &lt;Exception_2&gt;, ...</li>
42  * </ul>
43  *
44  * @param <T> The return type of the encapsulated method call.
45  */
46 public abstract class TAsyncMethodCall<T> {
47 
48   private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
49   private static AtomicLong sequenceIdCounter = new AtomicLong(0);
50 
51   public static enum State {
52     CONNECTING,
53     WRITING_REQUEST_SIZE,
54     WRITING_REQUEST_BODY,
55     READING_RESPONSE_SIZE,
56     READING_RESPONSE_BODY,
57     RESPONSE_READ,
58     ERROR;
59   }
60 
61   /**
62    * Next step in the call, initialized by start()
63    */
64   private State state = null;
65 
66   protected final TNonblockingTransport transport;
67   private final TProtocolFactory protocolFactory;
68   protected final TAsyncClient client;
69   private final AsyncMethodCallback<T> callback;
70   private final boolean isOneway;
71   private long sequenceId;
72   private final long timeout;
73 
74   private ByteBuffer sizeBuffer;
75   private final byte[] sizeBufferArray = new byte[4];
76   private ByteBuffer frameBuffer;
77 
78   private long startTime = System.currentTimeMillis();
79 
TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway)80   protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
81     this.transport = transport;
82     this.callback = callback;
83     this.protocolFactory = protocolFactory;
84     this.client = client;
85     this.isOneway = isOneway;
86     this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement();
87     this.timeout = client.getTimeout();
88   }
89 
getState()90   protected State getState() {
91     return state;
92   }
93 
isFinished()94   protected boolean isFinished() {
95     return state == State.RESPONSE_READ;
96   }
97 
getStartTime()98   protected long getStartTime() {
99     return startTime;
100   }
101 
getSequenceId()102   protected long getSequenceId() {
103     return sequenceId;
104   }
105 
getClient()106   public TAsyncClient getClient() {
107     return client;
108   }
109 
hasTimeout()110   public boolean hasTimeout() {
111     return timeout > 0;
112   }
113 
getTimeoutTimestamp()114   public long getTimeoutTimestamp() {
115     return timeout + startTime;
116   }
117 
write_args(TProtocol protocol)118   protected abstract void write_args(TProtocol protocol) throws TException;
119 
getResult()120   protected abstract T getResult() throws Exception;
121 
122   /**
123    * Initialize buffers.
124    * @throws TException if buffer initialization fails
125    */
prepareMethodCall()126   protected void prepareMethodCall() throws TException {
127     TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE);
128     TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);
129     write_args(protocol);
130 
131     int length = memoryBuffer.length();
132     frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0, length);
133 
134     TFramedTransport.encodeFrameSize(length, sizeBufferArray);
135     sizeBuffer = ByteBuffer.wrap(sizeBufferArray);
136   }
137 
138   /**
139    * Register with selector and start first state, which could be either connecting or writing.
140    * @throws IOException if register or starting fails
141    */
start(Selector sel)142   void start(Selector sel) throws IOException {
143     SelectionKey key;
144     if (transport.isOpen()) {
145       state = State.WRITING_REQUEST_SIZE;
146       key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
147     } else {
148       state = State.CONNECTING;
149       key = transport.registerSelector(sel, SelectionKey.OP_CONNECT);
150 
151       // non-blocking connect can complete immediately,
152       // in which case we should not expect the OP_CONNECT
153       if (transport.startConnect()) {
154         registerForFirstWrite(key);
155       }
156     }
157 
158     key.attach(this);
159   }
160 
registerForFirstWrite(SelectionKey key)161   void registerForFirstWrite(SelectionKey key) throws IOException {
162     state = State.WRITING_REQUEST_SIZE;
163     key.interestOps(SelectionKey.OP_WRITE);
164   }
165 
getFrameBuffer()166   protected ByteBuffer getFrameBuffer() {
167     return frameBuffer;
168   }
169 
170   /**
171    * Transition to next state, doing whatever work is required. Since this
172    * method is only called by the selector thread, we can make changes to our
173    * select interests without worrying about concurrency.
174    * @param key
175    */
transition(SelectionKey key)176   void transition(SelectionKey key) {
177     // Ensure key is valid
178     if (!key.isValid()) {
179       key.cancel();
180       Exception e = new TTransportException("Selection key not valid!");
181       onError(e);
182       return;
183     }
184 
185     // Transition function
186     try {
187       switch (state) {
188         case CONNECTING:
189           doConnecting(key);
190           break;
191         case WRITING_REQUEST_SIZE:
192           doWritingRequestSize();
193           break;
194         case WRITING_REQUEST_BODY:
195           doWritingRequestBody(key);
196           break;
197         case READING_RESPONSE_SIZE:
198           doReadingResponseSize();
199           break;
200         case READING_RESPONSE_BODY:
201           doReadingResponseBody(key);
202           break;
203         default: // RESPONSE_READ, ERROR, or bug
204           throw new IllegalStateException("Method call in state " + state
205               + " but selector called transition method. Seems like a bug...");
206       }
207     } catch (Exception e) {
208       key.cancel();
209       key.attach(null);
210       onError(e);
211     }
212   }
213 
onError(Exception e)214   protected void onError(Exception e) {
215     client.onError(e);
216     callback.onError(e);
217     state = State.ERROR;
218   }
219 
doReadingResponseBody(SelectionKey key)220   private void doReadingResponseBody(SelectionKey key) throws TTransportException {
221     if (transport.read(frameBuffer) < 0) {
222       throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed");
223     }
224     if (frameBuffer.remaining() == 0) {
225       cleanUpAndFireCallback(key);
226     }
227   }
228 
cleanUpAndFireCallback(SelectionKey key)229   private void cleanUpAndFireCallback(SelectionKey key) {
230     state = State.RESPONSE_READ;
231     key.interestOps(0);
232     // this ensures that the TAsyncMethod instance doesn't hang around
233     key.attach(null);
234     try {
235       T result = this.getResult();
236       client.onComplete();
237       callback.onComplete(result);
238     } catch (Exception e) {
239       key.cancel();
240       onError(e);
241     }
242   }
243 
doReadingResponseSize()244   private void doReadingResponseSize() throws TTransportException {
245     if (transport.read(sizeBuffer) < 0) {
246       throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame size failed");
247     }
248     if (sizeBuffer.remaining() == 0) {
249       state = State.READING_RESPONSE_BODY;
250       frameBuffer = ByteBuffer.allocate(TFramedTransport.decodeFrameSize(sizeBufferArray));
251     }
252   }
253 
doWritingRequestBody(SelectionKey key)254   private void doWritingRequestBody(SelectionKey key) throws TTransportException {
255     if (transport.write(frameBuffer) < 0) {
256       throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed");
257     }
258     if (frameBuffer.remaining() == 0) {
259       if (isOneway) {
260         cleanUpAndFireCallback(key);
261       } else {
262         state = State.READING_RESPONSE_SIZE;
263         sizeBuffer.rewind();  // Prepare to read incoming frame size
264         key.interestOps(SelectionKey.OP_READ);
265       }
266     }
267   }
268 
doWritingRequestSize()269   private void doWritingRequestSize() throws TTransportException {
270     if (transport.write(sizeBuffer) < 0) {
271       throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame size failed");
272     }
273     if (sizeBuffer.remaining() == 0) {
274       state = State.WRITING_REQUEST_BODY;
275     }
276   }
277 
doConnecting(SelectionKey key)278   private void doConnecting(SelectionKey key) throws IOException {
279     if (!key.isConnectable() || !transport.finishConnect()) {
280       throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
281     }
282     registerForFirstWrite(key);
283   }
284 }
285