1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19import zmq 20from cStringIO import StringIO 21from thrift.transport.TTransport import TTransportBase, CReadableTransport 22 23 24class TZmqClient(TTransportBase, CReadableTransport): 25 def __init__(self, ctx, endpoint, sock_type): 26 self._sock = ctx.socket(sock_type) 27 self._endpoint = endpoint 28 self._wbuf = StringIO() 29 self._rbuf = StringIO() 30 31 def open(self): 32 self._sock.connect(self._endpoint) 33 34 def read(self, size): 35 ret = self._rbuf.read(size) 36 if len(ret) != 0: 37 return ret 38 self._read_message() 39 return self._rbuf.read(size) 40 41 def _read_message(self): 42 msg = self._sock.recv() 43 self._rbuf = StringIO(msg) 44 45 def write(self, buf): 46 self._wbuf.write(buf) 47 48 def flush(self): 49 msg = self._wbuf.getvalue() 50 self._wbuf = StringIO() 51 self._sock.send(msg) 52 53 # Implement the CReadableTransport interface. 54 @property 55 def cstringio_buf(self): 56 return self._rbuf 57 58 # NOTE: This will probably not actually work. 59 def cstringio_refill(self, prefix, reqlen): 60 while len(prefix) < reqlen: 61 self.read_message() 62 prefix += self._rbuf.getvalue() 63 self._rbuf = StringIO(prefix) 64 return self._rbuf 65