1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.namenode;
19 
20 import java.io.IOException;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.Comparator;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
27 import org.apache.hadoop.io.IOUtils;
28 
29 import com.google.common.base.Preconditions;
30 import com.google.common.primitives.Longs;
31 
32 /**
33  * A merged input stream that handles failover between different edit logs.
34  *
35  * We will currently try each edit log stream exactly once.  In other words, we
36  * don't handle the "ping pong" scenario where different edit logs contain a
37  * different subset of the available edits.
38  */
39 class RedundantEditLogInputStream extends EditLogInputStream {
40   public static final Log LOG = LogFactory.getLog(EditLogInputStream.class.getName());
41   private int curIdx;
42   private long prevTxId;
43   private final EditLogInputStream[] streams;
44 
45   /**
46    * States that the RedundantEditLogInputStream can be in.
47    *
48    * <pre>
49    *                   start (if no streams)
50    *                           |
51    *                           V
52    * PrematureEOFException  +----------------+
53    *        +-------------->| EOF            |<--------------+
54    *        |               +----------------+               |
55    *        |                                                |
56    *        |          start (if there are streams)          |
57    *        |                  |                             |
58    *        |                  V                             | EOF
59    *        |   resync      +----------------+ skipUntil  +---------+
60    *        |   +---------->| SKIP_UNTIL     |----------->|  OK     |
61    *        |   |           +----------------+            +---------+
62    *        |   |                | IOE   ^ fail over to      | IOE
63    *        |   |                V       | next stream       |
64    * +----------------------+   +----------------+           |
65    * | STREAM_FAILED_RESYNC |   | STREAM_FAILED  |<----------+
66    * +----------------------+   +----------------+
67    *                  ^   Recovery mode    |
68    *                  +--------------------+
69    * </pre>
70    */
71   static private enum State {
72     /** We need to skip until prevTxId + 1 */
73     SKIP_UNTIL,
74     /** We're ready to read opcodes out of the current stream */
75     OK,
76     /** The current stream has failed. */
77     STREAM_FAILED,
78     /** The current stream has failed, and resync() was called.  */
79     STREAM_FAILED_RESYNC,
80     /** There are no more opcodes to read from this
81      * RedundantEditLogInputStream */
82     EOF;
83   }
84 
85   private State state;
86   private IOException prevException;
87 
RedundantEditLogInputStream(Collection<EditLogInputStream> streams, long startTxId)88   RedundantEditLogInputStream(Collection<EditLogInputStream> streams,
89       long startTxId) {
90     this.curIdx = 0;
91     this.prevTxId = (startTxId == HdfsConstants.INVALID_TXID) ?
92       HdfsConstants.INVALID_TXID : (startTxId - 1);
93     this.state = (streams.isEmpty()) ? State.EOF : State.SKIP_UNTIL;
94     this.prevException = null;
95     // EditLogInputStreams in a RedundantEditLogInputStream must be finalized,
96     // and can't be pre-transactional.
97     EditLogInputStream first = null;
98     for (EditLogInputStream s : streams) {
99       Preconditions.checkArgument(s.getFirstTxId() !=
100           HdfsConstants.INVALID_TXID, "invalid first txid in stream: %s", s);
101       Preconditions.checkArgument(s.getLastTxId() !=
102           HdfsConstants.INVALID_TXID, "invalid last txid in stream: %s", s);
103       if (first == null) {
104         first = s;
105       } else {
106         Preconditions.checkArgument(s.getFirstTxId() == first.getFirstTxId(),
107           "All streams in the RedundantEditLogInputStream must have the same " +
108           "start transaction ID!  " + first + " had start txId " +
109           first.getFirstTxId() + ", but " + s + " had start txId " +
110           s.getFirstTxId());
111       }
112     }
113 
114     this.streams = streams.toArray(new EditLogInputStream[0]);
115 
116     // We sort the streams here so that the streams that end later come first.
117     Arrays.sort(this.streams, new Comparator<EditLogInputStream>() {
118       @Override
119       public int compare(EditLogInputStream a, EditLogInputStream b) {
120         return Longs.compare(b.getLastTxId(), a.getLastTxId());
121       }
122     });
123   }
124 
125   @Override
getCurrentStreamName()126   public String getCurrentStreamName() {
127     return streams[curIdx].getCurrentStreamName();
128   }
129 
130   @Override
getName()131   public String getName() {
132     StringBuilder bld = new StringBuilder();
133     String prefix = "";
134     for (EditLogInputStream elis : streams) {
135       bld.append(prefix);
136       bld.append(elis.getName());
137       prefix = ", ";
138     }
139     return bld.toString();
140   }
141 
142   @Override
getFirstTxId()143   public long getFirstTxId() {
144     return streams[curIdx].getFirstTxId();
145   }
146 
147   @Override
getLastTxId()148   public long getLastTxId() {
149     return streams[curIdx].getLastTxId();
150   }
151 
152   @Override
close()153   public void close() throws IOException {
154     IOUtils.cleanup(LOG,  streams);
155   }
156 
157   @Override
nextValidOp()158   protected FSEditLogOp nextValidOp() {
159     try {
160       if (state == State.STREAM_FAILED) {
161         state = State.STREAM_FAILED_RESYNC;
162       }
163       return nextOp();
164     } catch (IOException e) {
165       return null;
166     }
167   }
168 
169   @Override
nextOp()170   protected FSEditLogOp nextOp() throws IOException {
171     while (true) {
172       switch (state) {
173       case SKIP_UNTIL:
174        try {
175           if (prevTxId != HdfsConstants.INVALID_TXID) {
176             LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
177                 "' to transaction ID " + (prevTxId + 1));
178             streams[curIdx].skipUntil(prevTxId + 1);
179           }
180         } catch (IOException e) {
181           prevException = e;
182           state = State.STREAM_FAILED;
183         }
184         state = State.OK;
185         break;
186       case OK:
187         try {
188           FSEditLogOp op = streams[curIdx].readOp();
189           if (op == null) {
190             state = State.EOF;
191             if (streams[curIdx].getLastTxId() == prevTxId) {
192               return null;
193             } else {
194               throw new PrematureEOFException("got premature end-of-file " +
195                   "at txid " + prevTxId + "; expected file to go up to " +
196                   streams[curIdx].getLastTxId());
197             }
198           }
199           prevTxId = op.getTransactionId();
200           return op;
201         } catch (IOException e) {
202           prevException = e;
203           state = State.STREAM_FAILED;
204         }
205         break;
206       case STREAM_FAILED:
207         if (curIdx + 1 == streams.length) {
208           throw prevException;
209         }
210         long oldLast = streams[curIdx].getLastTxId();
211         long newLast = streams[curIdx + 1].getLastTxId();
212         if (newLast < oldLast) {
213           throw new IOException("We encountered an error reading " +
214               streams[curIdx].getName() + ".  During automatic edit log " +
215               "failover, we noticed that all of the remaining edit log " +
216               "streams are shorter than the current one!  The best " +
217               "remaining edit log ends at transaction " +
218               newLast + ", but we thought we could read up to transaction " +
219               oldLast + ".  If you continue, metadata will be lost forever!");
220         }
221         LOG.error("Got error reading edit log input stream " +
222           streams[curIdx].getName() + "; failing over to edit log " +
223           streams[curIdx + 1].getName(), prevException);
224         curIdx++;
225         state = State.SKIP_UNTIL;
226         break;
227       case STREAM_FAILED_RESYNC:
228         if (curIdx + 1 == streams.length) {
229           if (prevException instanceof PrematureEOFException) {
230             // bypass early EOF check
231             state = State.EOF;
232           } else {
233             streams[curIdx].resync();
234             state = State.SKIP_UNTIL;
235           }
236         } else {
237           LOG.error("failing over to edit log " +
238               streams[curIdx + 1].getName());
239           curIdx++;
240           state = State.SKIP_UNTIL;
241         }
242         break;
243       case EOF:
244         return null;
245       }
246     }
247   }
248 
249   @Override
getVersion(boolean verifyVersion)250   public int getVersion(boolean verifyVersion) throws IOException {
251     return streams[curIdx].getVersion(verifyVersion);
252   }
253 
254   @Override
getPosition()255   public long getPosition() {
256     return streams[curIdx].getPosition();
257   }
258 
259   @Override
length()260   public long length() throws IOException {
261     return streams[curIdx].length();
262   }
263 
264   @Override
isInProgress()265   public boolean isInProgress() {
266     return streams[curIdx].isInProgress();
267   }
268 
269   static private final class PrematureEOFException extends IOException {
270     private static final long serialVersionUID = 1L;
PrematureEOFException(String msg)271     PrematureEOFException(String msg) {
272       super(msg);
273     }
274   }
275 
276   @Override
setMaxOpSize(int maxOpSize)277   public void setMaxOpSize(int maxOpSize) {
278     for (EditLogInputStream elis : streams) {
279       elis.setMaxOpSize(maxOpSize);
280     }
281   }
282 
283   @Override
isLocalLog()284   public boolean isLocalLog() {
285     return streams[curIdx].isLocalLog();
286   }
287 }
288