1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.network.protocol;
19 
20 import com.google.common.base.Objects;
21 import io.netty.buffer.ByteBuf;
22 
23 import org.apache.spark.network.buffer.ManagedBuffer;
24 import org.apache.spark.network.buffer.NettyManagedBuffer;
25 
26 /** Response to {@link RpcRequest} for a successful RPC. */
27 public final class RpcResponse extends AbstractResponseMessage {
28   public final long requestId;
29 
RpcResponse(long requestId, ManagedBuffer message)30   public RpcResponse(long requestId, ManagedBuffer message) {
31     super(message, true);
32     this.requestId = requestId;
33   }
34 
35   @Override
type()36   public Type type() { return Type.RpcResponse; }
37 
38   @Override
encodedLength()39   public int encodedLength() {
40     // The integer (a.k.a. the body size) is not really used, since that information is already
41     // encoded in the frame length. But this maintains backwards compatibility with versions of
42     // RpcRequest that use Encoders.ByteArrays.
43     return 8 + 4;
44   }
45 
46   @Override
encode(ByteBuf buf)47   public void encode(ByteBuf buf) {
48     buf.writeLong(requestId);
49     // See comment in encodedLength().
50     buf.writeInt((int) body().size());
51   }
52 
53   @Override
createFailureResponse(String error)54   public ResponseMessage createFailureResponse(String error) {
55     return new RpcFailure(requestId, error);
56   }
57 
decode(ByteBuf buf)58   public static RpcResponse decode(ByteBuf buf) {
59     long requestId = buf.readLong();
60     // See comment in encodedLength().
61     buf.readInt();
62     return new RpcResponse(requestId, new NettyManagedBuffer(buf.retain()));
63   }
64 
65   @Override
hashCode()66   public int hashCode() {
67     return Objects.hashCode(requestId, body());
68   }
69 
70   @Override
equals(Object other)71   public boolean equals(Object other) {
72     if (other instanceof RpcResponse) {
73       RpcResponse o = (RpcResponse) other;
74       return requestId == o.requestId && super.equals(o);
75     }
76     return false;
77   }
78 
79   @Override
toString()80   public String toString() {
81     return Objects.toStringHelper(this)
82       .add("requestId", requestId)
83       .add("body", body())
84       .toString();
85   }
86 }
87