1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.network.protocol; 19 20 import com.google.common.base.Objects; 21 import io.netty.buffer.ByteBuf; 22 23 import org.apache.spark.network.buffer.ManagedBuffer; 24 import org.apache.spark.network.buffer.NettyManagedBuffer; 25 26 /** 27 * Response to {@link ChunkFetchRequest} when a chunk exists and has been successfully fetched. 28 * 29 * Note that the server-side encoding of this messages does NOT include the buffer itself, as this 30 * may be written by Netty in a more efficient manner (i.e., zero-copy write). 31 * Similarly, the client-side decoding will reuse the Netty ByteBuf as the buffer. 32 */ 33 public final class ChunkFetchSuccess extends AbstractResponseMessage { 34 public final StreamChunkId streamChunkId; 35 ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer)36 public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer) { 37 super(buffer, true); 38 this.streamChunkId = streamChunkId; 39 } 40 41 @Override type()42 public Type type() { return Type.ChunkFetchSuccess; } 43 44 @Override encodedLength()45 public int encodedLength() { 46 return streamChunkId.encodedLength(); 47 } 48 49 /** Encoding does NOT include 'buffer' itself. See {@link MessageEncoder}. */ 50 @Override encode(ByteBuf buf)51 public void encode(ByteBuf buf) { 52 streamChunkId.encode(buf); 53 } 54 55 @Override createFailureResponse(String error)56 public ResponseMessage createFailureResponse(String error) { 57 return new ChunkFetchFailure(streamChunkId, error); 58 } 59 60 /** Decoding uses the given ByteBuf as our data, and will retain() it. */ decode(ByteBuf buf)61 public static ChunkFetchSuccess decode(ByteBuf buf) { 62 StreamChunkId streamChunkId = StreamChunkId.decode(buf); 63 buf.retain(); 64 NettyManagedBuffer managedBuf = new NettyManagedBuffer(buf.duplicate()); 65 return new ChunkFetchSuccess(streamChunkId, managedBuf); 66 } 67 68 @Override hashCode()69 public int hashCode() { 70 return Objects.hashCode(streamChunkId, body()); 71 } 72 73 @Override equals(Object other)74 public boolean equals(Object other) { 75 if (other instanceof ChunkFetchSuccess) { 76 ChunkFetchSuccess o = (ChunkFetchSuccess) other; 77 return streamChunkId.equals(o.streamChunkId) && super.equals(o); 78 } 79 return false; 80 } 81 82 @Override toString()83 public String toString() { 84 return Objects.toStringHelper(this) 85 .add("streamChunkId", streamChunkId) 86 .add("buffer", body()) 87 .toString(); 88 } 89 } 90