1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.stream; 9 10 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_ACK_MESSAGES; 11 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_GROUPED_ACKS; 12 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_GROUP_ACK_MESSAGES; 13 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_MAX_GROUPED_ACKS; 14 15 import java.nio.ByteBuffer; 16 import java.util.Arrays; 17 import java.util.UUID; 18 19 import com.sleepycat.je.DatabaseException; 20 import com.sleepycat.je.Durability.SyncPolicy; 21 import com.sleepycat.je.EnvironmentFailureException; 22 import com.sleepycat.je.JEVersion; 23 import com.sleepycat.je.log.LogEntryType; 24 import com.sleepycat.je.log.LogUtils; 25 import com.sleepycat.je.rep.NodeType; 26 import com.sleepycat.je.rep.impl.RepGroupImpl; 27 import com.sleepycat.je.rep.impl.RepNodeImpl; 28 import com.sleepycat.je.rep.impl.node.NameIdPair; 29 import com.sleepycat.je.rep.impl.node.RepNode; 30 import com.sleepycat.je.rep.utilint.BinaryProtocol; 31 import com.sleepycat.je.rep.utilint.LongMaxZeroStat; 32 import com.sleepycat.je.utilint.LongMaxStat; 33 import com.sleepycat.je.utilint.LongStat; 34 import com.sleepycat.je.utilint.VLSN; 35 36 37 /** 38 * Defines the messages used to set up a feeder-replica replication stream. 39 * 40 * From Feeder to Replica 41 * 42 * Heartbeat -> HeartbeatResponse 43 * Commit -> Ack 44 * Commit+ -> GroupAck 45 * Entry 46 * ShutdownRequest -> ShutdownResponse 47 * 48 * Note: in the future, we may want to support bulk entry messages 49 * 50 * From Replica to Feeder 51 * 52 * The following subset of messages represents the handshake protocol that 53 * precedes the transmission of replication log entries. 54 * 55 * ReplicaProtocolVersion -> FeederProtocolVersion | DuplicateNodeReject 56 * ReplicaJEVersions -> FeederJEVersions | JEVersionsReject 57 * NodeGroupInfo -> NodeGroupInfoOK | NodeGroupInfoReject 58 * SNTPRequest -> SNTPResponse 59 * -> HeartbeatResponse 60 * 61 * A HeartbeatResponse is not strictly a response message and may also be sent 62 * spontaneously if there is no output activity in a heartbeat interval. This 63 * spontaneous generation of a HeartbeatReponse ensures that a socket is not 64 * timed out if the feeder or the replica replay are otherwise busy. 65 * 66 * Note that there may be multiple SNTPRequest/SNTPResponse message pairs that 67 * are exchanged as part of a single handshake. So a successful handshake 68 * requested sequence generated by the Replica looks like: 69 * 70 * ReplicaProtocolVersion ReplicaJEVersions MembershipInfo [SNTPRequest]+ 71 * 72 * The following messages constitute the syncup and the transmission of log 73 * entries. 74 * 75 * EntryRequest -> Entry | EntryNotFound | AlternateMatchpoint 76 * RestoreRequest -> RestoreResponse 77 * StartStream 78 * 79 * The Protocol instance has local state in terms of buffers that are reused 80 * across multiple messages. A Protocol instance is expected to be used in 81 * strictly serial fashion. Consequently, there is an instance for each Replica 82 * to Feeder connection, and two instances per Feeder to Replica connection: 83 * one for the InputThread and one for the OutputThread. 84 */ 85 public class Protocol extends BinaryProtocol { 86 87 /* 88 * Note that the GROUP_ACK response message was introduced in version 5, 89 * but is disabled by default via RepParams.REPLICA_ENABLE_GROUP_ACKS. 90 * 91 * It can be enabled when we can increase the protocol version number. 92 */ 93 94 /* The default (highest) version supported by the Protocol code. */ 95 public static final int MAX_VERSION = 5; 96 97 /* The minimum version we're willing to interact with. */ 98 static final int MIN_VERSION = 3; 99 100 /* Version added in JE 6.0.1 to support RepGroupImpl version 3. */ 101 public static final int VERSION_5 = 5; 102 public static final JEVersion VERSION_5_JE_VERSION = 103 new JEVersion("6.0.1"); 104 105 /* 106 * Version in which HEARTBEAT_RESPONSE added a second field. We can manage 107 * without this optional additional information if we have to, we we can 108 * still interact with the previous protocol version. (JE 5.0.58) 109 */ 110 static final int VERSION_4 = 4; 111 public static final JEVersion VERSION_4_JE_VERSION = 112 new JEVersion("5.0.58"); 113 114 /* Version added in JE 4.0.50 to address byte order issues. */ 115 static final int VERSION_3 = 3; 116 public static final JEVersion VERSION_3_JE_VERSION = 117 new JEVersion("4.0.50"); 118 119 /* The replication node that's communicating via this protocol. */ 120 private final RepNode repNode; 121 122 /** The log version of the format used to write log entries. */ 123 private final int writeLogVersion; 124 125 /* Count of all singleton ACK messages. */ 126 private final LongStat nAckMessages; 127 128 /* Count of all group ACK messages. */ 129 private final LongStat nGroupAckMessages; 130 131 /* Sum of all acks sent via group ACK messages. */ 132 private final LongStat nGroupedAcks; 133 134 /* Max number of acks sent via a single group ACK message. */ 135 private final LongMaxStat nMaxGroupedAcks; 136 137 138 /** 139 * Returns a Protocol object configured that implements the specified 140 * (supported) protocol version. 141 * 142 * @param repNode the node using the protocol 143 * 144 * @param protocolVersion the version of the protocol that must be 145 * implemented by this object 146 * 147 * @param maxProtocolVersion the highest supported protocol version, which 148 * may be lower than the code version, for testing purposes 149 * 150 * @param writeLogVersion the log version of the format used to write log 151 * entries 152 */ Protocol(final RepNode repNode, final int protocolVersion, final int maxProtocolVersion, final int writeLogVersion)153 private Protocol(final RepNode repNode, 154 final int protocolVersion, 155 final int maxProtocolVersion, 156 final int writeLogVersion) { 157 super((repNode != null) ? repNode.getNameIdPair() : NameIdPair.NULL, 158 maxProtocolVersion, 159 protocolVersion, 160 (repNode != null) ? repNode.getRepImpl() : null); 161 162 /* repNode is only null during test usage. */ 163 this.repNode = repNode; 164 this.configuredVersion = protocolVersion; 165 this.writeLogVersion = writeLogVersion; 166 167 nAckMessages = new LongStat(stats, N_ACK_MESSAGES); 168 nGroupAckMessages = new LongStat(stats, N_GROUP_ACK_MESSAGES); 169 nGroupedAcks = new LongStat(stats, N_GROUPED_ACKS); 170 nMaxGroupedAcks = new LongMaxZeroStat(stats, N_MAX_GROUPED_ACKS); 171 172 initializeMessageOps(new MessageOp[] { 173 REPLICA_PROTOCOL_VERSION, 174 FEEDER_PROTOCOL_VERSION, 175 DUP_NODE_REJECT, 176 REPLICA_JE_VERSIONS, 177 FEEDER_JE_VERSIONS, 178 JE_VERSIONS_REJECT, 179 MEMBERSHIP_INFO, 180 MEMBERSHIP_INFO_OK, 181 MEMBERSHIP_INFO_REJECT, 182 SNTP_REQUEST, 183 SNTP_RESPONSE, 184 ENTRY, 185 START_STREAM, 186 HEARTBEAT, 187 HEARTBEAT_RESPONSE, 188 COMMIT, 189 ACK, 190 ENTRY_REQUEST, 191 ENTRY_NOTFOUND, 192 RESTORE_REQUEST, 193 RESTORE_RESPONSE, 194 ALT_MATCHPOINT, 195 SHUTDOWN_REQUEST, 196 SHUTDOWN_RESPONSE, 197 GROUP_ACK 198 }); 199 } 200 201 /** 202 * Returns a protocol object that supports the specific requested protocol 203 * version, or null if the version is not supported. 204 */ get(final RepNode repNode, final int protocolVersion)205 public static Protocol get(final RepNode repNode, 206 final int protocolVersion) { 207 return get(repNode, protocolVersion, protocolVersion); 208 } 209 210 /** 211 * Returns a protocol object that supports the specific requested protocol 212 * version, which must not be higher than the specified maximum version, or 213 * null if no such version is supported. 214 */ get(final RepNode repNode, final int protocolVersion, final int maxProtocolVersion)215 public static Protocol get(final RepNode repNode, 216 final int protocolVersion, 217 final int maxProtocolVersion) { 218 return get(repNode, protocolVersion, maxProtocolVersion, 219 LogEntryType.LOG_VERSION); 220 } 221 222 /** 223 * Returns a protocol object that supports the specified protocol, which 224 * must be less than the specified maximum version, and writes log entries 225 * in the specified log version format. Returns null if no such version is 226 * supported. 227 */ get(final RepNode repNode, final int protocolVersion, final int maxProtocolVersion, final int writeLogVersion)228 public static Protocol get(final RepNode repNode, 229 final int protocolVersion, 230 final int maxProtocolVersion, 231 final int writeLogVersion) { 232 assert repNode != null; 233 234 /* 235 * If the RepGroupImpl has been upgraded to version 3, then require 236 * protocol version 5, which is required to support that RepGroupImpl 237 * version. This check prevents new facilities that depend on 238 * RepGroupImpl version 3 from being seen by non-upgraded replicas. 239 */ 240 int minProtocolVersion = MIN_VERSION; 241 final RepGroupImpl group = repNode.getGroup(); 242 if (group == null) { 243 throw EnvironmentFailureException.unexpectedState( 244 "Group is null"); 245 } 246 final int groupFormatVersion = group.getFormatVersion(); 247 if (groupFormatVersion >= RepGroupImpl.FORMAT_VERSION_3) { 248 minProtocolVersion = VERSION_5; 249 } 250 251 return get(repNode, protocolVersion, minProtocolVersion, 252 maxProtocolVersion, writeLogVersion); 253 } 254 255 /** 256 * Returns a protocol object using the specified minimum and maximum 257 * values, returning null if no supported version is found. Use this 258 * method for testing when the RepGroupImpl object is not available. 259 */ get(final RepNode repNode, final int protocolVersion, final int minProtocolVersion, final int maxProtocolVersion, final int writeLogVersion)260 static Protocol get(final RepNode repNode, 261 final int protocolVersion, 262 final int minProtocolVersion, 263 final int maxProtocolVersion, 264 final int writeLogVersion) { 265 266 if (!isSupportedVersion(protocolVersion, minProtocolVersion, 267 maxProtocolVersion)) { 268 return null; 269 } 270 271 /* 272 * Future code will do what is appropriate in support of the version 273 * depending on the nature of the incompatibility. 274 */ 275 return new Protocol(repNode, protocolVersion, maxProtocolVersion, 276 writeLogVersion); 277 } 278 279 /** 280 * Returns a protocol object using the specified protocol version. 281 */ getProtocol(final RepNode repNode, final int protocolVersion)282 static Protocol getProtocol(final RepNode repNode, 283 final int protocolVersion) { 284 285 assert repNode != null; 286 return new Protocol(repNode, protocolVersion, protocolVersion, 287 LogEntryType.LOG_VERSION); 288 } 289 290 /** 291 * Returns true if the code can support the version. 292 * 293 * @param protocolVersion protocol version being queried 294 * @param minProtocolVersion minimum protocol version supported 295 * @param maxProtocolVersion maximum protocol version supported 296 * 297 * @return true if the protocol version is supported by this implementation 298 * of the protocol 299 */ isSupportedVersion(final int protocolVersion, final int minProtocolVersion, final int maxProtocolVersion)300 private static boolean isSupportedVersion(final int protocolVersion, 301 final int minProtocolVersion, 302 final int maxProtocolVersion) { 303 if (protocolVersion == Integer.MIN_VALUE) { 304 /* For testing purposes. */ 305 return false; 306 } 307 308 /* 309 * Version compatibility check: for now, a simple range check. We can 310 * make this fancier in the future if necessary. 311 */ 312 return minProtocolVersion <= protocolVersion && 313 protocolVersion <= maxProtocolVersion; 314 } 315 316 /** 317 * Gets the JE version that corresponds to the specified protocol version, 318 * for use in creating error messages that explain protocol version errors 319 * in terms of JE versions. Returns null if the associated version is not 320 * known. 321 */ getProtocolJEVersion(final int protocolVersion)322 static JEVersion getProtocolJEVersion(final int protocolVersion) { 323 switch (protocolVersion) { 324 case VERSION_5: 325 return VERSION_5_JE_VERSION; 326 case VERSION_4: 327 return VERSION_4_JE_VERSION; 328 case VERSION_3: 329 return VERSION_3_JE_VERSION; 330 default: 331 return null; 332 } 333 } 334 335 /** 336 * Gets the protocol version that corresponds to the specified JE version, 337 * throwing an IllegalArgumentException if the version is not supported. 338 */ getJEVersionProtocolVersion(final JEVersion jeVersion)339 static int getJEVersionProtocolVersion(final JEVersion jeVersion) { 340 if (jeVersion == null) { 341 return VERSION_5; 342 } else if (jeVersion.compareTo(VERSION_5_JE_VERSION) >= 0) { 343 return VERSION_5; 344 } else if (jeVersion.compareTo(VERSION_4_JE_VERSION) >= 0) { 345 return VERSION_4; 346 } else if (jeVersion.compareTo(VERSION_3_JE_VERSION) >= 0) { 347 return VERSION_3; 348 } else { 349 throw new IllegalArgumentException( 350 "JE version not supported: " + jeVersion); 351 } 352 } 353 354 /** 355 * Write an entry output wire record to the message buffer using the write 356 * log version format and increment nEntriesWrittenOldVersion if the entry 357 * format was changed. 358 */ writeOutputWireRecord(final OutputWireRecord record, final ByteBuffer messageBuffer)359 void writeOutputWireRecord(final OutputWireRecord record, 360 final ByteBuffer messageBuffer) { 361 final boolean changedFormat = 362 record.writeToWire(messageBuffer, writeLogVersion); 363 if (changedFormat) { 364 nEntriesWrittenOldVersion.increment(); 365 } 366 } 367 368 public final static MessageOp REPLICA_PROTOCOL_VERSION = 369 new MessageOp((short) 1, ReplicaProtocolVersion.class); 370 371 public final static MessageOp FEEDER_PROTOCOL_VERSION = 372 new MessageOp((short) 2, FeederProtocolVersion.class); 373 374 public final static MessageOp DUP_NODE_REJECT = 375 new MessageOp((short) 3, DuplicateNodeReject.class); 376 377 public final static MessageOp REPLICA_JE_VERSIONS = 378 new MessageOp((short) 4, ReplicaJEVersions.class); 379 380 public final static MessageOp FEEDER_JE_VERSIONS = 381 new MessageOp((short) 5, FeederJEVersions.class); 382 383 public final static MessageOp JE_VERSIONS_REJECT = 384 new MessageOp((short) 6, JEVersionsReject.class); 385 386 public final static MessageOp MEMBERSHIP_INFO = 387 new MessageOp((short) 7, NodeGroupInfo.class); 388 389 public final static MessageOp MEMBERSHIP_INFO_OK = 390 new MessageOp((short) 8, NodeGroupInfoOK.class); 391 392 public final static MessageOp MEMBERSHIP_INFO_REJECT = 393 new MessageOp((short) 9, NodeGroupInfoReject.class); 394 395 public final static MessageOp SNTP_REQUEST = 396 new MessageOp((short)10, SNTPRequest.class); 397 398 public final static MessageOp SNTP_RESPONSE = 399 new MessageOp((short)11, SNTPResponse.class); 400 401 /* Core Replication Stream post-handshake messages */ 402 public final static MessageOp ENTRY = 403 new MessageOp((short) 101, Entry.class); 404 405 public final static MessageOp START_STREAM = 406 new MessageOp((short) 102, StartStream.class); 407 408 public final static MessageOp HEARTBEAT = 409 new MessageOp((short) 103, Heartbeat.class); 410 411 public final static MessageOp HEARTBEAT_RESPONSE = 412 new MessageOp((short) 104, HeartbeatResponse.class); 413 414 public final static MessageOp COMMIT = 415 new MessageOp((short) 105, Commit.class); 416 417 public final static MessageOp ACK = 418 new MessageOp((short) 106, Ack.class); 419 420 public final static MessageOp ENTRY_REQUEST = 421 new MessageOp((short) 107, EntryRequest.class); 422 423 public final static MessageOp ENTRY_NOTFOUND = 424 new MessageOp((short) 108, EntryNotFound.class); 425 426 public final static MessageOp ALT_MATCHPOINT = 427 new MessageOp((short) 109, AlternateMatchpoint.class); 428 429 public final static MessageOp RESTORE_REQUEST = 430 new MessageOp((short) 110, RestoreRequest.class); 431 432 public final static MessageOp RESTORE_RESPONSE = 433 new MessageOp((short) 111, RestoreResponse.class); 434 435 public final static MessageOp SHUTDOWN_REQUEST = 436 new MessageOp((short) 112, ShutdownRequest.class); 437 438 public final static MessageOp SHUTDOWN_RESPONSE = 439 new MessageOp((short) 113, ShutdownResponse.class); 440 441 public final static MessageOp GROUP_ACK = 442 new MessageOp((short) 114, GroupAck.class); 443 444 /** 445 * Base class for all protocol handshake messages. 446 */ 447 abstract class HandshakeMessage extends SimpleMessage { 448 } 449 450 /** 451 * Version broadcasts the sending node's protocol version. 452 */ 453 abstract class ProtocolVersion extends HandshakeMessage { 454 private final int version; 455 456 @SuppressWarnings("hiding") 457 private final NameIdPair nameIdPair; 458 ProtocolVersion(int version)459 public ProtocolVersion(int version) { 460 super(); 461 this.version = version; 462 this.nameIdPair = Protocol.this.nameIdPair; 463 } 464 465 @Override wireFormat()466 public ByteBuffer wireFormat() { 467 return wireFormat(version, nameIdPair); 468 } 469 ProtocolVersion(ByteBuffer buffer)470 public ProtocolVersion(ByteBuffer buffer) { 471 version = LogUtils.readInt(buffer); 472 nameIdPair = getNameIdPair(buffer); 473 } 474 475 /** 476 * @return the version 477 */ getVersion()478 int getVersion() { 479 return version; 480 } 481 482 /** 483 * The nodeName of the sender 484 * 485 * @return nodeName 486 */ getNameIdPair()487 NameIdPair getNameIdPair() { 488 return nameIdPair; 489 } 490 } 491 492 /** 493 * The replica sends the feeder its protocol version. 494 * 495 * IMPORTANT: This message must not change. 496 */ 497 public class ReplicaProtocolVersion extends ProtocolVersion { 498 ReplicaProtocolVersion()499 public ReplicaProtocolVersion() { 500 super(configuredVersion); 501 } 502 ReplicaProtocolVersion(ByteBuffer buffer)503 public ReplicaProtocolVersion(ByteBuffer buffer) { 504 super(buffer); 505 } 506 507 @Override getOp()508 public MessageOp getOp() { 509 return REPLICA_PROTOCOL_VERSION; 510 } 511 } 512 513 /** 514 * The feeder sends the replica its proposed version. 515 * 516 * IMPORTANT: This message must not change. 517 */ 518 public class FeederProtocolVersion extends ProtocolVersion { 519 FeederProtocolVersion(int proposedVersion)520 public FeederProtocolVersion(int proposedVersion) { 521 super(proposedVersion); 522 } 523 FeederProtocolVersion(ByteBuffer buffer)524 public FeederProtocolVersion(ByteBuffer buffer) { 525 super(buffer); 526 } 527 528 @Override getOp()529 public MessageOp getOp() { 530 return FEEDER_PROTOCOL_VERSION; 531 } 532 } 533 534 /* Reject response to a ReplicaProtocolVersion request */ 535 public class DuplicateNodeReject extends RejectMessage { 536 DuplicateNodeReject(String errorMessage)537 DuplicateNodeReject(String errorMessage) { 538 super(errorMessage); 539 } 540 DuplicateNodeReject(ByteBuffer buffer)541 public DuplicateNodeReject(ByteBuffer buffer) { 542 super(buffer); 543 } 544 545 @Override getOp()546 public MessageOp getOp() { 547 return DUP_NODE_REJECT; 548 } 549 } 550 551 public class SNTPRequest extends HandshakeMessage { 552 553 private final long originateTimestamp; 554 555 /* Set by the receiver at the time the message is recreated. */ 556 private long receiveTimestamp = -1; 557 558 /* 559 * Determines whether this is the last in a consecutive stream of 560 * requests to determine the skew. 561 */ 562 private boolean isLast = true; 563 SNTPRequest(boolean isLast)564 public SNTPRequest(boolean isLast) { 565 super(); 566 this.isLast = isLast; 567 originateTimestamp = repNode.getClock().currentTimeMillis(); 568 } 569 570 @Override wireFormat()571 public ByteBuffer wireFormat() { 572 return wireFormat(originateTimestamp, isLast); 573 } 574 SNTPRequest(ByteBuffer buffer)575 public SNTPRequest(ByteBuffer buffer) { 576 this.originateTimestamp = LogUtils.readLong(buffer); 577 this.isLast = getBoolean(buffer); 578 this.receiveTimestamp = repNode.getClock().currentTimeMillis(); 579 } 580 581 @Override getOp()582 public MessageOp getOp() { 583 return SNTP_REQUEST; 584 } 585 getOriginateTimestamp()586 public long getOriginateTimestamp() { 587 return originateTimestamp; 588 } 589 getReceiveTimestamp()590 public long getReceiveTimestamp() { 591 return receiveTimestamp; 592 } 593 isLast()594 public boolean isLast() { 595 return isLast; 596 } 597 } 598 599 public class SNTPResponse extends HandshakeMessage { 600 601 /* These fields have the standard SNTP interpretation */ 602 private final long originateTimestamp; // time request sent by client 603 private final long receiveTimestamp; // time request received by server 604 605 /* 606 * Initialized when the message is serialized to ensure it's as 607 * accurate as possible. 608 */ 609 private long transmitTimestamp = -1; // time reply sent by server 610 611 /* Initialized at de-serialization for similar reasons. */ 612 private long destinationTimestamp = -1; //time reply received by client 613 SNTPResponse(SNTPRequest request)614 public SNTPResponse(SNTPRequest request) { 615 this.originateTimestamp = request.originateTimestamp; 616 this.receiveTimestamp = request.receiveTimestamp; 617 } 618 619 @Override wireFormat()620 public ByteBuffer wireFormat() { 621 transmitTimestamp = repNode.getClock().currentTimeMillis(); 622 return wireFormat(originateTimestamp, 623 receiveTimestamp, 624 transmitTimestamp); 625 } 626 SNTPResponse(ByteBuffer buffer)627 public SNTPResponse(ByteBuffer buffer) { 628 originateTimestamp = LogUtils.readLong(buffer); 629 receiveTimestamp = LogUtils.readLong(buffer); 630 transmitTimestamp = LogUtils.readLong(buffer); 631 destinationTimestamp = repNode.getClock().currentTimeMillis(); 632 } 633 634 @Override getOp()635 public MessageOp getOp() { 636 return SNTP_RESPONSE; 637 } 638 getOriginateTimestamp()639 public long getOriginateTimestamp() { 640 return originateTimestamp; 641 } 642 getReceiveTimestamp()643 public long getReceiveTimestamp() { 644 return receiveTimestamp; 645 } 646 getTransmitTimestamp()647 public long getTransmitTimestamp() { 648 return transmitTimestamp; 649 } 650 getDestinationTimestamp()651 public long getDestinationTimestamp() { 652 return destinationTimestamp; 653 } 654 getDelay()655 public long getDelay() { 656 assert(destinationTimestamp != -1); 657 return (destinationTimestamp - originateTimestamp) - 658 (transmitTimestamp - receiveTimestamp); 659 } 660 getDelta()661 public long getDelta() { 662 assert(destinationTimestamp != -1); 663 return ((receiveTimestamp - originateTimestamp) + 664 (transmitTimestamp - destinationTimestamp))/2; 665 } 666 } 667 668 /** 669 * Abstract message used as the basis for the exchange of software versions 670 * between replicated nodes 671 */ 672 abstract class JEVersions extends HandshakeMessage { 673 private final JEVersion version; 674 675 private final int logVersion; 676 JEVersions(JEVersion version, int logVersion)677 public JEVersions(JEVersion version, int logVersion) { 678 this.version = version; 679 this.logVersion = logVersion; 680 } 681 682 @Override wireFormat()683 public ByteBuffer wireFormat() { 684 return wireFormat(version.getVersionString(), logVersion); 685 } 686 JEVersions(ByteBuffer buffer)687 public JEVersions(ByteBuffer buffer) { 688 this.version = new JEVersion(getString(buffer)); 689 this.logVersion = LogUtils.readInt(buffer); 690 } 691 getVersion()692 public JEVersion getVersion() { 693 return version; 694 } 695 getLogVersion()696 public byte getLogVersion() { 697 return (byte)logVersion; 698 } 699 } 700 701 public class ReplicaJEVersions extends JEVersions { 702 ReplicaJEVersions(JEVersion version, int logVersion)703 ReplicaJEVersions(JEVersion version, int logVersion) { 704 super(version, logVersion); 705 } 706 ReplicaJEVersions(ByteBuffer buffer)707 public ReplicaJEVersions(ByteBuffer buffer) { 708 super(buffer); 709 } 710 711 @Override getOp()712 public MessageOp getOp() { 713 return REPLICA_JE_VERSIONS; 714 } 715 716 } 717 718 public class FeederJEVersions extends JEVersions { 719 FeederJEVersions(JEVersion version, int logVersion)720 FeederJEVersions(JEVersion version, int logVersion) { 721 super(version, logVersion); 722 } 723 FeederJEVersions(ByteBuffer buffer)724 public FeederJEVersions(ByteBuffer buffer) { 725 super(buffer); 726 } 727 728 @Override getOp()729 public MessageOp getOp() { 730 return FEEDER_JE_VERSIONS; 731 } 732 } 733 734 /* Reject response to a ReplicaJEVersions request */ 735 public class JEVersionsReject extends RejectMessage { 736 JEVersionsReject(String errorMessage)737 public JEVersionsReject(String errorMessage) { 738 super(errorMessage); 739 } 740 JEVersionsReject(ByteBuffer buffer)741 public JEVersionsReject(ByteBuffer buffer) { 742 super(buffer); 743 } 744 745 @Override getOp()746 public MessageOp getOp() { 747 return JE_VERSIONS_REJECT; 748 } 749 } 750 751 public class NodeGroupInfo extends HandshakeMessage { 752 private final String groupName; 753 private final UUID uuid; 754 755 @SuppressWarnings("hiding") 756 private final NameIdPair nameIdPair; 757 private final String hostName; 758 private final int port; 759 private final NodeType nodeType; 760 private final boolean designatedPrimary; 761 762 /** 763 * A string version of the JE version running on this node, or the 764 * empty string if not known. 765 */ 766 private final String jeVersion; 767 NodeGroupInfo(final String groupName, final UUID uuid, final NameIdPair nameIdPair, final String hostName, final int port, final NodeType nodeType, final boolean designatedPrimary, final JEVersion jeVersion)768 NodeGroupInfo(final String groupName, 769 final UUID uuid, 770 final NameIdPair nameIdPair, 771 final String hostName, 772 final int port, 773 final NodeType nodeType, 774 final boolean designatedPrimary, 775 final JEVersion jeVersion) { 776 777 this.groupName = groupName; 778 this.uuid = uuid; 779 this.nameIdPair = nameIdPair; 780 this.hostName = hostName; 781 this.port = port; 782 this.nodeType = nodeType; 783 this.designatedPrimary = designatedPrimary; 784 this.jeVersion = (jeVersion != null) ? 785 jeVersion.getNumericVersionString() : 786 ""; 787 } 788 789 @Override getOp()790 public MessageOp getOp() { 791 return MEMBERSHIP_INFO; 792 } 793 794 @Override wireFormat()795 public ByteBuffer wireFormat() { 796 final boolean repGroupV3 = (getVersion() >= VERSION_5); 797 if (!repGroupV3 && nodeType.compareTo(NodeType.ELECTABLE) > 0) { 798 throw new IllegalStateException( 799 "Node type not supported before group version 3: " + 800 nodeType); 801 } 802 final Object[] args = new Object[repGroupV3 ? 9 : 8]; 803 args[0] = groupName; 804 args[1] = uuid.getMostSignificantBits(); 805 args[2] = uuid.getLeastSignificantBits(); 806 args[3] = nameIdPair; 807 args[4] = hostName; 808 args[5] = port; 809 args[6] = nodeType; 810 args[7] = designatedPrimary; 811 if (repGroupV3) { 812 args[8] = jeVersion; 813 } 814 return wireFormat(args); 815 } 816 NodeGroupInfo(ByteBuffer buffer)817 public NodeGroupInfo(ByteBuffer buffer) { 818 this.groupName = getString(buffer); 819 this.uuid = new UUID(LogUtils.readLong(buffer), 820 LogUtils.readLong(buffer)); 821 this.nameIdPair = getNameIdPair(buffer); 822 this.hostName = getString(buffer); 823 this.port = LogUtils.readInt(buffer); 824 this.nodeType = getEnum(NodeType.class, buffer); 825 this.designatedPrimary = getBoolean(buffer); 826 jeVersion = (getVersion() >= VERSION_5) ? getString(buffer) : ""; 827 } 828 getGroupName()829 public String getGroupName() { 830 return groupName; 831 } 832 getUUID()833 public UUID getUUID() { 834 return uuid; 835 } 836 getNodeName()837 public String getNodeName() { 838 return nameIdPair.getName(); 839 } 840 getNodeId()841 public int getNodeId() { 842 return nameIdPair.getId(); 843 } 844 getHostName()845 public String getHostName() { 846 return hostName; 847 } 848 getNameIdPair()849 public NameIdPair getNameIdPair() { 850 return nameIdPair; 851 } 852 port()853 public int port() { 854 return port; 855 } getNodeType()856 public NodeType getNodeType() { 857 return nodeType; 858 } 859 isDesignatedPrimary()860 public boolean isDesignatedPrimary() { 861 return designatedPrimary; 862 } 863 864 /** 865 * Returns the JE version most recently noted running on the associated 866 * node, or null if not known. 867 */ getJEVersion()868 public JEVersion getJEVersion() { 869 return !jeVersion.isEmpty() ? new JEVersion(jeVersion) : null; 870 } 871 } 872 873 /** 874 * Response to a NodeGroupInfo request that was successful. The object 875 * contains the group's UUID and the replica's NameIdPair. The group UUID 876 * is used to update the replica's notion of the group UUID on first 877 * joining. The NameIdPair is used to update the replica's node ID for a 878 * secondary node, which is not available in the RepGroupDB. 879 */ 880 public class NodeGroupInfoOK extends HandshakeMessage { 881 882 private final UUID uuid; 883 @SuppressWarnings("hiding") 884 private final NameIdPair nameIdPair; 885 NodeGroupInfoOK(UUID uuid, NameIdPair nameIdPair)886 public NodeGroupInfoOK(UUID uuid, NameIdPair nameIdPair) { 887 super(); 888 this.uuid = uuid; 889 this.nameIdPair = nameIdPair; 890 } 891 NodeGroupInfoOK(ByteBuffer buffer)892 public NodeGroupInfoOK(ByteBuffer buffer) { 893 uuid = new UUID(LogUtils.readLong(buffer), 894 LogUtils.readLong(buffer)); 895 nameIdPair = getNameIdPair(buffer); 896 } 897 898 @Override wireFormat()899 public ByteBuffer wireFormat() { 900 return wireFormat(uuid.getMostSignificantBits(), 901 uuid.getLeastSignificantBits(), 902 nameIdPair); 903 } 904 905 @Override getOp()906 public MessageOp getOp() { 907 return MEMBERSHIP_INFO_OK; 908 } 909 getNameIdPair()910 public NameIdPair getNameIdPair() { 911 return nameIdPair; 912 } 913 getUUID()914 public UUID getUUID() { 915 return uuid; 916 } 917 } 918 919 public class NodeGroupInfoReject extends RejectMessage { 920 NodeGroupInfoReject(String errorMessage)921 NodeGroupInfoReject(String errorMessage) { 922 super(errorMessage); 923 } 924 925 @Override getOp()926 public MessageOp getOp() { 927 return MEMBERSHIP_INFO_REJECT; 928 } 929 930 @Override wireFormat()931 public ByteBuffer wireFormat() { 932 return wireFormat(errorMessage); 933 } 934 NodeGroupInfoReject(ByteBuffer buffer)935 public NodeGroupInfoReject(ByteBuffer buffer) { 936 super(buffer); 937 } 938 } 939 940 /** 941 * Base class for messages which contain only a VLSN 942 */ 943 abstract class VLSNMessage extends Message { 944 protected final VLSN vlsn; 945 VLSNMessage(VLSN vlsn)946 VLSNMessage(VLSN vlsn) { 947 super(); 948 this.vlsn = vlsn; 949 } 950 VLSNMessage(ByteBuffer buffer)951 public VLSNMessage(ByteBuffer buffer) { 952 long vlsnSequence = LogUtils.readLong(buffer); 953 vlsn = new VLSN(vlsnSequence); 954 } 955 956 @Override wireFormat()957 public ByteBuffer wireFormat() { 958 int bodySize = 8; 959 ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); 960 LogUtils.writeLong(messageBuffer, vlsn.getSequence()); 961 messageBuffer.flip(); 962 return messageBuffer; 963 } 964 getVLSN()965 VLSN getVLSN() { 966 return vlsn; 967 } 968 969 @Override toString()970 public String toString() { 971 return super.toString() + " " + vlsn; 972 } 973 } 974 975 /** 976 * A message containing a log entry in the replication stream. 977 */ 978 public class Entry extends Message { 979 980 /* 981 * InputWireRecord is set when this Message had been received at this 982 * node. OutputWireRecord is set when this message is created for 983 * sending from this node. 984 */ 985 final protected InputWireRecord inputWireRecord; 986 protected OutputWireRecord outputWireRecord; 987 Entry(final OutputWireRecord outputWireRecord)988 public Entry(final OutputWireRecord outputWireRecord) { 989 inputWireRecord = null; 990 this.outputWireRecord = outputWireRecord; 991 } 992 993 @Override getOp()994 public MessageOp getOp() { 995 return ENTRY; 996 } 997 998 @Override wireFormat()999 public ByteBuffer wireFormat() { 1000 final int bodySize = getWireSize(); 1001 final ByteBuffer messageBuffer = 1002 allocateInitializedBuffer(bodySize); 1003 writeOutputWireRecord(outputWireRecord, messageBuffer); 1004 messageBuffer.flip(); 1005 return messageBuffer; 1006 } 1007 getWireSize()1008 protected int getWireSize() { 1009 return outputWireRecord.getWireSize(writeLogVersion); 1010 } 1011 Entry(final ByteBuffer buffer)1012 public Entry(final ByteBuffer buffer) 1013 throws DatabaseException { 1014 1015 inputWireRecord = 1016 new InputWireRecord(repNode.getRepImpl(), buffer); 1017 } 1018 getWireRecord()1019 public InputWireRecord getWireRecord() { 1020 return inputWireRecord; 1021 } 1022 1023 @Override toString()1024 public String toString() { 1025 1026 final StringBuilder sb = new StringBuilder(); 1027 sb.append(super.toString()); 1028 1029 if (inputWireRecord != null) { 1030 sb.append(" "); 1031 sb.append(inputWireRecord); 1032 } 1033 1034 if (outputWireRecord != null) { 1035 sb.append(" "); 1036 sb.append(outputWireRecord); 1037 } 1038 1039 return sb.toString(); 1040 } 1041 1042 /* For unit test support */ 1043 @Override match(Message other)1044 public boolean match(Message other) { 1045 1046 /* 1047 * This message was read in, but we need to compare it to a message 1048 * that was sent out. 1049 */ 1050 if (outputWireRecord == null) { 1051 outputWireRecord = new OutputWireRecord(repNode.getRepImpl(), 1052 inputWireRecord); 1053 } 1054 return super.match(other); 1055 } 1056 1057 /* True if the log entry is a TxnAbort or TxnCommit. */ isTxnEnd()1058 public boolean isTxnEnd() { 1059 final byte entryType = getWireRecord().getEntryType(); 1060 if (LogEntryType.LOG_TXN_COMMIT.equalsType(entryType) || 1061 LogEntryType.LOG_TXN_ABORT.equalsType(entryType)) { 1062 return true; 1063 } 1064 1065 return false; 1066 } 1067 } 1068 1069 /** 1070 * StartStream indicates that the replica would like the feeder to start 1071 * the replication stream at the proposed vlsn. 1072 */ 1073 public class StartStream extends VLSNMessage { 1074 StartStream(VLSN startVLSN)1075 StartStream(VLSN startVLSN) { 1076 super(startVLSN); 1077 } 1078 StartStream(ByteBuffer buffer)1079 public StartStream(ByteBuffer buffer) { 1080 super(buffer); 1081 } 1082 1083 @Override getOp()1084 public MessageOp getOp() { 1085 return START_STREAM; 1086 } 1087 } 1088 1089 public class Heartbeat extends Message { 1090 1091 private final long masterNow; 1092 private final long currentTxnEndVLSN; 1093 Heartbeat(long masterNow, long currentTxnEndVLSN)1094 public Heartbeat(long masterNow, long currentTxnEndVLSN) { 1095 this.masterNow = masterNow; 1096 this.currentTxnEndVLSN = currentTxnEndVLSN; 1097 } 1098 1099 @Override getOp()1100 public MessageOp getOp() { 1101 return HEARTBEAT; 1102 } 1103 1104 @Override wireFormat()1105 public ByteBuffer wireFormat() { 1106 int bodySize = 8 * 2 /* masterNow + currentTxnEndVLSN */; 1107 ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); 1108 LogUtils.writeLong(messageBuffer, masterNow); 1109 LogUtils.writeLong(messageBuffer, currentTxnEndVLSN); 1110 messageBuffer.flip(); 1111 return messageBuffer; 1112 } 1113 Heartbeat(ByteBuffer buffer)1114 public Heartbeat(ByteBuffer buffer) { 1115 masterNow = LogUtils.readLong(buffer); 1116 currentTxnEndVLSN = LogUtils.readLong(buffer); 1117 } 1118 getMasterNow()1119 public long getMasterNow() { 1120 return masterNow; 1121 } 1122 getCurrentTxnEndVLSN()1123 public long getCurrentTxnEndVLSN() { 1124 return currentTxnEndVLSN; 1125 } 1126 1127 @Override toString()1128 public String toString() { 1129 return super.toString() + " masterNow=" + masterNow + 1130 " currentCommit=" + currentTxnEndVLSN; 1131 } 1132 } 1133 1134 public class HeartbeatResponse extends Message { 1135 /* The latest syncupVLSN */ 1136 private final VLSN syncupVLSN; 1137 private final VLSN txnEndVLSN; 1138 HeartbeatResponse(VLSN syncupVLSN, VLSN ackedVLSN)1139 public HeartbeatResponse(VLSN syncupVLSN, VLSN ackedVLSN) { 1140 super(); 1141 this.syncupVLSN = syncupVLSN; 1142 this.txnEndVLSN = ackedVLSN; 1143 } 1144 HeartbeatResponse(ByteBuffer buffer)1145 public HeartbeatResponse(ByteBuffer buffer) { 1146 syncupVLSN = new VLSN(LogUtils.readLong(buffer)); 1147 txnEndVLSN = 1148 getVersion() >= VERSION_4 ? 1149 new VLSN(LogUtils.readLong(buffer)) : 1150 null; 1151 } 1152 1153 @Override getOp()1154 public MessageOp getOp() { 1155 return HEARTBEAT_RESPONSE; 1156 } 1157 1158 @Override wireFormat()1159 public ByteBuffer wireFormat() { 1160 boolean includeTxnEndVLSN = getVersion() >= VERSION_4; 1161 int bodySize = includeTxnEndVLSN ? 1162 8 * 2 : 1163 8; 1164 ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); 1165 LogUtils.writeLong(messageBuffer, syncupVLSN.getSequence()); 1166 if (includeTxnEndVLSN) { 1167 LogUtils.writeLong(messageBuffer, txnEndVLSN.getSequence()); 1168 } 1169 messageBuffer.flip(); 1170 return messageBuffer; 1171 } 1172 getSyncupVLSN()1173 public VLSN getSyncupVLSN() { 1174 return syncupVLSN; 1175 } 1176 getTxnEndVLSN()1177 public VLSN getTxnEndVLSN() { 1178 return txnEndVLSN; 1179 } 1180 1181 @Override toString()1182 public String toString() { 1183 return super.toString() + " syncupVLSN=" + syncupVLSN; 1184 } 1185 } 1186 1187 /** 1188 * Message used to shutdown a node 1189 */ 1190 public class ShutdownRequest extends SimpleMessage { 1191 /* The time that the shutdown was initiated on the master. */ 1192 private final long shutdownTimeMs; 1193 ShutdownRequest(long shutdownTimeMs)1194 public ShutdownRequest(long shutdownTimeMs) { 1195 super(); 1196 this.shutdownTimeMs = shutdownTimeMs; 1197 } 1198 1199 @Override getOp()1200 public MessageOp getOp() { 1201 return SHUTDOWN_REQUEST; 1202 } 1203 ShutdownRequest(ByteBuffer buffer)1204 public ShutdownRequest(ByteBuffer buffer) { 1205 shutdownTimeMs = LogUtils.readLong(buffer); 1206 } 1207 1208 @Override wireFormat()1209 public ByteBuffer wireFormat() { 1210 return wireFormat(shutdownTimeMs); 1211 } 1212 getShutdownTimeMs()1213 public long getShutdownTimeMs() { 1214 return shutdownTimeMs; 1215 } 1216 } 1217 1218 /** 1219 * Message in response to a shutdown request. 1220 */ 1221 public class ShutdownResponse extends Message { 1222 ShutdownResponse()1223 public ShutdownResponse() { 1224 super(); 1225 } 1226 1227 @Override getOp()1228 public MessageOp getOp() { 1229 return SHUTDOWN_RESPONSE; 1230 } 1231 ShutdownResponse(@uppressWarningsR) ByteBuffer buffer)1232 public ShutdownResponse(@SuppressWarnings("unused") ByteBuffer buffer) { 1233 } 1234 } 1235 1236 public class Commit extends Entry { 1237 private final boolean needsAck; 1238 private final SyncPolicy replicaSyncPolicy; 1239 Commit(final boolean needsAck, final SyncPolicy replicaSyncPolicy, final OutputWireRecord wireRecord)1240 public Commit(final boolean needsAck, 1241 final SyncPolicy replicaSyncPolicy, 1242 final OutputWireRecord wireRecord) { 1243 super(wireRecord); 1244 this.needsAck = needsAck; 1245 this.replicaSyncPolicy = replicaSyncPolicy; 1246 } 1247 1248 @Override getOp()1249 public MessageOp getOp() { 1250 return COMMIT; 1251 } 1252 1253 @Override wireFormat()1254 public ByteBuffer wireFormat() { 1255 final int bodySize = super.getWireSize() + 1256 1 /* needsAck */ + 1257 1 /* replica sync policy */; 1258 final ByteBuffer messageBuffer = 1259 allocateInitializedBuffer(bodySize); 1260 messageBuffer.put((byte) (needsAck ? 1 : 0)); 1261 messageBuffer.put((byte) replicaSyncPolicy.ordinal()); 1262 writeOutputWireRecord(outputWireRecord, messageBuffer); 1263 messageBuffer.flip(); 1264 return messageBuffer; 1265 } 1266 Commit(final ByteBuffer buffer)1267 public Commit(final ByteBuffer buffer) 1268 throws DatabaseException { 1269 1270 this(getByteNeedsAck(buffer.get()), 1271 getByteReplicaSyncPolicy(buffer.get()), 1272 buffer); 1273 } 1274 Commit(final boolean needsAck, final SyncPolicy replicaSyncPolicy, final ByteBuffer buffer)1275 private Commit(final boolean needsAck, 1276 final SyncPolicy replicaSyncPolicy, 1277 final ByteBuffer buffer) 1278 throws DatabaseException { 1279 1280 super(buffer); 1281 this.needsAck = needsAck; 1282 this.replicaSyncPolicy = replicaSyncPolicy; 1283 } 1284 getNeedsAck()1285 public boolean getNeedsAck() { 1286 return needsAck; 1287 } 1288 getReplicaSyncPolicy()1289 public SyncPolicy getReplicaSyncPolicy() { 1290 return replicaSyncPolicy; 1291 } 1292 } 1293 1294 /** 1295 * Returns whether the byte value specifies that an acknowledgment is 1296 * needed. 1297 */ getByteNeedsAck(final byte needsAckByte)1298 private static boolean getByteNeedsAck(final byte needsAckByte) { 1299 switch (needsAckByte) { 1300 case 0: 1301 return false; 1302 case 1: 1303 return true; 1304 default: 1305 throw EnvironmentFailureException.unexpectedState( 1306 "Invalid bool ordinal: " + needsAckByte); 1307 } 1308 } 1309 1310 /** Returns the sync policy specified by the argument. */ getByteReplicaSyncPolicy( final byte syncPolicyByte)1311 private static SyncPolicy getByteReplicaSyncPolicy( 1312 final byte syncPolicyByte) { 1313 1314 for (final SyncPolicy p : SyncPolicy.values()) { 1315 if (p.ordinal() == syncPolicyByte) { 1316 return p; 1317 } 1318 } 1319 throw EnvironmentFailureException.unexpectedState( 1320 "Invalid sync policy ordinal: " + syncPolicyByte); 1321 } 1322 1323 public class Ack extends Message { 1324 1325 private final long txnId; 1326 Ack(long txnId)1327 public Ack(long txnId) { 1328 super(); 1329 this.txnId = txnId; 1330 nAckMessages.increment(); 1331 } 1332 1333 @Override getOp()1334 public MessageOp getOp() { 1335 return ACK; 1336 } 1337 1338 @Override wireFormat()1339 public ByteBuffer wireFormat() { 1340 int bodySize = 8; 1341 ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); 1342 LogUtils.writeLong(messageBuffer, txnId); 1343 messageBuffer.flip(); 1344 return messageBuffer; 1345 } 1346 Ack(ByteBuffer buffer)1347 public Ack(ByteBuffer buffer) { 1348 txnId = LogUtils.readLong(buffer); 1349 } 1350 getTxnId()1351 public long getTxnId() { 1352 return txnId; 1353 } 1354 1355 @Override toString()1356 public String toString() { 1357 return super.toString() + " txn " + txnId; 1358 } 1359 } 1360 1361 public class GroupAck extends Message { 1362 1363 private final long txnIds[]; 1364 GroupAck(long txnIds[])1365 public GroupAck(long txnIds[]) { 1366 super(); 1367 this.txnIds = txnIds; 1368 nGroupAckMessages.increment(); 1369 nGroupedAcks.add(txnIds.length); 1370 nMaxGroupedAcks.setMax(txnIds.length); 1371 } 1372 1373 @Override getOp()1374 public MessageOp getOp() { 1375 return GROUP_ACK; 1376 } 1377 1378 @Override wireFormat()1379 public ByteBuffer wireFormat() { 1380 1381 final int bodySize = 4 + 8 * txnIds.length; 1382 final ByteBuffer messageBuffer = 1383 allocateInitializedBuffer(bodySize); 1384 1385 putLongArray(messageBuffer, txnIds); 1386 messageBuffer.flip(); 1387 1388 return messageBuffer; 1389 } 1390 GroupAck(ByteBuffer buffer)1391 public GroupAck(ByteBuffer buffer) { 1392 txnIds = readLongArray(buffer); 1393 } 1394 getTxnIds()1395 public long[] getTxnIds() { 1396 return txnIds; 1397 } 1398 1399 @Override toString()1400 public String toString() { 1401 return super.toString() + " txn " + Arrays.toString(txnIds); 1402 } 1403 } 1404 1405 putLongArray(ByteBuffer buffer, long[] la)1406 private void putLongArray(ByteBuffer buffer, long[] la) { 1407 LogUtils.writeInt(buffer, la.length); 1408 1409 for (long l : la) { 1410 LogUtils.writeLong(buffer, l); 1411 } 1412 } 1413 readLongArray(ByteBuffer buffer)1414 private long[] readLongArray(ByteBuffer buffer) { 1415 final long la[] = new long[LogUtils.readInt(buffer)]; 1416 1417 for (int i=0; i < la.length; i++) { 1418 la[i] = LogUtils.readLong(buffer); 1419 } 1420 1421 return la; 1422 } 1423 1424 /** 1425 * A replica node asks a feeder for the log entry at this VLSN. 1426 */ 1427 public class EntryRequest extends VLSNMessage { 1428 EntryRequest(VLSN matchpoint)1429 EntryRequest(VLSN matchpoint) { 1430 super(matchpoint); 1431 } 1432 EntryRequest(ByteBuffer buffer)1433 public EntryRequest(ByteBuffer buffer) { 1434 super(buffer); 1435 } 1436 1437 @Override getOp()1438 public MessageOp getOp() { 1439 return ENTRY_REQUEST; 1440 } 1441 } 1442 1443 /** 1444 * Response when the EntryRequest asks for a VLSN that is below the VLSN 1445 * range covered by the Feeder. 1446 */ 1447 public class EntryNotFound extends Message { 1448 EntryNotFound()1449 public EntryNotFound() { 1450 } 1451 EntryNotFound(@uppressWarningsR) ByteBuffer buffer)1452 public EntryNotFound(@SuppressWarnings("unused") ByteBuffer buffer) { 1453 super(); 1454 } 1455 1456 @Override getOp()1457 public MessageOp getOp() { 1458 return ENTRY_NOTFOUND; 1459 } 1460 } 1461 1462 public class AlternateMatchpoint extends Message { 1463 1464 private final InputWireRecord alternateInput; 1465 private OutputWireRecord alternateOutput = null; 1466 AlternateMatchpoint(final OutputWireRecord alternate)1467 AlternateMatchpoint(final OutputWireRecord alternate) { 1468 alternateInput = null; 1469 this.alternateOutput = alternate; 1470 } 1471 1472 @Override getOp()1473 public MessageOp getOp() { 1474 return ALT_MATCHPOINT; 1475 } 1476 1477 @Override wireFormat()1478 public ByteBuffer wireFormat() { 1479 final int bodySize = alternateOutput.getWireSize(writeLogVersion); 1480 final ByteBuffer messageBuffer = 1481 allocateInitializedBuffer(bodySize); 1482 writeOutputWireRecord(alternateOutput, messageBuffer); 1483 messageBuffer.flip(); 1484 return messageBuffer; 1485 } 1486 AlternateMatchpoint(final ByteBuffer buffer)1487 public AlternateMatchpoint(final ByteBuffer buffer) 1488 throws DatabaseException { 1489 alternateInput = new InputWireRecord(repNode.getRepImpl(), buffer); 1490 } 1491 getAlternateWireRecord()1492 public InputWireRecord getAlternateWireRecord() { 1493 return alternateInput; 1494 } 1495 1496 /* For unit test support */ 1497 @Override match(Message other)1498 public boolean match(Message other) { 1499 1500 /* 1501 * This message was read in, but we need to compare it to a message 1502 * that was sent out. 1503 */ 1504 if (alternateOutput == null) { 1505 alternateOutput = 1506 new OutputWireRecord(repNode.getRepImpl(), alternateInput); 1507 } 1508 return super.match(other); 1509 } 1510 } 1511 1512 /** 1513 * Request from the replica to the feeder for sufficient information to 1514 * start a network restore. 1515 */ 1516 public class RestoreRequest extends VLSNMessage { 1517 RestoreRequest(VLSN failedMatchpoint)1518 RestoreRequest(VLSN failedMatchpoint) { 1519 super(failedMatchpoint); 1520 } 1521 RestoreRequest(ByteBuffer buffer)1522 public RestoreRequest(ByteBuffer buffer) { 1523 super(buffer); 1524 } 1525 1526 @Override getOp()1527 public MessageOp getOp() { 1528 return RESTORE_REQUEST; 1529 } 1530 } 1531 1532 /** 1533 * Response when the replica needs information to instigate a network 1534 * restore. The message contains two pieces of information. One is a set of 1535 * nodes that could be used as the basis for a NetworkBackup so that the 1536 * request node can become current again. The second is a suitable low vlsn 1537 * for the replica, which will be registered as this replica's local 1538 * CBVLSN. This will contribute to the global CBVLSN calculation. 1539 * 1540 * The feeder when sending this response must, if it is also the master, 1541 * update the membership table to set the local CBVLSN for the requesting 1542 * node thus ensuring that it can continue the replication stream at this 1543 * VLSN (or higher) when it retries the syncup operation. 1544 */ 1545 public class RestoreResponse extends SimpleMessage { 1546 private final VLSN cbvlsn; 1547 1548 private final RepNodeImpl[] logProviders; 1549 RestoreResponse(VLSN cbvlsn, RepNodeImpl[] logProviders)1550 public RestoreResponse(VLSN cbvlsn, RepNodeImpl[] logProviders) { 1551 this.cbvlsn = cbvlsn; 1552 this.logProviders = logProviders; 1553 } 1554 RestoreResponse(ByteBuffer buffer)1555 public RestoreResponse(ByteBuffer buffer) { 1556 long vlsnSequence = LogUtils.readLong(buffer); 1557 cbvlsn = new VLSN(vlsnSequence); 1558 logProviders = getRepNodeImplArray(buffer); 1559 } 1560 1561 @Override wireFormat()1562 public ByteBuffer wireFormat() { 1563 return wireFormat(cbvlsn.getSequence(), logProviders); 1564 } 1565 1566 /* Add support for RepNodeImpl arrays. */ 1567 1568 @Override putWireFormat(final ByteBuffer buffer, final Object obj)1569 protected void putWireFormat(final ByteBuffer buffer, 1570 final Object obj) { 1571 if (obj.getClass() == RepNodeImpl[].class) { 1572 putRepNodeImplArray(buffer, (RepNodeImpl[]) obj); 1573 } else { 1574 super.putWireFormat(buffer, obj); 1575 } 1576 } 1577 1578 @Override wireFormatSize(final Object obj)1579 protected int wireFormatSize(final Object obj) { 1580 if (obj.getClass() == RepNodeImpl[].class) { 1581 return getRepNodeImplArraySize((RepNodeImpl[]) obj); 1582 } 1583 return super.wireFormatSize(obj); 1584 } 1585 putRepNodeImplArray(final ByteBuffer buffer, final RepNodeImpl[] ra)1586 private void putRepNodeImplArray(final ByteBuffer buffer, 1587 final RepNodeImpl[] ra) { 1588 LogUtils.writeInt(buffer, ra.length); 1589 final int groupFormatVersion = getGroupFormatVersion(); 1590 for (final RepNodeImpl node : ra) { 1591 putByteArray( 1592 buffer, 1593 RepGroupImpl.serializeBytes(node, groupFormatVersion)); 1594 } 1595 } 1596 getRepNodeImplArray(final ByteBuffer buffer)1597 private RepNodeImpl[] getRepNodeImplArray(final ByteBuffer buffer) { 1598 final RepNodeImpl[] ra = new RepNodeImpl[LogUtils.readInt(buffer)]; 1599 final int groupFormatVersion = getGroupFormatVersion(); 1600 for (int i = 0; i < ra.length; i++) { 1601 ra[i] = RepGroupImpl.deserializeNode( 1602 getByteArray(buffer), groupFormatVersion); 1603 } 1604 return ra; 1605 } 1606 getRepNodeImplArraySize(RepNodeImpl[] ra)1607 private int getRepNodeImplArraySize(RepNodeImpl[] ra) { 1608 int size = 4; /* array length */ 1609 final int groupFormatVersion = getGroupFormatVersion(); 1610 for (final RepNodeImpl node : ra) { 1611 size += (4 /* Node size */ + 1612 RepGroupImpl.serializeBytes(node, groupFormatVersion) 1613 .length); 1614 } 1615 return size; 1616 } 1617 1618 /** 1619 * Returns the RepGroupImpl version to use for the currently configured 1620 * protocol version. 1621 */ getGroupFormatVersion()1622 private int getGroupFormatVersion() { 1623 return (getVersion() < VERSION_5) ? 1624 RepGroupImpl.FORMAT_VERSION_2 : 1625 RepGroupImpl.MAX_FORMAT_VERSION; 1626 } 1627 1628 @Override getOp()1629 public MessageOp getOp() { 1630 return RESTORE_RESPONSE; 1631 } 1632 getLogProviders()1633 RepNodeImpl[] getLogProviders() { 1634 return logProviders; 1635 } 1636 getCBVLSN()1637 VLSN getCBVLSN() { 1638 return cbvlsn; 1639 } 1640 } 1641 } 1642