1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.network.protocol; 19 20 import com.google.common.base.Objects; 21 import io.netty.buffer.ByteBuf; 22 23 import org.apache.spark.network.buffer.ManagedBuffer; 24 import org.apache.spark.network.buffer.NettyManagedBuffer; 25 26 /** 27 * A generic RPC which is handled by a remote {@link org.apache.spark.network.server.RpcHandler}. 28 * This will correspond to a single 29 * {@link org.apache.spark.network.protocol.ResponseMessage} (either success or failure). 30 */ 31 public final class RpcRequest extends AbstractMessage implements RequestMessage { 32 /** Used to link an RPC request with its response. */ 33 public final long requestId; 34 RpcRequest(long requestId, ManagedBuffer message)35 public RpcRequest(long requestId, ManagedBuffer message) { 36 super(message, true); 37 this.requestId = requestId; 38 } 39 40 @Override type()41 public Type type() { return Type.RpcRequest; } 42 43 @Override encodedLength()44 public int encodedLength() { 45 // The integer (a.k.a. the body size) is not really used, since that information is already 46 // encoded in the frame length. But this maintains backwards compatibility with versions of 47 // RpcRequest that use Encoders.ByteArrays. 48 return 8 + 4; 49 } 50 51 @Override encode(ByteBuf buf)52 public void encode(ByteBuf buf) { 53 buf.writeLong(requestId); 54 // See comment in encodedLength(). 55 buf.writeInt((int) body().size()); 56 } 57 decode(ByteBuf buf)58 public static RpcRequest decode(ByteBuf buf) { 59 long requestId = buf.readLong(); 60 // See comment in encodedLength(). 61 buf.readInt(); 62 return new RpcRequest(requestId, new NettyManagedBuffer(buf.retain())); 63 } 64 65 @Override hashCode()66 public int hashCode() { 67 return Objects.hashCode(requestId, body()); 68 } 69 70 @Override equals(Object other)71 public boolean equals(Object other) { 72 if (other instanceof RpcRequest) { 73 RpcRequest o = (RpcRequest) other; 74 return requestId == o.requestId && super.equals(o); 75 } 76 return false; 77 } 78 79 @Override toString()80 public String toString() { 81 return Objects.toStringHelper(this) 82 .add("requestId", requestId) 83 .add("body", body()) 84 .toString(); 85 } 86 } 87