1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20 
21 import java.io.IOException;
22 import java.io.FileNotFoundException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.ReentrantLock;
28 import java.util.concurrent.LinkedTransferQueue;
29 import java.util.concurrent.TimeUnit;
30 import java.util.Arrays;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.LinkedList;
36 import java.util.Set;
37 
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FSDataOutputStream;
42 import org.apache.hadoop.fs.FileAlreadyExistsException;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.fs.PathFilter;
47 import org.apache.hadoop.hbase.classification.InterfaceAudience;
48 import org.apache.hadoop.hbase.classification.InterfaceStability;
49 import org.apache.hadoop.hbase.procedure2.Procedure;
50 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
51 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
52 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
53 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
54 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
55 import org.apache.hadoop.hbase.util.Threads;
56 import org.apache.hadoop.ipc.RemoteException;
57 
58 import com.google.common.annotations.VisibleForTesting;
59 
60 /**
61  * WAL implementation of the ProcedureStore.
62  */
63 @InterfaceAudience.Private
64 @InterfaceStability.Evolving
65 public class WALProcedureStore extends ProcedureStoreBase {
66   private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
67 
68   public interface LeaseRecovery {
recoverFileLease(FileSystem fs, Path path)69     void recoverFileLease(FileSystem fs, Path path) throws IOException;
70   }
71 
72   private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
73     "hbase.procedure.store.wal.max.retries.before.roll";
74   private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
75 
76   private static final String WAIT_BEFORE_ROLL_CONF_KEY =
77     "hbase.procedure.store.wal.wait.before.roll";
78   private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
79 
80   private static final String ROLL_RETRIES_CONF_KEY =
81     "hbase.procedure.store.wal.max.roll.retries";
82   private static final int DEFAULT_ROLL_RETRIES = 3;
83 
84   private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
85     "hbase.procedure.store.wal.sync.failure.roll.max";
86   private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
87 
88   private static final String PERIODIC_ROLL_CONF_KEY =
89     "hbase.procedure.store.wal.periodic.roll.msec";
90   private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
91 
92   private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
93   private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
94 
95   private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
96   private static final boolean DEFAULT_USE_HSYNC = true;
97 
98   private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
99   private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
100 
101   private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
102   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
103   private final ReentrantLock lock = new ReentrantLock();
104   private final Condition waitCond = lock.newCondition();
105   private final Condition slotCond = lock.newCondition();
106   private final Condition syncCond = lock.newCondition();
107 
108   private final LeaseRecovery leaseRecovery;
109   private final Configuration conf;
110   private final FileSystem fs;
111   private final Path logDir;
112 
113   private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
114   private final AtomicBoolean loading = new AtomicBoolean(true);
115   private final AtomicBoolean inSync = new AtomicBoolean(false);
116   private final AtomicLong totalSynced = new AtomicLong(0);
117   private final AtomicLong lastRollTs = new AtomicLong(0);
118 
119   private LinkedTransferQueue<ByteSlot> slotsCache = null;
120   private Set<ProcedureWALFile> corruptedLogs = null;
121   private FSDataOutputStream stream = null;
122   private long flushLogId = 0;
123   private int slotIndex = 0;
124   private Thread syncThread;
125   private ByteSlot[] slots;
126 
127   private int maxRetriesBeforeRoll;
128   private int maxSyncFailureRoll;
129   private int waitBeforeRoll;
130   private int rollRetries;
131   private int periodicRollMsec;
132   private long rollThreshold;
133   private boolean useHsync;
134   private int syncWaitMsec;
135 
WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir, final LeaseRecovery leaseRecovery)136   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
137       final LeaseRecovery leaseRecovery) {
138     this.fs = fs;
139     this.conf = conf;
140     this.logDir = logDir;
141     this.leaseRecovery = leaseRecovery;
142   }
143 
144   @Override
start(int numSlots)145   public void start(int numSlots) throws IOException {
146     if (!setRunning(true)) {
147       return;
148     }
149 
150     // Init buffer slots
151     loading.set(true);
152     slots = new ByteSlot[numSlots];
153     slotsCache = new LinkedTransferQueue();
154     while (slotsCache.size() < numSlots) {
155       slotsCache.offer(new ByteSlot());
156     }
157 
158     // Tunings
159     maxRetriesBeforeRoll =
160       conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
161     maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
162     waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
163     rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
164     rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
165     periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
166     syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
167     useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
168 
169     // Init sync thread
170     syncThread = new Thread("WALProcedureStoreSyncThread") {
171       @Override
172       public void run() {
173         try {
174           syncLoop();
175         } catch (Throwable e) {
176           LOG.error("Got an exception from the sync-loop", e);
177           if (!isSyncAborted()) {
178             sendAbortProcessSignal();
179           }
180         }
181       }
182     };
183     syncThread.start();
184   }
185 
186   @Override
stop(boolean abort)187   public void stop(boolean abort) {
188     if (!setRunning(false)) {
189       return;
190     }
191 
192     LOG.info("Stopping the WAL Procedure Store");
193     sendStopSignal();
194 
195     if (!abort) {
196       try {
197         while (syncThread.isAlive()) {
198           sendStopSignal();
199           syncThread.join(250);
200         }
201       } catch (InterruptedException e) {
202         LOG.warn("join interrupted", e);
203         Thread.currentThread().interrupt();
204       }
205     }
206 
207     // Close the writer
208     closeStream();
209 
210     // Close the old logs
211     // they should be already closed, this is just in case the load fails
212     // and we call start() and then stop()
213     for (ProcedureWALFile log: logs) {
214       log.close();
215     }
216     logs.clear();
217   }
218 
sendStopSignal()219   private void sendStopSignal() {
220     if (lock.tryLock()) {
221       try {
222         waitCond.signalAll();
223         syncCond.signalAll();
224       } finally {
225         lock.unlock();
226       }
227     }
228   }
229 
230   @Override
getNumThreads()231   public int getNumThreads() {
232     return slots == null ? 0 : slots.length;
233   }
234 
getStoreTracker()235   public ProcedureStoreTracker getStoreTracker() {
236     return storeTracker;
237   }
238 
getActiveLogs()239   public ArrayList<ProcedureWALFile> getActiveLogs() {
240     lock.lock();
241     try {
242       return new ArrayList<ProcedureWALFile>(logs);
243     } finally {
244       lock.unlock();
245     }
246   }
247 
getCorruptedLogs()248   public Set<ProcedureWALFile> getCorruptedLogs() {
249     return corruptedLogs;
250   }
251 
252   @Override
recoverLease()253   public void recoverLease() throws IOException {
254     lock.lock();
255     try {
256       LOG.info("Starting WAL Procedure Store lease recovery");
257       FileStatus[] oldLogs = getLogFiles();
258       while (isRunning()) {
259         // Get Log-MaxID and recover lease on old logs
260         flushLogId = initOldLogs(oldLogs);
261 
262         // Create new state-log
263         if (!rollWriter(flushLogId + 1)) {
264           // someone else has already created this log
265           LOG.debug("someone else has already created log " + flushLogId);
266           continue;
267         }
268 
269         // We have the lease on the log
270         oldLogs = getLogFiles();
271         if (getMaxLogId(oldLogs) > flushLogId) {
272           if (LOG.isDebugEnabled()) {
273             LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
274           }
275           logs.getLast().removeFile();
276           continue;
277         }
278 
279         LOG.info("Lease acquired for flushLogId: " + flushLogId);
280         break;
281       }
282     } finally {
283       lock.unlock();
284     }
285   }
286 
287   @Override
load(final ProcedureLoader loader)288   public void load(final ProcedureLoader loader) throws IOException {
289     if (logs.isEmpty()) {
290       throw new RuntimeException("recoverLease() must be called before loading data");
291     }
292 
293     // Nothing to do, If we have only the current log.
294     if (logs.size() == 1) {
295       if (LOG.isDebugEnabled()) {
296         LOG.debug("No state logs to replay.");
297       }
298       loader.setMaxProcId(0);
299       loading.set(false);
300       return;
301     }
302 
303     // Load the old logs
304     Iterator<ProcedureWALFile> it = logs.descendingIterator();
305     it.next(); // Skip the current log
306     try {
307       ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
308         @Override
309         public void setMaxProcId(long maxProcId) {
310           loader.setMaxProcId(maxProcId);
311         }
312 
313         @Override
314         public void load(ProcedureIterator procIter) throws IOException {
315           loader.load(procIter);
316         }
317 
318         @Override
319         public void handleCorrupted(ProcedureIterator procIter) throws IOException {
320           loader.handleCorrupted(procIter);
321         }
322 
323         @Override
324         public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
325           if (corruptedLogs == null) {
326             corruptedLogs = new HashSet<ProcedureWALFile>();
327           }
328           corruptedLogs.add(log);
329           // TODO: sideline corrupted log
330         }
331       });
332     } finally {
333       loading.set(false);
334     }
335   }
336 
337   @Override
insert(final Procedure proc, final Procedure[] subprocs)338   public void insert(final Procedure proc, final Procedure[] subprocs) {
339     if (LOG.isTraceEnabled()) {
340       LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
341     }
342 
343     ByteSlot slot = acquireSlot();
344     try {
345       // Serialize the insert
346       long[] subProcIds = null;
347       if (subprocs != null) {
348         ProcedureWALFormat.writeInsert(slot, proc, subprocs);
349         subProcIds = new long[subprocs.length];
350         for (int i = 0; i < subprocs.length; ++i) {
351           subProcIds[i] = subprocs[i].getProcId();
352         }
353       } else {
354         assert !proc.hasParent();
355         ProcedureWALFormat.writeInsert(slot, proc);
356       }
357 
358       // Push the transaction data and wait until it is persisted
359       pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
360     } catch (IOException e) {
361       // We are not able to serialize the procedure.
362       // this is a code error, and we are not able to go on.
363       LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
364                 ", subprocs=" + Arrays.toString(subprocs), e);
365       throw new RuntimeException(e);
366     } finally {
367       releaseSlot(slot);
368     }
369   }
370 
371   @Override
update(final Procedure proc)372   public void update(final Procedure proc) {
373     if (LOG.isTraceEnabled()) {
374       LOG.trace("Update " + proc);
375     }
376 
377     ByteSlot slot = acquireSlot();
378     try {
379       // Serialize the update
380       ProcedureWALFormat.writeUpdate(slot, proc);
381 
382       // Push the transaction data and wait until it is persisted
383       pushData(PushType.UPDATE, slot, proc.getProcId(), null);
384     } catch (IOException e) {
385       // We are not able to serialize the procedure.
386       // this is a code error, and we are not able to go on.
387       LOG.fatal("Unable to serialize the procedure: " + proc, e);
388       throw new RuntimeException(e);
389     } finally {
390       releaseSlot(slot);
391     }
392   }
393 
394   @Override
delete(final long procId)395   public void delete(final long procId) {
396     if (LOG.isTraceEnabled()) {
397       LOG.trace("Delete " + procId);
398     }
399 
400     ByteSlot slot = acquireSlot();
401     try {
402       // Serialize the delete
403       ProcedureWALFormat.writeDelete(slot, procId);
404 
405       // Push the transaction data and wait until it is persisted
406       pushData(PushType.DELETE, slot, procId, null);
407     } catch (IOException e) {
408       // We are not able to serialize the procedure.
409       // this is a code error, and we are not able to go on.
410       LOG.fatal("Unable to serialize the procedure: " + procId, e);
411       throw new RuntimeException(e);
412     } finally {
413       releaseSlot(slot);
414     }
415   }
416 
acquireSlot()417   private ByteSlot acquireSlot() {
418     ByteSlot slot = slotsCache.poll();
419     return slot != null ? slot : new ByteSlot();
420   }
421 
releaseSlot(final ByteSlot slot)422   private void releaseSlot(final ByteSlot slot) {
423     slot.reset();
424     slotsCache.offer(slot);
425   }
426 
427   private enum PushType { INSERT, UPDATE, DELETE };
428 
pushData(final PushType type, final ByteSlot slot, final long procId, final long[] subProcIds)429   private long pushData(final PushType type, final ByteSlot slot,
430       final long procId, final long[] subProcIds) {
431     if (!isRunning()) {
432       throw new RuntimeException("the store must be running before inserting data");
433     }
434     if (logs.isEmpty()) {
435       throw new RuntimeException("recoverLease() must be called before inserting data");
436     }
437 
438     long logId = -1;
439     lock.lock();
440     try {
441       // Wait for the sync to be completed
442       while (true) {
443         if (!isRunning()) {
444           throw new RuntimeException("store no longer running");
445         } else if (isSyncAborted()) {
446           throw new RuntimeException("sync aborted", syncException.get());
447         } else if (inSync.get()) {
448           syncCond.await();
449         } else if (slotIndex == slots.length) {
450           slotCond.signal();
451           syncCond.await();
452         } else {
453           break;
454         }
455       }
456 
457       updateStoreTracker(type, procId, subProcIds);
458       slots[slotIndex++] = slot;
459       logId = flushLogId;
460 
461       // Notify that there is new data
462       if (slotIndex == 1) {
463         waitCond.signal();
464       }
465 
466       // Notify that the slots are full
467       if (slotIndex == slots.length) {
468         waitCond.signal();
469         slotCond.signal();
470       }
471 
472       syncCond.await();
473     } catch (InterruptedException e) {
474       Thread.currentThread().interrupt();
475       sendAbortProcessSignal();
476       throw new RuntimeException(e);
477     } finally {
478       lock.unlock();
479       if (isSyncAborted()) {
480         throw new RuntimeException("sync aborted", syncException.get());
481       }
482     }
483     return logId;
484   }
485 
updateStoreTracker(final PushType type, final long procId, final long[] subProcIds)486   private void updateStoreTracker(final PushType type,
487       final long procId, final long[] subProcIds) {
488     switch (type) {
489       case INSERT:
490         if (subProcIds == null) {
491           storeTracker.insert(procId);
492         } else {
493           storeTracker.insert(procId, subProcIds);
494         }
495         break;
496       case UPDATE:
497         storeTracker.update(procId);
498         break;
499       case DELETE:
500         storeTracker.delete(procId);
501         break;
502       default:
503         throw new RuntimeException("invalid push type " + type);
504     }
505   }
506 
isSyncAborted()507   private boolean isSyncAborted() {
508     return syncException.get() != null;
509   }
510 
syncLoop()511   private void syncLoop() throws Throwable {
512     inSync.set(false);
513     lock.lock();
514     try {
515       while (isRunning()) {
516         try {
517           // Wait until new data is available
518           if (slotIndex == 0) {
519             if (!loading.get()) {
520               periodicRoll();
521             }
522 
523             if (LOG.isTraceEnabled()) {
524               float rollTsSec = getMillisFromLastRoll() / 1000.0f;
525               LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
526                         StringUtils.humanSize(totalSynced.get()),
527                         StringUtils.humanSize(totalSynced.get() / rollTsSec)));
528             }
529 
530             waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
531             if (slotIndex == 0) {
532               // no data.. probably a stop() or a periodic roll
533               continue;
534             }
535           }
536 
537           // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
538           long syncWaitSt = System.currentTimeMillis();
539           if (slotIndex != slots.length) {
540             slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
541           }
542           long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
543           if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
544             float rollSec = getMillisFromLastRoll() / 1000.0f;
545             LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
546                       StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
547                       StringUtils.humanSize(totalSynced.get()),
548                       StringUtils.humanSize(totalSynced.get() / rollSec)));
549           }
550 
551           inSync.set(true);
552           totalSynced.addAndGet(syncSlots());
553           slotIndex = 0;
554           inSync.set(false);
555         } catch (InterruptedException e) {
556           Thread.currentThread().interrupt();
557           sendAbortProcessSignal();
558           syncException.compareAndSet(null, e);
559           throw e;
560         } catch (Throwable t) {
561           syncException.compareAndSet(null, t);
562           throw t;
563         } finally {
564           syncCond.signalAll();
565         }
566       }
567     } finally {
568       lock.unlock();
569     }
570   }
571 
syncSlots()572   private long syncSlots() throws Throwable {
573     int retry = 0;
574     int logRolled = 0;
575     long totalSynced = 0;
576     do {
577       try {
578         totalSynced = syncSlots(stream, slots, 0, slotIndex);
579         break;
580       } catch (Throwable e) {
581         LOG.warn("unable to sync slots, retry=" + retry);
582         if (++retry >= maxRetriesBeforeRoll) {
583           if (logRolled >= maxSyncFailureRoll) {
584             LOG.error("Sync slots after log roll failed, abort.", e);
585             sendAbortProcessSignal();
586             throw e;
587           }
588 
589           if (!rollWriterOrDie()) {
590             throw e;
591           }
592 
593           logRolled++;
594           retry = 0;
595         }
596       }
597     } while (isRunning());
598     return totalSynced;
599   }
600 
syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)601   protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
602       throws IOException {
603     long totalSynced = 0;
604     for (int i = 0; i < count; ++i) {
605       ByteSlot data = slots[offset + i];
606       data.writeTo(stream);
607       totalSynced += data.size();
608     }
609 
610     if (useHsync) {
611       stream.hsync();
612     } else {
613       stream.hflush();
614     }
615     sendPostSyncSignal();
616 
617     if (LOG.isTraceEnabled()) {
618       LOG.trace("Sync slots=" + count + '/' + slots.length +
619                 ", flushed=" + StringUtils.humanSize(totalSynced));
620     }
621     return totalSynced;
622   }
623 
rollWriterOrDie()624   private boolean rollWriterOrDie() {
625     for (int i = 0; i < rollRetries; ++i) {
626       if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
627 
628       try {
629         if (rollWriter()) {
630           return true;
631         }
632       } catch (IOException e) {
633         LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
634       }
635     }
636     LOG.fatal("Unable to roll the log");
637     sendAbortProcessSignal();
638     throw new RuntimeException("unable to roll the log");
639   }
640 
tryRollWriter()641   private boolean tryRollWriter() {
642     try {
643       return rollWriter();
644     } catch (IOException e) {
645       LOG.warn("Unable to roll the log", e);
646       return false;
647     }
648   }
649 
getMillisToNextPeriodicRoll()650   private long getMillisToNextPeriodicRoll() {
651     if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
652       return periodicRollMsec - getMillisFromLastRoll();
653     }
654     return Long.MAX_VALUE;
655   }
656 
getMillisFromLastRoll()657   private long getMillisFromLastRoll() {
658     return (System.currentTimeMillis() - lastRollTs.get());
659   }
660 
661   @VisibleForTesting
periodicRollForTesting()662   protected void periodicRollForTesting() throws IOException {
663     lock.lock();
664     try {
665       periodicRoll();
666     } finally {
667       lock.unlock();
668     }
669   }
670 
671   @VisibleForTesting
rollWriterForTesting()672   protected boolean rollWriterForTesting() throws IOException {
673     lock.lock();
674     try {
675       return rollWriter();
676     } finally {
677       lock.unlock();
678     }
679   }
680 
periodicRoll()681   private void periodicRoll() throws IOException {
682     if (storeTracker.isEmpty()) {
683       if (LOG.isTraceEnabled()) {
684         LOG.trace("no active procedures");
685       }
686       tryRollWriter();
687       removeAllLogs(flushLogId - 1);
688     } else {
689       if (storeTracker.isUpdated()) {
690         if (LOG.isTraceEnabled()) {
691           LOG.trace("all the active procedures are in the latest log");
692         }
693         removeAllLogs(flushLogId - 1);
694       }
695 
696       // if the log size has exceeded the roll threshold
697       // or the periodic roll timeout is expired, try to roll the wal.
698       if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
699         tryRollWriter();
700       }
701 
702       removeInactiveLogs();
703     }
704   }
705 
rollWriter()706   private boolean rollWriter() throws IOException {
707     // Create new state-log
708     if (!rollWriter(flushLogId + 1)) {
709       LOG.warn("someone else has already created log " + flushLogId);
710       return false;
711     }
712 
713     // We have the lease on the log,
714     // but we should check if someone else has created new files
715     if (getMaxLogId(getLogFiles()) > flushLogId) {
716       LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
717       logs.getLast().removeFile();
718       return false;
719     }
720 
721     // We have the lease on the log
722     return true;
723   }
724 
rollWriter(final long logId)725   private boolean rollWriter(final long logId) throws IOException {
726     assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
727     assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
728 
729     ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
730       .setVersion(ProcedureWALFormat.HEADER_VERSION)
731       .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
732       .setMinProcId(storeTracker.getMinProcId())
733       .setLogId(logId)
734       .build();
735 
736     FSDataOutputStream newStream = null;
737     Path newLogFile = null;
738     long startPos = -1;
739     newLogFile = getLogFilePath(logId);
740     try {
741       newStream = fs.create(newLogFile, false);
742     } catch (FileAlreadyExistsException e) {
743       LOG.error("Log file with id=" + logId + " already exists", e);
744       return false;
745     } catch (RemoteException re) {
746       LOG.warn("failed to create log file with id=" + logId, re);
747       return false;
748     }
749     try {
750       ProcedureWALFormat.writeHeader(newStream, header);
751       startPos = newStream.getPos();
752     } catch (IOException ioe) {
753       LOG.warn("Encountered exception writing header", ioe);
754       newStream.close();
755       return false;
756     }
757 
758     closeStream();
759 
760     storeTracker.resetUpdates();
761     stream = newStream;
762     flushLogId = logId;
763     totalSynced.set(0);
764     lastRollTs.set(System.currentTimeMillis());
765     logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
766 
767     if (LOG.isDebugEnabled()) {
768       LOG.debug("Roll new state log: " + logId);
769     }
770     return true;
771   }
772 
closeStream()773   private void closeStream() {
774     try {
775       if (stream != null) {
776         try {
777           ProcedureWALFile log = logs.getLast();
778           log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
779           ProcedureWALFormat.writeTrailer(stream, storeTracker);
780         } catch (IOException e) {
781           LOG.warn("Unable to write the trailer: " + e.getMessage());
782         }
783         stream.close();
784       }
785     } catch (IOException e) {
786       LOG.error("Unable to close the stream", e);
787     } finally {
788       stream = null;
789     }
790   }
791 
792   // ==========================================================================
793   //  Log Files cleaner helpers
794   // ==========================================================================
removeInactiveLogs()795   private void removeInactiveLogs() {
796     // Verify if the ProcId of the first oldest is still active. if not remove the file.
797     while (logs.size() > 1) {
798       ProcedureWALFile log = logs.getFirst();
799       if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
800         break;
801       }
802       removeLogFile(log);
803     }
804   }
805 
removeAllLogs(long lastLogId)806   private void removeAllLogs(long lastLogId) {
807     if (logs.size() <= 1) return;
808 
809     if (LOG.isDebugEnabled()) {
810       LOG.debug("Remove all state logs with ID less than " + lastLogId);
811     }
812     while (logs.size() > 1) {
813       ProcedureWALFile log = logs.getFirst();
814       if (lastLogId < log.getLogId()) {
815         break;
816       }
817       removeLogFile(log);
818     }
819   }
820 
removeLogFile(final ProcedureWALFile log)821   private boolean removeLogFile(final ProcedureWALFile log) {
822     try {
823       if (LOG.isDebugEnabled()) {
824         LOG.debug("Remove log: " + log);
825       }
826       log.removeFile();
827       logs.remove(log);
828       LOG.info("Remove log: " + log);
829       LOG.info("Removed logs: " + logs);
830       if (logs.size() == 0) { LOG.error("Expected at least one log"); }
831       assert logs.size() > 0 : "expected at least one log";
832     } catch (IOException e) {
833       LOG.error("Unable to remove log: " + log, e);
834       return false;
835     }
836     return true;
837   }
838 
839   // ==========================================================================
840   //  FileSystem Log Files helpers
841   // ==========================================================================
getLogDir()842   public Path getLogDir() {
843     return this.logDir;
844   }
845 
getFileSystem()846   public FileSystem getFileSystem() {
847     return this.fs;
848   }
849 
getLogFilePath(final long logId)850   protected Path getLogFilePath(final long logId) throws IOException {
851     return new Path(logDir, String.format("state-%020d.log", logId));
852   }
853 
getLogIdFromName(final String name)854   private static long getLogIdFromName(final String name) {
855     int end = name.lastIndexOf(".log");
856     int start = name.lastIndexOf('-') + 1;
857     while (start < end) {
858       if (name.charAt(start) != '0')
859         break;
860       start++;
861     }
862     return Long.parseLong(name.substring(start, end));
863   }
864 
getLogFiles()865   private FileStatus[] getLogFiles() throws IOException {
866     try {
867       return fs.listStatus(logDir, new PathFilter() {
868         @Override
869         public boolean accept(Path path) {
870           String name = path.getName();
871           return name.startsWith("state-") && name.endsWith(".log");
872         }
873       });
874     } catch (FileNotFoundException e) {
875       LOG.warn("Log directory not found: " + e.getMessage());
876       return null;
877     }
878   }
879 
880   private static long getMaxLogId(final FileStatus[] logFiles) {
881     long maxLogId = 0;
882     if (logFiles != null && logFiles.length > 0) {
883       for (int i = 0; i < logFiles.length; ++i) {
884         maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
885       }
886     }
887     return maxLogId;
888   }
889 
890   /**
891    * @return Max-LogID of the specified log file set
892    */
893   private long initOldLogs(final FileStatus[] logFiles) throws IOException {
894     this.logs.clear();
895 
896     long maxLogId = 0;
897     if (logFiles != null && logFiles.length > 0) {
898       for (int i = 0; i < logFiles.length; ++i) {
899         final Path logPath = logFiles[i].getPath();
900         leaseRecovery.recoverFileLease(fs, logPath);
901         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
902 
903         ProcedureWALFile log = initOldLog(logFiles[i]);
904         if (log != null) {
905           this.logs.add(log);
906         }
907       }
908       Collections.sort(this.logs);
909       initTrackerFromOldLogs();
910     }
911     return maxLogId;
912   }
913 
914   private void initTrackerFromOldLogs() {
915     // TODO: Load the most recent tracker available
916     if (!logs.isEmpty()) {
917       ProcedureWALFile log = logs.getLast();
918       try {
919         log.readTracker(storeTracker);
920       } catch (IOException e) {
921         LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
922         // try the next one...
923         storeTracker.reset();
924         storeTracker.setPartialFlag(true);
925       }
926     }
927   }
928 
929   private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
930     ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
931     if (logFile.getLen() == 0) {
932       LOG.warn("Remove uninitialized log: " + logFile);
933       log.removeFile();
934       return null;
935     }
936     if (LOG.isDebugEnabled()) {
937       LOG.debug("Opening state-log: " + logFile);
938     }
939     try {
940       log.open();
941     } catch (ProcedureWALFormat.InvalidWALDataException e) {
942       LOG.warn("Remove uninitialized log: " + logFile, e);
943       log.removeFile();
944       return null;
945     } catch (IOException e) {
946       String msg = "Unable to read state log: " + logFile;
947       LOG.error(msg, e);
948       throw new IOException(msg, e);
949     }
950 
951     if (log.isCompacted()) {
952       try {
953         log.readTrailer();
954       } catch (IOException e) {
955         LOG.warn("Unfinished compacted log: " + logFile, e);
956         log.removeFile();
957         return null;
958       }
959     }
960     return log;
961   }
962 }
963