1package gocbcore
2
3import (
4	"strconv"
5	"sync"
6)
7
8type Mutation struct {
9	Expiry       uint32
10	Locktime     uint32
11	Cas          uint64
12	Value        []byte
13	CollectionID uint32
14	StreamID     uint16
15}
16
17type Deletion struct {
18	IsExpiration bool
19	DeleteTime   uint32
20	Cas          uint64
21	CollectionID uint32
22	StreamID     uint16
23}
24
25type SnapshotMarker struct {
26	lastSnapStart uint64
27	lastSnapEnd   uint64
28}
29
30type DCPEventCounter struct {
31	mutations          map[string]Mutation
32	deletions          map[string]Deletion
33	expirations        map[string]Deletion
34	scopes             map[string]int
35	collections        map[string]int
36	scopesDeleted      map[string]int
37	collectionsDeleted map[string]int
38}
39
40type TestStreamObserver struct {
41	lock      sync.Mutex
42	lastSeqno map[uint16]uint64
43	snapshots map[uint16]SnapshotMarker
44	counter   *DCPEventCounter
45	endWg     sync.WaitGroup
46}
47
48func (so *TestStreamObserver) newCounter() {
49	so.counter = &DCPEventCounter{
50		mutations:          make(map[string]Mutation),
51		deletions:          make(map[string]Deletion),
52		expirations:        make(map[string]Deletion),
53		scopes:             make(map[string]int),
54		collections:        make(map[string]int),
55		scopesDeleted:      make(map[string]int),
56		collectionsDeleted: make(map[string]int),
57	}
58}
59
60func (so *TestStreamObserver) SnapshotMarker(startSeqNo, endSeqNo uint64, vbId uint16, streamId uint16,
61	snapshotType SnapshotState) {
62	so.lock.Lock()
63	so.snapshots[vbId] = SnapshotMarker{startSeqNo, endSeqNo}
64	if so.lastSeqno[vbId] < startSeqNo || so.lastSeqno[vbId] > endSeqNo {
65		so.lastSeqno[vbId] = startSeqNo
66	}
67	so.lock.Unlock()
68}
69
70func (so *TestStreamObserver) Mutation(seqNo, revNo uint64, flags, expiry, lockTime uint32, cas uint64, datatype uint8, vbId uint16,
71	collectionId uint32, streamId uint16, key, value []byte) {
72	mutation := Mutation{
73		Expiry:       expiry,
74		Locktime:     lockTime,
75		Cas:          cas,
76		Value:        value,
77		CollectionID: collectionId,
78		StreamID:     streamId,
79	}
80
81	so.lock.Lock()
82	so.counter.mutations[string(key)] = mutation
83	so.lock.Unlock()
84}
85
86func (so *TestStreamObserver) Deletion(seqNo, revNo uint64, deleteTime uint32, cas uint64, datatype uint8, vbId uint16, collectionId uint32, streamId uint16,
87	key, value []byte) {
88	mutation := Deletion{
89		IsExpiration: false,
90		DeleteTime:   deleteTime,
91		Cas:          cas,
92		CollectionID: collectionId,
93		StreamID:     streamId,
94	}
95
96	so.lock.Lock()
97	so.counter.deletions[string(key)] = mutation
98	so.lock.Unlock()
99}
100
101func (so *TestStreamObserver) Expiration(seqNo, revNo uint64, deleteTime uint32, cas uint64, vbId uint16, collectionId uint32, streamId uint16, key []byte) {
102	mutation := Deletion{
103		IsExpiration: true,
104		DeleteTime:   deleteTime,
105		Cas:          cas,
106		CollectionID: collectionId,
107		StreamID:     streamId,
108	}
109
110	so.lock.Lock()
111	so.counter.deletions[string(key)] = mutation
112	so.lock.Unlock()
113}
114
115func (so *TestStreamObserver) End(vbId uint16, streamId uint16, err error) {
116	so.endWg.Done()
117}
118
119func (so *TestStreamObserver) CreateCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32,
120	collectionId uint32, ttl uint32, streamId uint16, key []byte) {
121	so.lock.Lock()
122	so.counter.collections[strconv.Itoa(int(scopeId))+"."+string(key)]++
123	so.lock.Unlock()
124}
125
126func (so *TestStreamObserver) DeleteCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32,
127	collectionId uint32, streamId uint16) {
128	so.lock.Lock()
129	so.counter.collectionsDeleted[strconv.Itoa(int(scopeId))+"."+strconv.Itoa(int(collectionId))]++
130	so.lock.Unlock()
131}
132
133func (so *TestStreamObserver) FlushCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64,
134	collectionId uint32) {
135}
136
137func (so *TestStreamObserver) CreateScope(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32,
138	streamId uint16, key []byte) {
139	so.lock.Lock()
140	so.counter.scopes[string(key)]++
141	so.lock.Unlock()
142}
143
144func (so *TestStreamObserver) DeleteScope(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32,
145	streamId uint16) {
146	so.lock.Lock()
147	so.counter.scopesDeleted[strconv.Itoa(int(scopeId))]++
148	so.lock.Unlock()
149}
150
151func (so *TestStreamObserver) ModifyCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64,
152	collectionId uint32, ttl uint32, streamId uint16) {
153}
154
155func (so *TestStreamObserver) OSOSnapshot(vbId uint16, snapshotType uint32, streamID uint16) {
156}
157
158func (so *TestStreamObserver) SeqNoAdvanced(vbId uint16, bySeqNo uint64, streamID uint16) {
159}
160