1// Copyright 2020 Andrew Thornton. All rights reserved. 2// Use of this source code is governed by a MIT-style 3// license that can be found in the LICENSE file. 4 5package levelqueue 6 7import ( 8 "sync" 9 10 "github.com/syndtr/goleveldb/leveldb" 11 "github.com/syndtr/goleveldb/leveldb/errors" 12 "github.com/syndtr/goleveldb/leveldb/util" 13) 14 15const ( 16 setPrefixStr = "set" 17) 18 19// Set defines a set struct 20type Set struct { 21 db *leveldb.DB 22 closeUnderlyingDB bool 23 lock sync.Mutex 24 prefix []byte 25} 26 27// OpenSet opens a set from the db path or creates a set if it doesn't exist. 28// The keys will be prefixed with "set-" by default 29func OpenSet(dataDir string) (*Set, error) { 30 db, err := leveldb.OpenFile(dataDir, nil) 31 if err != nil { 32 if !errors.IsCorrupted(err) { 33 return nil, err 34 } 35 db, err = leveldb.RecoverFile(dataDir, nil) 36 if err != nil { 37 return nil, err 38 } 39 } 40 return NewSet(db, []byte(setPrefixStr), true) 41} 42 43// NewSet creates a set from a db. The keys will be prefixed with prefix 44// and at close the db will be closed as per closeUnderlyingDB 45func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) { 46 set := &Set{ 47 db: db, 48 closeUnderlyingDB: closeUnderlyingDB, 49 } 50 set.prefix = make([]byte, len(prefix)) 51 copy(set.prefix, prefix) 52 53 return set, nil 54} 55 56// Add adds a member string to a key set, returns true if the member was not already present 57func (set *Set) Add(value []byte) (bool, error) { 58 set.lock.Lock() 59 defer set.lock.Unlock() 60 setKey := withPrefix(set.prefix, value) 61 has, err := set.db.Has(setKey, nil) 62 if err != nil || has { 63 return !has, err 64 } 65 return !has, set.db.Put(setKey, []byte(""), nil) 66} 67 68// Members returns the current members of the set 69func (set *Set) Members() ([][]byte, error) { 70 set.lock.Lock() 71 defer set.lock.Unlock() 72 var members [][]byte 73 prefix := withPrefix(set.prefix, []byte{}) 74 iter := set.db.NewIterator(util.BytesPrefix(prefix), nil) 75 for iter.Next() { 76 slice := iter.Key()[len(prefix):] 77 value := make([]byte, len(slice)) 78 copy(value, slice) 79 members = append(members, value) 80 } 81 iter.Release() 82 return members, iter.Error() 83} 84 85// Has returns if the member is in the set 86func (set *Set) Has(value []byte) (bool, error) { 87 set.lock.Lock() 88 defer set.lock.Unlock() 89 setKey := withPrefix(set.prefix, value) 90 91 return set.db.Has(setKey, nil) 92} 93 94// Remove removes a member from the set, returns true if the member was present 95func (set *Set) Remove(value []byte) (bool, error) { 96 set.lock.Lock() 97 defer set.lock.Unlock() 98 setKey := withPrefix(set.prefix, value) 99 100 has, err := set.db.Has(setKey, nil) 101 if err != nil || !has { 102 return has, err 103 } 104 105 return has, set.db.Delete(setKey, nil) 106} 107 108// Close closes the set (and the underlying db if set to closeUnderlyingDB) 109func (set *Set) Close() error { 110 set.lock.Lock() 111 defer set.lock.Unlock() 112 if !set.closeUnderlyingDB { 113 set.db = nil 114 return nil 115 } 116 err := set.db.Close() 117 set.db = nil 118 return err 119} 120