1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.procedure2.store.wal; 20 21 import java.io.IOException; 22 import java.io.FileNotFoundException; 23 import java.util.concurrent.atomic.AtomicBoolean; 24 import java.util.concurrent.atomic.AtomicLong; 25 import java.util.concurrent.atomic.AtomicReference; 26 import java.util.concurrent.locks.Condition; 27 import java.util.concurrent.locks.ReentrantLock; 28 import java.util.concurrent.LinkedTransferQueue; 29 import java.util.concurrent.TimeUnit; 30 import java.util.Arrays; 31 import java.util.ArrayList; 32 import java.util.Collections; 33 import java.util.HashSet; 34 import java.util.Iterator; 35 import java.util.LinkedList; 36 import java.util.Set; 37 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.apache.hadoop.conf.Configuration; 41 import org.apache.hadoop.fs.FSDataOutputStream; 42 import org.apache.hadoop.fs.FileAlreadyExistsException; 43 import org.apache.hadoop.fs.FileStatus; 44 import org.apache.hadoop.fs.FileSystem; 45 import org.apache.hadoop.fs.Path; 46 import org.apache.hadoop.fs.PathFilter; 47 import org.apache.hadoop.hbase.classification.InterfaceAudience; 48 import org.apache.hadoop.hbase.classification.InterfaceStability; 49 import org.apache.hadoop.hbase.procedure2.Procedure; 50 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 51 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 52 import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 53 import org.apache.hadoop.hbase.procedure2.util.StringUtils; 54 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 55 import org.apache.hadoop.hbase.util.Threads; 56 import org.apache.hadoop.ipc.RemoteException; 57 58 import com.google.common.annotations.VisibleForTesting; 59 60 /** 61 * WAL implementation of the ProcedureStore. 62 */ 63 @InterfaceAudience.Private 64 @InterfaceStability.Evolving 65 public class WALProcedureStore extends ProcedureStoreBase { 66 private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); 67 68 public interface LeaseRecovery { recoverFileLease(FileSystem fs, Path path)69 void recoverFileLease(FileSystem fs, Path path) throws IOException; 70 } 71 72 private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = 73 "hbase.procedure.store.wal.max.retries.before.roll"; 74 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; 75 76 private static final String WAIT_BEFORE_ROLL_CONF_KEY = 77 "hbase.procedure.store.wal.wait.before.roll"; 78 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; 79 80 private static final String ROLL_RETRIES_CONF_KEY = 81 "hbase.procedure.store.wal.max.roll.retries"; 82 private static final int DEFAULT_ROLL_RETRIES = 3; 83 84 private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = 85 "hbase.procedure.store.wal.sync.failure.roll.max"; 86 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; 87 88 private static final String PERIODIC_ROLL_CONF_KEY = 89 "hbase.procedure.store.wal.periodic.roll.msec"; 90 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h 91 92 private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; 93 private static final int DEFAULT_SYNC_WAIT_MSEC = 100; 94 95 private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; 96 private static final boolean DEFAULT_USE_HSYNC = true; 97 98 private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; 99 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M 100 101 private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>(); 102 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); 103 private final ReentrantLock lock = new ReentrantLock(); 104 private final Condition waitCond = lock.newCondition(); 105 private final Condition slotCond = lock.newCondition(); 106 private final Condition syncCond = lock.newCondition(); 107 108 private final LeaseRecovery leaseRecovery; 109 private final Configuration conf; 110 private final FileSystem fs; 111 private final Path logDir; 112 113 private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>(); 114 private final AtomicBoolean loading = new AtomicBoolean(true); 115 private final AtomicBoolean inSync = new AtomicBoolean(false); 116 private final AtomicLong totalSynced = new AtomicLong(0); 117 private final AtomicLong lastRollTs = new AtomicLong(0); 118 119 private LinkedTransferQueue<ByteSlot> slotsCache = null; 120 private Set<ProcedureWALFile> corruptedLogs = null; 121 private FSDataOutputStream stream = null; 122 private long flushLogId = 0; 123 private int slotIndex = 0; 124 private Thread syncThread; 125 private ByteSlot[] slots; 126 127 private int maxRetriesBeforeRoll; 128 private int maxSyncFailureRoll; 129 private int waitBeforeRoll; 130 private int rollRetries; 131 private int periodicRollMsec; 132 private long rollThreshold; 133 private boolean useHsync; 134 private int syncWaitMsec; 135 WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir, final LeaseRecovery leaseRecovery)136 public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir, 137 final LeaseRecovery leaseRecovery) { 138 this.fs = fs; 139 this.conf = conf; 140 this.logDir = logDir; 141 this.leaseRecovery = leaseRecovery; 142 } 143 144 @Override start(int numSlots)145 public void start(int numSlots) throws IOException { 146 if (!setRunning(true)) { 147 return; 148 } 149 150 // Init buffer slots 151 loading.set(true); 152 slots = new ByteSlot[numSlots]; 153 slotsCache = new LinkedTransferQueue(); 154 while (slotsCache.size() < numSlots) { 155 slotsCache.offer(new ByteSlot()); 156 } 157 158 // Tunings 159 maxRetriesBeforeRoll = 160 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); 161 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); 162 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); 163 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); 164 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); 165 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); 166 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); 167 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); 168 169 // Init sync thread 170 syncThread = new Thread("WALProcedureStoreSyncThread") { 171 @Override 172 public void run() { 173 try { 174 syncLoop(); 175 } catch (Throwable e) { 176 LOG.error("Got an exception from the sync-loop", e); 177 if (!isSyncAborted()) { 178 sendAbortProcessSignal(); 179 } 180 } 181 } 182 }; 183 syncThread.start(); 184 } 185 186 @Override stop(boolean abort)187 public void stop(boolean abort) { 188 if (!setRunning(false)) { 189 return; 190 } 191 192 LOG.info("Stopping the WAL Procedure Store"); 193 sendStopSignal(); 194 195 if (!abort) { 196 try { 197 while (syncThread.isAlive()) { 198 sendStopSignal(); 199 syncThread.join(250); 200 } 201 } catch (InterruptedException e) { 202 LOG.warn("join interrupted", e); 203 Thread.currentThread().interrupt(); 204 } 205 } 206 207 // Close the writer 208 closeStream(); 209 210 // Close the old logs 211 // they should be already closed, this is just in case the load fails 212 // and we call start() and then stop() 213 for (ProcedureWALFile log: logs) { 214 log.close(); 215 } 216 logs.clear(); 217 } 218 sendStopSignal()219 private void sendStopSignal() { 220 if (lock.tryLock()) { 221 try { 222 waitCond.signalAll(); 223 syncCond.signalAll(); 224 } finally { 225 lock.unlock(); 226 } 227 } 228 } 229 230 @Override getNumThreads()231 public int getNumThreads() { 232 return slots == null ? 0 : slots.length; 233 } 234 getStoreTracker()235 public ProcedureStoreTracker getStoreTracker() { 236 return storeTracker; 237 } 238 getActiveLogs()239 public ArrayList<ProcedureWALFile> getActiveLogs() { 240 lock.lock(); 241 try { 242 return new ArrayList<ProcedureWALFile>(logs); 243 } finally { 244 lock.unlock(); 245 } 246 } 247 getCorruptedLogs()248 public Set<ProcedureWALFile> getCorruptedLogs() { 249 return corruptedLogs; 250 } 251 252 @Override recoverLease()253 public void recoverLease() throws IOException { 254 lock.lock(); 255 try { 256 LOG.info("Starting WAL Procedure Store lease recovery"); 257 FileStatus[] oldLogs = getLogFiles(); 258 while (isRunning()) { 259 // Get Log-MaxID and recover lease on old logs 260 flushLogId = initOldLogs(oldLogs); 261 262 // Create new state-log 263 if (!rollWriter(flushLogId + 1)) { 264 // someone else has already created this log 265 LOG.debug("someone else has already created log " + flushLogId); 266 continue; 267 } 268 269 // We have the lease on the log 270 oldLogs = getLogFiles(); 271 if (getMaxLogId(oldLogs) > flushLogId) { 272 if (LOG.isDebugEnabled()) { 273 LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); 274 } 275 logs.getLast().removeFile(); 276 continue; 277 } 278 279 LOG.info("Lease acquired for flushLogId: " + flushLogId); 280 break; 281 } 282 } finally { 283 lock.unlock(); 284 } 285 } 286 287 @Override load(final ProcedureLoader loader)288 public void load(final ProcedureLoader loader) throws IOException { 289 if (logs.isEmpty()) { 290 throw new RuntimeException("recoverLease() must be called before loading data"); 291 } 292 293 // Nothing to do, If we have only the current log. 294 if (logs.size() == 1) { 295 if (LOG.isDebugEnabled()) { 296 LOG.debug("No state logs to replay."); 297 } 298 loader.setMaxProcId(0); 299 loading.set(false); 300 return; 301 } 302 303 // Load the old logs 304 Iterator<ProcedureWALFile> it = logs.descendingIterator(); 305 it.next(); // Skip the current log 306 try { 307 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { 308 @Override 309 public void setMaxProcId(long maxProcId) { 310 loader.setMaxProcId(maxProcId); 311 } 312 313 @Override 314 public void load(ProcedureIterator procIter) throws IOException { 315 loader.load(procIter); 316 } 317 318 @Override 319 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 320 loader.handleCorrupted(procIter); 321 } 322 323 @Override 324 public void markCorruptedWAL(ProcedureWALFile log, IOException e) { 325 if (corruptedLogs == null) { 326 corruptedLogs = new HashSet<ProcedureWALFile>(); 327 } 328 corruptedLogs.add(log); 329 // TODO: sideline corrupted log 330 } 331 }); 332 } finally { 333 loading.set(false); 334 } 335 } 336 337 @Override insert(final Procedure proc, final Procedure[] subprocs)338 public void insert(final Procedure proc, final Procedure[] subprocs) { 339 if (LOG.isTraceEnabled()) { 340 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); 341 } 342 343 ByteSlot slot = acquireSlot(); 344 try { 345 // Serialize the insert 346 long[] subProcIds = null; 347 if (subprocs != null) { 348 ProcedureWALFormat.writeInsert(slot, proc, subprocs); 349 subProcIds = new long[subprocs.length]; 350 for (int i = 0; i < subprocs.length; ++i) { 351 subProcIds[i] = subprocs[i].getProcId(); 352 } 353 } else { 354 assert !proc.hasParent(); 355 ProcedureWALFormat.writeInsert(slot, proc); 356 } 357 358 // Push the transaction data and wait until it is persisted 359 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); 360 } catch (IOException e) { 361 // We are not able to serialize the procedure. 362 // this is a code error, and we are not able to go on. 363 LOG.fatal("Unable to serialize one of the procedure: proc=" + proc + 364 ", subprocs=" + Arrays.toString(subprocs), e); 365 throw new RuntimeException(e); 366 } finally { 367 releaseSlot(slot); 368 } 369 } 370 371 @Override update(final Procedure proc)372 public void update(final Procedure proc) { 373 if (LOG.isTraceEnabled()) { 374 LOG.trace("Update " + proc); 375 } 376 377 ByteSlot slot = acquireSlot(); 378 try { 379 // Serialize the update 380 ProcedureWALFormat.writeUpdate(slot, proc); 381 382 // Push the transaction data and wait until it is persisted 383 pushData(PushType.UPDATE, slot, proc.getProcId(), null); 384 } catch (IOException e) { 385 // We are not able to serialize the procedure. 386 // this is a code error, and we are not able to go on. 387 LOG.fatal("Unable to serialize the procedure: " + proc, e); 388 throw new RuntimeException(e); 389 } finally { 390 releaseSlot(slot); 391 } 392 } 393 394 @Override delete(final long procId)395 public void delete(final long procId) { 396 if (LOG.isTraceEnabled()) { 397 LOG.trace("Delete " + procId); 398 } 399 400 ByteSlot slot = acquireSlot(); 401 try { 402 // Serialize the delete 403 ProcedureWALFormat.writeDelete(slot, procId); 404 405 // Push the transaction data and wait until it is persisted 406 pushData(PushType.DELETE, slot, procId, null); 407 } catch (IOException e) { 408 // We are not able to serialize the procedure. 409 // this is a code error, and we are not able to go on. 410 LOG.fatal("Unable to serialize the procedure: " + procId, e); 411 throw new RuntimeException(e); 412 } finally { 413 releaseSlot(slot); 414 } 415 } 416 acquireSlot()417 private ByteSlot acquireSlot() { 418 ByteSlot slot = slotsCache.poll(); 419 return slot != null ? slot : new ByteSlot(); 420 } 421 releaseSlot(final ByteSlot slot)422 private void releaseSlot(final ByteSlot slot) { 423 slot.reset(); 424 slotsCache.offer(slot); 425 } 426 427 private enum PushType { INSERT, UPDATE, DELETE }; 428 pushData(final PushType type, final ByteSlot slot, final long procId, final long[] subProcIds)429 private long pushData(final PushType type, final ByteSlot slot, 430 final long procId, final long[] subProcIds) { 431 if (!isRunning()) { 432 throw new RuntimeException("the store must be running before inserting data"); 433 } 434 if (logs.isEmpty()) { 435 throw new RuntimeException("recoverLease() must be called before inserting data"); 436 } 437 438 long logId = -1; 439 lock.lock(); 440 try { 441 // Wait for the sync to be completed 442 while (true) { 443 if (!isRunning()) { 444 throw new RuntimeException("store no longer running"); 445 } else if (isSyncAborted()) { 446 throw new RuntimeException("sync aborted", syncException.get()); 447 } else if (inSync.get()) { 448 syncCond.await(); 449 } else if (slotIndex == slots.length) { 450 slotCond.signal(); 451 syncCond.await(); 452 } else { 453 break; 454 } 455 } 456 457 updateStoreTracker(type, procId, subProcIds); 458 slots[slotIndex++] = slot; 459 logId = flushLogId; 460 461 // Notify that there is new data 462 if (slotIndex == 1) { 463 waitCond.signal(); 464 } 465 466 // Notify that the slots are full 467 if (slotIndex == slots.length) { 468 waitCond.signal(); 469 slotCond.signal(); 470 } 471 472 syncCond.await(); 473 } catch (InterruptedException e) { 474 Thread.currentThread().interrupt(); 475 sendAbortProcessSignal(); 476 throw new RuntimeException(e); 477 } finally { 478 lock.unlock(); 479 if (isSyncAborted()) { 480 throw new RuntimeException("sync aborted", syncException.get()); 481 } 482 } 483 return logId; 484 } 485 updateStoreTracker(final PushType type, final long procId, final long[] subProcIds)486 private void updateStoreTracker(final PushType type, 487 final long procId, final long[] subProcIds) { 488 switch (type) { 489 case INSERT: 490 if (subProcIds == null) { 491 storeTracker.insert(procId); 492 } else { 493 storeTracker.insert(procId, subProcIds); 494 } 495 break; 496 case UPDATE: 497 storeTracker.update(procId); 498 break; 499 case DELETE: 500 storeTracker.delete(procId); 501 break; 502 default: 503 throw new RuntimeException("invalid push type " + type); 504 } 505 } 506 isSyncAborted()507 private boolean isSyncAborted() { 508 return syncException.get() != null; 509 } 510 syncLoop()511 private void syncLoop() throws Throwable { 512 inSync.set(false); 513 lock.lock(); 514 try { 515 while (isRunning()) { 516 try { 517 // Wait until new data is available 518 if (slotIndex == 0) { 519 if (!loading.get()) { 520 periodicRoll(); 521 } 522 523 if (LOG.isTraceEnabled()) { 524 float rollTsSec = getMillisFromLastRoll() / 1000.0f; 525 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", 526 StringUtils.humanSize(totalSynced.get()), 527 StringUtils.humanSize(totalSynced.get() / rollTsSec))); 528 } 529 530 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); 531 if (slotIndex == 0) { 532 // no data.. probably a stop() or a periodic roll 533 continue; 534 } 535 } 536 537 // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing 538 long syncWaitSt = System.currentTimeMillis(); 539 if (slotIndex != slots.length) { 540 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); 541 } 542 long syncWaitMs = System.currentTimeMillis() - syncWaitSt; 543 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { 544 float rollSec = getMillisFromLastRoll() / 1000.0f; 545 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", 546 StringUtils.humanTimeDiff(syncWaitMs), slotIndex, 547 StringUtils.humanSize(totalSynced.get()), 548 StringUtils.humanSize(totalSynced.get() / rollSec))); 549 } 550 551 inSync.set(true); 552 totalSynced.addAndGet(syncSlots()); 553 slotIndex = 0; 554 inSync.set(false); 555 } catch (InterruptedException e) { 556 Thread.currentThread().interrupt(); 557 sendAbortProcessSignal(); 558 syncException.compareAndSet(null, e); 559 throw e; 560 } catch (Throwable t) { 561 syncException.compareAndSet(null, t); 562 throw t; 563 } finally { 564 syncCond.signalAll(); 565 } 566 } 567 } finally { 568 lock.unlock(); 569 } 570 } 571 syncSlots()572 private long syncSlots() throws Throwable { 573 int retry = 0; 574 int logRolled = 0; 575 long totalSynced = 0; 576 do { 577 try { 578 totalSynced = syncSlots(stream, slots, 0, slotIndex); 579 break; 580 } catch (Throwable e) { 581 LOG.warn("unable to sync slots, retry=" + retry); 582 if (++retry >= maxRetriesBeforeRoll) { 583 if (logRolled >= maxSyncFailureRoll) { 584 LOG.error("Sync slots after log roll failed, abort.", e); 585 sendAbortProcessSignal(); 586 throw e; 587 } 588 589 if (!rollWriterOrDie()) { 590 throw e; 591 } 592 593 logRolled++; 594 retry = 0; 595 } 596 } 597 } while (isRunning()); 598 return totalSynced; 599 } 600 syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)601 protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) 602 throws IOException { 603 long totalSynced = 0; 604 for (int i = 0; i < count; ++i) { 605 ByteSlot data = slots[offset + i]; 606 data.writeTo(stream); 607 totalSynced += data.size(); 608 } 609 610 if (useHsync) { 611 stream.hsync(); 612 } else { 613 stream.hflush(); 614 } 615 sendPostSyncSignal(); 616 617 if (LOG.isTraceEnabled()) { 618 LOG.trace("Sync slots=" + count + '/' + slots.length + 619 ", flushed=" + StringUtils.humanSize(totalSynced)); 620 } 621 return totalSynced; 622 } 623 rollWriterOrDie()624 private boolean rollWriterOrDie() { 625 for (int i = 0; i < rollRetries; ++i) { 626 if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); 627 628 try { 629 if (rollWriter()) { 630 return true; 631 } 632 } catch (IOException e) { 633 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); 634 } 635 } 636 LOG.fatal("Unable to roll the log"); 637 sendAbortProcessSignal(); 638 throw new RuntimeException("unable to roll the log"); 639 } 640 tryRollWriter()641 private boolean tryRollWriter() { 642 try { 643 return rollWriter(); 644 } catch (IOException e) { 645 LOG.warn("Unable to roll the log", e); 646 return false; 647 } 648 } 649 getMillisToNextPeriodicRoll()650 private long getMillisToNextPeriodicRoll() { 651 if (lastRollTs.get() > 0 && periodicRollMsec > 0) { 652 return periodicRollMsec - getMillisFromLastRoll(); 653 } 654 return Long.MAX_VALUE; 655 } 656 getMillisFromLastRoll()657 private long getMillisFromLastRoll() { 658 return (System.currentTimeMillis() - lastRollTs.get()); 659 } 660 661 @VisibleForTesting periodicRollForTesting()662 protected void periodicRollForTesting() throws IOException { 663 lock.lock(); 664 try { 665 periodicRoll(); 666 } finally { 667 lock.unlock(); 668 } 669 } 670 671 @VisibleForTesting rollWriterForTesting()672 protected boolean rollWriterForTesting() throws IOException { 673 lock.lock(); 674 try { 675 return rollWriter(); 676 } finally { 677 lock.unlock(); 678 } 679 } 680 periodicRoll()681 private void periodicRoll() throws IOException { 682 if (storeTracker.isEmpty()) { 683 if (LOG.isTraceEnabled()) { 684 LOG.trace("no active procedures"); 685 } 686 tryRollWriter(); 687 removeAllLogs(flushLogId - 1); 688 } else { 689 if (storeTracker.isUpdated()) { 690 if (LOG.isTraceEnabled()) { 691 LOG.trace("all the active procedures are in the latest log"); 692 } 693 removeAllLogs(flushLogId - 1); 694 } 695 696 // if the log size has exceeded the roll threshold 697 // or the periodic roll timeout is expired, try to roll the wal. 698 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { 699 tryRollWriter(); 700 } 701 702 removeInactiveLogs(); 703 } 704 } 705 rollWriter()706 private boolean rollWriter() throws IOException { 707 // Create new state-log 708 if (!rollWriter(flushLogId + 1)) { 709 LOG.warn("someone else has already created log " + flushLogId); 710 return false; 711 } 712 713 // We have the lease on the log, 714 // but we should check if someone else has created new files 715 if (getMaxLogId(getLogFiles()) > flushLogId) { 716 LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); 717 logs.getLast().removeFile(); 718 return false; 719 } 720 721 // We have the lease on the log 722 return true; 723 } 724 rollWriter(final long logId)725 private boolean rollWriter(final long logId) throws IOException { 726 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; 727 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); 728 729 ProcedureWALHeader header = ProcedureWALHeader.newBuilder() 730 .setVersion(ProcedureWALFormat.HEADER_VERSION) 731 .setType(ProcedureWALFormat.LOG_TYPE_STREAM) 732 .setMinProcId(storeTracker.getMinProcId()) 733 .setLogId(logId) 734 .build(); 735 736 FSDataOutputStream newStream = null; 737 Path newLogFile = null; 738 long startPos = -1; 739 newLogFile = getLogFilePath(logId); 740 try { 741 newStream = fs.create(newLogFile, false); 742 } catch (FileAlreadyExistsException e) { 743 LOG.error("Log file with id=" + logId + " already exists", e); 744 return false; 745 } catch (RemoteException re) { 746 LOG.warn("failed to create log file with id=" + logId, re); 747 return false; 748 } 749 try { 750 ProcedureWALFormat.writeHeader(newStream, header); 751 startPos = newStream.getPos(); 752 } catch (IOException ioe) { 753 LOG.warn("Encountered exception writing header", ioe); 754 newStream.close(); 755 return false; 756 } 757 758 closeStream(); 759 760 storeTracker.resetUpdates(); 761 stream = newStream; 762 flushLogId = logId; 763 totalSynced.set(0); 764 lastRollTs.set(System.currentTimeMillis()); 765 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); 766 767 if (LOG.isDebugEnabled()) { 768 LOG.debug("Roll new state log: " + logId); 769 } 770 return true; 771 } 772 closeStream()773 private void closeStream() { 774 try { 775 if (stream != null) { 776 try { 777 ProcedureWALFile log = logs.getLast(); 778 log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); 779 ProcedureWALFormat.writeTrailer(stream, storeTracker); 780 } catch (IOException e) { 781 LOG.warn("Unable to write the trailer: " + e.getMessage()); 782 } 783 stream.close(); 784 } 785 } catch (IOException e) { 786 LOG.error("Unable to close the stream", e); 787 } finally { 788 stream = null; 789 } 790 } 791 792 // ========================================================================== 793 // Log Files cleaner helpers 794 // ========================================================================== removeInactiveLogs()795 private void removeInactiveLogs() { 796 // Verify if the ProcId of the first oldest is still active. if not remove the file. 797 while (logs.size() > 1) { 798 ProcedureWALFile log = logs.getFirst(); 799 if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { 800 break; 801 } 802 removeLogFile(log); 803 } 804 } 805 removeAllLogs(long lastLogId)806 private void removeAllLogs(long lastLogId) { 807 if (logs.size() <= 1) return; 808 809 if (LOG.isDebugEnabled()) { 810 LOG.debug("Remove all state logs with ID less than " + lastLogId); 811 } 812 while (logs.size() > 1) { 813 ProcedureWALFile log = logs.getFirst(); 814 if (lastLogId < log.getLogId()) { 815 break; 816 } 817 removeLogFile(log); 818 } 819 } 820 removeLogFile(final ProcedureWALFile log)821 private boolean removeLogFile(final ProcedureWALFile log) { 822 try { 823 if (LOG.isDebugEnabled()) { 824 LOG.debug("Remove log: " + log); 825 } 826 log.removeFile(); 827 logs.remove(log); 828 LOG.info("Remove log: " + log); 829 LOG.info("Removed logs: " + logs); 830 if (logs.size() == 0) { LOG.error("Expected at least one log"); } 831 assert logs.size() > 0 : "expected at least one log"; 832 } catch (IOException e) { 833 LOG.error("Unable to remove log: " + log, e); 834 return false; 835 } 836 return true; 837 } 838 839 // ========================================================================== 840 // FileSystem Log Files helpers 841 // ========================================================================== getLogDir()842 public Path getLogDir() { 843 return this.logDir; 844 } 845 getFileSystem()846 public FileSystem getFileSystem() { 847 return this.fs; 848 } 849 getLogFilePath(final long logId)850 protected Path getLogFilePath(final long logId) throws IOException { 851 return new Path(logDir, String.format("state-%020d.log", logId)); 852 } 853 getLogIdFromName(final String name)854 private static long getLogIdFromName(final String name) { 855 int end = name.lastIndexOf(".log"); 856 int start = name.lastIndexOf('-') + 1; 857 while (start < end) { 858 if (name.charAt(start) != '0') 859 break; 860 start++; 861 } 862 return Long.parseLong(name.substring(start, end)); 863 } 864 getLogFiles()865 private FileStatus[] getLogFiles() throws IOException { 866 try { 867 return fs.listStatus(logDir, new PathFilter() { 868 @Override 869 public boolean accept(Path path) { 870 String name = path.getName(); 871 return name.startsWith("state-") && name.endsWith(".log"); 872 } 873 }); 874 } catch (FileNotFoundException e) { 875 LOG.warn("Log directory not found: " + e.getMessage()); 876 return null; 877 } 878 } 879 880 private static long getMaxLogId(final FileStatus[] logFiles) { 881 long maxLogId = 0; 882 if (logFiles != null && logFiles.length > 0) { 883 for (int i = 0; i < logFiles.length; ++i) { 884 maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName())); 885 } 886 } 887 return maxLogId; 888 } 889 890 /** 891 * @return Max-LogID of the specified log file set 892 */ 893 private long initOldLogs(final FileStatus[] logFiles) throws IOException { 894 this.logs.clear(); 895 896 long maxLogId = 0; 897 if (logFiles != null && logFiles.length > 0) { 898 for (int i = 0; i < logFiles.length; ++i) { 899 final Path logPath = logFiles[i].getPath(); 900 leaseRecovery.recoverFileLease(fs, logPath); 901 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); 902 903 ProcedureWALFile log = initOldLog(logFiles[i]); 904 if (log != null) { 905 this.logs.add(log); 906 } 907 } 908 Collections.sort(this.logs); 909 initTrackerFromOldLogs(); 910 } 911 return maxLogId; 912 } 913 914 private void initTrackerFromOldLogs() { 915 // TODO: Load the most recent tracker available 916 if (!logs.isEmpty()) { 917 ProcedureWALFile log = logs.getLast(); 918 try { 919 log.readTracker(storeTracker); 920 } catch (IOException e) { 921 LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); 922 // try the next one... 923 storeTracker.reset(); 924 storeTracker.setPartialFlag(true); 925 } 926 } 927 } 928 929 private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException { 930 ProcedureWALFile log = new ProcedureWALFile(fs, logFile); 931 if (logFile.getLen() == 0) { 932 LOG.warn("Remove uninitialized log: " + logFile); 933 log.removeFile(); 934 return null; 935 } 936 if (LOG.isDebugEnabled()) { 937 LOG.debug("Opening state-log: " + logFile); 938 } 939 try { 940 log.open(); 941 } catch (ProcedureWALFormat.InvalidWALDataException e) { 942 LOG.warn("Remove uninitialized log: " + logFile, e); 943 log.removeFile(); 944 return null; 945 } catch (IOException e) { 946 String msg = "Unable to read state log: " + logFile; 947 LOG.error(msg, e); 948 throw new IOException(msg, e); 949 } 950 951 if (log.isCompacted()) { 952 try { 953 log.readTrailer(); 954 } catch (IOException e) { 955 LOG.warn("Unfinished compacted log: " + logFile, e); 956 log.removeFile(); 957 return null; 958 } 959 } 960 return log; 961 } 962 } 963