1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.network.protocol;
19 
20 import java.util.List;
21 
22 import io.netty.buffer.ByteBuf;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.handler.codec.MessageToMessageEncoder;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 
29 /**
30  * Encoder used by the server side to encode server-to-client responses.
31  * This encoder is stateless so it is safe to be shared by multiple threads.
32  */
33 @ChannelHandler.Sharable
34 public final class MessageEncoder extends MessageToMessageEncoder<Message> {
35 
36   private static final Logger logger = LoggerFactory.getLogger(MessageEncoder.class);
37 
38   public static final MessageEncoder INSTANCE = new MessageEncoder();
39 
MessageEncoder()40   private MessageEncoder() {}
41 
42   /***
43    * Encodes a Message by invoking its encode() method. For non-data messages, we will add one
44    * ByteBuf to 'out' containing the total frame length, the message type, and the message itself.
45    * In the case of a ChunkFetchSuccess, we will also add the ManagedBuffer corresponding to the
46    * data to 'out', in order to enable zero-copy transfer.
47    */
48   @Override
encode(ChannelHandlerContext ctx, Message in, List<Object> out)49   public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) throws Exception {
50     Object body = null;
51     long bodyLength = 0;
52     boolean isBodyInFrame = false;
53 
54     // If the message has a body, take it out to enable zero-copy transfer for the payload.
55     if (in.body() != null) {
56       try {
57         bodyLength = in.body().size();
58         body = in.body().convertToNetty();
59         isBodyInFrame = in.isBodyInFrame();
60       } catch (Exception e) {
61         in.body().release();
62         if (in instanceof AbstractResponseMessage) {
63           AbstractResponseMessage resp = (AbstractResponseMessage) in;
64           // Re-encode this message as a failure response.
65           String error = e.getMessage() != null ? e.getMessage() : "null";
66           logger.error(String.format("Error processing %s for client %s",
67             in, ctx.channel().remoteAddress()), e);
68           encode(ctx, resp.createFailureResponse(error), out);
69         } else {
70           throw e;
71         }
72         return;
73       }
74     }
75 
76     Message.Type msgType = in.type();
77     // All messages have the frame length, message type, and message itself. The frame length
78     // may optionally include the length of the body data, depending on what message is being
79     // sent.
80     int headerLength = 8 + msgType.encodedLength() + in.encodedLength();
81     long frameLength = headerLength + (isBodyInFrame ? bodyLength : 0);
82     ByteBuf header = ctx.alloc().heapBuffer(headerLength);
83     header.writeLong(frameLength);
84     msgType.encode(header);
85     in.encode(header);
86     assert header.writableBytes() == 0;
87 
88     if (body != null) {
89       // We transfer ownership of the reference on in.body() to MessageWithHeader.
90       // This reference will be freed when MessageWithHeader.deallocate() is called.
91       out.add(new MessageWithHeader(in.body(), header, body, bodyLength));
92     } else {
93       out.add(header);
94     }
95   }
96 
97 }
98