1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/processor/PeekProcessor.h>
21 
22 using namespace apache::thrift::transport;
23 using namespace apache::thrift::protocol;
24 using namespace apache::thrift;
25 
26 namespace apache {
27 namespace thrift {
28 namespace processor {
29 
PeekProcessor()30 PeekProcessor::PeekProcessor() {
31   memoryBuffer_.reset(new TMemoryBuffer());
32   targetTransport_ = memoryBuffer_;
33 }
34 PeekProcessor::~PeekProcessor() = default;
35 
initialize(std::shared_ptr<TProcessor> actualProcessor,std::shared_ptr<TProtocolFactory> protocolFactory,std::shared_ptr<TPipedTransportFactory> transportFactory)36 void PeekProcessor::initialize(std::shared_ptr<TProcessor> actualProcessor,
37                                std::shared_ptr<TProtocolFactory> protocolFactory,
38                                std::shared_ptr<TPipedTransportFactory> transportFactory) {
39   actualProcessor_ = actualProcessor;
40   pipedProtocol_ = protocolFactory->getProtocol(targetTransport_);
41   transportFactory_ = transportFactory;
42   transportFactory_->initializeTargetTransport(targetTransport_);
43 }
44 
getPipedTransport(std::shared_ptr<TTransport> in)45 std::shared_ptr<TTransport> PeekProcessor::getPipedTransport(std::shared_ptr<TTransport> in) {
46   return transportFactory_->getTransport(in);
47 }
48 
setTargetTransport(std::shared_ptr<TTransport> targetTransport)49 void PeekProcessor::setTargetTransport(std::shared_ptr<TTransport> targetTransport) {
50   targetTransport_ = targetTransport;
51   if (std::dynamic_pointer_cast<TMemoryBuffer>(targetTransport_)) {
52     memoryBuffer_ = std::dynamic_pointer_cast<TMemoryBuffer>(targetTransport);
53   } else if (std::dynamic_pointer_cast<TPipedTransport>(targetTransport_)) {
54     memoryBuffer_ = std::dynamic_pointer_cast<TMemoryBuffer>(
55         std::dynamic_pointer_cast<TPipedTransport>(targetTransport_)->getTargetTransport());
56   }
57 
58   if (!memoryBuffer_) {
59     throw TException(
60         "Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer");
61   }
62 }
63 
process(std::shared_ptr<TProtocol> in,std::shared_ptr<TProtocol> out,void * connectionContext)64 bool PeekProcessor::process(std::shared_ptr<TProtocol> in,
65                             std::shared_ptr<TProtocol> out,
66                             void* connectionContext) {
67 
68   std::string fname;
69   TMessageType mtype;
70   int32_t seqid;
71   in->readMessageBegin(fname, mtype, seqid);
72 
73   if (mtype != T_CALL && mtype != T_ONEWAY) {
74     throw TException("Unexpected message type");
75   }
76 
77   // Peek at the name
78   peekName(fname);
79 
80   TType ftype;
81   int16_t fid;
82   while (true) {
83     in->readFieldBegin(fname, ftype, fid);
84     if (ftype == T_STOP) {
85       break;
86     }
87 
88     // Peek at the variable
89     peek(in, ftype, fid);
90     in->readFieldEnd();
91   }
92   in->readMessageEnd();
93   in->getTransport()->readEnd();
94 
95   //
96   // All the data is now in memoryBuffer_ and ready to be processed
97   //
98 
99   // Let's first take a peek at the full data in memory
100   uint8_t* buffer;
101   uint32_t size;
102   memoryBuffer_->getBuffer(&buffer, &size);
103   peekBuffer(buffer, size);
104 
105   // Done peeking at variables
106   peekEnd();
107 
108   bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext);
109   memoryBuffer_->resetBuffer();
110   return ret;
111 }
112 
peekName(const std::string & fname)113 void PeekProcessor::peekName(const std::string& fname) {
114   (void)fname;
115 }
116 
peekBuffer(uint8_t * buffer,uint32_t size)117 void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
118   (void)buffer;
119   (void)size;
120 }
121 
peek(std::shared_ptr<TProtocol> in,TType ftype,int16_t fid)122 void PeekProcessor::peek(std::shared_ptr<TProtocol> in, TType ftype, int16_t fid) {
123   (void)fid;
124   in->skip(ftype);
125 }
126 
peekEnd()127 void PeekProcessor::peekEnd() {
128 }
129 }
130 }
131 }
132