1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.stream;
9 
10 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_ACK_MESSAGES;
11 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_GROUPED_ACKS;
12 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_GROUP_ACK_MESSAGES;
13 import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_MAX_GROUPED_ACKS;
14 
15 import java.nio.ByteBuffer;
16 import java.util.Arrays;
17 import java.util.UUID;
18 
19 import com.sleepycat.je.DatabaseException;
20 import com.sleepycat.je.Durability.SyncPolicy;
21 import com.sleepycat.je.EnvironmentFailureException;
22 import com.sleepycat.je.JEVersion;
23 import com.sleepycat.je.log.LogEntryType;
24 import com.sleepycat.je.log.LogUtils;
25 import com.sleepycat.je.rep.NodeType;
26 import com.sleepycat.je.rep.impl.RepGroupImpl;
27 import com.sleepycat.je.rep.impl.RepNodeImpl;
28 import com.sleepycat.je.rep.impl.node.NameIdPair;
29 import com.sleepycat.je.rep.impl.node.RepNode;
30 import com.sleepycat.je.rep.utilint.BinaryProtocol;
31 import com.sleepycat.je.rep.utilint.LongMaxZeroStat;
32 import com.sleepycat.je.utilint.LongMaxStat;
33 import com.sleepycat.je.utilint.LongStat;
34 import com.sleepycat.je.utilint.VLSN;
35 
36 
37 /**
38  * Defines the messages used to set up a feeder-replica replication stream.
39  *
40  * From Feeder to Replica
41  *
42  *    Heartbeat -> HeartbeatResponse
43  *    Commit -> Ack
44  *    Commit+ -> GroupAck
45  *    Entry
46  *    ShutdownRequest -> ShutdownResponse
47  *
48  * Note: in the future, we may want to support bulk entry messages
49  *
50  * From Replica to Feeder
51  *
52  * The following subset of messages represents the handshake protocol that
53  * precedes the transmission of replication log entries.
54  *
55  *    ReplicaProtocolVersion -> FeederProtocolVersion | DuplicateNodeReject
56  *    ReplicaJEVersions -> FeederJEVersions | JEVersionsReject
57  *    NodeGroupInfo -> NodeGroupInfoOK | NodeGroupInfoReject
58  *    SNTPRequest -> SNTPResponse
59  *    -> HeartbeatResponse
60  *
61  * A HeartbeatResponse is not strictly a response message and may also be sent
62  * spontaneously if there is no output activity in a heartbeat interval. This
63  * spontaneous generation of a HeartbeatReponse ensures that a socket is not
64  * timed out if the feeder or the replica replay are otherwise busy.
65  *
66  * Note that there may be multiple SNTPRequest/SNTPResponse message pairs that
67  * are exchanged as part of a single handshake. So a successful handshake
68  * requested sequence generated by the Replica looks like:
69  *
70  * ReplicaProtocolVersion ReplicaJEVersions MembershipInfo [SNTPRequest]+
71  *
72  * The following messages constitute the syncup and the transmission of log
73  * entries.
74  *
75  *    EntryRequest -> Entry | EntryNotFound | AlternateMatchpoint
76  *    RestoreRequest -> RestoreResponse
77  *    StartStream
78  *
79  * The Protocol instance has local state in terms of buffers that are reused
80  * across multiple messages. A Protocol instance is expected to be used in
81  * strictly serial fashion. Consequently, there is an instance for each Replica
82  * to Feeder connection, and two instances per Feeder to Replica connection:
83  * one for the InputThread and one for the OutputThread.
84  */
85 public class Protocol extends BinaryProtocol {
86 
87     /*
88      * Note that the GROUP_ACK response message was introduced in version 5,
89      * but is disabled by default via RepParams.REPLICA_ENABLE_GROUP_ACKS.
90      *
91      * It can be enabled when we can increase the protocol version number.
92      */
93 
94     /* The default (highest) version supported by the Protocol code. */
95     public static final int MAX_VERSION = 5;
96 
97     /* The minimum version we're willing to interact with. */
98     static final int MIN_VERSION = 3;
99 
100     /* Version added in JE 6.0.1 to support RepGroupImpl version 3. */
101     public static final int VERSION_5 = 5;
102     public static final JEVersion VERSION_5_JE_VERSION =
103         new JEVersion("6.0.1");
104 
105     /*
106      * Version in which HEARTBEAT_RESPONSE added a second field.  We can manage
107      * without this optional additional information if we have to, we we can
108      * still interact with the previous protocol version.  (JE 5.0.58)
109      */
110     static final int VERSION_4 = 4;
111     public static final JEVersion VERSION_4_JE_VERSION =
112         new JEVersion("5.0.58");
113 
114     /* Version added in JE 4.0.50 to address byte order issues. */
115     static final int VERSION_3 = 3;
116     public static final JEVersion VERSION_3_JE_VERSION =
117         new JEVersion("4.0.50");
118 
119     /* The replication node that's communicating via this protocol. */
120     private final RepNode repNode;
121 
122     /** The log version of the format used to write log entries. */
123     private final int writeLogVersion;
124 
125     /* Count of all singleton ACK messages. */
126     private final LongStat nAckMessages;
127 
128     /* Count of all group ACK messages. */
129     private final LongStat nGroupAckMessages;
130 
131     /* Sum of all acks sent via group ACK messages. */
132     private final LongStat nGroupedAcks;
133 
134     /* Max number of acks sent via a single group ACK message. */
135     private final LongMaxStat nMaxGroupedAcks;
136 
137 
138     /**
139      * Returns a Protocol object configured that implements the specified
140      * (supported) protocol version.
141      *
142      * @param repNode the node using the protocol
143      *
144      * @param protocolVersion the version of the protocol that must be
145      *        implemented by this object
146      *
147      * @param maxProtocolVersion the highest supported protocol version, which
148      *        may be lower than the code version, for testing purposes
149      *
150      * @param writeLogVersion the log version of the format used to write log
151      *        entries
152      */
Protocol(final RepNode repNode, final int protocolVersion, final int maxProtocolVersion, final int writeLogVersion)153     private Protocol(final RepNode repNode,
154                      final int protocolVersion,
155                      final int maxProtocolVersion,
156                      final int writeLogVersion) {
157         super((repNode != null) ? repNode.getNameIdPair() : NameIdPair.NULL,
158               maxProtocolVersion,
159               protocolVersion,
160               (repNode != null) ? repNode.getRepImpl() : null);
161 
162         /* repNode is only null during test usage. */
163         this.repNode = repNode;
164         this.configuredVersion = protocolVersion;
165         this.writeLogVersion = writeLogVersion;
166 
167         nAckMessages = new LongStat(stats, N_ACK_MESSAGES);
168         nGroupAckMessages = new LongStat(stats, N_GROUP_ACK_MESSAGES);
169         nGroupedAcks = new LongStat(stats, N_GROUPED_ACKS);
170         nMaxGroupedAcks = new LongMaxZeroStat(stats, N_MAX_GROUPED_ACKS);
171 
172         initializeMessageOps(new MessageOp[] {
173             REPLICA_PROTOCOL_VERSION,
174             FEEDER_PROTOCOL_VERSION,
175             DUP_NODE_REJECT,
176             REPLICA_JE_VERSIONS,
177             FEEDER_JE_VERSIONS,
178             JE_VERSIONS_REJECT,
179             MEMBERSHIP_INFO,
180             MEMBERSHIP_INFO_OK,
181             MEMBERSHIP_INFO_REJECT,
182             SNTP_REQUEST,
183             SNTP_RESPONSE,
184             ENTRY,
185             START_STREAM,
186             HEARTBEAT,
187             HEARTBEAT_RESPONSE,
188             COMMIT,
189             ACK,
190             ENTRY_REQUEST,
191             ENTRY_NOTFOUND,
192             RESTORE_REQUEST,
193             RESTORE_RESPONSE,
194             ALT_MATCHPOINT,
195             SHUTDOWN_REQUEST,
196             SHUTDOWN_RESPONSE,
197             GROUP_ACK
198         });
199     }
200 
201     /**
202      * Returns a protocol object that supports the specific requested protocol
203      * version, or null if the version is not supported.
204      */
get(final RepNode repNode, final int protocolVersion)205     public static Protocol get(final RepNode repNode,
206                                final int protocolVersion) {
207         return get(repNode, protocolVersion, protocolVersion);
208     }
209 
210     /**
211      * Returns a protocol object that supports the specific requested protocol
212      * version, which must not be higher than the specified maximum version, or
213      * null if no such version is supported.
214      */
get(final RepNode repNode, final int protocolVersion, final int maxProtocolVersion)215     public static Protocol get(final RepNode repNode,
216                                final int protocolVersion,
217                                final int maxProtocolVersion) {
218         return get(repNode, protocolVersion, maxProtocolVersion,
219                    LogEntryType.LOG_VERSION);
220     }
221 
222     /**
223      * Returns a protocol object that supports the specified protocol, which
224      * must be less than the specified maximum version, and writes log entries
225      * in the specified log version format.  Returns null if no such version is
226      * supported.
227      */
get(final RepNode repNode, final int protocolVersion, final int maxProtocolVersion, final int writeLogVersion)228     public static Protocol get(final RepNode repNode,
229                                final int protocolVersion,
230                                final int maxProtocolVersion,
231                                final int writeLogVersion) {
232         assert repNode != null;
233 
234         /*
235          * If the RepGroupImpl has been upgraded to version 3, then require
236          * protocol version 5, which is required to support that RepGroupImpl
237          * version.  This check prevents new facilities that depend on
238          * RepGroupImpl version 3 from being seen by non-upgraded replicas.
239          */
240         int minProtocolVersion = MIN_VERSION;
241         final RepGroupImpl group = repNode.getGroup();
242         if (group == null) {
243             throw EnvironmentFailureException.unexpectedState(
244                 "Group is null");
245         }
246         final int groupFormatVersion = group.getFormatVersion();
247         if (groupFormatVersion >= RepGroupImpl.FORMAT_VERSION_3) {
248             minProtocolVersion = VERSION_5;
249         }
250 
251         return get(repNode, protocolVersion, minProtocolVersion,
252                    maxProtocolVersion, writeLogVersion);
253     }
254 
255     /**
256      * Returns a protocol object using the specified minimum and maximum
257      * values, returning null if no supported version is found.  Use this
258      * method for testing when the RepGroupImpl object is not available.
259      */
get(final RepNode repNode, final int protocolVersion, final int minProtocolVersion, final int maxProtocolVersion, final int writeLogVersion)260     static Protocol get(final RepNode repNode,
261                         final int protocolVersion,
262                         final int minProtocolVersion,
263                         final int maxProtocolVersion,
264                         final int writeLogVersion) {
265 
266         if (!isSupportedVersion(protocolVersion, minProtocolVersion,
267                                 maxProtocolVersion)) {
268             return null;
269         }
270 
271         /*
272          * Future code will do what is appropriate in support of the version
273          * depending on the nature of the incompatibility.
274          */
275         return new Protocol(repNode, protocolVersion, maxProtocolVersion,
276                             writeLogVersion);
277     }
278 
279     /**
280      * Returns a protocol object using the specified protocol version.
281      */
getProtocol(final RepNode repNode, final int protocolVersion)282     static Protocol getProtocol(final RepNode repNode,
283                                 final int protocolVersion) {
284 
285         assert repNode != null;
286         return new Protocol(repNode, protocolVersion, protocolVersion,
287                             LogEntryType.LOG_VERSION);
288     }
289 
290     /**
291      * Returns true if the code can support the version.
292      *
293      * @param protocolVersion protocol version being queried
294      * @param minProtocolVersion minimum protocol version supported
295      * @param maxProtocolVersion maximum protocol version supported
296      *
297      * @return true if the protocol version is supported by this implementation
298      *         of the protocol
299      */
isSupportedVersion(final int protocolVersion, final int minProtocolVersion, final int maxProtocolVersion)300     private static boolean isSupportedVersion(final int protocolVersion,
301                                               final int minProtocolVersion,
302                                               final int maxProtocolVersion) {
303         if (protocolVersion == Integer.MIN_VALUE) {
304             /* For testing purposes. */
305             return false;
306         }
307 
308         /*
309          * Version compatibility check: for now, a simple range check.  We can
310          * make this fancier in the future if necessary.
311          */
312         return minProtocolVersion <= protocolVersion &&
313             protocolVersion <= maxProtocolVersion;
314     }
315 
316     /**
317      * Gets the JE version that corresponds to the specified protocol version,
318      * for use in creating error messages that explain protocol version errors
319      * in terms of JE versions.  Returns null if the associated version is not
320      * known.
321      */
getProtocolJEVersion(final int protocolVersion)322     static JEVersion getProtocolJEVersion(final int protocolVersion) {
323         switch (protocolVersion) {
324         case VERSION_5:
325             return VERSION_5_JE_VERSION;
326         case VERSION_4:
327             return VERSION_4_JE_VERSION;
328         case VERSION_3:
329             return VERSION_3_JE_VERSION;
330         default:
331             return null;
332         }
333     }
334 
335     /**
336      * Gets the protocol version that corresponds to the specified JE version,
337      * throwing an IllegalArgumentException if the version is not supported.
338      */
getJEVersionProtocolVersion(final JEVersion jeVersion)339     static int getJEVersionProtocolVersion(final JEVersion jeVersion) {
340         if (jeVersion == null) {
341             return VERSION_5;
342         } else if (jeVersion.compareTo(VERSION_5_JE_VERSION) >= 0) {
343             return VERSION_5;
344         } else if (jeVersion.compareTo(VERSION_4_JE_VERSION) >= 0) {
345             return VERSION_4;
346         } else if (jeVersion.compareTo(VERSION_3_JE_VERSION) >= 0) {
347             return VERSION_3;
348         } else {
349             throw new IllegalArgumentException(
350                 "JE version not supported: " + jeVersion);
351         }
352     }
353 
354     /**
355      * Write an entry output wire record to the message buffer using the write
356      * log version format and increment nEntriesWrittenOldVersion if the entry
357      * format was changed.
358      */
writeOutputWireRecord(final OutputWireRecord record, final ByteBuffer messageBuffer)359     void writeOutputWireRecord(final OutputWireRecord record,
360                                final ByteBuffer messageBuffer) {
361         final boolean changedFormat =
362             record.writeToWire(messageBuffer, writeLogVersion);
363         if (changedFormat) {
364             nEntriesWrittenOldVersion.increment();
365         }
366     }
367 
368     public final static MessageOp REPLICA_PROTOCOL_VERSION =
369         new MessageOp((short) 1, ReplicaProtocolVersion.class);
370 
371     public final static MessageOp FEEDER_PROTOCOL_VERSION =
372         new MessageOp((short) 2, FeederProtocolVersion.class);
373 
374     public final static MessageOp DUP_NODE_REJECT =
375         new MessageOp((short) 3, DuplicateNodeReject.class);
376 
377     public final static MessageOp REPLICA_JE_VERSIONS =
378         new MessageOp((short) 4, ReplicaJEVersions.class);
379 
380     public final static MessageOp FEEDER_JE_VERSIONS =
381         new MessageOp((short) 5, FeederJEVersions.class);
382 
383     public final static MessageOp JE_VERSIONS_REJECT =
384         new MessageOp((short) 6, JEVersionsReject.class);
385 
386     public final static MessageOp MEMBERSHIP_INFO =
387         new MessageOp((short) 7, NodeGroupInfo.class);
388 
389     public final static MessageOp MEMBERSHIP_INFO_OK =
390         new MessageOp((short) 8, NodeGroupInfoOK.class);
391 
392     public final static MessageOp MEMBERSHIP_INFO_REJECT =
393         new MessageOp((short) 9, NodeGroupInfoReject.class);
394 
395     public final static MessageOp SNTP_REQUEST =
396         new MessageOp((short)10, SNTPRequest.class);
397 
398     public final static MessageOp SNTP_RESPONSE =
399         new MessageOp((short)11, SNTPResponse.class);
400 
401         /* Core Replication Stream post-handshake messages */
402     public final static MessageOp ENTRY =
403         new MessageOp((short) 101, Entry.class);
404 
405     public final static MessageOp START_STREAM =
406         new MessageOp((short) 102, StartStream.class);
407 
408     public final static MessageOp HEARTBEAT =
409         new MessageOp((short) 103, Heartbeat.class);
410 
411     public final static MessageOp HEARTBEAT_RESPONSE =
412         new MessageOp((short) 104, HeartbeatResponse.class);
413 
414     public final static MessageOp COMMIT =
415         new MessageOp((short) 105, Commit.class);
416 
417     public final static MessageOp ACK =
418         new MessageOp((short) 106, Ack.class);
419 
420     public final static MessageOp ENTRY_REQUEST =
421         new MessageOp((short) 107, EntryRequest.class);
422 
423     public final static MessageOp ENTRY_NOTFOUND =
424         new MessageOp((short) 108, EntryNotFound.class);
425 
426     public final static MessageOp ALT_MATCHPOINT =
427         new MessageOp((short) 109, AlternateMatchpoint.class);
428 
429     public final static MessageOp RESTORE_REQUEST =
430         new MessageOp((short) 110, RestoreRequest.class);
431 
432     public final static MessageOp RESTORE_RESPONSE =
433         new MessageOp((short) 111, RestoreResponse.class);
434 
435     public final static MessageOp SHUTDOWN_REQUEST =
436         new MessageOp((short) 112, ShutdownRequest.class);
437 
438     public final static MessageOp SHUTDOWN_RESPONSE =
439         new MessageOp((short) 113, ShutdownResponse.class);
440 
441     public final static MessageOp GROUP_ACK =
442         new MessageOp((short) 114, GroupAck.class);
443 
444     /**
445      * Base class for all protocol handshake messages.
446      */
447     abstract class HandshakeMessage extends SimpleMessage {
448     }
449 
450     /**
451      * Version broadcasts the sending node's protocol version.
452      */
453     abstract class ProtocolVersion extends HandshakeMessage {
454         private final int version;
455 
456         @SuppressWarnings("hiding")
457         private final NameIdPair nameIdPair;
458 
ProtocolVersion(int version)459         public ProtocolVersion(int version) {
460             super();
461             this.version = version;
462             this.nameIdPair = Protocol.this.nameIdPair;
463         }
464 
465         @Override
wireFormat()466         public ByteBuffer wireFormat() {
467             return wireFormat(version, nameIdPair);
468         }
469 
ProtocolVersion(ByteBuffer buffer)470         public ProtocolVersion(ByteBuffer buffer) {
471             version = LogUtils.readInt(buffer);
472             nameIdPair = getNameIdPair(buffer);
473         }
474 
475         /**
476          * @return the version
477          */
getVersion()478         int getVersion() {
479             return version;
480         }
481 
482         /**
483          * The nodeName of the sender
484          *
485          * @return nodeName
486          */
getNameIdPair()487         NameIdPair getNameIdPair() {
488             return nameIdPair;
489         }
490     }
491 
492     /**
493      * The replica sends the feeder its protocol version.
494      *
495      * IMPORTANT: This message must not change.
496      */
497     public class ReplicaProtocolVersion extends ProtocolVersion {
498 
ReplicaProtocolVersion()499         public ReplicaProtocolVersion() {
500             super(configuredVersion);
501         }
502 
ReplicaProtocolVersion(ByteBuffer buffer)503         public ReplicaProtocolVersion(ByteBuffer buffer) {
504             super(buffer);
505         }
506 
507         @Override
getOp()508         public MessageOp getOp() {
509             return REPLICA_PROTOCOL_VERSION;
510         }
511     }
512 
513     /**
514      * The feeder sends the replica its proposed version.
515      *
516      * IMPORTANT: This message must not change.
517      */
518     public class FeederProtocolVersion extends ProtocolVersion {
519 
FeederProtocolVersion(int proposedVersion)520         public FeederProtocolVersion(int proposedVersion) {
521             super(proposedVersion);
522         }
523 
FeederProtocolVersion(ByteBuffer buffer)524         public FeederProtocolVersion(ByteBuffer buffer) {
525             super(buffer);
526         }
527 
528         @Override
getOp()529         public MessageOp getOp() {
530             return FEEDER_PROTOCOL_VERSION;
531         }
532     }
533 
534     /* Reject response to a ReplicaProtocolVersion request */
535     public class DuplicateNodeReject extends RejectMessage {
536 
DuplicateNodeReject(String errorMessage)537         DuplicateNodeReject(String errorMessage) {
538             super(errorMessage);
539         }
540 
DuplicateNodeReject(ByteBuffer buffer)541         public DuplicateNodeReject(ByteBuffer buffer) {
542             super(buffer);
543         }
544 
545         @Override
getOp()546         public MessageOp getOp() {
547             return DUP_NODE_REJECT;
548         }
549     }
550 
551     public class SNTPRequest extends HandshakeMessage {
552 
553         private final long originateTimestamp;
554 
555         /* Set by the receiver at the time the message is recreated. */
556         private long receiveTimestamp = -1;
557 
558         /*
559          * Determines whether this is the last in a consecutive stream of
560          * requests to determine the skew.
561          */
562         private boolean isLast = true;
563 
SNTPRequest(boolean isLast)564         public SNTPRequest(boolean isLast) {
565             super();
566             this.isLast = isLast;
567             originateTimestamp = repNode.getClock().currentTimeMillis();
568         }
569 
570         @Override
wireFormat()571         public ByteBuffer wireFormat() {
572             return wireFormat(originateTimestamp, isLast);
573         }
574 
SNTPRequest(ByteBuffer buffer)575         public SNTPRequest(ByteBuffer buffer) {
576             this.originateTimestamp = LogUtils.readLong(buffer);
577             this.isLast = getBoolean(buffer);
578             this.receiveTimestamp = repNode.getClock().currentTimeMillis();
579         }
580 
581         @Override
getOp()582         public MessageOp getOp() {
583             return SNTP_REQUEST;
584         }
585 
getOriginateTimestamp()586         public long getOriginateTimestamp() {
587             return originateTimestamp;
588         }
589 
getReceiveTimestamp()590         public long getReceiveTimestamp() {
591             return receiveTimestamp;
592         }
593 
isLast()594         public boolean isLast() {
595             return isLast;
596         }
597     }
598 
599     public class SNTPResponse extends HandshakeMessage {
600 
601         /* These fields have the standard SNTP interpretation */
602         private final long originateTimestamp; // time request sent by client
603         private final long receiveTimestamp; // time request received by server
604 
605         /*
606          * Initialized when the message is serialized to ensure it's as
607          * accurate as possible.
608          */
609         private long transmitTimestamp = -1; // time reply sent by server
610 
611         /* Initialized at de-serialization for similar reasons. */
612         private long destinationTimestamp = -1; //time reply received by client
613 
SNTPResponse(SNTPRequest request)614         public SNTPResponse(SNTPRequest request) {
615             this.originateTimestamp = request.originateTimestamp;
616             this.receiveTimestamp = request.receiveTimestamp;
617         }
618 
619         @Override
wireFormat()620         public ByteBuffer wireFormat() {
621             transmitTimestamp = repNode.getClock().currentTimeMillis();
622             return wireFormat(originateTimestamp,
623                               receiveTimestamp,
624                               transmitTimestamp);
625         }
626 
SNTPResponse(ByteBuffer buffer)627         public SNTPResponse(ByteBuffer buffer) {
628             originateTimestamp = LogUtils.readLong(buffer);
629             receiveTimestamp = LogUtils.readLong(buffer);
630             transmitTimestamp = LogUtils.readLong(buffer);
631             destinationTimestamp = repNode.getClock().currentTimeMillis();
632         }
633 
634         @Override
getOp()635         public MessageOp getOp() {
636             return SNTP_RESPONSE;
637         }
638 
getOriginateTimestamp()639         public long getOriginateTimestamp() {
640             return originateTimestamp;
641         }
642 
getReceiveTimestamp()643         public long getReceiveTimestamp() {
644             return receiveTimestamp;
645         }
646 
getTransmitTimestamp()647         public long getTransmitTimestamp() {
648             return transmitTimestamp;
649         }
650 
getDestinationTimestamp()651         public long getDestinationTimestamp() {
652             return destinationTimestamp;
653         }
654 
getDelay()655         public long getDelay() {
656             assert(destinationTimestamp != -1);
657             return (destinationTimestamp - originateTimestamp) -
658                     (transmitTimestamp - receiveTimestamp);
659         }
660 
getDelta()661         public long getDelta() {
662             assert(destinationTimestamp != -1);
663             return ((receiveTimestamp - originateTimestamp) +
664                     (transmitTimestamp - destinationTimestamp))/2;
665         }
666     }
667 
668     /**
669      * Abstract message used as the basis for the exchange of software versions
670      * between replicated nodes
671      */
672     abstract class JEVersions extends HandshakeMessage {
673         private final JEVersion version;
674 
675         private final int logVersion;
676 
JEVersions(JEVersion version, int logVersion)677         public JEVersions(JEVersion version, int logVersion) {
678             this.version = version;
679             this.logVersion = logVersion;
680         }
681 
682         @Override
wireFormat()683         public ByteBuffer wireFormat() {
684             return wireFormat(version.getVersionString(), logVersion);
685         }
686 
JEVersions(ByteBuffer buffer)687         public JEVersions(ByteBuffer buffer) {
688             this.version = new JEVersion(getString(buffer));
689             this.logVersion = LogUtils.readInt(buffer);
690         }
691 
getVersion()692         public JEVersion getVersion() {
693             return version;
694         }
695 
getLogVersion()696         public byte getLogVersion() {
697             return (byte)logVersion;
698         }
699     }
700 
701     public class ReplicaJEVersions extends JEVersions {
702 
ReplicaJEVersions(JEVersion version, int logVersion)703         ReplicaJEVersions(JEVersion version, int logVersion) {
704             super(version, logVersion);
705         }
706 
ReplicaJEVersions(ByteBuffer buffer)707         public ReplicaJEVersions(ByteBuffer buffer) {
708             super(buffer);
709         }
710 
711         @Override
getOp()712         public MessageOp getOp() {
713             return REPLICA_JE_VERSIONS;
714         }
715 
716     }
717 
718     public class FeederJEVersions extends JEVersions {
719 
FeederJEVersions(JEVersion version, int logVersion)720         FeederJEVersions(JEVersion version, int logVersion) {
721             super(version, logVersion);
722         }
723 
FeederJEVersions(ByteBuffer buffer)724         public FeederJEVersions(ByteBuffer buffer) {
725             super(buffer);
726         }
727 
728         @Override
getOp()729         public MessageOp getOp() {
730             return FEEDER_JE_VERSIONS;
731         }
732     }
733 
734     /* Reject response to a ReplicaJEVersions request */
735     public class JEVersionsReject extends RejectMessage {
736 
JEVersionsReject(String errorMessage)737         public JEVersionsReject(String errorMessage) {
738             super(errorMessage);
739         }
740 
JEVersionsReject(ByteBuffer buffer)741         public JEVersionsReject(ByteBuffer buffer) {
742             super(buffer);
743         }
744 
745         @Override
getOp()746         public MessageOp getOp() {
747             return JE_VERSIONS_REJECT;
748         }
749     }
750 
751     public class NodeGroupInfo extends HandshakeMessage {
752         private final String groupName;
753         private final UUID uuid;
754 
755         @SuppressWarnings("hiding")
756         private final NameIdPair nameIdPair;
757         private final String hostName;
758         private final int port;
759         private final NodeType nodeType;
760         private final boolean designatedPrimary;
761 
762         /**
763          * A string version of the JE version running on this node, or the
764          * empty string if not known.
765          */
766         private final String jeVersion;
767 
NodeGroupInfo(final String groupName, final UUID uuid, final NameIdPair nameIdPair, final String hostName, final int port, final NodeType nodeType, final boolean designatedPrimary, final JEVersion jeVersion)768         NodeGroupInfo(final String groupName,
769                       final UUID uuid,
770                       final NameIdPair nameIdPair,
771                       final String hostName,
772                       final int port,
773                       final NodeType nodeType,
774                       final boolean designatedPrimary,
775                       final JEVersion jeVersion) {
776 
777             this.groupName = groupName;
778             this.uuid = uuid;
779             this.nameIdPair = nameIdPair;
780             this.hostName = hostName;
781             this.port = port;
782             this.nodeType = nodeType;
783             this.designatedPrimary = designatedPrimary;
784             this.jeVersion = (jeVersion != null) ?
785                 jeVersion.getNumericVersionString() :
786                 "";
787         }
788 
789         @Override
getOp()790         public MessageOp getOp() {
791             return MEMBERSHIP_INFO;
792         }
793 
794         @Override
wireFormat()795         public ByteBuffer wireFormat() {
796             final boolean repGroupV3 = (getVersion() >= VERSION_5);
797             if (!repGroupV3 && nodeType.compareTo(NodeType.ELECTABLE) > 0) {
798                 throw new IllegalStateException(
799                     "Node type not supported before group version 3: " +
800                     nodeType);
801             }
802             final Object[] args = new Object[repGroupV3 ? 9 : 8];
803             args[0] = groupName;
804             args[1] = uuid.getMostSignificantBits();
805             args[2] = uuid.getLeastSignificantBits();
806             args[3] = nameIdPair;
807             args[4] = hostName;
808             args[5] = port;
809             args[6] = nodeType;
810             args[7] = designatedPrimary;
811             if (repGroupV3) {
812                 args[8] = jeVersion;
813             }
814             return wireFormat(args);
815         }
816 
NodeGroupInfo(ByteBuffer buffer)817         public NodeGroupInfo(ByteBuffer buffer) {
818             this.groupName = getString(buffer);
819             this.uuid = new UUID(LogUtils.readLong(buffer),
820                                  LogUtils.readLong(buffer));
821             this.nameIdPair = getNameIdPair(buffer);
822             this.hostName = getString(buffer);
823             this.port = LogUtils.readInt(buffer);
824             this.nodeType = getEnum(NodeType.class, buffer);
825             this.designatedPrimary = getBoolean(buffer);
826             jeVersion = (getVersion() >= VERSION_5) ? getString(buffer) : "";
827         }
828 
getGroupName()829         public String getGroupName() {
830             return groupName;
831         }
832 
getUUID()833         public UUID getUUID() {
834             return uuid;
835         }
836 
getNodeName()837         public String getNodeName() {
838             return nameIdPair.getName();
839         }
840 
getNodeId()841         public int getNodeId() {
842             return nameIdPair.getId();
843         }
844 
getHostName()845         public String getHostName() {
846             return hostName;
847         }
848 
getNameIdPair()849         public NameIdPair getNameIdPair() {
850             return nameIdPair;
851         }
852 
port()853         public int port() {
854             return port;
855         }
getNodeType()856         public NodeType getNodeType() {
857             return nodeType;
858         }
859 
isDesignatedPrimary()860         public boolean isDesignatedPrimary() {
861             return designatedPrimary;
862         }
863 
864         /**
865          * Returns the JE version most recently noted running on the associated
866          * node, or null if not known.
867          */
getJEVersion()868         public JEVersion getJEVersion() {
869             return !jeVersion.isEmpty() ? new JEVersion(jeVersion) : null;
870         }
871     }
872 
873     /**
874      * Response to a NodeGroupInfo request that was successful.  The object
875      * contains the group's UUID and the replica's NameIdPair.  The group UUID
876      * is used to update the replica's notion of the group UUID on first
877      * joining.  The NameIdPair is used to update the replica's node ID for a
878      * secondary node, which is not available in the RepGroupDB.
879      */
880     public class NodeGroupInfoOK extends HandshakeMessage {
881 
882         private final UUID uuid;
883         @SuppressWarnings("hiding")
884         private final NameIdPair nameIdPair;
885 
NodeGroupInfoOK(UUID uuid, NameIdPair nameIdPair)886         public NodeGroupInfoOK(UUID uuid, NameIdPair nameIdPair) {
887             super();
888             this.uuid = uuid;
889             this.nameIdPair = nameIdPair;
890         }
891 
NodeGroupInfoOK(ByteBuffer buffer)892         public NodeGroupInfoOK(ByteBuffer buffer) {
893             uuid = new UUID(LogUtils.readLong(buffer),
894                             LogUtils.readLong(buffer));
895             nameIdPair = getNameIdPair(buffer);
896         }
897 
898         @Override
wireFormat()899         public ByteBuffer wireFormat() {
900             return wireFormat(uuid.getMostSignificantBits(),
901                               uuid.getLeastSignificantBits(),
902                               nameIdPair);
903         }
904 
905         @Override
getOp()906         public MessageOp getOp() {
907             return MEMBERSHIP_INFO_OK;
908         }
909 
getNameIdPair()910         public NameIdPair getNameIdPair() {
911             return nameIdPair;
912         }
913 
getUUID()914         public UUID getUUID() {
915             return uuid;
916         }
917     }
918 
919     public class NodeGroupInfoReject extends RejectMessage {
920 
NodeGroupInfoReject(String errorMessage)921         NodeGroupInfoReject(String errorMessage) {
922             super(errorMessage);
923         }
924 
925         @Override
getOp()926         public MessageOp getOp() {
927             return MEMBERSHIP_INFO_REJECT;
928         }
929 
930         @Override
wireFormat()931         public ByteBuffer wireFormat() {
932             return wireFormat(errorMessage);
933         }
934 
NodeGroupInfoReject(ByteBuffer buffer)935         public NodeGroupInfoReject(ByteBuffer buffer) {
936             super(buffer);
937         }
938     }
939 
940     /**
941      * Base class for messages which contain only a VLSN
942      */
943     abstract class VLSNMessage extends Message {
944         protected final VLSN vlsn;
945 
VLSNMessage(VLSN vlsn)946         VLSNMessage(VLSN vlsn) {
947             super();
948             this.vlsn = vlsn;
949         }
950 
VLSNMessage(ByteBuffer buffer)951         public VLSNMessage(ByteBuffer buffer) {
952             long vlsnSequence = LogUtils.readLong(buffer);
953             vlsn = new VLSN(vlsnSequence);
954         }
955 
956         @Override
wireFormat()957         public ByteBuffer wireFormat() {
958             int bodySize = 8;
959             ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize);
960             LogUtils.writeLong(messageBuffer, vlsn.getSequence());
961             messageBuffer.flip();
962             return messageBuffer;
963         }
964 
getVLSN()965         VLSN getVLSN() {
966             return vlsn;
967         }
968 
969         @Override
toString()970         public String toString() {
971             return super.toString() + " " + vlsn;
972         }
973     }
974 
975     /**
976      * A message containing a log entry in the replication stream.
977      */
978     public class Entry extends Message {
979 
980         /*
981          * InputWireRecord is set when this Message had been received at this
982          * node. OutputWireRecord is set when this message is created for
983          * sending from this node.
984          */
985         final protected InputWireRecord inputWireRecord;
986         protected OutputWireRecord outputWireRecord;
987 
Entry(final OutputWireRecord outputWireRecord)988         public Entry(final OutputWireRecord outputWireRecord) {
989             inputWireRecord = null;
990             this.outputWireRecord = outputWireRecord;
991         }
992 
993         @Override
getOp()994         public MessageOp getOp() {
995             return ENTRY;
996         }
997 
998         @Override
wireFormat()999         public ByteBuffer wireFormat() {
1000             final int bodySize = getWireSize();
1001             final ByteBuffer messageBuffer =
1002                 allocateInitializedBuffer(bodySize);
1003             writeOutputWireRecord(outputWireRecord, messageBuffer);
1004             messageBuffer.flip();
1005             return messageBuffer;
1006         }
1007 
getWireSize()1008         protected int getWireSize() {
1009             return outputWireRecord.getWireSize(writeLogVersion);
1010         }
1011 
Entry(final ByteBuffer buffer)1012         public Entry(final ByteBuffer buffer)
1013             throws DatabaseException {
1014 
1015             inputWireRecord =
1016                 new InputWireRecord(repNode.getRepImpl(), buffer);
1017         }
1018 
getWireRecord()1019         public InputWireRecord getWireRecord() {
1020             return inputWireRecord;
1021         }
1022 
1023         @Override
toString()1024         public String toString() {
1025 
1026             final StringBuilder sb = new StringBuilder();
1027             sb.append(super.toString());
1028 
1029             if (inputWireRecord != null) {
1030                 sb.append(" ");
1031                 sb.append(inputWireRecord);
1032             }
1033 
1034             if (outputWireRecord != null) {
1035                 sb.append(" ");
1036                 sb.append(outputWireRecord);
1037             }
1038 
1039             return sb.toString();
1040         }
1041 
1042         /* For unit test support */
1043         @Override
match(Message other)1044         public boolean match(Message other) {
1045 
1046             /*
1047              * This message was read in, but we need to compare it to a message
1048              * that was sent out.
1049              */
1050             if (outputWireRecord == null) {
1051                 outputWireRecord = new OutputWireRecord(repNode.getRepImpl(),
1052                                                         inputWireRecord);
1053             }
1054             return super.match(other);
1055         }
1056 
1057         /* True if the log entry is a TxnAbort or TxnCommit. */
isTxnEnd()1058         public boolean isTxnEnd() {
1059             final byte entryType = getWireRecord().getEntryType();
1060             if (LogEntryType.LOG_TXN_COMMIT.equalsType(entryType) ||
1061                 LogEntryType.LOG_TXN_ABORT.equalsType(entryType)) {
1062                 return true;
1063             }
1064 
1065             return false;
1066         }
1067     }
1068 
1069     /**
1070      * StartStream indicates that the replica would like the feeder to start
1071      * the replication stream at the proposed vlsn.
1072      */
1073     public class StartStream extends VLSNMessage {
1074 
StartStream(VLSN startVLSN)1075         StartStream(VLSN startVLSN) {
1076             super(startVLSN);
1077         }
1078 
StartStream(ByteBuffer buffer)1079         public StartStream(ByteBuffer buffer) {
1080             super(buffer);
1081         }
1082 
1083         @Override
getOp()1084         public MessageOp getOp() {
1085             return START_STREAM;
1086         }
1087     }
1088 
1089     public class Heartbeat extends Message {
1090 
1091         private final long masterNow;
1092         private final long currentTxnEndVLSN;
1093 
Heartbeat(long masterNow, long currentTxnEndVLSN)1094         public Heartbeat(long masterNow, long currentTxnEndVLSN) {
1095             this.masterNow = masterNow;
1096             this.currentTxnEndVLSN = currentTxnEndVLSN;
1097         }
1098 
1099         @Override
getOp()1100         public MessageOp getOp() {
1101             return HEARTBEAT;
1102         }
1103 
1104         @Override
wireFormat()1105         public ByteBuffer wireFormat() {
1106             int bodySize = 8 * 2 /* masterNow + currentTxnEndVLSN */;
1107             ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize);
1108             LogUtils.writeLong(messageBuffer, masterNow);
1109             LogUtils.writeLong(messageBuffer, currentTxnEndVLSN);
1110             messageBuffer.flip();
1111             return messageBuffer;
1112         }
1113 
Heartbeat(ByteBuffer buffer)1114         public Heartbeat(ByteBuffer buffer) {
1115             masterNow = LogUtils.readLong(buffer);
1116             currentTxnEndVLSN = LogUtils.readLong(buffer);
1117         }
1118 
getMasterNow()1119         public long getMasterNow() {
1120             return masterNow;
1121         }
1122 
getCurrentTxnEndVLSN()1123         public long getCurrentTxnEndVLSN() {
1124             return currentTxnEndVLSN;
1125         }
1126 
1127         @Override
toString()1128         public String toString() {
1129             return super.toString() + " masterNow=" + masterNow +
1130                 " currentCommit=" + currentTxnEndVLSN;
1131         }
1132     }
1133 
1134     public class HeartbeatResponse extends Message {
1135         /* The latest syncupVLSN */
1136         private final VLSN syncupVLSN;
1137         private final VLSN txnEndVLSN;
1138 
HeartbeatResponse(VLSN syncupVLSN, VLSN ackedVLSN)1139         public HeartbeatResponse(VLSN syncupVLSN, VLSN ackedVLSN) {
1140             super();
1141             this.syncupVLSN = syncupVLSN;
1142             this.txnEndVLSN = ackedVLSN;
1143         }
1144 
HeartbeatResponse(ByteBuffer buffer)1145         public HeartbeatResponse(ByteBuffer buffer) {
1146             syncupVLSN = new VLSN(LogUtils.readLong(buffer));
1147             txnEndVLSN =
1148                 getVersion() >= VERSION_4 ?
1149                 new VLSN(LogUtils.readLong(buffer)) :
1150                 null;
1151         }
1152 
1153         @Override
getOp()1154         public MessageOp getOp() {
1155             return HEARTBEAT_RESPONSE;
1156         }
1157 
1158         @Override
wireFormat()1159         public ByteBuffer wireFormat() {
1160             boolean includeTxnEndVLSN = getVersion() >= VERSION_4;
1161             int bodySize = includeTxnEndVLSN ?
1162                            8 * 2 :
1163                            8;
1164             ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize);
1165             LogUtils.writeLong(messageBuffer, syncupVLSN.getSequence());
1166             if (includeTxnEndVLSN) {
1167                 LogUtils.writeLong(messageBuffer, txnEndVLSN.getSequence());
1168             }
1169             messageBuffer.flip();
1170             return messageBuffer;
1171         }
1172 
getSyncupVLSN()1173         public VLSN getSyncupVLSN() {
1174             return syncupVLSN;
1175         }
1176 
getTxnEndVLSN()1177         public VLSN getTxnEndVLSN() {
1178             return txnEndVLSN;
1179         }
1180 
1181         @Override
toString()1182         public String toString() {
1183             return super.toString() + " syncupVLSN=" + syncupVLSN;
1184         }
1185     }
1186 
1187     /**
1188      * Message used to shutdown a node
1189      */
1190     public class ShutdownRequest extends SimpleMessage {
1191         /* The time that the shutdown was initiated on the master. */
1192         private final long shutdownTimeMs;
1193 
ShutdownRequest(long shutdownTimeMs)1194         public ShutdownRequest(long shutdownTimeMs) {
1195             super();
1196             this.shutdownTimeMs = shutdownTimeMs;
1197         }
1198 
1199         @Override
getOp()1200         public MessageOp getOp() {
1201             return SHUTDOWN_REQUEST;
1202         }
1203 
ShutdownRequest(ByteBuffer buffer)1204         public ShutdownRequest(ByteBuffer buffer) {
1205             shutdownTimeMs = LogUtils.readLong(buffer);
1206         }
1207 
1208         @Override
wireFormat()1209         public ByteBuffer wireFormat() {
1210             return wireFormat(shutdownTimeMs);
1211         }
1212 
getShutdownTimeMs()1213         public long getShutdownTimeMs() {
1214             return shutdownTimeMs;
1215         }
1216     }
1217 
1218     /**
1219      * Message in response to a shutdown request.
1220      */
1221     public class ShutdownResponse extends Message {
1222 
ShutdownResponse()1223         public ShutdownResponse() {
1224             super();
1225         }
1226 
1227         @Override
getOp()1228         public MessageOp getOp() {
1229             return SHUTDOWN_RESPONSE;
1230         }
1231 
ShutdownResponse(@uppressWarningsR) ByteBuffer buffer)1232         public ShutdownResponse(@SuppressWarnings("unused") ByteBuffer buffer) {
1233         }
1234     }
1235 
1236     public class Commit extends Entry {
1237         private final boolean needsAck;
1238         private final SyncPolicy replicaSyncPolicy;
1239 
Commit(final boolean needsAck, final SyncPolicy replicaSyncPolicy, final OutputWireRecord wireRecord)1240         public Commit(final boolean needsAck,
1241                       final SyncPolicy replicaSyncPolicy,
1242                       final OutputWireRecord wireRecord) {
1243             super(wireRecord);
1244             this.needsAck = needsAck;
1245             this.replicaSyncPolicy = replicaSyncPolicy;
1246         }
1247 
1248         @Override
getOp()1249         public MessageOp getOp() {
1250             return COMMIT;
1251         }
1252 
1253         @Override
wireFormat()1254         public ByteBuffer wireFormat() {
1255             final int bodySize = super.getWireSize() +
1256                 1 /* needsAck */ +
1257                 1 /* replica sync policy */;
1258             final ByteBuffer messageBuffer =
1259                 allocateInitializedBuffer(bodySize);
1260             messageBuffer.put((byte) (needsAck ? 1 : 0));
1261             messageBuffer.put((byte) replicaSyncPolicy.ordinal());
1262             writeOutputWireRecord(outputWireRecord, messageBuffer);
1263             messageBuffer.flip();
1264             return messageBuffer;
1265         }
1266 
Commit(final ByteBuffer buffer)1267         public Commit(final ByteBuffer buffer)
1268             throws DatabaseException {
1269 
1270             this(getByteNeedsAck(buffer.get()),
1271                  getByteReplicaSyncPolicy(buffer.get()),
1272                  buffer);
1273         }
1274 
Commit(final boolean needsAck, final SyncPolicy replicaSyncPolicy, final ByteBuffer buffer)1275         private Commit(final boolean needsAck,
1276                        final SyncPolicy replicaSyncPolicy,
1277                        final ByteBuffer buffer)
1278             throws DatabaseException {
1279 
1280             super(buffer);
1281             this.needsAck = needsAck;
1282             this.replicaSyncPolicy = replicaSyncPolicy;
1283         }
1284 
getNeedsAck()1285         public boolean getNeedsAck() {
1286             return needsAck;
1287         }
1288 
getReplicaSyncPolicy()1289         public SyncPolicy getReplicaSyncPolicy() {
1290             return replicaSyncPolicy;
1291         }
1292     }
1293 
1294     /**
1295      * Returns whether the byte value specifies that an acknowledgment is
1296      * needed.
1297      */
getByteNeedsAck(final byte needsAckByte)1298     private static boolean getByteNeedsAck(final byte needsAckByte) {
1299         switch (needsAckByte) {
1300         case 0:
1301             return false;
1302         case 1:
1303             return true;
1304         default:
1305             throw EnvironmentFailureException.unexpectedState(
1306                 "Invalid bool ordinal: " + needsAckByte);
1307         }
1308     }
1309 
1310     /** Returns the sync policy specified by the argument. */
getByteReplicaSyncPolicy( final byte syncPolicyByte)1311     private static SyncPolicy getByteReplicaSyncPolicy(
1312         final byte syncPolicyByte) {
1313 
1314         for (final SyncPolicy p : SyncPolicy.values()) {
1315             if (p.ordinal() == syncPolicyByte) {
1316                 return p;
1317             }
1318         }
1319         throw EnvironmentFailureException.unexpectedState(
1320             "Invalid sync policy ordinal: " + syncPolicyByte);
1321     }
1322 
1323     public class Ack extends Message {
1324 
1325         private final long txnId;
1326 
Ack(long txnId)1327         public Ack(long txnId) {
1328             super();
1329             this.txnId = txnId;
1330             nAckMessages.increment();
1331         }
1332 
1333         @Override
getOp()1334         public MessageOp getOp() {
1335             return ACK;
1336         }
1337 
1338         @Override
wireFormat()1339         public ByteBuffer wireFormat() {
1340             int bodySize = 8;
1341             ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize);
1342             LogUtils.writeLong(messageBuffer, txnId);
1343             messageBuffer.flip();
1344             return messageBuffer;
1345         }
1346 
Ack(ByteBuffer buffer)1347         public Ack(ByteBuffer buffer) {
1348             txnId = LogUtils.readLong(buffer);
1349         }
1350 
getTxnId()1351         public long getTxnId() {
1352             return txnId;
1353         }
1354 
1355         @Override
toString()1356         public String toString() {
1357             return super.toString() + " txn " + txnId;
1358         }
1359     }
1360 
1361     public class GroupAck extends Message {
1362 
1363         private final long txnIds[];
1364 
GroupAck(long txnIds[])1365         public GroupAck(long txnIds[]) {
1366             super();
1367             this.txnIds = txnIds;
1368             nGroupAckMessages.increment();
1369             nGroupedAcks.add(txnIds.length);
1370             nMaxGroupedAcks.setMax(txnIds.length);
1371         }
1372 
1373         @Override
getOp()1374         public MessageOp getOp() {
1375             return GROUP_ACK;
1376         }
1377 
1378         @Override
wireFormat()1379         public ByteBuffer wireFormat() {
1380 
1381             final int bodySize = 4 + 8 *  txnIds.length;
1382             final ByteBuffer messageBuffer =
1383                 allocateInitializedBuffer(bodySize);
1384 
1385             putLongArray(messageBuffer, txnIds);
1386             messageBuffer.flip();
1387 
1388             return messageBuffer;
1389         }
1390 
GroupAck(ByteBuffer buffer)1391         public GroupAck(ByteBuffer buffer) {
1392             txnIds = readLongArray(buffer);
1393         }
1394 
getTxnIds()1395         public long[] getTxnIds() {
1396             return txnIds;
1397         }
1398 
1399         @Override
toString()1400         public String toString() {
1401             return super.toString() + " txn " + Arrays.toString(txnIds);
1402         }
1403     }
1404 
1405 
putLongArray(ByteBuffer buffer, long[] la)1406     private void putLongArray(ByteBuffer buffer, long[] la) {
1407         LogUtils.writeInt(buffer, la.length);
1408 
1409         for (long l : la) {
1410             LogUtils.writeLong(buffer, l);
1411         }
1412     }
1413 
readLongArray(ByteBuffer buffer)1414     private long[] readLongArray(ByteBuffer buffer) {
1415         final long la[] = new long[LogUtils.readInt(buffer)];
1416 
1417         for (int i=0;  i < la.length; i++) {
1418             la[i] = LogUtils.readLong(buffer);
1419         }
1420 
1421         return la;
1422     }
1423 
1424     /**
1425      * A replica node asks a feeder for the log entry at this VLSN.
1426      */
1427     public class EntryRequest extends VLSNMessage {
1428 
EntryRequest(VLSN matchpoint)1429         EntryRequest(VLSN matchpoint) {
1430             super(matchpoint);
1431         }
1432 
EntryRequest(ByteBuffer buffer)1433         public EntryRequest(ByteBuffer buffer) {
1434             super(buffer);
1435         }
1436 
1437         @Override
getOp()1438         public MessageOp getOp() {
1439             return ENTRY_REQUEST;
1440         }
1441     }
1442 
1443     /**
1444      * Response when the EntryRequest asks for a VLSN that is below the VLSN
1445      * range covered by the Feeder.
1446      */
1447     public class EntryNotFound extends Message {
1448 
EntryNotFound()1449         public EntryNotFound() {
1450         }
1451 
EntryNotFound(@uppressWarningsR) ByteBuffer buffer)1452         public EntryNotFound(@SuppressWarnings("unused") ByteBuffer buffer) {
1453             super();
1454         }
1455 
1456         @Override
getOp()1457         public MessageOp getOp() {
1458             return ENTRY_NOTFOUND;
1459         }
1460     }
1461 
1462     public class AlternateMatchpoint extends Message {
1463 
1464         private final InputWireRecord alternateInput;
1465         private OutputWireRecord alternateOutput = null;
1466 
AlternateMatchpoint(final OutputWireRecord alternate)1467         AlternateMatchpoint(final OutputWireRecord alternate) {
1468             alternateInput = null;
1469             this.alternateOutput = alternate;
1470         }
1471 
1472         @Override
getOp()1473         public MessageOp getOp() {
1474             return ALT_MATCHPOINT;
1475         }
1476 
1477         @Override
wireFormat()1478         public ByteBuffer wireFormat() {
1479             final int bodySize = alternateOutput.getWireSize(writeLogVersion);
1480             final ByteBuffer messageBuffer =
1481                 allocateInitializedBuffer(bodySize);
1482             writeOutputWireRecord(alternateOutput, messageBuffer);
1483             messageBuffer.flip();
1484             return messageBuffer;
1485         }
1486 
AlternateMatchpoint(final ByteBuffer buffer)1487         public AlternateMatchpoint(final ByteBuffer buffer)
1488             throws DatabaseException {
1489             alternateInput = new InputWireRecord(repNode.getRepImpl(), buffer);
1490         }
1491 
getAlternateWireRecord()1492         public InputWireRecord getAlternateWireRecord() {
1493             return alternateInput;
1494         }
1495 
1496         /* For unit test support */
1497         @Override
match(Message other)1498         public boolean match(Message other) {
1499 
1500             /*
1501              * This message was read in, but we need to compare it to a message
1502              * that was sent out.
1503              */
1504             if (alternateOutput == null) {
1505                 alternateOutput =
1506                     new OutputWireRecord(repNode.getRepImpl(), alternateInput);
1507             }
1508             return super.match(other);
1509         }
1510     }
1511 
1512     /**
1513      * Request from the replica to the feeder for sufficient information to
1514      * start a network restore.
1515      */
1516     public class RestoreRequest extends VLSNMessage {
1517 
RestoreRequest(VLSN failedMatchpoint)1518         RestoreRequest(VLSN failedMatchpoint) {
1519             super(failedMatchpoint);
1520         }
1521 
RestoreRequest(ByteBuffer buffer)1522         public RestoreRequest(ByteBuffer buffer) {
1523             super(buffer);
1524         }
1525 
1526         @Override
getOp()1527         public MessageOp getOp() {
1528             return RESTORE_REQUEST;
1529         }
1530     }
1531 
1532     /**
1533      * Response when the replica needs information to instigate a network
1534      * restore. The message contains two pieces of information. One is a set of
1535      * nodes that could be used as the basis for a NetworkBackup so that the
1536      * request node can become current again. The second is a suitable low vlsn
1537      * for the replica, which will be registered as this replica's local
1538      * CBVLSN. This will contribute to the global CBVLSN calculation.
1539      *
1540      * The feeder when sending this response must, if it is also the master,
1541      * update the membership table to set the local CBVLSN for the requesting
1542      * node thus ensuring that it can continue the replication stream at this
1543      * VLSN (or higher) when it retries the syncup operation.
1544      */
1545     public class RestoreResponse extends SimpleMessage {
1546         private final VLSN cbvlsn;
1547 
1548         private final RepNodeImpl[] logProviders;
1549 
RestoreResponse(VLSN cbvlsn, RepNodeImpl[] logProviders)1550         public RestoreResponse(VLSN cbvlsn, RepNodeImpl[] logProviders) {
1551             this.cbvlsn = cbvlsn;
1552             this.logProviders = logProviders;
1553         }
1554 
RestoreResponse(ByteBuffer buffer)1555         public RestoreResponse(ByteBuffer buffer) {
1556             long vlsnSequence = LogUtils.readLong(buffer);
1557             cbvlsn = new VLSN(vlsnSequence);
1558             logProviders = getRepNodeImplArray(buffer);
1559         }
1560 
1561         @Override
wireFormat()1562         public ByteBuffer wireFormat() {
1563             return wireFormat(cbvlsn.getSequence(), logProviders);
1564         }
1565 
1566         /* Add support for RepNodeImpl arrays. */
1567 
1568         @Override
putWireFormat(final ByteBuffer buffer, final Object obj)1569         protected void putWireFormat(final ByteBuffer buffer,
1570                                      final Object obj) {
1571             if (obj.getClass() == RepNodeImpl[].class) {
1572                 putRepNodeImplArray(buffer, (RepNodeImpl[]) obj);
1573             } else {
1574                 super.putWireFormat(buffer, obj);
1575             }
1576         }
1577 
1578         @Override
wireFormatSize(final Object obj)1579         protected int wireFormatSize(final Object obj) {
1580             if (obj.getClass() == RepNodeImpl[].class) {
1581                 return getRepNodeImplArraySize((RepNodeImpl[]) obj);
1582             }
1583             return super.wireFormatSize(obj);
1584         }
1585 
putRepNodeImplArray(final ByteBuffer buffer, final RepNodeImpl[] ra)1586         private void putRepNodeImplArray(final ByteBuffer buffer,
1587                                          final RepNodeImpl[] ra) {
1588             LogUtils.writeInt(buffer, ra.length);
1589             final int groupFormatVersion = getGroupFormatVersion();
1590             for (final RepNodeImpl node : ra) {
1591                 putByteArray(
1592                     buffer,
1593                     RepGroupImpl.serializeBytes(node, groupFormatVersion));
1594             }
1595         }
1596 
getRepNodeImplArray(final ByteBuffer buffer)1597         private RepNodeImpl[] getRepNodeImplArray(final ByteBuffer buffer) {
1598             final RepNodeImpl[] ra = new RepNodeImpl[LogUtils.readInt(buffer)];
1599             final int groupFormatVersion = getGroupFormatVersion();
1600             for (int i = 0; i < ra.length; i++) {
1601                 ra[i] = RepGroupImpl.deserializeNode(
1602                     getByteArray(buffer), groupFormatVersion);
1603             }
1604             return ra;
1605         }
1606 
getRepNodeImplArraySize(RepNodeImpl[] ra)1607         private int getRepNodeImplArraySize(RepNodeImpl[] ra) {
1608             int size = 4; /* array length */
1609             final int groupFormatVersion = getGroupFormatVersion();
1610             for (final RepNodeImpl node : ra) {
1611                 size += (4 /* Node size */ +
1612                          RepGroupImpl.serializeBytes(node, groupFormatVersion)
1613                          .length);
1614             }
1615             return size;
1616         }
1617 
1618         /**
1619          * Returns the RepGroupImpl version to use for the currently configured
1620          * protocol version.
1621          */
getGroupFormatVersion()1622         private int getGroupFormatVersion() {
1623             return (getVersion() < VERSION_5) ?
1624                 RepGroupImpl.FORMAT_VERSION_2 :
1625                 RepGroupImpl.MAX_FORMAT_VERSION;
1626         }
1627 
1628         @Override
getOp()1629         public MessageOp getOp() {
1630             return RESTORE_RESPONSE;
1631         }
1632 
getLogProviders()1633         RepNodeImpl[] getLogProviders() {
1634             return logProviders;
1635         }
1636 
getCBVLSN()1637         VLSN getCBVLSN() {
1638             return cbvlsn;
1639         }
1640     }
1641 }
1642