1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.impl; 9 10 import static com.sleepycat.je.rep.NoConsistencyRequiredPolicy.NO_CONSISTENCY; 11 import static com.sleepycat.je.rep.impl.RepParams.NODE_NAME; 12 import static com.sleepycat.je.rep.impl.RepParams.TEST_JE_VERSION; 13 import static com.sleepycat.je.rep.impl.RepParams.VLSN_MAX_DIST; 14 import static com.sleepycat.je.rep.impl.RepParams.VLSN_MAX_MAP; 15 import static com.sleepycat.je.rep.impl.RepParams.VLSN_STRIDE; 16 17 import java.io.File; 18 import java.io.IOException; 19 import java.io.PrintWriter; 20 import java.net.InetSocketAddress; 21 import java.util.ArrayList; 22 import java.util.Collection; 23 import java.util.HashSet; 24 import java.util.List; 25 import java.util.Map; 26 import java.util.NavigableSet; 27 import java.util.Set; 28 import java.util.SortedSet; 29 import java.util.StringTokenizer; 30 import java.util.UUID; 31 import java.util.concurrent.CountDownLatch; 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.locks.ReentrantLock; 34 import java.util.concurrent.locks.ReentrantReadWriteLock; 35 import java.util.logging.Formatter; 36 import java.util.logging.Level; 37 38 import com.sleepycat.je.CheckpointConfig; 39 import com.sleepycat.je.Database; 40 import com.sleepycat.je.DatabaseConfig; 41 import com.sleepycat.je.DatabaseException; 42 import com.sleepycat.je.DatabaseNotFoundException; 43 import com.sleepycat.je.Durability; 44 import com.sleepycat.je.Durability.ReplicaAckPolicy; 45 import com.sleepycat.je.Durability.SyncPolicy; 46 import com.sleepycat.je.Environment; 47 import com.sleepycat.je.EnvironmentConfig; 48 import com.sleepycat.je.EnvironmentFailureException; 49 import com.sleepycat.je.EnvironmentLockedException; 50 import com.sleepycat.je.EnvironmentNotFoundException; 51 import com.sleepycat.je.JEVersion; 52 import com.sleepycat.je.ProgressListener; 53 import com.sleepycat.je.ReplicaConsistencyPolicy; 54 import com.sleepycat.je.StatsConfig; 55 import com.sleepycat.je.ThreadInterruptedException; 56 import com.sleepycat.je.TransactionConfig; 57 import com.sleepycat.je.TransactionTimeoutException; 58 import com.sleepycat.je.dbi.DatabaseId; 59 import com.sleepycat.je.dbi.DatabaseImpl; 60 import com.sleepycat.je.dbi.DbConfigManager; 61 import com.sleepycat.je.dbi.DbTree; 62 import com.sleepycat.je.dbi.DbType; 63 import com.sleepycat.je.dbi.EnvironmentFailureReason; 64 import com.sleepycat.je.dbi.EnvironmentImpl; 65 import com.sleepycat.je.dbi.RepConfigProxy; 66 import com.sleepycat.je.dbi.StartupTracker.Phase; 67 import com.sleepycat.je.log.LogEntryHeader; 68 import com.sleepycat.je.log.LogEntryType; 69 import com.sleepycat.je.log.LogItem; 70 import com.sleepycat.je.recovery.RecoveryInfo; 71 import com.sleepycat.je.recovery.VLSNRecoveryProxy; 72 import com.sleepycat.je.rep.DatabasePreemptedException; 73 import com.sleepycat.je.rep.InsufficientAcksException; 74 import com.sleepycat.je.rep.InsufficientReplicasException; 75 import com.sleepycat.je.rep.LockPreemptedException; 76 import com.sleepycat.je.rep.LogFileRewriteListener; 77 import com.sleepycat.je.rep.LogOverwriteException; 78 import com.sleepycat.je.rep.QuorumPolicy; 79 import com.sleepycat.je.rep.RepInternal; 80 import com.sleepycat.je.rep.RepStatManager; 81 import com.sleepycat.je.rep.ReplicaConsistencyException; 82 import com.sleepycat.je.rep.ReplicaWriteException; 83 import com.sleepycat.je.rep.ReplicatedEnvironment; 84 import com.sleepycat.je.rep.ReplicatedEnvironmentStats; 85 import com.sleepycat.je.rep.ReplicationConfig; 86 import com.sleepycat.je.rep.ReplicationMutableConfig; 87 import com.sleepycat.je.rep.ReplicationNetworkConfig; 88 import com.sleepycat.je.rep.RestartRequiredException; 89 import com.sleepycat.je.rep.RollbackException; 90 import com.sleepycat.je.rep.StateChangeEvent; 91 import com.sleepycat.je.rep.StateChangeListener; 92 import com.sleepycat.je.rep.SyncupProgress; 93 import com.sleepycat.je.rep.UnknownMasterException; 94 import com.sleepycat.je.rep.impl.node.LocalCBVLSNUpdater; 95 import com.sleepycat.je.rep.impl.node.MasterTransfer; 96 import com.sleepycat.je.rep.impl.node.NameIdPair; 97 import com.sleepycat.je.rep.impl.node.NodeState; 98 import com.sleepycat.je.rep.impl.node.RepNode; 99 import com.sleepycat.je.rep.impl.node.Replay; 100 import com.sleepycat.je.rep.net.DataChannelFactory; 101 import com.sleepycat.je.rep.stream.FeederReader; 102 import com.sleepycat.je.rep.stream.FeederTxns; 103 import com.sleepycat.je.rep.txn.MasterThreadLocker; 104 import com.sleepycat.je.rep.txn.MasterTxn; 105 import com.sleepycat.je.rep.txn.ReadonlyTxn; 106 import com.sleepycat.je.rep.txn.ReplayTxn; 107 import com.sleepycat.je.rep.txn.ReplicaThreadLocker; 108 import com.sleepycat.je.rep.utilint.HostPortPair; 109 import com.sleepycat.je.rep.utilint.RepUtils; 110 import com.sleepycat.je.rep.utilint.ReplicationFormatter; 111 import com.sleepycat.je.rep.utilint.StatCaptureRepDefinitions; 112 import com.sleepycat.je.rep.utilint.net.DataChannelFactoryBuilder; 113 import com.sleepycat.je.rep.vlsn.VLSNIndex; 114 import com.sleepycat.je.rep.vlsn.VLSNRange; 115 import com.sleepycat.je.rep.vlsn.VLSNRecoveryTracker; 116 import com.sleepycat.je.statcap.StatManager; 117 import com.sleepycat.je.txn.Locker; 118 import com.sleepycat.je.txn.ThreadLocker; 119 import com.sleepycat.je.txn.Txn; 120 import com.sleepycat.je.util.DbBackup; 121 import com.sleepycat.je.utilint.BooleanStat; 122 import com.sleepycat.je.utilint.DbLsn; 123 import com.sleepycat.je.utilint.LoggerUtils; 124 import com.sleepycat.je.utilint.StatGroup; 125 import com.sleepycat.je.utilint.StringStat; 126 import com.sleepycat.je.utilint.VLSN; 127 128 public class RepImpl 129 extends EnvironmentImpl 130 implements RepEnvConfigObserver { 131 132 private VLSNIndex vlsnIndex; 133 /* VLSNIndexAccess coordinates the closing of the vlsn index */ 134 private final VLSNIndexAccess vlsnIndexAccess = new VLSNIndexAccess(); 135 136 private final FeederTxns feederTxns; 137 138 /* 139 * The repNode is only non-null when the replicated environment has joined 140 * a group. It's null otherwise. 141 */ 142 private volatile RepNode repNode; 143 private Replay replay; 144 145 /* 146 * This is the canonical nameIdPair instance used by the node. The internal 147 * Id part of the pair will be updated when the node actually joins the 148 * group. 149 */ 150 private NameIdPair nameIdPair; 151 152 private final NodeState nodeState; 153 154 /* 155 * The clockskew used by this environment in ms. It's only used by testing 156 * to inject clock skew between ReplicatedEnvironments. 157 */ 158 private static int clockSkewMs = 0; 159 160 /* 161 * A handle to the group database. This handle is initialized lazily when 162 * the contents of the database are first required. It's set to null upon 163 * shutdown. The handle must be initialized lazily because the database is 164 * created by the master, and we only know master identity later. The 165 * RepImpl manages the rep group database, so that the lifetime of the 166 * databaseImpl handle can be managed more easily to mesh with the opening 167 * and closing of the RepImpl. 168 */ 169 private DatabaseImpl groupDbImpl = null; 170 171 /* The status presents whether this replica is doing rollback. */ 172 private boolean backupProhibited = false; 173 174 /* 175 * Represents whether this Environment is allowed to convert a 176 * non-replicated Environment to replicated. 177 */ 178 private boolean allowConvert = false; 179 180 /** Config params for preserving and caching the VLSN. */ 181 private boolean preserveVLSN; 182 private boolean cacheVLSN; 183 private int cachedVLSNMinLength; 184 185 /* Keep an eye on the ongoing DbBackups. */ 186 private final Set<DbBackup> backups = new HashSet<DbBackup>(); 187 188 /* 189 * The list of observers who are notified when a mutable rep param changes. 190 */ 191 private final List<RepEnvConfigObserver> repConfigObservers; 192 193 /* 194 * Lock used to control access and lazy initialization of groupDbImpl, 195 * ensuring that there is exactly one database made. A mutex is used rather 196 * than synchronization to allow us to probe for contention on the 197 * groupDbImpl. 198 */ 199 private final ReentrantLock groupDbLock = new ReentrantLock(); 200 201 private int replicaAckTimeout; 202 private int insufficientReplicasTimeout; 203 private int replayTxnTimeout; 204 private ReplicaConsistencyPolicy defaultConsistencyPolicy; 205 206 /* 207 * NodeStats are currently not public, but we may want to evaluate 208 * and decide if they would be useful, perhaps as a debugging aid. 209 */ 210 private final StatGroup nodeStats; 211 private final BooleanStat hardRecoveryStat; 212 private final StringStat hardRecoveryInfoStat; 213 214 /* 215 * Used to block transaction commit/abort execution just before completing 216 * a Master Transfer operation. 217 */ 218 private volatile CountDownLatch blockTxnLatch = new CountDownLatch(0); 219 220 /** 221 * A lock used to coordinate access to {@link #blockTxnLatch}. 222 * <p> 223 * When a Master Transfer operation completes Phase 1, it sets a new {@code 224 * CountDownLatch} in order to block the completion of transactions at the 225 * commit or abort stage. We must avoid having it do so at an awkward 226 * moment. There are two (unrelated) cases: 227 * <ol> 228 * <li>There is a brief period between the time a transaction "awaits" the 229 * latch (in {@code checkBlock()}) and the time it publishes its VLSN. We 230 * must avoid having Master Transfer read its "ultimate goal" VLSN during 231 * that interval. 232 * <li>The Feeder input thread occasionally updates the GroupDB, upon 233 * receiving a Heartbeat response. That happens in a transaction, like any 234 * other, so it could be subject to the normal blockage in Phase 2. But 235 * the Feeder input thread is of course also the thread that we rely on for 236 * making progress towards the goal of Master Transfer; so blocking it is 237 * counterproductive. 238 * </ol> 239 * 240 * @see MasterTransfer 241 * @see ReplicatedEnvironment#transferMaster 242 */ 243 private final ReentrantReadWriteLock blockLatchLock = 244 new ReentrantReadWriteLock(true); 245 246 /* application listener for syncups. */ 247 private final ProgressListener<SyncupProgress> syncupProgressListener; 248 249 /* Application callback to be notified before we overwrite log files. */ 250 private final LogFileRewriteListener logRewriteListener; 251 252 /* Configuration for ServiceDispatcher communication */ 253 private final ReplicationNetworkConfig repNetConfig; 254 255 /* 256 * Factory for creating channel instances. Not available until 257 * initializeChannelFactory is called. 258 */ 259 private volatile DataChannelFactory channelFactory; 260 RepImpl(File envHome, EnvironmentConfig envConfig, EnvironmentImpl sharedCacheEnv, RepConfigProxy repConfigProxy)261 public RepImpl(File envHome, 262 EnvironmentConfig envConfig, 263 EnvironmentImpl sharedCacheEnv, 264 RepConfigProxy repConfigProxy) 265 throws EnvironmentNotFoundException, EnvironmentLockedException { 266 267 super(envHome, envConfig, sharedCacheEnv, repConfigProxy); 268 269 allowConvert = 270 RepInternal.getAllowConvert(((ReplicationConfig) repConfigProxy)); 271 feederTxns = new FeederTxns(this); 272 replay = new Replay(this, nameIdPair); 273 nodeState = new NodeState(nameIdPair, this); 274 repConfigObservers = new ArrayList<RepEnvConfigObserver>(); 275 addRepConfigObserver(this); 276 277 nodeStats = new StatGroup(RepImplStatDefinition.GROUP_NAME, 278 RepImplStatDefinition.GROUP_DESC); 279 hardRecoveryStat = new BooleanStat(nodeStats, 280 RepImplStatDefinition.HARD_RECOVERY); 281 hardRecoveryInfoStat = 282 new StringStat(nodeStats, RepImplStatDefinition.HARD_RECOVERY_INFO, 283 "This node did not incur a hard recovery."); 284 285 syncupProgressListener = 286 ((ReplicationConfig)repConfigProxy).getSyncupProgressListener(); 287 logRewriteListener = 288 ((ReplicationConfig)repConfigProxy).getLogFileRewriteListener(); 289 repNetConfig = 290 ((ReplicationConfig)repConfigProxy).getRepNetConfig(); 291 } 292 293 /** 294 * Called by the EnvironmentImpl constructor. Some rep params, 295 * preserveVLSN for example, are accessed by the EnvironmentImpl via 296 * methods (getPreserveVLSN for example), so they need to be initialized 297 * early. 298 */ 299 @Override initConfigParams(EnvironmentConfig envConfig, RepConfigProxy repConfigProxy)300 protected void initConfigParams(EnvironmentConfig envConfig, 301 RepConfigProxy repConfigProxy) { 302 303 /* Init standalone config params first. */ 304 super.initConfigParams(envConfig, repConfigProxy); 305 306 /* Init rep config params. */ 307 replicaAckTimeout = 308 configManager.getDuration(RepParams.REPLICA_ACK_TIMEOUT); 309 insufficientReplicasTimeout = 310 configManager.getDuration(RepParams.INSUFFICIENT_REPLICAS_TIMEOUT); 311 replayTxnTimeout = 312 configManager.getDuration(RepParams.REPLAY_TXN_LOCK_TIMEOUT); 313 defaultConsistencyPolicy = RepUtils.getReplicaConsistencyPolicy 314 (configManager.get(RepParams.CONSISTENCY_POLICY)); 315 preserveVLSN = 316 configManager.getBoolean(RepParams.PRESERVE_RECORD_VERSION); 317 cacheVLSN = 318 configManager.getBoolean(RepParams.CACHE_RECORD_VERSION); 319 cachedVLSNMinLength = 320 configManager.getInt(RepParams.CACHED_RECORD_VERSION_MIN_LENGTH); 321 } 322 323 @Override initFormatter()324 protected Formatter initFormatter() { 325 326 /* 327 * The nameIdPair field is assigned here rather than in the constructor 328 * because of base class/subclass dependencies. initFormatter() is 329 * called by the base class constructor, and nameIdPair must be 330 * available at that time. 331 */ 332 nameIdPair = new NameIdPair(configManager.get(NODE_NAME)); 333 return new ReplicationFormatter(nameIdPair); 334 } 335 336 @Override getMonitorClassName()337 public String getMonitorClassName() { 338 return "com.sleepycat.je.rep.jmx.RepJEMonitor"; 339 } 340 341 @Override getDiagnosticsClassName()342 public String getDiagnosticsClassName() { 343 return "com.sleepycat.je.rep.jmx.RepJEDiagnostics"; 344 } 345 346 /** 347 * @see super#initConfigManager 348 */ 349 @Override 350 protected DbConfigManager initConfigManager(EnvironmentConfig envConfig, RepConfigProxy repConfigProxy)351 initConfigManager(EnvironmentConfig envConfig, 352 RepConfigProxy repConfigProxy) { 353 return new RepConfigManager(envConfig, repConfigProxy); 354 } 355 356 @Override getAllowRepConvert()357 public boolean getAllowRepConvert() { 358 return allowConvert; 359 } 360 361 /** 362 * @see super#resetConfigManager 363 */ 364 @Override resetConfigManager(EnvironmentConfig newConfig)365 protected DbConfigManager resetConfigManager(EnvironmentConfig newConfig) { 366 /* Save all the replication related properties. */ 367 RepConfigManager repConfigManager = (RepConfigManager) configManager; 368 ReplicationConfig repConfig = repConfigManager.makeReplicationConfig(); 369 return new RepConfigManager(newConfig, repConfig); 370 } 371 cloneRepConfig()372 public ReplicationConfig cloneRepConfig() { 373 RepConfigManager repConfigManager = (RepConfigManager) configManager; 374 return repConfigManager.makeReplicationConfig(); 375 } 376 377 /* Make an ReplicatedEnvironment handle for this RepImpl. */ makeEnvironment()378 public ReplicatedEnvironment makeEnvironment() { 379 return new ReplicatedEnvironment(getEnvironmentHome(), 380 cloneRepConfig(), 381 cloneConfig()); 382 } 383 cloneRepMutableConfig()384 public ReplicationMutableConfig cloneRepMutableConfig() { 385 RepConfigManager repConfigManager = (RepConfigManager) configManager; 386 return repConfigManager.makeReplicationConfig(); 387 } 388 setRepMutableConfig(ReplicationMutableConfig config)389 public void setRepMutableConfig(ReplicationMutableConfig config) 390 throws DatabaseException { 391 392 /* Clone the current config. */ 393 RepConfigManager repConfigManager = (RepConfigManager) configManager; 394 ReplicationConfig newConfig = repConfigManager.makeReplicationConfig(); 395 396 /* Copy in the mutable props. */ 397 config.copyMutablePropsTo(newConfig); 398 repConfigManager = new RepConfigManager 399 (configManager.getEnvironmentConfig(), newConfig); 400 401 /* 402 * Update the current config and notify observers. The config manager 403 * is replaced with a new instance that uses the new configuration. 404 * This avoids synchronization issues: other threads that have a 405 * reference to the old configuration object are not impacted. 406 * 407 * Notify listeners in reverse order of registration so that the 408 * environment listener is notified last and can start daemon threads 409 * after they are configured. 410 */ 411 for (int i = repConfigObservers.size() - 1; i >= 0; i -= 1) { 412 RepEnvConfigObserver o = repConfigObservers.get(i); 413 o.repEnvConfigUpdate(repConfigManager, newConfig); 414 } 415 } 416 417 @Override repEnvConfigUpdate(RepConfigManager configMgr, ReplicationMutableConfig newConfig)418 public void repEnvConfigUpdate(RepConfigManager configMgr, 419 ReplicationMutableConfig newConfig) 420 throws DatabaseException { 421 422 repNode.getArbiter().processConfigChange(newConfig); 423 424 repNode.getElectionQuorum().setElectableGroupSizeOverride 425 (newConfig.getElectableGroupSizeOverride()); 426 427 repNode.configLogFlusher(configMgr); 428 429 repNode.getReplica().getDbCache().setConfig(configMgr); 430 } 431 addRepConfigObserver(RepEnvConfigObserver o)432 public synchronized void addRepConfigObserver(RepEnvConfigObserver o) { 433 repConfigObservers.add(o); 434 } 435 436 /** 437 * The VLSNIndex must be created, merged and flushed before the recovery 438 * checkpoint. This method should be called even if there is no recovery 439 * checkpoint, because it sets up needed data structures. 440 * 441 * On the face of it, it seems that one could flush the VLSNIndex cache 442 * after the recovery checkpoint, before the Replicator constructor returns 443 * and before any user level HA operations can start. That's not sufficient 444 * because the recovery checkpoint is shortening the recovery interval for 445 * future recoveries, and any information that has been garnered must be 446 * persisted. Here's an example of what might happen after a series of 447 * recoveries if we fail to flush VLSNIndex as part of the recovery 448 * checkpoint: 449 * 450 * Environment recovers for first time, brand new environment 451 * recovery did not find any VLSNs in log, because log is brand new 452 * recovery logs ckpt 1start 453 * recovery logs ckpt 1 end 454 * 455 * VLSN 1 logged 456 * VLSN 2 logged 457 * VLSN 3 logged 458 * 459 * crash .... Environment recovers 460 * recovery crawls log from ckpt 1 start onward, finds VLSNs 1-3 461 * recovery logs ckpt 2 start 462 * recovery logs ckpt 2 end 463 * VLSN index instantiated, VLSNs 1-3 added in but not written too disk 464 * 465 * crash ... Environment recovers 466 * recovery crawls log from ckpt start 2 start onward, finds no VLSNs. 467 * 468 * Instead, the flushed VLSN has to be logged before the checkpoint end 469 * record that is used for the next recovery. 470 */ 471 @Override preRecoveryCheckpointInit(RecoveryInfo recoveryInfo)472 public void preRecoveryCheckpointInit(RecoveryInfo recoveryInfo) { 473 474 int stride = configManager.getInt(VLSN_STRIDE); 475 int maxMappings = configManager.getInt(VLSN_MAX_MAP); 476 int maxDist = configManager.getInt(VLSN_MAX_DIST); 477 478 /* 479 * Our local nameIdPair field isn't set yet because we haven't finished 480 * our initialization, so get it from the config manager. 481 */ 482 NameIdPair useNameIdPair = 483 new NameIdPair(configManager.get(NODE_NAME)); 484 485 vlsnIndex = new VLSNIndex(this, DbType.VLSN_MAP.getInternalName(), 486 useNameIdPair, stride, maxMappings, maxDist, 487 recoveryInfo); 488 replay.preRecoveryCheckpointInit(recoveryInfo); 489 } 490 491 /** 492 * Returns the current state associated with this ReplicatedEnvironment 493 * 494 * @return the externally visible ReplicatedEnvironment state 495 */ getState()496 public ReplicatedEnvironment.State getState() { 497 return nodeState.getRepEnvState(); 498 } 499 500 /** 501 * Returns the state change event that transitioned the 502 * ReplicatedEnviroment to its current state. 503 */ getStateChangeEvent()504 public StateChangeEvent getStateChangeEvent() { 505 return nodeState.getStateChangeEvent(); 506 } 507 508 /** 509 * Wait for this node to join a replication group and return whether it is 510 * a MASTER or REPLICA. Note that any method that creates or clears the 511 * repNode field must be synchronized. 512 */ 513 public synchronized ReplicatedEnvironment.State joinGroup(ReplicaConsistencyPolicy consistency, QuorumPolicy initialElectionPolicy)514 joinGroup(ReplicaConsistencyPolicy consistency, 515 QuorumPolicy initialElectionPolicy) 516 throws ReplicaConsistencyException, DatabaseException { 517 518 startupTracker.start(Phase.TOTAL_JOIN_GROUP); 519 try { 520 if (repNode == null) { 521 repNode = new RepNode(this, replay, nodeState); 522 } 523 524 return repNode.joinGroup(consistency, initialElectionPolicy); 525 } catch (IOException ioe) { 526 throw EnvironmentFailureException.unexpectedException 527 (this, "Problem attempting to join on " + getSocket(), ioe); 528 } finally { 529 startupTracker.stop(Phase.TOTAL_JOIN_GROUP); 530 } 531 } 532 533 /** 534 * Initialize the DataChannelFactory in our configuration for use. 535 * This is public to allow access by the ReplicatedEnvironment constructor. 536 * @throws IllegalArgumentException if the ReplicationNetworkConfig 537 * is invalid. 538 */ initializeChannelFactory()539 public void initializeChannelFactory() { 540 if (channelFactory != null) { 541 return; 542 } 543 544 synchronized (this) { 545 if (channelFactory == null) { 546 channelFactory = 547 DataChannelFactoryBuilder.construct( 548 repNetConfig, 549 DataChannelFactoryBuilder.makeLoggerFactory(this)); 550 } 551 } 552 } 553 554 @Override createInternalEnvironment()555 protected Environment createInternalEnvironment() { 556 return new InternalReplicatedEnvironment 557 (getEnvironmentHome(), cloneRepConfig(), cloneConfig(), this); 558 } 559 560 /** 561 * @see EnvironmentImpl#setupClose 562 * Release all replication resources that can be released before the 563 * checkpoint. Note that any method that creates or clears the repNode 564 * field must be called from a synchronized caller. 565 * 566 * Note that the vlsnIndex is closed as a callback, from 567 * postCheckpointPreEnvClose() 568 * @throws DatabaseException 569 * 570 */ 571 @Override setupClose(PrintWriter errors)572 protected synchronized void setupClose(PrintWriter errors) 573 throws DatabaseException { 574 575 if (groupDbImpl != null) { 576 getDbTree().releaseDb(groupDbImpl); 577 groupDbImpl = null; 578 LoggerUtils.fine 579 (envLogger, this, "Group member database shutdown"); 580 } 581 582 try { 583 if (repNode != null) { 584 repNode.shutdown(); 585 repNode = null; 586 } 587 } catch (InterruptedException e) { 588 appendException(errors, e, "shutting down node " + nameIdPair); 589 } 590 } 591 592 /** 593 * Close any resources that need to be closed after the closing checkpoint. 594 * Note that since Replay.close closes open transactions, it must be 595 * invoked after the checkpoint has been completed, so that the checkpoint 596 * operation can correctly account for the open transactions. 597 */ 598 @Override postCheckpointClose(boolean checkpointed)599 protected synchronized void postCheckpointClose(boolean checkpointed) 600 throws DatabaseException { 601 602 if (replay != null) { 603 replay.close(); 604 replay = null; 605 } 606 607 vlsnIndexAccess.closeVLSNIndex(checkpointed); 608 } 609 610 /** 611 * @see EnvironmentImpl#setupClose 612 * 613 * Note: this conversion process will iterate over all user created 614 * databases in the environment, which could be potentially be a costly 615 * affair. However, let's opt for simplicity and defer any optimizations 616 * until we see whether this is an important use case. 617 */ 618 @Override postRecoveryConversion()619 protected void postRecoveryConversion() { 620 621 super.postRecoveryConversion(); 622 623 if (needRepConvert) { 624 /* Set NameDb to replicated. */ 625 DatabaseImpl nameDb = null; 626 try { 627 nameDb = dbMapTree.getDb(DbTree.NAME_DB_ID); 628 if (!nameDb.isReplicated()) { 629 nameDb.setIsReplicatedBit(); 630 nameDb.setDirty(); 631 } 632 } finally { 633 if (nameDb != null) { 634 dbMapTree.releaseDb(nameDb); 635 } 636 } 637 638 /* Set user defined databases to replicated. */ 639 Map<DatabaseId, String> idNameMap = dbMapTree.getDbNamesAndIds(); 640 for (DatabaseId id : idNameMap.keySet()) { 641 DatabaseImpl db = null; 642 try { 643 db = dbMapTree.getDb(id); 644 if (db != null && 645 !DbTree.isReservedDbName(idNameMap.get(id))) { 646 647 db.setIsReplicatedBit(); 648 db.setDirty(); 649 } 650 } finally { 651 if (db != null) { 652 dbMapTree.releaseDb(db); 653 } 654 } 655 } 656 657 /* 658 * Do a checkpointer to flush dirty datbaseImpls that are converted 659 * to replicated and write the current VLSNRange to the log. 660 */ 661 CheckpointConfig ckptConfig = new CheckpointConfig(); 662 ckptConfig.setForce(true); 663 ckptConfig.setMinimizeRecoveryTime(true); 664 invokeCheckpoint(ckptConfig, "Environment conversion"); 665 } 666 } 667 668 /* 669 * Close enough resources to support reopening the environment in the same 670 * JVM. 671 * @see EnvironmentImpl#doCloseAfterInvalid() 672 */ 673 @Override doCloseAfterInvalid()674 public synchronized void doCloseAfterInvalid() { 675 676 try { 677 /* Release the repNode, in order to release sockets. */ 678 if (repNode != null) { 679 repNode.shutdown(); 680 repNode = null; 681 } 682 } catch (Exception ignore) { 683 } 684 685 super.doCloseAfterInvalid(); 686 } 687 688 /** 689 * Used by error handling to forcibly close an environment, and by tests to 690 * close an environment to simulate a crash. Database handles do not have 691 * to be closed before calling this method. A checkpoint is not performed. 692 * The various thread pools will be shutdown abruptly. 693 * 694 * @throws DatabaseException 695 */ 696 @Override abnormalClose()697 public void abnormalClose() 698 throws DatabaseException { 699 700 /* 701 * Shutdown the daemons, and the checkpointer in particular, before 702 * nulling out the vlsnIndex. 703 */ 704 shutdownDaemons(); 705 706 try { 707 if (repNode != null) { 708 709 /* 710 * Don't fire a LeaveGroupEvent if it's an abnormal close, 711 * otherwise an EnvironmentFailureException would be thrown 712 * because daemons of this Environment have been shutdown. 713 */ 714 repNode.getMonitorEventManager().disableLeaveGroupEvent(); 715 repNode.shutdown(); 716 repNode = null; 717 } 718 } catch (InterruptedException ignore) { 719 /* ignore */ 720 } 721 722 try { 723 vlsnIndexAccess.abnormalCloseVLSNIndex(); 724 } catch (DatabaseException ignore) { 725 /* ignore */ 726 } 727 728 try { 729 super.abnormalClose(); 730 } catch (DatabaseException ignore) { 731 /* ignore */ 732 } 733 } 734 735 /** 736 * A replicated log entry has been written on this node. Update the 737 * VLSN->LSN mapping. Called outside the log write latch. 738 * @throws DatabaseException 739 */ 740 @Override registerVLSN(LogItem logItem)741 public void registerVLSN(LogItem logItem) { 742 LogEntryHeader header = logItem.getHeader(); 743 VLSN vlsn = header.getVLSN(); 744 745 /* 746 * Although the very first replicated entry of the system is never a 747 * syncable log entry type, the first GlobalCBVLSN of the system must 748 * start at 1. If we only track the first syncable entry, the 749 * GlobalCBVLSN will start a a value > 1, and replicas that are 750 * starting up from VLSN 1 will be caught in spurious network restores 751 * because VLSN 1 < the GlobalCBVLSN. Therefore treat the VLSN 1 as a 752 * syncable entry for the sake of the GlobalCBVLSN. 753 */ 754 if (LogEntryType.isSyncPoint(header.getType()) || 755 VLSN.FIRST_VLSN.equals(vlsn)) { 756 repNode.trackSyncableVLSN(vlsn, logItem.getNewLsn()); 757 } 758 vlsnIndex.put(logItem); 759 } 760 761 /** 762 * Generate the next VLSN. 763 */ 764 @Override bumpVLSN()765 public VLSN bumpVLSN() { 766 return vlsnIndex.bump(); 767 } 768 769 /** 770 * If the log entry wasn't successfully logged, decrement the VLSN to 771 * reclaim the slot. 772 */ 773 @Override decrementVLSN()774 public void decrementVLSN() { 775 vlsnIndex.decrement(); 776 } 777 778 /** 779 * Flush any information that needs to go out at checkpoint. Specifically, 780 * write any in-memory VLSN->LSN mappings to the VLSNIndex database so we 781 * are guaranteed that the VLSNIndex database will recover properly. 782 * This must be committed with noSync because 783 * - the ensuing checkpoint end record will be logged with an fsync and 784 * will effectively force this out 785 * - it's important to minmize lock contention on the vlsn index and 786 * any fsync done during a checkpoint will be expensive, as there may 787 * be quite a lot to push to disk. We don't want to incur that cost 788 * while holding locks on the vlsn index. [#20702] 789 */ 790 @Override preCheckpointEndFlush()791 public void preCheckpointEndFlush() 792 throws DatabaseException { 793 794 if (vlsnIndex != null) { 795 vlsnIndex.flushToDatabase(Durability.COMMIT_NO_SYNC); 796 } 797 } 798 799 @Override isMaster()800 public boolean isMaster() { 801 802 /* 803 * The volatile repNode field might be modified by joinGroup(), 804 * leaveGroup, or close(), which are synchronized. Keep this method 805 * unsynchronized, assign to a temporary field to guard against a 806 * change. 807 */ 808 RepNode useNode = repNode; 809 if (useNode == null) { 810 return false; 811 } 812 return useNode.isMaster(); 813 } 814 setChangeListener(StateChangeListener listener)815 public void setChangeListener(StateChangeListener listener) { 816 StateChangeListener prevListener = nodeState.getChangeListener(); 817 nodeState.setChangeListener(listener); 818 819 /* 820 * Call back so that it's aware of the last state change event and 821 * the application can initialize itself correctly as a master or 822 * replica. 823 */ 824 final StateChangeEvent stateChangeEvent = 825 nodeState.getStateChangeEvent(); 826 try { 827 /* Invoke application code and handle any app exceptions. */ 828 listener.stateChange(stateChangeEvent); 829 } catch (Exception e) { 830 /* Revert the change. */ 831 nodeState.setChangeListener(prevListener); 832 LoggerUtils.severe 833 (envLogger, this, 834 "State Change listener exception: " + e.getMessage()); 835 /* An application error. */ 836 throw new EnvironmentFailureException 837 (this, EnvironmentFailureReason.LISTENER_EXCEPTION, e); 838 } 839 } 840 getChangeListener()841 public StateChangeListener getChangeListener() { 842 return nodeState.getChangeListener(); 843 } 844 getVLSNIndex()845 public VLSNIndex getVLSNIndex() { 846 return vlsnIndex; 847 } 848 getFeederTxns()849 public FeederTxns getFeederTxns() { 850 return feederTxns; 851 } 852 getStats(StatsConfig config)853 public ReplicatedEnvironmentStats getStats(StatsConfig config) { 854 return getStats(config, statKey); 855 } 856 857 @Override getRepStatGroups(StatsConfig config, Integer statKey1)858 public Collection<StatGroup> getRepStatGroups(StatsConfig config, 859 Integer statKey1) { 860 ReplicatedEnvironmentStats res = getStats(config, statKey1); 861 return (res == null) ? null : res.getStatGroups(); 862 } 863 864 @Override getStatCaptureProjections()865 public SortedSet<String> getStatCaptureProjections() { 866 return new StatCaptureRepDefinitions().getStatisticProjections(); 867 } 868 869 @Override createStatManager()870 public StatManager createStatManager() { 871 return new RepStatManager(this); 872 } 873 getStatsInternal(StatsConfig config)874 public ReplicatedEnvironmentStats getStatsInternal(StatsConfig config) { 875 if (repNode == null) { 876 return null; 877 } 878 return repNode.getStats(config); 879 } 880 getStats( StatsConfig config, Integer contextKey)881 public ReplicatedEnvironmentStats getStats( 882 StatsConfig config, 883 Integer contextKey) { 884 return ((RepStatManager)statManager).getRepStats(config, contextKey); 885 } 886 getReplay()887 public Replay getReplay() { 888 return replay; 889 } 890 891 /** 892 * Ensures that the environment is currently a Master before proceeding 893 * with an operation that requires it to be the master. 894 * 895 * @throws UnknownMasterException if the node is disconnected 896 * @throws ReplicaWriteException if the node is currently a replica 897 */ checkIfMaster(Locker locker)898 public void checkIfMaster(Locker locker) 899 throws UnknownMasterException, ReplicaWriteException { 900 901 final StateChangeEvent event = nodeState.getStateChangeEvent(); 902 903 switch (nodeState.getRepEnvState()) { 904 case MASTER: 905 break; 906 907 case REPLICA: 908 throw new ReplicaWriteException(locker, event); 909 910 case UNKNOWN: 911 throw new UnknownMasterException(locker, event); 912 913 case DETACHED: 914 throw new UnknownMasterException(locker, event); 915 916 default: 917 throw EnvironmentFailureException.unexpectedState 918 ("Unexpected state: " + nodeState.getRepEnvState()); 919 } 920 } 921 922 /** 923 * @return the repNode. May return null. 924 */ getRepNode()925 public RepNode getRepNode() { 926 return repNode; 927 } 928 929 /** 930 * Create an appropriate type of ThreadLocker. Specifically, it creates an 931 * MasterThreadLocker if the node is currently a Master, and a 932 * ReplicaThreadLocker otherwise, that is, if the node is a Replica, or 933 * it's currently in a DETACHED state. 934 * 935 * @return an instance of MasterThreadLocker or ReplicaThreadLocker 936 */ 937 @Override createRepThreadLocker()938 public ThreadLocker createRepThreadLocker() { 939 return (isMaster() ? 940 new MasterThreadLocker(this) : 941 new ReplicaThreadLocker(this)); 942 } 943 944 /** 945 * Create an appropriate type of Replicated transaction. Specifically, 946 * it creates a MasterTxn, if the node is currently a Master, a ReadonlyTxn 947 * otherwise, that is, if the node is a Replica, or it's currently in a 948 * DETACHED state. 949 * 950 * Note that a ReplicaTxn, used for transaction replay on a Replica is not 951 * created on this path. It's created explicitly in the Replay loop by a 952 * Replica. 953 * 954 * @param config the transaction configuration 955 * 956 * @return an instance of MasterTxn or ReadonlyTxn 957 * @throws DatabaseException 958 */ 959 @Override createRepUserTxn(TransactionConfig config)960 public Txn createRepUserTxn(TransactionConfig config) 961 throws DatabaseException { 962 963 return (isMaster() && 964 !config.getReadOnly() && 965 !config.getLocalWrite()) ? 966 MasterTxn.create(this, config, nameIdPair) : 967 new ReadonlyTxn(this, config); 968 } 969 970 /** 971 * Ensure that a sufficient number of feeders are available before 972 * proceeding with a master transaction begin. 973 * 974 * @param txn the master transaction being initiated. 975 * 976 * @throws InterruptedException 977 * @throws DatabaseException if there were insufficient Replicas after the 978 * timeout period. 979 */ txnBeginHook(MasterTxn txn)980 public void txnBeginHook(MasterTxn txn) 981 throws InterruptedException, 982 DatabaseException { 983 984 checkIfInvalid(); 985 repNode.getDurabilityQuorum().ensureReplicasForCommit( 986 txn, insufficientReplicasTimeout); 987 } 988 989 /** 990 * Installs the commit-blocking latch that is used to halt the commit/abort 991 * of transactions, in the final phase of a master transfer. 992 * 993 * @see #updateCBVLSN(LocalCBVLSNUpdater) 994 */ blockTxnCompletion(CountDownLatch blocker)995 public void blockTxnCompletion(CountDownLatch blocker) 996 throws InterruptedException { 997 998 ReentrantReadWriteLock.WriteLock lock = blockLatchLock.writeLock(); 999 lock.lockInterruptibly(); 1000 try { 1001 blockTxnLatch = blocker; 1002 } finally { 1003 lock.unlock(); 1004 } 1005 } 1006 1007 /** 1008 * Updates the CBVLSN on behalf of a Feeder input thread (or FeederManager 1009 * running in the RepNode thread), while avoiding the possibility that any 1010 * resulting GroupDB update may get blocked behind the final phase of a 1011 * master transfer. 1012 * <p> 1013 * We skip the update if we're at the point of blocking new transactions 1014 * for a master transfer. And we use a read/write lock in order to be able 1015 * to examine that state safely. 1016 */ updateCBVLSN(LocalCBVLSNUpdater updater)1017 public void updateCBVLSN(LocalCBVLSNUpdater updater) { 1018 ReentrantReadWriteLock.ReadLock lock = blockLatchLock.readLock(); 1019 lock.lock(); 1020 try { 1021 if (blockTxnLatch.getCount() > 0) { 1022 return; 1023 } 1024 updater.update(); 1025 } finally { 1026 lock.unlock(); 1027 } 1028 } 1029 1030 /** 1031 * Releases the transaction block latch. 1032 */ unblockTxnCompletion()1033 public void unblockTxnCompletion() { 1034 LoggerUtils.info(envLogger, this, "Releasing commit block latch"); 1035 blockTxnLatch.countDown(); 1036 } 1037 1038 /** 1039 * This hook is used primarily to perform the final checks before allowing 1040 * the commit operation to proceed. The following checks are performed 1041 * here: 1042 * 1043 * 1) Check for master 1044 * 2) Check for sufficient Feeder connections to ensure that the commit 1045 * policy could be implemented. There is no guarantee that they will all 1046 * ack the commit request. 1047 * 1048 * The method also associates a latch with the transaction. The latch is 1049 * used to delay the commit operation until a sufficient number of commits 1050 * have been received. 1051 * 1052 * In addition, when mastership transfers are done, and this node is the 1053 * original master, commits and aborts are blocked so as to avoid hard 1054 * recovery after electing a new master, see [#18081]. 1055 * 1056 * @param txn the master transaction being committed 1057 * 1058 * @throws InsufficientReplicasException if the feeder is not in contact 1059 * with enough replicas. 1060 * @throws RestartRequiredException if the environment is invalid. 1061 * @throws UnknownMasterException if the current master is unknown. 1062 * @throws ReplicaWriteException if the node transitioned to a Replica 1063 * after the transaction was initiated. 1064 */ preLogCommitHook(MasterTxn txn)1065 public void preLogCommitHook(MasterTxn txn) 1066 throws InsufficientReplicasException, 1067 RestartRequiredException, 1068 UnknownMasterException, 1069 ReplicaWriteException, 1070 EnvironmentFailureException { 1071 1072 checkIfInvalid(); 1073 checkIfMaster(txn); 1074 checkBlock(txn); 1075 1076 /* Still a master, check for a sufficient number of connections */ 1077 int activeReplicaCount = 1078 repNode.feederManager().activeReplicaCount(); 1079 ReplicaAckPolicy ackPolicy = 1080 txn.getCommitDurability().getReplicaAck(); 1081 int requiredAckCount = txn.getRequiredAckCount(); 1082 if (envLogger.isLoggable(Level.FINE)) { 1083 LoggerUtils.fine(envLogger, this, 1084 "Txn " + txn.getId() + " requires: " + 1085 requiredAckCount + " active: " + 1086 activeReplicaCount + 1087 " replica acks. Commit Policy: " + ackPolicy); 1088 } 1089 if (requiredAckCount > activeReplicaCount) { 1090 /* Check for possible activation of Primary */ 1091 if (ackPolicy.equals(ReplicaAckPolicy.SIMPLE_MAJORITY) && 1092 repNode.getArbiter().activateArbitration()) { 1093 txn.resetRequiredAckCount(); 1094 } else { 1095 InsufficientReplicasException ire = 1096 new InsufficientReplicasException 1097 (txn, ackPolicy, requiredAckCount, 1098 repNode.feederManager().activeAckReplicas()); 1099 LoggerUtils.info(envLogger, this, ire.getMessage()); 1100 throw ire; 1101 } 1102 } 1103 feederTxns.setupForAcks(txn); 1104 } 1105 1106 /* 1107 * Block transaction commits/aborts if this node is the original master 1108 * and we're doing Master Transfer. 1109 */ checkBlock(MasterTxn txn)1110 private void checkBlock(MasterTxn txn) { 1111 try { 1112 1113 /* 1114 * Lock out the setting of the block latch by Master Transfer in 1115 * the interval between waiting on the latch and setting the VLSN 1116 * for the commit: Master Transfer needs to get a coherent idea of 1117 * the final VLSN when it sets the latch. This lock will be 1118 * released by the {@code postLogXxxHook()} functions, one of which 1119 * is guaranteed to be called, unless an Environment-invalidating 1120 * exception occurs. 1121 */ 1122 if (txn.lockOnce()) { 1123 blockLatchLock.readLock().lockInterruptibly(); 1124 } 1125 1126 if (blockTxnLatch.getCount() > 0) { 1127 LoggerUtils.info(envLogger, this, 1128 "Block transaction: " + txn.getId() + 1129 " pending master transfer. Write locks = " + 1130 txn.getWriteLockIds()); 1131 } 1132 1133 final long txnTimeout = txn.getTxnTimeout(); 1134 if (txnTimeout <= 0) { 1135 blockTxnLatch.await(); 1136 } else if (! blockTxnLatch.await(txnTimeout, 1137 TimeUnit.MILLISECONDS)) { 1138 1139 final String message = 1140 "Timed out waiting for master transfer. " + 1141 "Configured transaction timeout:" + txnTimeout + "ms"; 1142 1143 throw new TransactionTimeoutException(txn, message); 1144 } 1145 1146 checkIfInvalid(); 1147 1148 /* 1149 * Check again, after the block! The block may be a result of a 1150 * master->replica transfer, and if this node transitions from 1151 * master to replica, this node will be disqualified from being 1152 * able to commit transactions. 1153 */ 1154 checkIfMaster(txn); 1155 1156 } catch (InterruptedException e) { 1157 throw new ThreadInterruptedException(this, e); 1158 } 1159 } 1160 1161 /** 1162 * It ensures that the feeder obtains the requisite number of 1163 * acknowledgments required for a successful commit. 1164 * 1165 * @param txn The MasterTxn that was committed locally. 1166 * 1167 * @throws InterruptedException if the thread was interrupted while 1168 * waiting for acknowledgments. 1169 * @throws InsufficientAcksException if the master received an insufficient 1170 * number of commit acknowledgments within the replica commit timeout 1171 * period. 1172 * @throws EnvironmentFailureException 1173 */ postLogCommitHook(MasterTxn txn)1174 public void postLogCommitHook(MasterTxn txn) 1175 throws InsufficientAcksException, 1176 InterruptedException, 1177 EnvironmentFailureException { 1178 1179 if (txn.unlockOnce()) { 1180 blockLatchLock.readLock().unlock(); 1181 } 1182 checkIfInvalid(); 1183 /* Don't do master check, the transaction has already been committed */ 1184 try { 1185 feederTxns.awaitReplicaAcks(txn, replicaAckTimeout); 1186 } catch (InsufficientAcksException e) { 1187 LoggerUtils.info(envLogger, this, e.getMessage()); 1188 throw e; 1189 } 1190 } 1191 1192 /** 1193 * Invoked before aborting a MasterTxn, this happens when the master is 1194 * going to be a replica because of mastership transfer. We do this to make 1195 * sure that the replica going to be the master has the most recent log and 1196 * no hard recovery would happen after its election, see SR [#18081]. 1197 * 1198 * @param txn The MasterTxn that was aborted locally. 1199 * 1200 * @throws ReplicaWriteException if the node transitioned to a Replica 1201 * after the transaction was initiated. 1202 * @throws UnknownMasterException if the current master is unknown. 1203 * @throws EnvironmentFailureException 1204 */ preLogAbortHook(MasterTxn txn)1205 public void preLogAbortHook(MasterTxn txn) 1206 throws EnvironmentFailureException, 1207 ReplicaWriteException, 1208 UnknownMasterException { 1209 1210 checkIfInvalid(); 1211 checkIfMaster(txn); 1212 checkBlock(txn); 1213 } 1214 1215 /** 1216 * Releases the block latch lock, if held. This hook is called in the 1217 * normal course of Txn.abort(), once the abort log record has been written 1218 * and the associated VLSN stored. 1219 */ postLogAbortHook(MasterTxn txn)1220 public void postLogAbortHook(MasterTxn txn) { 1221 if (txn.unlockOnce()) { 1222 blockLatchLock.readLock().unlock(); 1223 } 1224 } 1225 1226 /** 1227 * Removes any pending acknowledgments that were registered by the 1228 * preLogCommitHook. This hook is called only when a {@code commit()} 1229 * fails and therefore must be aborted. 1230 */ postLogCommitAbortHook(MasterTxn txn)1231 public void postLogCommitAbortHook(MasterTxn txn) { 1232 LoggerUtils.info(envLogger, this, 1233 "post log abort hook for txn: " + txn.getId()); 1234 if (txn.unlockOnce()) { 1235 blockLatchLock.readLock().unlock(); 1236 } 1237 feederTxns.clearTransactionAcks(txn); 1238 } 1239 1240 /** 1241 * Create a ReplayTxn for recovery processing. 1242 */ 1243 @Override createReplayTxn(long txnId)1244 public Txn createReplayTxn(long txnId) 1245 throws DatabaseException { 1246 1247 return 1248 new ReplayTxn(this, TransactionConfig.DEFAULT, txnId, envLogger); 1249 } 1250 1251 /** 1252 * Used by environment recovery to get a tracker to collect VLSN-LSN 1253 * mappings that are within the recovery part of the log. These might 1254 * not be reflected in the persistent mapping db. 1255 */ 1256 @Override getVLSNProxy()1257 public VLSNRecoveryProxy getVLSNProxy() { 1258 int stride = configManager.getInt(RepParams.VLSN_STRIDE); 1259 int maxMappings = configManager.getInt(RepParams.VLSN_MAX_MAP); 1260 int maxDist = configManager.getInt(RepParams.VLSN_MAX_DIST); 1261 1262 return new VLSNRecoveryTracker(this, stride, maxMappings, maxDist); 1263 } 1264 getUUID()1265 public UUID getUUID() { 1266 return repNode.getUUID(); 1267 } 1268 1269 /** 1270 * Used during testing to introduce artificial clock skews. 1271 */ setSkewMs(int skewMs)1272 public static void setSkewMs(int skewMs) { 1273 clockSkewMs = skewMs; 1274 } 1275 getClockSkewMs()1276 public static int getClockSkewMs() { 1277 return clockSkewMs; 1278 } 1279 1280 /** 1281 * Delete from the first VLSN in the range to lastVLSN, inclusive. This 1282 * will be fsynced to guarantee that the persistent vlsn index does not 1283 * refer to any deleted files. [#20702] 1284 * 1285 * @param lastVLSN was cleaned by the cleaner 1286 * @param deleteFileNum was the file that was deleted by the cleaner. 1287 */ 1288 @Override vlsnHeadTruncate(VLSN lastVLSN, long deleteFileNum)1289 public void vlsnHeadTruncate(VLSN lastVLSN, long deleteFileNum) { 1290 1291 vlsnIndex.truncateFromHead(lastVLSN, deleteFileNum); 1292 } 1293 getNodeId()1294 public int getNodeId() { 1295 return nameIdPair.getId(); 1296 } 1297 getNameIdPair()1298 public NameIdPair getNameIdPair() { 1299 return nameIdPair; 1300 } 1301 1302 @Override getReplayTxnTimeout()1303 public long getReplayTxnTimeout() { 1304 return replayTxnTimeout; 1305 } 1306 1307 /* Return the default consistency policy. */ 1308 @Override getDefaultConsistencyPolicy()1309 public ReplicaConsistencyPolicy getDefaultConsistencyPolicy() { 1310 return defaultConsistencyPolicy; 1311 } 1312 1313 /** 1314 * The default consistency is not currently mutable in the API, but can be 1315 * set for testing purposes. 1316 * 1317 * TODO: Make it mutable in the API, since Durability is mutable. 1318 */ setDefaultConsistencyPolicy(ReplicaConsistencyPolicy policy)1319 public void setDefaultConsistencyPolicy(ReplicaConsistencyPolicy policy) { 1320 defaultConsistencyPolicy = policy; 1321 } 1322 1323 /* Returns the on disk LSN for VLSN. */ 1324 @Override getLsnForVLSN(VLSN vlsn, int readBufferSize)1325 public long getLsnForVLSN(VLSN vlsn, int readBufferSize) { 1326 /* Returns the file number which is nearest to the vlsn. */ 1327 long fileNumber = vlsnIndex.getLTEFileNumber(vlsn); 1328 1329 /* Start reading from the nearest file. */ 1330 FeederReader feederReader = 1331 new FeederReader(this, 1332 vlsnIndex, 1333 DbLsn.makeLsn(fileNumber, 0), 1334 readBufferSize, 1335 nameIdPair); 1336 1337 try { 1338 feederReader.initScan(vlsn); 1339 1340 /* 1341 * Go on scan the log until FeederReader find the target VLSN, 1342 * thrown out an EnvironmentFailureException if it can't be found. 1343 */ 1344 if (!feederReader.readNextEntry()) { 1345 throw EnvironmentFailureException.unexpectedState 1346 ("VLSN not found: " + vlsn); 1347 } 1348 } catch (IOException e) { 1349 throw EnvironmentFailureException.unexpectedException(e); 1350 } 1351 1352 return feederReader.getLastLsn(); 1353 } 1354 1355 /* Returns the end of the log. */ 1356 @Override getEndOfLog()1357 public long getEndOfLog() { 1358 return vlsnIndex.getRange().getLast().getSequence(); 1359 } 1360 1361 /* Return the durable VLSN for the replication group. */ 1362 @Override getGroupDurableVLSN()1363 public VLSN getGroupDurableVLSN() { 1364 return repNode.getGroupCBVLSN(); 1365 } 1366 1367 /* Disable the LocalCBVLSN changes on a replicator. */ 1368 @Override freezeLocalCBVLSN()1369 public void freezeLocalCBVLSN() { 1370 repNode.freezeLocalCBVLSN(); 1371 } 1372 1373 /* Enable the LocalCBVLSN changes on a replicator. */ 1374 @Override unfreezeLocalCBVLSN()1375 public void unfreezeLocalCBVLSN() { 1376 repNode.unfreezeLocalCBVLSN(); 1377 } 1378 1379 /** 1380 * Returns true if the VLSN is preserved as the record version. 1381 */ 1382 @Override getPreserveVLSN()1383 public boolean getPreserveVLSN() { 1384 return preserveVLSN; 1385 } 1386 1387 /** 1388 * Returns true if the VLSN is both preserved and cached. 1389 */ 1390 @Override getCacheVLSN()1391 public boolean getCacheVLSN() { 1392 return preserveVLSN && cacheVLSN; 1393 } 1394 1395 /** 1396 * Returns the number of initial bytes per VLSN in the vlsnCache. 1397 */ 1398 @Override getCachedVLSNMinLength()1399 public int getCachedVLSNMinLength() { 1400 return cachedVLSNMinLength; 1401 } 1402 1403 /** 1404 * @see EnvironmentImpl#getName 1405 */ 1406 @Override getName()1407 public String getName() { 1408 return nameIdPair + ":" + super.getName(); 1409 } 1410 1411 /** 1412 * Return true if this environment is part of a replication group. 1413 */ 1414 @Override isReplicated()1415 public boolean isReplicated() { 1416 return true; 1417 } 1418 1419 /** 1420 * Check whether this environment can be opened on an existing environment 1421 * directory. 1422 */ 1423 @Override checkRulesForExistingEnv(boolean dbTreeReplicatedBit, boolean dbTreePreserveVLSN)1424 public void checkRulesForExistingEnv(boolean dbTreeReplicatedBit, 1425 boolean dbTreePreserveVLSN) 1426 throws UnsupportedOperationException { 1427 1428 if (!dbTreeReplicatedBit) { 1429 1430 /* 1431 * We are attempting to open an existing, non-replicated 1432 * environment. 1433 */ 1434 throw new UnsupportedOperationException 1435 ("This environment must be converted for replication." + 1436 " using com.sleepycat.je.rep.util.DbEnableReplication."); 1437 } 1438 1439 /* The preserveVLSN setting is forever immutable. */ 1440 if (dbTreePreserveVLSN != getPreserveVLSN()) { 1441 throw new IllegalArgumentException 1442 (RepParams.PRESERVE_RECORD_VERSION.getName() + 1443 " parameter may not be changed." + 1444 " Previous value: " + dbTreePreserveVLSN + 1445 " New value: " + getPreserveVLSN()); 1446 } 1447 } 1448 1449 /** 1450 * Returns the hostname associated with this node. 1451 * 1452 * @return the hostname 1453 */ getHostName()1454 public String getHostName() { 1455 String hostAndPort = configManager.get(RepParams.NODE_HOST_PORT); 1456 int colonToken = hostAndPort.indexOf(":"); 1457 return (colonToken >= 0) ? 1458 hostAndPort.substring(0, colonToken) : 1459 hostAndPort; 1460 } 1461 1462 /** 1463 * Returns the port used by the replication node. 1464 * 1465 * @return the port number 1466 */ getPort()1467 public int getPort() { 1468 String hostAndPort = configManager.get(RepParams.NODE_HOST_PORT); 1469 int colonToken = hostAndPort.indexOf(":"); 1470 return (colonToken >= 0) ? 1471 Integer.parseInt(hostAndPort.substring(colonToken + 1)) : 1472 configManager.getInt(RepParams.DEFAULT_PORT); 1473 } 1474 1475 /* Convenience method for returning replication sockets. */ getSocket()1476 public InetSocketAddress getSocket() { 1477 return new InetSocketAddress(getHostName(), getPort()); 1478 } 1479 1480 /** 1481 * Returns the JE version that is currently running on this node, 1482 * consulting the TEST_JE_VERSION configuration parameter for a test 1483 * override. 1484 */ getCurrentJEVersion()1485 public JEVersion getCurrentJEVersion() { 1486 final String testJEVersion = configManager.get(TEST_JE_VERSION); 1487 return testJEVersion.isEmpty() ? 1488 JEVersion.CURRENT_VERSION : 1489 new JEVersion(testJEVersion); 1490 } 1491 1492 /** 1493 * Returns the set of sockets associated with helper nodes. 1494 * 1495 * @return the set of helper sockets, returns an empty set if there 1496 * are no helpers. 1497 */ getHelperSockets()1498 public Set<InetSocketAddress> getHelperSockets() { 1499 Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); 1500 String helperHosts = configManager.get(RepParams.HELPER_HOSTS); 1501 if (helperHosts == null) { 1502 return helpers; 1503 } 1504 for (StringTokenizer tokenizer = 1505 new StringTokenizer(helperHosts, ","); 1506 tokenizer.hasMoreTokens();) { 1507 String hostPortPair = tokenizer.nextToken(); 1508 helpers.add(HostPortPair.getSocket(hostPortPair)); 1509 } 1510 return helpers; 1511 } 1512 1513 /** 1514 * Called when a node has identified itself as the master, which is when 1515 * the RepNode.selfElect is called. The database should not exist at 1516 * this point. 1517 * 1518 * Lock hierarchy: GroupDbLock -> sync on EnvironmentImpl 1519 * @throws DatabaseException 1520 */ createGroupDb()1521 public DatabaseImpl createGroupDb() 1522 throws DatabaseException { 1523 1524 assert isMaster(); 1525 1526 try { 1527 groupDbLock.lockInterruptibly(); 1528 } catch (InterruptedException e) { 1529 throw EnvironmentFailureException.unexpectedException(e); 1530 } 1531 1532 try { 1533 if (groupDbImpl != null) { 1534 throw EnvironmentFailureException.unexpectedState 1535 ("GroupDb should not exist."); 1536 } 1537 1538 DatabaseImpl newDbImpl = null; 1539 Txn txn = null; 1540 try { 1541 TransactionConfig txnConfig = new TransactionConfig(); 1542 txnConfig.setDurability(new Durability(SyncPolicy.SYNC, 1543 SyncPolicy.SYNC, 1544 ReplicaAckPolicy.NONE)); 1545 txnConfig.setConsistencyPolicy(NO_CONSISTENCY); 1546 txn = new MasterTxn(this, 1547 txnConfig, 1548 getNameIdPair()); 1549 1550 /* Database should not exist yet, create it now */ 1551 DatabaseConfig dbConfig = new DatabaseConfig(); 1552 dbConfig.setAllowCreate(true); 1553 dbConfig.setTransactional(true); 1554 dbConfig.setExclusiveCreate(true); 1555 dbConfig.setReplicated(true); 1556 1557 newDbImpl = getDbTree().createInternalDb 1558 (txn, DbType.REP_GROUP.getInternalName(), dbConfig); 1559 txn.commit(); 1560 txn = null; 1561 } finally { 1562 if (txn != null) { 1563 txn.abort(); 1564 } 1565 } 1566 1567 groupDbImpl = newDbImpl; 1568 } finally { 1569 groupDbLock.unlock(); 1570 } 1571 return groupDbImpl; 1572 } 1573 1574 /** 1575 * Removes files that ought to be protected from deletion. 1576 * <p> 1577 * This implementation considers both Sync operations and the replication 1578 * CBVLSN. 1579 * 1580 * @returns null if file deletion is prohibited 1581 */ 1582 @Override getUnprotectedFileSet(NavigableSet<Long> files)1583 public NavigableSet<Long> getUnprotectedFileSet(NavigableSet<Long> files) { 1584 if (repNode == null) { 1585 return null; 1586 } 1587 files = super.getUnprotectedFileSet(files); 1588 if (files.isEmpty()) { 1589 return files; 1590 } 1591 return repNode.getUnprotectedFileSet(files, getCleaner()); 1592 } 1593 1594 /** 1595 * Open the group db, which should exist already, using NO_CONSISTENCY. 1596 */ getGroupDb()1597 public DatabaseImpl getGroupDb() 1598 throws DatabaseNotFoundException, 1599 DatabaseException { 1600 1601 return openGroupDb(false /* doLockProbe */); 1602 } 1603 1604 /** 1605 * Open the group db, which should exist already, using NO_CONSISTENCY. Do 1606 * not wait on the group db lock, return null if the databaseImpl hasn't 1607 * been created and we can't obtain it. 1608 * 1609 * Lock hierarchy: GroupDbLock -> sync on EnvironmentImpl 1610 */ probeGroupDb()1611 public DatabaseImpl probeGroupDb() 1612 throws DatabaseException { 1613 1614 try { 1615 return openGroupDb(true /* doLockProbe */); 1616 } catch (DatabaseNotFoundException e) { 1617 /* Should never happen, DB should exist. */ 1618 throw EnvironmentFailureException.unexpectedException(e); 1619 } 1620 } 1621 1622 /** 1623 * Do the work of creating the lock and then assigning the groupDbImpl 1624 * field, using NO_CONSISTENCY. 1625 * 1626 * @throws DatabaseException 1627 * @throws DatabaseNotFoundException 1628 */ openGroupDb(final boolean doLockProbe)1629 private DatabaseImpl openGroupDb(final boolean doLockProbe) 1630 throws DatabaseNotFoundException, DatabaseException { 1631 1632 /* Acquire the lock. */ 1633 try { 1634 if (doLockProbe) { 1635 if (!groupDbLock.tryLock(1, TimeUnit.MILLISECONDS)) { 1636 /* Contention, try later. */ 1637 return null; 1638 } 1639 } else { 1640 groupDbLock.lockInterruptibly(); 1641 } 1642 } catch(InterruptedException e) { 1643 throw EnvironmentFailureException.unexpectedException(e); 1644 } 1645 1646 Txn txn = null; 1647 try { 1648 if (groupDbImpl != null) { 1649 return groupDbImpl; 1650 } 1651 1652 DatabaseImpl newDbImpl = null; 1653 TransactionConfig txnConfig = new TransactionConfig(); 1654 txnConfig.setConsistencyPolicy(NO_CONSISTENCY); 1655 txn = new ReadonlyTxn(this, txnConfig); 1656 1657 newDbImpl = getDbTree().getDb(txn, 1658 DbType.REP_GROUP.getInternalName(), 1659 null /* databaseHandle */); 1660 if (newDbImpl == null) { 1661 throw new DatabaseNotFoundException 1662 (DbType.REP_GROUP.getInternalName()); 1663 } 1664 txn.commit(); 1665 txn = null; 1666 1667 groupDbImpl = newDbImpl; 1668 return groupDbImpl; 1669 } finally { 1670 if (txn != null) { 1671 txn.abort(); 1672 } 1673 groupDbLock.unlock(); 1674 } 1675 } 1676 1677 /** 1678 * Return true if the node has been configured as a Designated Primary. 1679 * This does not necessarily mean that the node is actively operating in 1680 * designated primary mode. See 1681 * {@link com.sleepycat.je.rep.arbitration.Arbiter#isActive} 1682 */ isDesignatedPrimary()1683 public boolean isDesignatedPrimary() { 1684 return getConfigManager().getBoolean(RepParams.DESIGNATED_PRIMARY); 1685 } 1686 1687 @Override addDbBackup(DbBackup backup)1688 public boolean addDbBackup(DbBackup backup) { 1689 synchronized (backups) { 1690 if (backupProhibited) { 1691 return false; 1692 } 1693 assert backups.add(backup); 1694 } 1695 1696 super.addDbBackup(backup); 1697 return true; 1698 } 1699 1700 @Override removeDbBackup(DbBackup backup)1701 public void removeDbBackup(DbBackup backup) { 1702 synchronized (backups) { 1703 assert backups.remove(backup); 1704 } 1705 super.removeDbBackup(backup); 1706 } 1707 1708 /* Invalidate all the on going DbBackups, used in Replay.rollback(). */ invalidateBackups(long fileNumber)1709 public void invalidateBackups(long fileNumber) { 1710 synchronized (backups) { 1711 for (DbBackup backup : backups) { 1712 backup.invalidate(fileNumber); 1713 } 1714 } 1715 } 1716 1717 /* Set the backupProhibited status, used in Replay.rollback(). */ setBackupProhibited(boolean backupProhibited)1718 public void setBackupProhibited(boolean backupProhibited) { 1719 synchronized (backups) { 1720 this.backupProhibited = backupProhibited; 1721 } 1722 } 1723 1724 /* For creating a rep exception from standalone code. */ 1725 @Override 1726 public LockPreemptedException createLockPreemptedException(Locker locker, Throwable cause)1727 createLockPreemptedException(Locker locker, Throwable cause) { 1728 return new LockPreemptedException(locker, cause); 1729 } 1730 1731 /* For creating a rep exception from standalone code. */ 1732 @Override 1733 public DatabasePreemptedException createDatabasePreemptedException(String msg, String dbName, Database db)1734 createDatabasePreemptedException(String msg, 1735 String dbName, 1736 Database db) { 1737 return new DatabasePreemptedException(msg, dbName, db); 1738 } 1739 1740 /* For creating a rep exception from standalone code. */ 1741 @Override createLogOverwriteException(String msg)1742 public LogOverwriteException createLogOverwriteException(String msg) { 1743 return new LogOverwriteException(msg); 1744 } 1745 1746 /** 1747 * Sets up the environment for group shutdown when the environment is 1748 * closed. 1749 * 1750 * @see ReplicatedEnvironment#shutdownGroup(long, TimeUnit) 1751 */ shutdownGroupSetup(long timeoutMs)1752 public void shutdownGroupSetup(long timeoutMs) { 1753 final int openCount = getAppOpenCount(); 1754 if (openCount > 1) { 1755 throw new IllegalStateException 1756 ("Environment has " + (openCount - 1) + 1757 " additional open handles."); 1758 } 1759 1760 final int backupCount = getBackupCount(); 1761 if (backupCount > 0) { 1762 throw new IllegalStateException 1763 ("Environment has " + backupCount + 1764 " DbBackups in progress."); 1765 } 1766 1767 repNode.shutdownGroupOnClose(timeoutMs); 1768 } 1769 transferMaster(Set<String> replicas, long timeout, boolean force)1770 public String transferMaster(Set<String> replicas, 1771 long timeout, 1772 boolean force) { 1773 return repNode.transferMaster(replicas, timeout, force); 1774 } 1775 1776 /** 1777 * Dump interesting aspects of the node's state. Currently for debugging 1778 * use, possibly useful for field support. 1779 */ dumpState()1780 public String dumpState() { 1781 StringBuilder sb = new StringBuilder(); 1782 1783 sb.append(getNameIdPair()); 1784 sb.append("[").append(getState()).append("] " ); 1785 1786 if (repNode != null) { 1787 sb.append(repNode.dumpState()); 1788 } 1789 1790 if (vlsnIndex != null) { 1791 sb.append("vlsnRange="); 1792 sb.append(vlsnIndex.getRange()).append("\n"); 1793 } 1794 1795 if (replay != null) { 1796 sb.append(replay.dumpState()); 1797 } 1798 1799 return sb.toString(); 1800 } 1801 1802 /** 1803 * Dumps the state associated with all active Feeders that supply 1804 * acknowledgments, along with identifying information about the node and 1805 * its current HA state. 1806 */ dumpAckFeederState()1807 public String dumpAckFeederState() { 1808 return getNameIdPair() + "[" + getState() + "]" + 1809 repNode.dumpAckFeederState() ; 1810 } 1811 1812 /** 1813 * If this node was started with a hard recovery, preserve that 1814 * information. 1815 */ setHardRecoveryInfo(RollbackException e)1816 public void setHardRecoveryInfo(RollbackException e) { 1817 hardRecoveryStat.set(true); 1818 hardRecoveryInfoStat.set(e.getMessage()); 1819 } 1820 getNodeStats()1821 public StatGroup getNodeStats() { 1822 return nodeStats; 1823 } 1824 1825 /** 1826 * Ensure that the in-memory vlsn index encompasses all logged entries 1827 * before it is flushed to disk. A No-Op for non-replicated systems. 1828 * [#19754] 1829 */ 1830 @Override awaitVLSNConsistency()1831 public void awaitVLSNConsistency() { 1832 vlsnIndex.awaitConsistency(); 1833 } 1834 setSyncupProgress(SyncupProgress progress)1835 public void setSyncupProgress(SyncupProgress progress) { 1836 setSyncupProgress(progress, 0, -1); 1837 } 1838 setSyncupProgress(SyncupProgress progress, long n, long total)1839 public void setSyncupProgress(SyncupProgress progress, long n, long total) { 1840 if (syncupProgressListener == null) { 1841 return; 1842 } 1843 1844 if (!(syncupProgressListener.progress(progress, n, total))) { 1845 throw new EnvironmentFailureException 1846 (this, EnvironmentFailureReason.PROGRESS_LISTENER_HALT, 1847 "ReplicatedEnvironmentConfig.syncupProgressListener: "); 1848 } 1849 } 1850 getLogRewriteListener()1851 public LogFileRewriteListener getLogRewriteListener() { 1852 return logRewriteListener; 1853 } 1854 getRepNetConfig()1855 public ReplicationNetworkConfig getRepNetConfig() { 1856 return repNetConfig; 1857 } 1858 getChannelFactory()1859 public DataChannelFactory getChannelFactory() { 1860 initializeChannelFactory(); 1861 return channelFactory; 1862 } 1863 1864 @Override invalidate(EnvironmentFailureException e)1865 public void invalidate(EnvironmentFailureException e) { 1866 super.invalidate(e); 1867 unblockTxnCompletion(); 1868 } 1869 getLastTxnEnd()1870 public VLSN getLastTxnEnd() { 1871 return vlsnIndexAccess.getLastTxnEnd(); 1872 } 1873 1874 /** 1875 * Private class to prevent used of the close() method by the application 1876 * on an internal handle. 1877 */ 1878 private static class InternalReplicatedEnvironment 1879 extends ReplicatedEnvironment { 1880 InternalReplicatedEnvironment(File environmentHome, ReplicationConfig cloneRepConfig, EnvironmentConfig cloneConfig, RepImpl envImpl)1881 public InternalReplicatedEnvironment(File environmentHome, 1882 ReplicationConfig cloneRepConfig, 1883 EnvironmentConfig cloneConfig, 1884 RepImpl envImpl) { 1885 super(environmentHome, cloneRepConfig, cloneConfig, 1886 null /*consistencyPolicy*/, null /*initialElectionPolicy*/, 1887 false /*joinGroup*/, envImpl); 1888 } 1889 1890 @Override isInternalHandle()1891 protected boolean isInternalHandle() { 1892 return true; 1893 } 1894 1895 @Override close()1896 public synchronized void close() { 1897 throw EnvironmentFailureException.unexpectedState 1898 ("close() not permitted on an internal environment handle"); 1899 } 1900 } 1901 1902 /** 1903 * Peruse the environment wide transaction table, and return a set of 1904 * all existing MasterTxns. 1905 */ getExistingMasterTxns()1906 public Set<MasterTxn> getExistingMasterTxns() { 1907 Set<Txn> txns = getTxnManager().getMasterTxns(); 1908 1909 /* 1910 * Need to down cast because we're using the non-HA method, 1911 * txnManager.getTxnSet() 1912 */ 1913 Set<MasterTxn> masterTxns = new HashSet<MasterTxn>(); 1914 for (Txn t: txns) { 1915 masterTxns.add((MasterTxn) t); 1916 } 1917 return masterTxns; 1918 } 1919 1920 /** 1921 * RepImpl supplies the last txn abort or commit vlsn for use cases such as 1922 * determining how caught up a feeder or master transfer is. This info is 1923 * usually obtained from the VLSNRange via the VLSNIndex, but in some types 1924 * of environment shutdowns, the VLSNIndex may need to be nulled out. When 1925 * that happens, VLSNIndexAccess will switch over from using the VLSNIndex 1926 * to obtain the range, to using a reference to the last known 1927 * VLSNRange. Note that the VLSNRange instance held within VLSNIndex is 1928 * constantly being replaced when the replication stream is active., and 1929 * that's why LastTxnEndAccess generally obtains the range via the 1930 * VLSNIndex, rather keeping a reference to a VLSNRange instance. 1931 */ 1932 private class VLSNIndexAccess { 1933 1934 private VLSNRange savedRange; 1935 getLastTxnEnd()1936 synchronized VLSN getLastTxnEnd() { 1937 if (vlsnIndex != null) { 1938 return vlsnIndex.getRange().getLastTxnEnd(); 1939 } 1940 return savedRange.getLastTxnEnd(); 1941 } 1942 1943 /** 1944 * Save the last range so the lastTxnEnd value can continue 1945 * to be available, and null out the vlsnIndex. 1946 */ closeVLSNIndex(boolean checkpointed)1947 synchronized void closeVLSNIndex(boolean checkpointed) { 1948 if (vlsnIndex != null) { 1949 vlsnIndex.close(checkpointed); 1950 savedRange = vlsnIndex.getRange(); 1951 vlsnIndex = null; 1952 } 1953 } 1954 1955 /** 1956 * Save the last range so the lastTxnEnd value can continue 1957 * to be available, and null out the vlsnIndex. 1958 */ abnormalCloseVLSNIndex()1959 synchronized void abnormalCloseVLSNIndex() { 1960 if (vlsnIndex != null) { 1961 vlsnIndex.abnormalClose(); 1962 savedRange = vlsnIndex.getRange(); 1963 vlsnIndex = null; 1964 } 1965 } 1966 } 1967 } 1968