1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package test 15 16import ( 17 "fmt" 18 "strconv" 19 "strings" 20 "sync" 21) 22 23// OrderingSender generates strings containing a message index to use for 24// verifying message ordering. It is used on conjunction with Publishers. 25type OrderingSender struct { 26 TotalMsgCount int64 27} 28 29// NewOrderingSender creats a new OrderingSender. 30func NewOrderingSender() *OrderingSender { 31 return new(OrderingSender) 32} 33 34// Next generates the next string to publish. 35func (os *OrderingSender) Next(prefix string) string { 36 os.TotalMsgCount++ 37 return fmt.Sprintf("%s/%d", prefix, os.TotalMsgCount) 38} 39 40// OrderingReceiver consumes a message string generated by OrderingSender and 41// verifies that messages in a partition are ordered. It is used in conjunction 42// with Subscribers. 43type OrderingReceiver struct { 44 mu sync.Mutex 45 // Map of key and last received message index. Messages are only guaranteed to 46 // be received in order within a partition. 47 received map[string]int64 48} 49 50// NewOrderingReceiver creates a new OrderingReceiver. 51func NewOrderingReceiver() *OrderingReceiver { 52 return &OrderingReceiver{ 53 received: make(map[string]int64), 54 } 55} 56 57func parseMsgIndex(msg string) int64 { 58 pos := strings.LastIndex(msg, "/") 59 if pos >= 0 { 60 if n, err := strconv.ParseInt(msg[pos+1:], 10, 64); err == nil { 61 return n 62 } 63 } 64 return -1 65} 66 67// Receive checks the given message data and key and returns an error if 68// unordered messages are detected. 69// 70// Note: a normal scenario resulting in unordered messages is when the Publish 71// stream breaks while there are in-flight batches, which are resent upon 72// stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a 73// test. 74func (or *OrderingReceiver) Receive(data, key string) error { 75 or.mu.Lock() 76 defer or.mu.Unlock() 77 78 idx := parseMsgIndex(data) 79 if idx < 0 { 80 return fmt.Errorf("failed to parse index from message: %q", data) 81 } 82 83 // Verify increasing ordering. 84 lastIdx, exists := or.received[key] 85 if exists && idx <= lastIdx { 86 return fmt.Errorf("message ordering failed for key %s, expected message idx > %d, got %d", key, lastIdx, idx) 87 } 88 or.received[key] = idx 89 return nil 90} 91 92var void struct{} 93 94type msgMetadata struct { 95 offsets map[int64]struct{} 96} 97 98func newMsgMetadata() *msgMetadata { 99 return &msgMetadata{ 100 offsets: make(map[int64]struct{}), 101 } 102} 103 104func (mm *msgMetadata) ContainsOffset(offset int64) bool { 105 _, exists := mm.offsets[offset] 106 return exists 107} 108 109func (mm *msgMetadata) AddOffset(offset int64) { 110 mm.offsets[offset] = void 111} 112 113// DuplicateMsgDetector can be used to detect duplicate messages, either due to 114// duplicate publishes or receives. 115type DuplicateMsgDetector struct { 116 mu sync.Mutex 117 // Map of Pub/Sub message data and associated metadata. 118 msgs map[string]*msgMetadata 119 duplicatePublishCount int64 120 duplicateReceiveCount int64 121} 122 123// NewDuplicateMsgDetector creates a new DuplicateMsgDetector. 124func NewDuplicateMsgDetector() *DuplicateMsgDetector { 125 return &DuplicateMsgDetector{ 126 msgs: make(map[string]*msgMetadata), 127 } 128} 129 130// Receive checks the given message data and offset. 131func (dm *DuplicateMsgDetector) Receive(data string, offset int64) { 132 dm.mu.Lock() 133 defer dm.mu.Unlock() 134 135 if metadata, exists := dm.msgs[data]; exists { 136 if metadata.ContainsOffset(offset) { 137 // If the message contains the same offset, it means it was received 138 // multiple times. This is not expected within a single test run. But it 139 // is normal when processes are stopped & restarted without committing 140 // cursors. 141 dm.duplicateReceiveCount++ 142 } else { 143 // If the message contains a different offset, it means a message was 144 // republished, which can occur when a publish stream reconnects with 145 // in-flight published messages. 146 dm.duplicatePublishCount++ 147 metadata.AddOffset(offset) 148 } 149 } else { 150 metadata = newMsgMetadata() 151 metadata.AddOffset(offset) 152 dm.msgs[data] = metadata 153 } 154} 155 156// Status returns a non-empty status string if there were duplicates detected. 157func (dm *DuplicateMsgDetector) Status() string { 158 dm.mu.Lock() 159 defer dm.mu.Unlock() 160 161 if (dm.duplicateReceiveCount + dm.duplicatePublishCount) == 0 { 162 return "" 163 } 164 return fmt.Sprintf("duplicate publish count = %d, receive count = %d", dm.duplicatePublishCount, dm.duplicateReceiveCount) 165} 166 167// HasPublishDuplicates returns true if duplicate published messages were 168// detected. 169func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool { 170 dm.mu.Lock() 171 defer dm.mu.Unlock() 172 return dm.duplicatePublishCount > 0 173} 174 175// HasReceiveDuplicates returns true if duplicate received messages were 176// detected. 177func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool { 178 dm.mu.Lock() 179 defer dm.mu.Unlock() 180 return dm.duplicateReceiveCount > 0 181} 182