1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.network.protocol;
19 
20 import com.google.common.base.Objects;
21 import io.netty.buffer.ByteBuf;
22 
23 import org.apache.spark.network.buffer.ManagedBuffer;
24 import org.apache.spark.network.buffer.NettyManagedBuffer;
25 
26 /**
27  * A generic RPC which is handled by a remote {@link org.apache.spark.network.server.RpcHandler}.
28  * This will correspond to a single
29  * {@link org.apache.spark.network.protocol.ResponseMessage} (either success or failure).
30  */
31 public final class RpcRequest extends AbstractMessage implements RequestMessage {
32   /** Used to link an RPC request with its response. */
33   public final long requestId;
34 
RpcRequest(long requestId, ManagedBuffer message)35   public RpcRequest(long requestId, ManagedBuffer message) {
36     super(message, true);
37     this.requestId = requestId;
38   }
39 
40   @Override
type()41   public Type type() { return Type.RpcRequest; }
42 
43   @Override
encodedLength()44   public int encodedLength() {
45     // The integer (a.k.a. the body size) is not really used, since that information is already
46     // encoded in the frame length. But this maintains backwards compatibility with versions of
47     // RpcRequest that use Encoders.ByteArrays.
48     return 8 + 4;
49   }
50 
51   @Override
encode(ByteBuf buf)52   public void encode(ByteBuf buf) {
53     buf.writeLong(requestId);
54     // See comment in encodedLength().
55     buf.writeInt((int) body().size());
56   }
57 
decode(ByteBuf buf)58   public static RpcRequest decode(ByteBuf buf) {
59     long requestId = buf.readLong();
60     // See comment in encodedLength().
61     buf.readInt();
62     return new RpcRequest(requestId, new NettyManagedBuffer(buf.retain()));
63   }
64 
65   @Override
hashCode()66   public int hashCode() {
67     return Objects.hashCode(requestId, body());
68   }
69 
70   @Override
equals(Object other)71   public boolean equals(Object other) {
72     if (other instanceof RpcRequest) {
73       RpcRequest o = (RpcRequest) other;
74       return requestId == o.requestId && super.equals(o);
75     }
76     return false;
77   }
78 
79   @Override
toString()80   public String toString() {
81     return Objects.toStringHelper(this)
82       .add("requestId", requestId)
83       .add("body", body())
84       .toString();
85   }
86 }
87