1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.network.protocol;
19 
20 import com.google.common.base.Objects;
21 import io.netty.buffer.ByteBuf;
22 
23 import org.apache.spark.network.buffer.ManagedBuffer;
24 import org.apache.spark.network.buffer.NettyManagedBuffer;
25 
26 /**
27  * Response to {@link ChunkFetchRequest} when a chunk exists and has been successfully fetched.
28  *
29  * Note that the server-side encoding of this messages does NOT include the buffer itself, as this
30  * may be written by Netty in a more efficient manner (i.e., zero-copy write).
31  * Similarly, the client-side decoding will reuse the Netty ByteBuf as the buffer.
32  */
33 public final class ChunkFetchSuccess extends AbstractResponseMessage {
34   public final StreamChunkId streamChunkId;
35 
ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer)36   public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer) {
37     super(buffer, true);
38     this.streamChunkId = streamChunkId;
39   }
40 
41   @Override
type()42   public Type type() { return Type.ChunkFetchSuccess; }
43 
44   @Override
encodedLength()45   public int encodedLength() {
46     return streamChunkId.encodedLength();
47   }
48 
49   /** Encoding does NOT include 'buffer' itself. See {@link MessageEncoder}. */
50   @Override
encode(ByteBuf buf)51   public void encode(ByteBuf buf) {
52     streamChunkId.encode(buf);
53   }
54 
55   @Override
createFailureResponse(String error)56   public ResponseMessage createFailureResponse(String error) {
57     return new ChunkFetchFailure(streamChunkId, error);
58   }
59 
60   /** Decoding uses the given ByteBuf as our data, and will retain() it. */
decode(ByteBuf buf)61   public static ChunkFetchSuccess decode(ByteBuf buf) {
62     StreamChunkId streamChunkId = StreamChunkId.decode(buf);
63     buf.retain();
64     NettyManagedBuffer managedBuf = new NettyManagedBuffer(buf.duplicate());
65     return new ChunkFetchSuccess(streamChunkId, managedBuf);
66   }
67 
68   @Override
hashCode()69   public int hashCode() {
70     return Objects.hashCode(streamChunkId, body());
71   }
72 
73   @Override
equals(Object other)74   public boolean equals(Object other) {
75     if (other instanceof ChunkFetchSuccess) {
76       ChunkFetchSuccess o = (ChunkFetchSuccess) other;
77       return streamChunkId.equals(o.streamChunkId) && super.equals(o);
78     }
79     return false;
80   }
81 
82   @Override
toString()83   public String toString() {
84     return Objects.toStringHelper(this)
85       .add("streamChunkId", streamChunkId)
86       .add("buffer", body())
87       .toString();
88   }
89 }
90