1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
22 
23 #include <assert.h>
24 #include <string.h>
25 
26 #include "absl/types/optional.h"
27 
28 #include <grpc/compression.h>
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/compression/algorithm_metadata.h"
35 #include "src/core/lib/compression/compression_args.h"
36 #include "src/core/lib/compression/compression_internal.h"
37 #include "src/core/lib/compression/message_compress.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gprpp/manual_constructor.h"
40 #include "src/core/lib/profiling/timers.h"
41 #include "src/core/lib/slice/slice_internal.h"
42 #include "src/core/lib/slice/slice_string_helpers.h"
43 #include "src/core/lib/surface/call.h"
44 #include "src/core/lib/transport/static_metadata.h"
45 
46 namespace {
47 
48 class ChannelData {
49  public:
ChannelData(grpc_channel_element_args * args)50   explicit ChannelData(grpc_channel_element_args* args) {
51     // Get the enabled and the default algorithms from channel args.
52     enabled_compression_algorithms_bitset_ =
53         grpc_channel_args_compression_algorithm_get_states(args->channel_args);
54     default_compression_algorithm_ =
55         grpc_channel_args_get_channel_default_compression_algorithm(
56             args->channel_args);
57     // Make sure the default is enabled.
58     if (size_t(default_compression_algorithm_) >= 32 ||
59         !grpc_core::GetBit(enabled_compression_algorithms_bitset_,
60                            default_compression_algorithm_)) {
61       const char* name;
62       if (!grpc_compression_algorithm_name(default_compression_algorithm_,
63                                            &name)) {
64         name = "<unknown>";
65       }
66       gpr_log(GPR_ERROR,
67               "default compression algorithm %s not enabled: switching to none",
68               name);
69       default_compression_algorithm_ = GRPC_COMPRESS_NONE;
70     }
71     enabled_message_compression_algorithms_bitset_ =
72         grpc_compression_bitset_to_message_bitset(
73             enabled_compression_algorithms_bitset_);
74     enabled_stream_compression_algorithms_bitset_ =
75         grpc_compression_bitset_to_stream_bitset(
76             enabled_compression_algorithms_bitset_);
77     GPR_ASSERT(!args->is_last);
78   }
79 
default_compression_algorithm() const80   grpc_compression_algorithm default_compression_algorithm() const {
81     return default_compression_algorithm_;
82   }
83 
enabled_compression_algorithms_bitset() const84   uint32_t enabled_compression_algorithms_bitset() const {
85     return enabled_compression_algorithms_bitset_;
86   }
87 
enabled_message_compression_algorithms_bitset() const88   uint32_t enabled_message_compression_algorithms_bitset() const {
89     return enabled_message_compression_algorithms_bitset_;
90   }
91 
enabled_stream_compression_algorithms_bitset() const92   uint32_t enabled_stream_compression_algorithms_bitset() const {
93     return enabled_stream_compression_algorithms_bitset_;
94   }
95 
96  private:
97   /** The default, channel-level, compression algorithm */
98   grpc_compression_algorithm default_compression_algorithm_;
99   /** Bitset of enabled compression algorithms */
100   uint32_t enabled_compression_algorithms_bitset_;
101   /** Bitset of enabled message compression algorithms */
102   uint32_t enabled_message_compression_algorithms_bitset_;
103   /** Bitset of enabled stream compression algorithms */
104   uint32_t enabled_stream_compression_algorithms_bitset_;
105 };
106 
107 class CallData {
108  public:
CallData(grpc_call_element * elem,const grpc_call_element_args & args)109   CallData(grpc_call_element* elem, const grpc_call_element_args& args)
110       : call_combiner_(args.call_combiner) {
111     ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
112     // The call's message compression algorithm is set to channel's default
113     // setting. It can be overridden later by initial metadata.
114     if (GPR_LIKELY(
115             grpc_core::GetBit(channeld->enabled_compression_algorithms_bitset(),
116                               channeld->default_compression_algorithm()))) {
117       message_compression_algorithm_ =
118           grpc_compression_algorithm_to_message_compression_algorithm(
119               channeld->default_compression_algorithm());
120     }
121     GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner_,
122                       StartSendMessageBatch, elem, grpc_schedule_on_exec_ctx);
123   }
124 
~CallData()125   ~CallData() {
126     if (state_initialized_) {
127       grpc_slice_buffer_destroy_internal(&slices_);
128     }
129     GRPC_ERROR_UNREF(cancel_error_);
130   }
131 
132   void CompressStartTransportStreamOpBatch(
133       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
134 
135  private:
136   bool SkipMessageCompression();
137   void InitializeState(grpc_call_element* elem);
138 
139   grpc_error_handle ProcessSendInitialMetadata(
140       grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
141 
142   // Methods for processing a send_message batch
143   static void StartSendMessageBatch(void* elem_arg, grpc_error_handle unused);
144   static void OnSendMessageNextDone(void* elem_arg, grpc_error_handle error);
145   grpc_error_handle PullSliceFromSendMessage();
146   void ContinueReadingSendMessage(grpc_call_element* elem);
147   void FinishSendMessage(grpc_call_element* elem);
148   void SendMessageBatchContinue(grpc_call_element* elem);
149   static void FailSendMessageBatchInCallCombiner(void* calld_arg,
150                                                  grpc_error_handle error);
151 
152   static void SendMessageOnComplete(void* calld_arg, grpc_error_handle error);
153 
154   grpc_core::CallCombiner* call_combiner_;
155   grpc_message_compression_algorithm message_compression_algorithm_ =
156       GRPC_MESSAGE_COMPRESS_NONE;
157   grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
158   grpc_transport_stream_op_batch* send_message_batch_ = nullptr;
159   bool seen_initial_metadata_ = false;
160   /* Set to true, if the fields below are initialized. */
161   bool state_initialized_ = false;
162   grpc_closure start_send_message_batch_in_call_combiner_;
163   /* The fields below are only initialized when we compress the payload.
164    * Keep them at the bottom of the struct, so they don't pollute the
165    * cache-lines. */
166   grpc_linked_mdelem message_compression_algorithm_storage_;
167   grpc_linked_mdelem stream_compression_algorithm_storage_;
168   grpc_linked_mdelem accept_encoding_storage_;
169   grpc_linked_mdelem accept_stream_encoding_storage_;
170   grpc_slice_buffer slices_; /**< Buffers up input slices to be compressed */
171   // Allocate space for the replacement stream
172   std::aligned_storage<sizeof(grpc_core::SliceBufferByteStream),
173                        alignof(grpc_core::SliceBufferByteStream)>::type
174       replacement_stream_;
175   grpc_closure* original_send_message_on_complete_ = nullptr;
176   grpc_closure send_message_on_complete_;
177   grpc_closure on_send_message_next_done_;
178 };
179 
180 // Returns true if we should skip message compression for the current message.
SkipMessageCompression()181 bool CallData::SkipMessageCompression() {
182   // If the flags of this message indicate that it shouldn't be compressed, we
183   // skip message compression.
184   uint32_t flags =
185       send_message_batch_->payload->send_message.send_message->flags();
186   if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
187     return true;
188   }
189   // If this call doesn't have any message compression algorithm set, skip
190   // message compression.
191   return message_compression_algorithm_ == GRPC_MESSAGE_COMPRESS_NONE;
192 }
193 
194 // Determines the compression algorithm from the initial metadata and the
195 // channel's default setting.
FindCompressionAlgorithm(grpc_metadata_batch * initial_metadata,ChannelData * channeld)196 grpc_compression_algorithm FindCompressionAlgorithm(
197     grpc_metadata_batch* initial_metadata, ChannelData* channeld) {
198   if (initial_metadata->legacy_index()->named.grpc_internal_encoding_request ==
199       nullptr) {
200     return channeld->default_compression_algorithm();
201   }
202   grpc_compression_algorithm compression_algorithm;
203   // Parse the compression algorithm from the initial metadata.
204   grpc_mdelem md = initial_metadata->legacy_index()
205                        ->named.grpc_internal_encoding_request->md;
206   GPR_ASSERT(grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
207                                               &compression_algorithm));
208   // Remove this metadata since it's an internal one (i.e., it won't be
209   // transmitted out).
210   initial_metadata->Remove(GRPC_BATCH_GRPC_INTERNAL_ENCODING_REQUEST);
211   // Check if that algorithm is enabled. Note that GRPC_COMPRESS_NONE is always
212   // enabled.
213   // TODO(juanlishen): Maybe use channel default or abort() if the algorithm
214   // from the initial metadata is disabled.
215   if (GPR_LIKELY(
216           grpc_core::GetBit(channeld->enabled_compression_algorithms_bitset(),
217                             compression_algorithm))) {
218     return compression_algorithm;
219   }
220   const char* algorithm_name;
221   GPR_ASSERT(
222       grpc_compression_algorithm_name(compression_algorithm, &algorithm_name));
223   gpr_log(GPR_ERROR,
224           "Invalid compression algorithm from initial metadata: '%s' "
225           "(previously disabled). "
226           "Will not compress.",
227           algorithm_name);
228   return GRPC_COMPRESS_NONE;
229 }
230 
InitializeState(grpc_call_element * elem)231 void CallData::InitializeState(grpc_call_element* elem) {
232   GPR_DEBUG_ASSERT(!state_initialized_);
233   state_initialized_ = true;
234   grpc_slice_buffer_init(&slices_);
235   GRPC_CLOSURE_INIT(&send_message_on_complete_, SendMessageOnComplete, this,
236                     grpc_schedule_on_exec_ctx);
237   GRPC_CLOSURE_INIT(&on_send_message_next_done_, OnSendMessageNextDone, elem,
238                     grpc_schedule_on_exec_ctx);
239 }
240 
ProcessSendInitialMetadata(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)241 grpc_error_handle CallData::ProcessSendInitialMetadata(
242     grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
243   ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
244   // Find the compression algorithm.
245   grpc_compression_algorithm compression_algorithm =
246       FindCompressionAlgorithm(initial_metadata, channeld);
247   // Note that at most one of the following algorithms can be set.
248   message_compression_algorithm_ =
249       grpc_compression_algorithm_to_message_compression_algorithm(
250           compression_algorithm);
251   grpc_stream_compression_algorithm stream_compression_algorithm =
252       grpc_compression_algorithm_to_stream_compression_algorithm(
253           compression_algorithm);
254   // Hint compression algorithm.
255   grpc_error_handle error = GRPC_ERROR_NONE;
256   if (message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) {
257     InitializeState(elem);
258     error = grpc_metadata_batch_add_tail(
259         initial_metadata, &message_compression_algorithm_storage_,
260         grpc_message_compression_encoding_mdelem(
261             message_compression_algorithm_),
262         GRPC_BATCH_GRPC_ENCODING);
263   } else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) {
264     InitializeState(elem);
265     error = grpc_metadata_batch_add_tail(
266         initial_metadata, &stream_compression_algorithm_storage_,
267         grpc_stream_compression_encoding_mdelem(stream_compression_algorithm),
268         GRPC_BATCH_CONTENT_ENCODING);
269   }
270   if (error != GRPC_ERROR_NONE) return error;
271   // Convey supported compression algorithms.
272   error = grpc_metadata_batch_add_tail(
273       initial_metadata, &accept_encoding_storage_,
274       GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
275           channeld->enabled_message_compression_algorithms_bitset()),
276       GRPC_BATCH_GRPC_ACCEPT_ENCODING);
277   if (error != GRPC_ERROR_NONE) return error;
278   // Do not overwrite accept-encoding header if it already presents (e.g., added
279   // by some proxy).
280   if (!initial_metadata->legacy_index()->named.accept_encoding) {
281     error = grpc_metadata_batch_add_tail(
282         initial_metadata, &accept_stream_encoding_storage_,
283         GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS(
284             channeld->enabled_stream_compression_algorithms_bitset()),
285         GRPC_BATCH_ACCEPT_ENCODING);
286   }
287   return error;
288 }
289 
SendMessageOnComplete(void * calld_arg,grpc_error_handle error)290 void CallData::SendMessageOnComplete(void* calld_arg, grpc_error_handle error) {
291   CallData* calld = static_cast<CallData*>(calld_arg);
292   grpc_slice_buffer_reset_and_unref_internal(&calld->slices_);
293   grpc_core::Closure::Run(DEBUG_LOCATION,
294                           calld->original_send_message_on_complete_,
295                           GRPC_ERROR_REF(error));
296 }
297 
SendMessageBatchContinue(grpc_call_element * elem)298 void CallData::SendMessageBatchContinue(grpc_call_element* elem) {
299   // Note: The call to grpc_call_next_op() results in yielding the
300   // call combiner, so we need to clear send_message_batch_ before we do that.
301   grpc_transport_stream_op_batch* send_message_batch = send_message_batch_;
302   send_message_batch_ = nullptr;
303   grpc_call_next_op(elem, send_message_batch);
304 }
305 
FinishSendMessage(grpc_call_element * elem)306 void CallData::FinishSendMessage(grpc_call_element* elem) {
307   GPR_DEBUG_ASSERT(message_compression_algorithm_ !=
308                    GRPC_MESSAGE_COMPRESS_NONE);
309   // Compress the data if appropriate.
310   grpc_slice_buffer tmp;
311   grpc_slice_buffer_init(&tmp);
312   uint32_t send_flags =
313       send_message_batch_->payload->send_message.send_message->flags();
314   bool did_compress =
315       grpc_msg_compress(message_compression_algorithm_, &slices_, &tmp);
316   if (did_compress) {
317     if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
318       const char* algo_name;
319       const size_t before_size = slices_.length;
320       const size_t after_size = tmp.length;
321       const float savings_ratio = 1.0f - static_cast<float>(after_size) /
322                                              static_cast<float>(before_size);
323       GPR_ASSERT(grpc_message_compression_algorithm_name(
324           message_compression_algorithm_, &algo_name));
325       gpr_log(GPR_INFO,
326               "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
327               " bytes (%.2f%% savings)",
328               algo_name, before_size, after_size, 100 * savings_ratio);
329     }
330     grpc_slice_buffer_swap(&slices_, &tmp);
331     send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
332   } else {
333     if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
334       const char* algo_name;
335       GPR_ASSERT(grpc_message_compression_algorithm_name(
336           message_compression_algorithm_, &algo_name));
337       gpr_log(GPR_INFO,
338               "Algorithm '%s' enabled but decided not to compress. Input size: "
339               "%" PRIuPTR,
340               algo_name, slices_.length);
341     }
342   }
343   grpc_slice_buffer_destroy_internal(&tmp);
344   // Swap out the original byte stream with our new one and send the
345   // batch down.
346   new (&replacement_stream_)
347       grpc_core::SliceBufferByteStream(&slices_, send_flags);
348   send_message_batch_->payload->send_message.send_message.reset(
349       reinterpret_cast<grpc_core::SliceBufferByteStream*>(
350           &replacement_stream_));
351   original_send_message_on_complete_ = send_message_batch_->on_complete;
352   send_message_batch_->on_complete = &send_message_on_complete_;
353   SendMessageBatchContinue(elem);
354 }
355 
FailSendMessageBatchInCallCombiner(void * calld_arg,grpc_error_handle error)356 void CallData::FailSendMessageBatchInCallCombiner(void* calld_arg,
357                                                   grpc_error_handle error) {
358   CallData* calld = static_cast<CallData*>(calld_arg);
359   if (calld->send_message_batch_ != nullptr) {
360     grpc_transport_stream_op_batch_finish_with_failure(
361         calld->send_message_batch_, GRPC_ERROR_REF(error),
362         calld->call_combiner_);
363     calld->send_message_batch_ = nullptr;
364   }
365 }
366 
367 // Pulls a slice from the send_message byte stream and adds it to slices_.
PullSliceFromSendMessage()368 grpc_error_handle CallData::PullSliceFromSendMessage() {
369   grpc_slice incoming_slice;
370   grpc_error_handle error =
371       send_message_batch_->payload->send_message.send_message->Pull(
372           &incoming_slice);
373   if (error == GRPC_ERROR_NONE) {
374     grpc_slice_buffer_add(&slices_, incoming_slice);
375   }
376   return error;
377 }
378 
379 // Reads as many slices as possible from the send_message byte stream.
380 // If all data has been read, invokes FinishSendMessage().  Otherwise,
381 // an async call to ByteStream::Next() has been started, which will
382 // eventually result in calling OnSendMessageNextDone().
ContinueReadingSendMessage(grpc_call_element * elem)383 void CallData::ContinueReadingSendMessage(grpc_call_element* elem) {
384   if (slices_.length ==
385       send_message_batch_->payload->send_message.send_message->length()) {
386     FinishSendMessage(elem);
387     return;
388   }
389   while (send_message_batch_->payload->send_message.send_message->Next(
390       ~static_cast<size_t>(0), &on_send_message_next_done_)) {
391     grpc_error_handle error = PullSliceFromSendMessage();
392     if (error != GRPC_ERROR_NONE) {
393       // Closure callback; does not take ownership of error.
394       FailSendMessageBatchInCallCombiner(this, error);
395       GRPC_ERROR_UNREF(error);
396       return;
397     }
398     if (slices_.length ==
399         send_message_batch_->payload->send_message.send_message->length()) {
400       FinishSendMessage(elem);
401       break;
402     }
403   }
404 }
405 
406 // Async callback for ByteStream::Next().
OnSendMessageNextDone(void * elem_arg,grpc_error_handle error)407 void CallData::OnSendMessageNextDone(void* elem_arg, grpc_error_handle error) {
408   grpc_call_element* elem = static_cast<grpc_call_element*>(elem_arg);
409   CallData* calld = static_cast<CallData*>(elem->call_data);
410   if (error != GRPC_ERROR_NONE) {
411     // Closure callback; does not take ownership of error.
412     FailSendMessageBatchInCallCombiner(calld, error);
413     return;
414   }
415   error = calld->PullSliceFromSendMessage();
416   if (error != GRPC_ERROR_NONE) {
417     // Closure callback; does not take ownership of error.
418     FailSendMessageBatchInCallCombiner(calld, error);
419     GRPC_ERROR_UNREF(error);
420     return;
421   }
422   if (calld->slices_.length == calld->send_message_batch_->payload->send_message
423                                    .send_message->length()) {
424     calld->FinishSendMessage(elem);
425   } else {
426     calld->ContinueReadingSendMessage(elem);
427   }
428 }
429 
StartSendMessageBatch(void * elem_arg,grpc_error_handle)430 void CallData::StartSendMessageBatch(void* elem_arg,
431                                      grpc_error_handle /*unused*/) {
432   grpc_call_element* elem = static_cast<grpc_call_element*>(elem_arg);
433   CallData* calld = static_cast<CallData*>(elem->call_data);
434   if (calld->SkipMessageCompression()) {
435     calld->SendMessageBatchContinue(elem);
436   } else {
437     calld->ContinueReadingSendMessage(elem);
438   }
439 }
440 
CompressStartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)441 void CallData::CompressStartTransportStreamOpBatch(
442     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
443   GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0);
444   // Handle cancel_stream.
445   if (batch->cancel_stream) {
446     GRPC_ERROR_UNREF(cancel_error_);
447     cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
448     if (send_message_batch_ != nullptr) {
449       if (!seen_initial_metadata_) {
450         GRPC_CALL_COMBINER_START(
451             call_combiner_,
452             GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, this,
453                                 grpc_schedule_on_exec_ctx),
454             GRPC_ERROR_REF(cancel_error_), "failing send_message op");
455       } else {
456         send_message_batch_->payload->send_message.send_message->Shutdown(
457             GRPC_ERROR_REF(cancel_error_));
458       }
459     }
460   } else if (cancel_error_ != GRPC_ERROR_NONE) {
461     grpc_transport_stream_op_batch_finish_with_failure(
462         batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
463     return;
464   }
465   // Handle send_initial_metadata.
466   if (batch->send_initial_metadata) {
467     GPR_ASSERT(!seen_initial_metadata_);
468     grpc_error_handle error = ProcessSendInitialMetadata(
469         elem, batch->payload->send_initial_metadata.send_initial_metadata);
470     if (error != GRPC_ERROR_NONE) {
471       grpc_transport_stream_op_batch_finish_with_failure(batch, error,
472                                                          call_combiner_);
473       return;
474     }
475     seen_initial_metadata_ = true;
476     // If we had previously received a batch containing a send_message op,
477     // handle it now.  Note that we need to re-enter the call combiner
478     // for this, since we can't send two batches down while holding the
479     // call combiner, since the connected_channel filter (at the bottom of
480     // the call stack) will release the call combiner for each batch it sees.
481     if (send_message_batch_ != nullptr) {
482       GRPC_CALL_COMBINER_START(
483           call_combiner_, &start_send_message_batch_in_call_combiner_,
484           GRPC_ERROR_NONE, "starting send_message after send_initial_metadata");
485     }
486   }
487   // Handle send_message.
488   if (batch->send_message) {
489     GPR_ASSERT(send_message_batch_ == nullptr);
490     send_message_batch_ = batch;
491     // If we have not yet seen send_initial_metadata, then we have to
492     // wait.  We save the batch and then drop the call combiner, which we'll
493     // have to pick up again later when we get send_initial_metadata.
494     if (!seen_initial_metadata_) {
495       GRPC_CALL_COMBINER_STOP(
496           call_combiner_, "send_message batch pending send_initial_metadata");
497       return;
498     }
499     StartSendMessageBatch(elem, GRPC_ERROR_NONE);
500   } else {
501     // Pass control down the stack.
502     grpc_call_next_op(elem, batch);
503   }
504 }
505 
CompressStartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)506 void CompressStartTransportStreamOpBatch(
507     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
508   CallData* calld = static_cast<CallData*>(elem->call_data);
509   calld->CompressStartTransportStreamOpBatch(elem, batch);
510 }
511 
512 /* Constructor for call_data */
CompressInitCallElem(grpc_call_element * elem,const grpc_call_element_args * args)513 grpc_error_handle CompressInitCallElem(grpc_call_element* elem,
514                                        const grpc_call_element_args* args) {
515   new (elem->call_data) CallData(elem, *args);
516   return GRPC_ERROR_NONE;
517 }
518 
519 /* Destructor for call_data */
CompressDestroyCallElem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)520 void CompressDestroyCallElem(grpc_call_element* elem,
521                              const grpc_call_final_info* /*final_info*/,
522                              grpc_closure* /*ignored*/) {
523   CallData* calld = static_cast<CallData*>(elem->call_data);
524   calld->~CallData();
525 }
526 
527 /* Constructor for ChannelData */
CompressInitChannelElem(grpc_channel_element * elem,grpc_channel_element_args * args)528 grpc_error_handle CompressInitChannelElem(grpc_channel_element* elem,
529                                           grpc_channel_element_args* args) {
530   new (elem->channel_data) ChannelData(args);
531   return GRPC_ERROR_NONE;
532 }
533 
534 /* Destructor for channel data */
CompressDestroyChannelElem(grpc_channel_element * elem)535 void CompressDestroyChannelElem(grpc_channel_element* elem) {
536   ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
537   channeld->~ChannelData();
538 }
539 
540 }  // namespace
541 
542 const grpc_channel_filter grpc_message_compress_filter = {
543     CompressStartTransportStreamOpBatch,
544     grpc_channel_next_op,
545     sizeof(CallData),
546     CompressInitCallElem,
547     grpc_call_stack_ignore_set_pollset_or_pollset_set,
548     CompressDestroyCallElem,
549     sizeof(ChannelData),
550     CompressInitChannelElem,
551     CompressDestroyChannelElem,
552     grpc_channel_next_get_info,
553     "message_compress"};
554