1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.namenode; 19 20 import java.io.IOException; 21 import java.util.Arrays; 22 import java.util.Collection; 23 import java.util.Comparator; 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 27 import org.apache.hadoop.io.IOUtils; 28 29 import com.google.common.base.Preconditions; 30 import com.google.common.primitives.Longs; 31 32 /** 33 * A merged input stream that handles failover between different edit logs. 34 * 35 * We will currently try each edit log stream exactly once. In other words, we 36 * don't handle the "ping pong" scenario where different edit logs contain a 37 * different subset of the available edits. 38 */ 39 class RedundantEditLogInputStream extends EditLogInputStream { 40 public static final Log LOG = LogFactory.getLog(EditLogInputStream.class.getName()); 41 private int curIdx; 42 private long prevTxId; 43 private final EditLogInputStream[] streams; 44 45 /** 46 * States that the RedundantEditLogInputStream can be in. 47 * 48 * <pre> 49 * start (if no streams) 50 * | 51 * V 52 * PrematureEOFException +----------------+ 53 * +-------------->| EOF |<--------------+ 54 * | +----------------+ | 55 * | | 56 * | start (if there are streams) | 57 * | | | 58 * | V | EOF 59 * | resync +----------------+ skipUntil +---------+ 60 * | +---------->| SKIP_UNTIL |----------->| OK | 61 * | | +----------------+ +---------+ 62 * | | | IOE ^ fail over to | IOE 63 * | | V | next stream | 64 * +----------------------+ +----------------+ | 65 * | STREAM_FAILED_RESYNC | | STREAM_FAILED |<----------+ 66 * +----------------------+ +----------------+ 67 * ^ Recovery mode | 68 * +--------------------+ 69 * </pre> 70 */ 71 static private enum State { 72 /** We need to skip until prevTxId + 1 */ 73 SKIP_UNTIL, 74 /** We're ready to read opcodes out of the current stream */ 75 OK, 76 /** The current stream has failed. */ 77 STREAM_FAILED, 78 /** The current stream has failed, and resync() was called. */ 79 STREAM_FAILED_RESYNC, 80 /** There are no more opcodes to read from this 81 * RedundantEditLogInputStream */ 82 EOF; 83 } 84 85 private State state; 86 private IOException prevException; 87 RedundantEditLogInputStream(Collection<EditLogInputStream> streams, long startTxId)88 RedundantEditLogInputStream(Collection<EditLogInputStream> streams, 89 long startTxId) { 90 this.curIdx = 0; 91 this.prevTxId = (startTxId == HdfsConstants.INVALID_TXID) ? 92 HdfsConstants.INVALID_TXID : (startTxId - 1); 93 this.state = (streams.isEmpty()) ? State.EOF : State.SKIP_UNTIL; 94 this.prevException = null; 95 // EditLogInputStreams in a RedundantEditLogInputStream must be finalized, 96 // and can't be pre-transactional. 97 EditLogInputStream first = null; 98 for (EditLogInputStream s : streams) { 99 Preconditions.checkArgument(s.getFirstTxId() != 100 HdfsConstants.INVALID_TXID, "invalid first txid in stream: %s", s); 101 Preconditions.checkArgument(s.getLastTxId() != 102 HdfsConstants.INVALID_TXID, "invalid last txid in stream: %s", s); 103 if (first == null) { 104 first = s; 105 } else { 106 Preconditions.checkArgument(s.getFirstTxId() == first.getFirstTxId(), 107 "All streams in the RedundantEditLogInputStream must have the same " + 108 "start transaction ID! " + first + " had start txId " + 109 first.getFirstTxId() + ", but " + s + " had start txId " + 110 s.getFirstTxId()); 111 } 112 } 113 114 this.streams = streams.toArray(new EditLogInputStream[0]); 115 116 // We sort the streams here so that the streams that end later come first. 117 Arrays.sort(this.streams, new Comparator<EditLogInputStream>() { 118 @Override 119 public int compare(EditLogInputStream a, EditLogInputStream b) { 120 return Longs.compare(b.getLastTxId(), a.getLastTxId()); 121 } 122 }); 123 } 124 125 @Override getCurrentStreamName()126 public String getCurrentStreamName() { 127 return streams[curIdx].getCurrentStreamName(); 128 } 129 130 @Override getName()131 public String getName() { 132 StringBuilder bld = new StringBuilder(); 133 String prefix = ""; 134 for (EditLogInputStream elis : streams) { 135 bld.append(prefix); 136 bld.append(elis.getName()); 137 prefix = ", "; 138 } 139 return bld.toString(); 140 } 141 142 @Override getFirstTxId()143 public long getFirstTxId() { 144 return streams[curIdx].getFirstTxId(); 145 } 146 147 @Override getLastTxId()148 public long getLastTxId() { 149 return streams[curIdx].getLastTxId(); 150 } 151 152 @Override close()153 public void close() throws IOException { 154 IOUtils.cleanup(LOG, streams); 155 } 156 157 @Override nextValidOp()158 protected FSEditLogOp nextValidOp() { 159 try { 160 if (state == State.STREAM_FAILED) { 161 state = State.STREAM_FAILED_RESYNC; 162 } 163 return nextOp(); 164 } catch (IOException e) { 165 return null; 166 } 167 } 168 169 @Override nextOp()170 protected FSEditLogOp nextOp() throws IOException { 171 while (true) { 172 switch (state) { 173 case SKIP_UNTIL: 174 try { 175 if (prevTxId != HdfsConstants.INVALID_TXID) { 176 LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() + 177 "' to transaction ID " + (prevTxId + 1)); 178 streams[curIdx].skipUntil(prevTxId + 1); 179 } 180 } catch (IOException e) { 181 prevException = e; 182 state = State.STREAM_FAILED; 183 } 184 state = State.OK; 185 break; 186 case OK: 187 try { 188 FSEditLogOp op = streams[curIdx].readOp(); 189 if (op == null) { 190 state = State.EOF; 191 if (streams[curIdx].getLastTxId() == prevTxId) { 192 return null; 193 } else { 194 throw new PrematureEOFException("got premature end-of-file " + 195 "at txid " + prevTxId + "; expected file to go up to " + 196 streams[curIdx].getLastTxId()); 197 } 198 } 199 prevTxId = op.getTransactionId(); 200 return op; 201 } catch (IOException e) { 202 prevException = e; 203 state = State.STREAM_FAILED; 204 } 205 break; 206 case STREAM_FAILED: 207 if (curIdx + 1 == streams.length) { 208 throw prevException; 209 } 210 long oldLast = streams[curIdx].getLastTxId(); 211 long newLast = streams[curIdx + 1].getLastTxId(); 212 if (newLast < oldLast) { 213 throw new IOException("We encountered an error reading " + 214 streams[curIdx].getName() + ". During automatic edit log " + 215 "failover, we noticed that all of the remaining edit log " + 216 "streams are shorter than the current one! The best " + 217 "remaining edit log ends at transaction " + 218 newLast + ", but we thought we could read up to transaction " + 219 oldLast + ". If you continue, metadata will be lost forever!"); 220 } 221 LOG.error("Got error reading edit log input stream " + 222 streams[curIdx].getName() + "; failing over to edit log " + 223 streams[curIdx + 1].getName(), prevException); 224 curIdx++; 225 state = State.SKIP_UNTIL; 226 break; 227 case STREAM_FAILED_RESYNC: 228 if (curIdx + 1 == streams.length) { 229 if (prevException instanceof PrematureEOFException) { 230 // bypass early EOF check 231 state = State.EOF; 232 } else { 233 streams[curIdx].resync(); 234 state = State.SKIP_UNTIL; 235 } 236 } else { 237 LOG.error("failing over to edit log " + 238 streams[curIdx + 1].getName()); 239 curIdx++; 240 state = State.SKIP_UNTIL; 241 } 242 break; 243 case EOF: 244 return null; 245 } 246 } 247 } 248 249 @Override getVersion(boolean verifyVersion)250 public int getVersion(boolean verifyVersion) throws IOException { 251 return streams[curIdx].getVersion(verifyVersion); 252 } 253 254 @Override getPosition()255 public long getPosition() { 256 return streams[curIdx].getPosition(); 257 } 258 259 @Override length()260 public long length() throws IOException { 261 return streams[curIdx].length(); 262 } 263 264 @Override isInProgress()265 public boolean isInProgress() { 266 return streams[curIdx].isInProgress(); 267 } 268 269 static private final class PrematureEOFException extends IOException { 270 private static final long serialVersionUID = 1L; PrematureEOFException(String msg)271 PrematureEOFException(String msg) { 272 super(msg); 273 } 274 } 275 276 @Override setMaxOpSize(int maxOpSize)277 public void setMaxOpSize(int maxOpSize) { 278 for (EditLogInputStream elis : streams) { 279 elis.setMaxOpSize(maxOpSize); 280 } 281 } 282 283 @Override isLocalLog()284 public boolean isLocalLog() { 285 return streams[curIdx].isLocalLog(); 286 } 287 } 288