1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.network.protocol; 19 20 import com.google.common.base.Objects; 21 import io.netty.buffer.ByteBuf; 22 23 import org.apache.spark.network.buffer.ManagedBuffer; 24 import org.apache.spark.network.buffer.NettyManagedBuffer; 25 26 /** Response to {@link RpcRequest} for a successful RPC. */ 27 public final class RpcResponse extends AbstractResponseMessage { 28 public final long requestId; 29 RpcResponse(long requestId, ManagedBuffer message)30 public RpcResponse(long requestId, ManagedBuffer message) { 31 super(message, true); 32 this.requestId = requestId; 33 } 34 35 @Override type()36 public Type type() { return Type.RpcResponse; } 37 38 @Override encodedLength()39 public int encodedLength() { 40 // The integer (a.k.a. the body size) is not really used, since that information is already 41 // encoded in the frame length. But this maintains backwards compatibility with versions of 42 // RpcRequest that use Encoders.ByteArrays. 43 return 8 + 4; 44 } 45 46 @Override encode(ByteBuf buf)47 public void encode(ByteBuf buf) { 48 buf.writeLong(requestId); 49 // See comment in encodedLength(). 50 buf.writeInt((int) body().size()); 51 } 52 53 @Override createFailureResponse(String error)54 public ResponseMessage createFailureResponse(String error) { 55 return new RpcFailure(requestId, error); 56 } 57 decode(ByteBuf buf)58 public static RpcResponse decode(ByteBuf buf) { 59 long requestId = buf.readLong(); 60 // See comment in encodedLength(). 61 buf.readInt(); 62 return new RpcResponse(requestId, new NettyManagedBuffer(buf.retain())); 63 } 64 65 @Override hashCode()66 public int hashCode() { 67 return Objects.hashCode(requestId, body()); 68 } 69 70 @Override equals(Object other)71 public boolean equals(Object other) { 72 if (other instanceof RpcResponse) { 73 RpcResponse o = (RpcResponse) other; 74 return requestId == o.requestId && super.equals(o); 75 } 76 return false; 77 } 78 79 @Override toString()80 public String toString() { 81 return Objects.toStringHelper(this) 82 .add("requestId", requestId) 83 .add("body", body()) 84 .toString(); 85 } 86 } 87