1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.network.protocol; 19 20 import java.util.List; 21 22 import io.netty.buffer.ByteBuf; 23 import io.netty.channel.ChannelHandler; 24 import io.netty.channel.ChannelHandlerContext; 25 import io.netty.handler.codec.MessageToMessageEncoder; 26 import org.slf4j.Logger; 27 import org.slf4j.LoggerFactory; 28 29 /** 30 * Encoder used by the server side to encode server-to-client responses. 31 * This encoder is stateless so it is safe to be shared by multiple threads. 32 */ 33 @ChannelHandler.Sharable 34 public final class MessageEncoder extends MessageToMessageEncoder<Message> { 35 36 private static final Logger logger = LoggerFactory.getLogger(MessageEncoder.class); 37 38 public static final MessageEncoder INSTANCE = new MessageEncoder(); 39 MessageEncoder()40 private MessageEncoder() {} 41 42 /*** 43 * Encodes a Message by invoking its encode() method. For non-data messages, we will add one 44 * ByteBuf to 'out' containing the total frame length, the message type, and the message itself. 45 * In the case of a ChunkFetchSuccess, we will also add the ManagedBuffer corresponding to the 46 * data to 'out', in order to enable zero-copy transfer. 47 */ 48 @Override encode(ChannelHandlerContext ctx, Message in, List<Object> out)49 public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) throws Exception { 50 Object body = null; 51 long bodyLength = 0; 52 boolean isBodyInFrame = false; 53 54 // If the message has a body, take it out to enable zero-copy transfer for the payload. 55 if (in.body() != null) { 56 try { 57 bodyLength = in.body().size(); 58 body = in.body().convertToNetty(); 59 isBodyInFrame = in.isBodyInFrame(); 60 } catch (Exception e) { 61 in.body().release(); 62 if (in instanceof AbstractResponseMessage) { 63 AbstractResponseMessage resp = (AbstractResponseMessage) in; 64 // Re-encode this message as a failure response. 65 String error = e.getMessage() != null ? e.getMessage() : "null"; 66 logger.error(String.format("Error processing %s for client %s", 67 in, ctx.channel().remoteAddress()), e); 68 encode(ctx, resp.createFailureResponse(error), out); 69 } else { 70 throw e; 71 } 72 return; 73 } 74 } 75 76 Message.Type msgType = in.type(); 77 // All messages have the frame length, message type, and message itself. The frame length 78 // may optionally include the length of the body data, depending on what message is being 79 // sent. 80 int headerLength = 8 + msgType.encodedLength() + in.encodedLength(); 81 long frameLength = headerLength + (isBodyInFrame ? bodyLength : 0); 82 ByteBuf header = ctx.alloc().heapBuffer(headerLength); 83 header.writeLong(frameLength); 84 msgType.encode(header); 85 in.encode(header); 86 assert header.writableBytes() == 0; 87 88 if (body != null) { 89 // We transfer ownership of the reference on in.body() to MessageWithHeader. 90 // This reference will be freed when MessageWithHeader.deallocate() is called. 91 out.add(new MessageWithHeader(in.body(), header, body, bodyLength)); 92 } else { 93 out.add(header); 94 } 95 } 96 97 } 98