1package gocbcore 2 3import ( 4 "strconv" 5 "sync" 6) 7 8type Mutation struct { 9 Expiry uint32 10 Locktime uint32 11 Cas uint64 12 Value []byte 13 CollectionID uint32 14 StreamID uint16 15} 16 17type Deletion struct { 18 IsExpiration bool 19 DeleteTime uint32 20 Cas uint64 21 CollectionID uint32 22 StreamID uint16 23} 24 25type SnapshotMarker struct { 26 lastSnapStart uint64 27 lastSnapEnd uint64 28} 29 30type DCPEventCounter struct { 31 mutations map[string]Mutation 32 deletions map[string]Deletion 33 expirations map[string]Deletion 34 scopes map[string]int 35 collections map[string]int 36 scopesDeleted map[string]int 37 collectionsDeleted map[string]int 38} 39 40type TestStreamObserver struct { 41 lock sync.Mutex 42 lastSeqno map[uint16]uint64 43 snapshots map[uint16]SnapshotMarker 44 counter *DCPEventCounter 45 endWg sync.WaitGroup 46} 47 48func (so *TestStreamObserver) newCounter() { 49 so.counter = &DCPEventCounter{ 50 mutations: make(map[string]Mutation), 51 deletions: make(map[string]Deletion), 52 expirations: make(map[string]Deletion), 53 scopes: make(map[string]int), 54 collections: make(map[string]int), 55 scopesDeleted: make(map[string]int), 56 collectionsDeleted: make(map[string]int), 57 } 58} 59 60func (so *TestStreamObserver) SnapshotMarker(startSeqNo, endSeqNo uint64, vbId uint16, streamId uint16, 61 snapshotType SnapshotState) { 62 so.lock.Lock() 63 so.snapshots[vbId] = SnapshotMarker{startSeqNo, endSeqNo} 64 if so.lastSeqno[vbId] < startSeqNo || so.lastSeqno[vbId] > endSeqNo { 65 so.lastSeqno[vbId] = startSeqNo 66 } 67 so.lock.Unlock() 68} 69 70func (so *TestStreamObserver) Mutation(seqNo, revNo uint64, flags, expiry, lockTime uint32, cas uint64, datatype uint8, vbId uint16, 71 collectionId uint32, streamId uint16, key, value []byte) { 72 mutation := Mutation{ 73 Expiry: expiry, 74 Locktime: lockTime, 75 Cas: cas, 76 Value: value, 77 CollectionID: collectionId, 78 StreamID: streamId, 79 } 80 81 so.lock.Lock() 82 so.counter.mutations[string(key)] = mutation 83 so.lock.Unlock() 84} 85 86func (so *TestStreamObserver) Deletion(seqNo, revNo uint64, deleteTime uint32, cas uint64, datatype uint8, vbId uint16, collectionId uint32, streamId uint16, 87 key, value []byte) { 88 mutation := Deletion{ 89 IsExpiration: false, 90 DeleteTime: deleteTime, 91 Cas: cas, 92 CollectionID: collectionId, 93 StreamID: streamId, 94 } 95 96 so.lock.Lock() 97 so.counter.deletions[string(key)] = mutation 98 so.lock.Unlock() 99} 100 101func (so *TestStreamObserver) Expiration(seqNo, revNo uint64, deleteTime uint32, cas uint64, vbId uint16, collectionId uint32, streamId uint16, key []byte) { 102 mutation := Deletion{ 103 IsExpiration: true, 104 DeleteTime: deleteTime, 105 Cas: cas, 106 CollectionID: collectionId, 107 StreamID: streamId, 108 } 109 110 so.lock.Lock() 111 so.counter.deletions[string(key)] = mutation 112 so.lock.Unlock() 113} 114 115func (so *TestStreamObserver) End(vbId uint16, streamId uint16, err error) { 116 so.endWg.Done() 117} 118 119func (so *TestStreamObserver) CreateCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32, 120 collectionId uint32, ttl uint32, streamId uint16, key []byte) { 121 so.lock.Lock() 122 so.counter.collections[strconv.Itoa(int(scopeId))+"."+string(key)]++ 123 so.lock.Unlock() 124} 125 126func (so *TestStreamObserver) DeleteCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32, 127 collectionId uint32, streamId uint16) { 128 so.lock.Lock() 129 so.counter.collectionsDeleted[strconv.Itoa(int(scopeId))+"."+strconv.Itoa(int(collectionId))]++ 130 so.lock.Unlock() 131} 132 133func (so *TestStreamObserver) FlushCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, 134 collectionId uint32) { 135} 136 137func (so *TestStreamObserver) CreateScope(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32, 138 streamId uint16, key []byte) { 139 so.lock.Lock() 140 so.counter.scopes[string(key)]++ 141 so.lock.Unlock() 142} 143 144func (so *TestStreamObserver) DeleteScope(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, scopeId uint32, 145 streamId uint16) { 146 so.lock.Lock() 147 so.counter.scopesDeleted[strconv.Itoa(int(scopeId))]++ 148 so.lock.Unlock() 149} 150 151func (so *TestStreamObserver) ModifyCollection(seqNo uint64, version uint8, vbId uint16, manifestUid uint64, 152 collectionId uint32, ttl uint32, streamId uint16) { 153} 154 155func (so *TestStreamObserver) OSOSnapshot(vbId uint16, snapshotType uint32, streamID uint16) { 156} 157 158func (so *TestStreamObserver) SeqNoAdvanced(vbId uint16, bySeqNo uint64, streamID uint16) { 159} 160