1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package test
15
16import (
17	"fmt"
18	"strconv"
19	"strings"
20	"sync"
21)
22
23// OrderingSender generates strings containing a message index to use for
24// verifying message ordering. It is used on conjunction with Publishers.
25type OrderingSender struct {
26	TotalMsgCount int64
27}
28
29// NewOrderingSender creats a new OrderingSender.
30func NewOrderingSender() *OrderingSender {
31	return new(OrderingSender)
32}
33
34// Next generates the next string to publish.
35func (os *OrderingSender) Next(prefix string) string {
36	os.TotalMsgCount++
37	return fmt.Sprintf("%s/%d", prefix, os.TotalMsgCount)
38}
39
40// OrderingReceiver consumes a message string generated by OrderingSender and
41// verifies that messages in a partition are ordered. It is used in conjunction
42// with Subscribers.
43type OrderingReceiver struct {
44	mu sync.Mutex
45	// Map of key and last received message index. Messages are only guaranteed to
46	// be received in order within a partition.
47	received map[string]int64
48}
49
50// NewOrderingReceiver creates a new OrderingReceiver.
51func NewOrderingReceiver() *OrderingReceiver {
52	return &OrderingReceiver{
53		received: make(map[string]int64),
54	}
55}
56
57func parseMsgIndex(msg string) int64 {
58	pos := strings.LastIndex(msg, "/")
59	if pos >= 0 {
60		if n, err := strconv.ParseInt(msg[pos+1:], 10, 64); err == nil {
61			return n
62		}
63	}
64	return -1
65}
66
67// Receive checks the given message data and key and returns an error if
68// unordered messages are detected.
69//
70// Note: a normal scenario resulting in unordered messages is when the Publish
71// stream breaks while there are in-flight batches, which are resent upon
72// stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a
73// test.
74func (or *OrderingReceiver) Receive(data, key string) error {
75	or.mu.Lock()
76	defer or.mu.Unlock()
77
78	idx := parseMsgIndex(data)
79	if idx < 0 {
80		return fmt.Errorf("failed to parse index from message: %q", data)
81	}
82
83	// Verify increasing ordering.
84	lastIdx, exists := or.received[key]
85	if exists && idx <= lastIdx {
86		return fmt.Errorf("message ordering failed for key %s, expected message idx > %d, got %d", key, lastIdx, idx)
87	}
88	or.received[key] = idx
89	return nil
90}
91
92var void struct{}
93
94type msgMetadata struct {
95	offsets map[int64]struct{}
96}
97
98func newMsgMetadata() *msgMetadata {
99	return &msgMetadata{
100		offsets: make(map[int64]struct{}),
101	}
102}
103
104func (mm *msgMetadata) ContainsOffset(offset int64) bool {
105	_, exists := mm.offsets[offset]
106	return exists
107}
108
109func (mm *msgMetadata) AddOffset(offset int64) {
110	mm.offsets[offset] = void
111}
112
113// DuplicateMsgDetector can be used to detect duplicate messages, either due to
114// duplicate publishes or receives.
115type DuplicateMsgDetector struct {
116	mu sync.Mutex
117	// Map of Pub/Sub message data and associated metadata.
118	msgs                  map[string]*msgMetadata
119	duplicatePublishCount int64
120	duplicateReceiveCount int64
121}
122
123// NewDuplicateMsgDetector creates a new DuplicateMsgDetector.
124func NewDuplicateMsgDetector() *DuplicateMsgDetector {
125	return &DuplicateMsgDetector{
126		msgs: make(map[string]*msgMetadata),
127	}
128}
129
130// Receive checks the given message data and offset.
131func (dm *DuplicateMsgDetector) Receive(data string, offset int64) {
132	dm.mu.Lock()
133	defer dm.mu.Unlock()
134
135	if metadata, exists := dm.msgs[data]; exists {
136		if metadata.ContainsOffset(offset) {
137			// If the message contains the same offset, it means it was received
138			// multiple times. This is not expected within a single test run. But it
139			// is normal when processes are stopped & restarted without committing
140			// cursors.
141			dm.duplicateReceiveCount++
142		} else {
143			// If the message contains a different offset, it means a message was
144			// republished, which can occur when a publish stream reconnects with
145			// in-flight published messages.
146			dm.duplicatePublishCount++
147			metadata.AddOffset(offset)
148		}
149	} else {
150		metadata = newMsgMetadata()
151		metadata.AddOffset(offset)
152		dm.msgs[data] = metadata
153	}
154}
155
156// Status returns a non-empty status string if there were duplicates detected.
157func (dm *DuplicateMsgDetector) Status() string {
158	dm.mu.Lock()
159	defer dm.mu.Unlock()
160
161	if (dm.duplicateReceiveCount + dm.duplicatePublishCount) == 0 {
162		return ""
163	}
164	return fmt.Sprintf("duplicate publish count = %d, receive count = %d", dm.duplicatePublishCount, dm.duplicateReceiveCount)
165}
166
167// HasPublishDuplicates returns true if duplicate published messages were
168// detected.
169func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool {
170	dm.mu.Lock()
171	defer dm.mu.Unlock()
172	return dm.duplicatePublishCount > 0
173}
174
175// HasReceiveDuplicates returns true if duplicate received messages were
176// detected.
177func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool {
178	dm.mu.Lock()
179	defer dm.mu.Unlock()
180	return dm.duplicateReceiveCount > 0
181}
182