1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
22
23 #include <assert.h>
24 #include <string.h>
25
26 #include "absl/types/optional.h"
27
28 #include <grpc/compression.h>
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/compression/algorithm_metadata.h"
35 #include "src/core/lib/compression/compression_args.h"
36 #include "src/core/lib/compression/compression_internal.h"
37 #include "src/core/lib/compression/message_compress.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gprpp/manual_constructor.h"
40 #include "src/core/lib/profiling/timers.h"
41 #include "src/core/lib/slice/slice_internal.h"
42 #include "src/core/lib/slice/slice_string_helpers.h"
43 #include "src/core/lib/surface/call.h"
44 #include "src/core/lib/transport/static_metadata.h"
45
46 namespace {
47
48 class ChannelData {
49 public:
ChannelData(grpc_channel_element_args * args)50 explicit ChannelData(grpc_channel_element_args* args) {
51 // Get the enabled and the default algorithms from channel args.
52 enabled_compression_algorithms_bitset_ =
53 grpc_channel_args_compression_algorithm_get_states(args->channel_args);
54 default_compression_algorithm_ =
55 grpc_channel_args_get_channel_default_compression_algorithm(
56 args->channel_args);
57 // Make sure the default is enabled.
58 if (size_t(default_compression_algorithm_) >= 32 ||
59 !grpc_core::GetBit(enabled_compression_algorithms_bitset_,
60 default_compression_algorithm_)) {
61 const char* name;
62 if (!grpc_compression_algorithm_name(default_compression_algorithm_,
63 &name)) {
64 name = "<unknown>";
65 }
66 gpr_log(GPR_ERROR,
67 "default compression algorithm %s not enabled: switching to none",
68 name);
69 default_compression_algorithm_ = GRPC_COMPRESS_NONE;
70 }
71 enabled_message_compression_algorithms_bitset_ =
72 grpc_compression_bitset_to_message_bitset(
73 enabled_compression_algorithms_bitset_);
74 enabled_stream_compression_algorithms_bitset_ =
75 grpc_compression_bitset_to_stream_bitset(
76 enabled_compression_algorithms_bitset_);
77 GPR_ASSERT(!args->is_last);
78 }
79
default_compression_algorithm() const80 grpc_compression_algorithm default_compression_algorithm() const {
81 return default_compression_algorithm_;
82 }
83
enabled_compression_algorithms_bitset() const84 uint32_t enabled_compression_algorithms_bitset() const {
85 return enabled_compression_algorithms_bitset_;
86 }
87
enabled_message_compression_algorithms_bitset() const88 uint32_t enabled_message_compression_algorithms_bitset() const {
89 return enabled_message_compression_algorithms_bitset_;
90 }
91
enabled_stream_compression_algorithms_bitset() const92 uint32_t enabled_stream_compression_algorithms_bitset() const {
93 return enabled_stream_compression_algorithms_bitset_;
94 }
95
96 private:
97 /** The default, channel-level, compression algorithm */
98 grpc_compression_algorithm default_compression_algorithm_;
99 /** Bitset of enabled compression algorithms */
100 uint32_t enabled_compression_algorithms_bitset_;
101 /** Bitset of enabled message compression algorithms */
102 uint32_t enabled_message_compression_algorithms_bitset_;
103 /** Bitset of enabled stream compression algorithms */
104 uint32_t enabled_stream_compression_algorithms_bitset_;
105 };
106
107 class CallData {
108 public:
CallData(grpc_call_element * elem,const grpc_call_element_args & args)109 CallData(grpc_call_element* elem, const grpc_call_element_args& args)
110 : call_combiner_(args.call_combiner) {
111 ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
112 // The call's message compression algorithm is set to channel's default
113 // setting. It can be overridden later by initial metadata.
114 if (GPR_LIKELY(
115 grpc_core::GetBit(channeld->enabled_compression_algorithms_bitset(),
116 channeld->default_compression_algorithm()))) {
117 message_compression_algorithm_ =
118 grpc_compression_algorithm_to_message_compression_algorithm(
119 channeld->default_compression_algorithm());
120 }
121 GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner_,
122 StartSendMessageBatch, elem, grpc_schedule_on_exec_ctx);
123 }
124
~CallData()125 ~CallData() {
126 if (state_initialized_) {
127 grpc_slice_buffer_destroy_internal(&slices_);
128 }
129 GRPC_ERROR_UNREF(cancel_error_);
130 }
131
132 void CompressStartTransportStreamOpBatch(
133 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
134
135 private:
136 bool SkipMessageCompression();
137 void InitializeState(grpc_call_element* elem);
138
139 grpc_error_handle ProcessSendInitialMetadata(
140 grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
141
142 // Methods for processing a send_message batch
143 static void StartSendMessageBatch(void* elem_arg, grpc_error_handle unused);
144 static void OnSendMessageNextDone(void* elem_arg, grpc_error_handle error);
145 grpc_error_handle PullSliceFromSendMessage();
146 void ContinueReadingSendMessage(grpc_call_element* elem);
147 void FinishSendMessage(grpc_call_element* elem);
148 void SendMessageBatchContinue(grpc_call_element* elem);
149 static void FailSendMessageBatchInCallCombiner(void* calld_arg,
150 grpc_error_handle error);
151
152 static void SendMessageOnComplete(void* calld_arg, grpc_error_handle error);
153
154 grpc_core::CallCombiner* call_combiner_;
155 grpc_message_compression_algorithm message_compression_algorithm_ =
156 GRPC_MESSAGE_COMPRESS_NONE;
157 grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
158 grpc_transport_stream_op_batch* send_message_batch_ = nullptr;
159 bool seen_initial_metadata_ = false;
160 /* Set to true, if the fields below are initialized. */
161 bool state_initialized_ = false;
162 grpc_closure start_send_message_batch_in_call_combiner_;
163 /* The fields below are only initialized when we compress the payload.
164 * Keep them at the bottom of the struct, so they don't pollute the
165 * cache-lines. */
166 grpc_linked_mdelem message_compression_algorithm_storage_;
167 grpc_linked_mdelem stream_compression_algorithm_storage_;
168 grpc_linked_mdelem accept_encoding_storage_;
169 grpc_linked_mdelem accept_stream_encoding_storage_;
170 grpc_slice_buffer slices_; /**< Buffers up input slices to be compressed */
171 // Allocate space for the replacement stream
172 std::aligned_storage<sizeof(grpc_core::SliceBufferByteStream),
173 alignof(grpc_core::SliceBufferByteStream)>::type
174 replacement_stream_;
175 grpc_closure* original_send_message_on_complete_ = nullptr;
176 grpc_closure send_message_on_complete_;
177 grpc_closure on_send_message_next_done_;
178 };
179
180 // Returns true if we should skip message compression for the current message.
SkipMessageCompression()181 bool CallData::SkipMessageCompression() {
182 // If the flags of this message indicate that it shouldn't be compressed, we
183 // skip message compression.
184 uint32_t flags =
185 send_message_batch_->payload->send_message.send_message->flags();
186 if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
187 return true;
188 }
189 // If this call doesn't have any message compression algorithm set, skip
190 // message compression.
191 return message_compression_algorithm_ == GRPC_MESSAGE_COMPRESS_NONE;
192 }
193
194 // Determines the compression algorithm from the initial metadata and the
195 // channel's default setting.
FindCompressionAlgorithm(grpc_metadata_batch * initial_metadata,ChannelData * channeld)196 grpc_compression_algorithm FindCompressionAlgorithm(
197 grpc_metadata_batch* initial_metadata, ChannelData* channeld) {
198 if (initial_metadata->legacy_index()->named.grpc_internal_encoding_request ==
199 nullptr) {
200 return channeld->default_compression_algorithm();
201 }
202 grpc_compression_algorithm compression_algorithm;
203 // Parse the compression algorithm from the initial metadata.
204 grpc_mdelem md = initial_metadata->legacy_index()
205 ->named.grpc_internal_encoding_request->md;
206 GPR_ASSERT(grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
207 &compression_algorithm));
208 // Remove this metadata since it's an internal one (i.e., it won't be
209 // transmitted out).
210 initial_metadata->Remove(GRPC_BATCH_GRPC_INTERNAL_ENCODING_REQUEST);
211 // Check if that algorithm is enabled. Note that GRPC_COMPRESS_NONE is always
212 // enabled.
213 // TODO(juanlishen): Maybe use channel default or abort() if the algorithm
214 // from the initial metadata is disabled.
215 if (GPR_LIKELY(
216 grpc_core::GetBit(channeld->enabled_compression_algorithms_bitset(),
217 compression_algorithm))) {
218 return compression_algorithm;
219 }
220 const char* algorithm_name;
221 GPR_ASSERT(
222 grpc_compression_algorithm_name(compression_algorithm, &algorithm_name));
223 gpr_log(GPR_ERROR,
224 "Invalid compression algorithm from initial metadata: '%s' "
225 "(previously disabled). "
226 "Will not compress.",
227 algorithm_name);
228 return GRPC_COMPRESS_NONE;
229 }
230
InitializeState(grpc_call_element * elem)231 void CallData::InitializeState(grpc_call_element* elem) {
232 GPR_DEBUG_ASSERT(!state_initialized_);
233 state_initialized_ = true;
234 grpc_slice_buffer_init(&slices_);
235 GRPC_CLOSURE_INIT(&send_message_on_complete_, SendMessageOnComplete, this,
236 grpc_schedule_on_exec_ctx);
237 GRPC_CLOSURE_INIT(&on_send_message_next_done_, OnSendMessageNextDone, elem,
238 grpc_schedule_on_exec_ctx);
239 }
240
ProcessSendInitialMetadata(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)241 grpc_error_handle CallData::ProcessSendInitialMetadata(
242 grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
243 ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
244 // Find the compression algorithm.
245 grpc_compression_algorithm compression_algorithm =
246 FindCompressionAlgorithm(initial_metadata, channeld);
247 // Note that at most one of the following algorithms can be set.
248 message_compression_algorithm_ =
249 grpc_compression_algorithm_to_message_compression_algorithm(
250 compression_algorithm);
251 grpc_stream_compression_algorithm stream_compression_algorithm =
252 grpc_compression_algorithm_to_stream_compression_algorithm(
253 compression_algorithm);
254 // Hint compression algorithm.
255 grpc_error_handle error = GRPC_ERROR_NONE;
256 if (message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) {
257 InitializeState(elem);
258 error = grpc_metadata_batch_add_tail(
259 initial_metadata, &message_compression_algorithm_storage_,
260 grpc_message_compression_encoding_mdelem(
261 message_compression_algorithm_),
262 GRPC_BATCH_GRPC_ENCODING);
263 } else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) {
264 InitializeState(elem);
265 error = grpc_metadata_batch_add_tail(
266 initial_metadata, &stream_compression_algorithm_storage_,
267 grpc_stream_compression_encoding_mdelem(stream_compression_algorithm),
268 GRPC_BATCH_CONTENT_ENCODING);
269 }
270 if (error != GRPC_ERROR_NONE) return error;
271 // Convey supported compression algorithms.
272 error = grpc_metadata_batch_add_tail(
273 initial_metadata, &accept_encoding_storage_,
274 GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
275 channeld->enabled_message_compression_algorithms_bitset()),
276 GRPC_BATCH_GRPC_ACCEPT_ENCODING);
277 if (error != GRPC_ERROR_NONE) return error;
278 // Do not overwrite accept-encoding header if it already presents (e.g., added
279 // by some proxy).
280 if (!initial_metadata->legacy_index()->named.accept_encoding) {
281 error = grpc_metadata_batch_add_tail(
282 initial_metadata, &accept_stream_encoding_storage_,
283 GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS(
284 channeld->enabled_stream_compression_algorithms_bitset()),
285 GRPC_BATCH_ACCEPT_ENCODING);
286 }
287 return error;
288 }
289
SendMessageOnComplete(void * calld_arg,grpc_error_handle error)290 void CallData::SendMessageOnComplete(void* calld_arg, grpc_error_handle error) {
291 CallData* calld = static_cast<CallData*>(calld_arg);
292 grpc_slice_buffer_reset_and_unref_internal(&calld->slices_);
293 grpc_core::Closure::Run(DEBUG_LOCATION,
294 calld->original_send_message_on_complete_,
295 GRPC_ERROR_REF(error));
296 }
297
SendMessageBatchContinue(grpc_call_element * elem)298 void CallData::SendMessageBatchContinue(grpc_call_element* elem) {
299 // Note: The call to grpc_call_next_op() results in yielding the
300 // call combiner, so we need to clear send_message_batch_ before we do that.
301 grpc_transport_stream_op_batch* send_message_batch = send_message_batch_;
302 send_message_batch_ = nullptr;
303 grpc_call_next_op(elem, send_message_batch);
304 }
305
FinishSendMessage(grpc_call_element * elem)306 void CallData::FinishSendMessage(grpc_call_element* elem) {
307 GPR_DEBUG_ASSERT(message_compression_algorithm_ !=
308 GRPC_MESSAGE_COMPRESS_NONE);
309 // Compress the data if appropriate.
310 grpc_slice_buffer tmp;
311 grpc_slice_buffer_init(&tmp);
312 uint32_t send_flags =
313 send_message_batch_->payload->send_message.send_message->flags();
314 bool did_compress =
315 grpc_msg_compress(message_compression_algorithm_, &slices_, &tmp);
316 if (did_compress) {
317 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
318 const char* algo_name;
319 const size_t before_size = slices_.length;
320 const size_t after_size = tmp.length;
321 const float savings_ratio = 1.0f - static_cast<float>(after_size) /
322 static_cast<float>(before_size);
323 GPR_ASSERT(grpc_message_compression_algorithm_name(
324 message_compression_algorithm_, &algo_name));
325 gpr_log(GPR_INFO,
326 "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
327 " bytes (%.2f%% savings)",
328 algo_name, before_size, after_size, 100 * savings_ratio);
329 }
330 grpc_slice_buffer_swap(&slices_, &tmp);
331 send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
332 } else {
333 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
334 const char* algo_name;
335 GPR_ASSERT(grpc_message_compression_algorithm_name(
336 message_compression_algorithm_, &algo_name));
337 gpr_log(GPR_INFO,
338 "Algorithm '%s' enabled but decided not to compress. Input size: "
339 "%" PRIuPTR,
340 algo_name, slices_.length);
341 }
342 }
343 grpc_slice_buffer_destroy_internal(&tmp);
344 // Swap out the original byte stream with our new one and send the
345 // batch down.
346 new (&replacement_stream_)
347 grpc_core::SliceBufferByteStream(&slices_, send_flags);
348 send_message_batch_->payload->send_message.send_message.reset(
349 reinterpret_cast<grpc_core::SliceBufferByteStream*>(
350 &replacement_stream_));
351 original_send_message_on_complete_ = send_message_batch_->on_complete;
352 send_message_batch_->on_complete = &send_message_on_complete_;
353 SendMessageBatchContinue(elem);
354 }
355
FailSendMessageBatchInCallCombiner(void * calld_arg,grpc_error_handle error)356 void CallData::FailSendMessageBatchInCallCombiner(void* calld_arg,
357 grpc_error_handle error) {
358 CallData* calld = static_cast<CallData*>(calld_arg);
359 if (calld->send_message_batch_ != nullptr) {
360 grpc_transport_stream_op_batch_finish_with_failure(
361 calld->send_message_batch_, GRPC_ERROR_REF(error),
362 calld->call_combiner_);
363 calld->send_message_batch_ = nullptr;
364 }
365 }
366
367 // Pulls a slice from the send_message byte stream and adds it to slices_.
PullSliceFromSendMessage()368 grpc_error_handle CallData::PullSliceFromSendMessage() {
369 grpc_slice incoming_slice;
370 grpc_error_handle error =
371 send_message_batch_->payload->send_message.send_message->Pull(
372 &incoming_slice);
373 if (error == GRPC_ERROR_NONE) {
374 grpc_slice_buffer_add(&slices_, incoming_slice);
375 }
376 return error;
377 }
378
379 // Reads as many slices as possible from the send_message byte stream.
380 // If all data has been read, invokes FinishSendMessage(). Otherwise,
381 // an async call to ByteStream::Next() has been started, which will
382 // eventually result in calling OnSendMessageNextDone().
ContinueReadingSendMessage(grpc_call_element * elem)383 void CallData::ContinueReadingSendMessage(grpc_call_element* elem) {
384 if (slices_.length ==
385 send_message_batch_->payload->send_message.send_message->length()) {
386 FinishSendMessage(elem);
387 return;
388 }
389 while (send_message_batch_->payload->send_message.send_message->Next(
390 ~static_cast<size_t>(0), &on_send_message_next_done_)) {
391 grpc_error_handle error = PullSliceFromSendMessage();
392 if (error != GRPC_ERROR_NONE) {
393 // Closure callback; does not take ownership of error.
394 FailSendMessageBatchInCallCombiner(this, error);
395 GRPC_ERROR_UNREF(error);
396 return;
397 }
398 if (slices_.length ==
399 send_message_batch_->payload->send_message.send_message->length()) {
400 FinishSendMessage(elem);
401 break;
402 }
403 }
404 }
405
406 // Async callback for ByteStream::Next().
OnSendMessageNextDone(void * elem_arg,grpc_error_handle error)407 void CallData::OnSendMessageNextDone(void* elem_arg, grpc_error_handle error) {
408 grpc_call_element* elem = static_cast<grpc_call_element*>(elem_arg);
409 CallData* calld = static_cast<CallData*>(elem->call_data);
410 if (error != GRPC_ERROR_NONE) {
411 // Closure callback; does not take ownership of error.
412 FailSendMessageBatchInCallCombiner(calld, error);
413 return;
414 }
415 error = calld->PullSliceFromSendMessage();
416 if (error != GRPC_ERROR_NONE) {
417 // Closure callback; does not take ownership of error.
418 FailSendMessageBatchInCallCombiner(calld, error);
419 GRPC_ERROR_UNREF(error);
420 return;
421 }
422 if (calld->slices_.length == calld->send_message_batch_->payload->send_message
423 .send_message->length()) {
424 calld->FinishSendMessage(elem);
425 } else {
426 calld->ContinueReadingSendMessage(elem);
427 }
428 }
429
StartSendMessageBatch(void * elem_arg,grpc_error_handle)430 void CallData::StartSendMessageBatch(void* elem_arg,
431 grpc_error_handle /*unused*/) {
432 grpc_call_element* elem = static_cast<grpc_call_element*>(elem_arg);
433 CallData* calld = static_cast<CallData*>(elem->call_data);
434 if (calld->SkipMessageCompression()) {
435 calld->SendMessageBatchContinue(elem);
436 } else {
437 calld->ContinueReadingSendMessage(elem);
438 }
439 }
440
CompressStartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)441 void CallData::CompressStartTransportStreamOpBatch(
442 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
443 GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0);
444 // Handle cancel_stream.
445 if (batch->cancel_stream) {
446 GRPC_ERROR_UNREF(cancel_error_);
447 cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
448 if (send_message_batch_ != nullptr) {
449 if (!seen_initial_metadata_) {
450 GRPC_CALL_COMBINER_START(
451 call_combiner_,
452 GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, this,
453 grpc_schedule_on_exec_ctx),
454 GRPC_ERROR_REF(cancel_error_), "failing send_message op");
455 } else {
456 send_message_batch_->payload->send_message.send_message->Shutdown(
457 GRPC_ERROR_REF(cancel_error_));
458 }
459 }
460 } else if (cancel_error_ != GRPC_ERROR_NONE) {
461 grpc_transport_stream_op_batch_finish_with_failure(
462 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
463 return;
464 }
465 // Handle send_initial_metadata.
466 if (batch->send_initial_metadata) {
467 GPR_ASSERT(!seen_initial_metadata_);
468 grpc_error_handle error = ProcessSendInitialMetadata(
469 elem, batch->payload->send_initial_metadata.send_initial_metadata);
470 if (error != GRPC_ERROR_NONE) {
471 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
472 call_combiner_);
473 return;
474 }
475 seen_initial_metadata_ = true;
476 // If we had previously received a batch containing a send_message op,
477 // handle it now. Note that we need to re-enter the call combiner
478 // for this, since we can't send two batches down while holding the
479 // call combiner, since the connected_channel filter (at the bottom of
480 // the call stack) will release the call combiner for each batch it sees.
481 if (send_message_batch_ != nullptr) {
482 GRPC_CALL_COMBINER_START(
483 call_combiner_, &start_send_message_batch_in_call_combiner_,
484 GRPC_ERROR_NONE, "starting send_message after send_initial_metadata");
485 }
486 }
487 // Handle send_message.
488 if (batch->send_message) {
489 GPR_ASSERT(send_message_batch_ == nullptr);
490 send_message_batch_ = batch;
491 // If we have not yet seen send_initial_metadata, then we have to
492 // wait. We save the batch and then drop the call combiner, which we'll
493 // have to pick up again later when we get send_initial_metadata.
494 if (!seen_initial_metadata_) {
495 GRPC_CALL_COMBINER_STOP(
496 call_combiner_, "send_message batch pending send_initial_metadata");
497 return;
498 }
499 StartSendMessageBatch(elem, GRPC_ERROR_NONE);
500 } else {
501 // Pass control down the stack.
502 grpc_call_next_op(elem, batch);
503 }
504 }
505
CompressStartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)506 void CompressStartTransportStreamOpBatch(
507 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
508 CallData* calld = static_cast<CallData*>(elem->call_data);
509 calld->CompressStartTransportStreamOpBatch(elem, batch);
510 }
511
512 /* Constructor for call_data */
CompressInitCallElem(grpc_call_element * elem,const grpc_call_element_args * args)513 grpc_error_handle CompressInitCallElem(grpc_call_element* elem,
514 const grpc_call_element_args* args) {
515 new (elem->call_data) CallData(elem, *args);
516 return GRPC_ERROR_NONE;
517 }
518
519 /* Destructor for call_data */
CompressDestroyCallElem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)520 void CompressDestroyCallElem(grpc_call_element* elem,
521 const grpc_call_final_info* /*final_info*/,
522 grpc_closure* /*ignored*/) {
523 CallData* calld = static_cast<CallData*>(elem->call_data);
524 calld->~CallData();
525 }
526
527 /* Constructor for ChannelData */
CompressInitChannelElem(grpc_channel_element * elem,grpc_channel_element_args * args)528 grpc_error_handle CompressInitChannelElem(grpc_channel_element* elem,
529 grpc_channel_element_args* args) {
530 new (elem->channel_data) ChannelData(args);
531 return GRPC_ERROR_NONE;
532 }
533
534 /* Destructor for channel data */
CompressDestroyChannelElem(grpc_channel_element * elem)535 void CompressDestroyChannelElem(grpc_channel_element* elem) {
536 ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
537 channeld->~ChannelData();
538 }
539
540 } // namespace
541
542 const grpc_channel_filter grpc_message_compress_filter = {
543 CompressStartTransportStreamOpBatch,
544 grpc_channel_next_op,
545 sizeof(CallData),
546 CompressInitCallElem,
547 grpc_call_stack_ignore_set_pollset_or_pollset_set,
548 CompressDestroyCallElem,
549 sizeof(ChannelData),
550 CompressInitChannelElem,
551 CompressDestroyChannelElem,
552 grpc_channel_next_get_info,
553 "message_compress"};
554