1package cachememdb 2 3import ( 4 "errors" 5 "fmt" 6 "sync/atomic" 7 8 memdb "github.com/hashicorp/go-memdb" 9) 10 11const ( 12 tableNameIndexer = "indexer" 13) 14 15// CacheMemDB is the underlying cache database for storing indexes. 16type CacheMemDB struct { 17 db *atomic.Value 18} 19 20// New creates a new instance of CacheMemDB. 21func New() (*CacheMemDB, error) { 22 db, err := newDB() 23 if err != nil { 24 return nil, err 25 } 26 27 c := &CacheMemDB{ 28 db: new(atomic.Value), 29 } 30 c.db.Store(db) 31 32 return c, nil 33} 34 35func newDB() (*memdb.MemDB, error) { 36 cacheSchema := &memdb.DBSchema{ 37 Tables: map[string]*memdb.TableSchema{ 38 tableNameIndexer: &memdb.TableSchema{ 39 Name: tableNameIndexer, 40 Indexes: map[string]*memdb.IndexSchema{ 41 // This index enables fetching the cached item based on the 42 // identifier of the index. 43 IndexNameID: &memdb.IndexSchema{ 44 Name: IndexNameID, 45 Unique: true, 46 Indexer: &memdb.StringFieldIndex{ 47 Field: "ID", 48 }, 49 }, 50 // This index enables fetching all the entries in cache for 51 // a given request path, in a given namespace. 52 IndexNameRequestPath: &memdb.IndexSchema{ 53 Name: IndexNameRequestPath, 54 Unique: false, 55 Indexer: &memdb.CompoundIndex{ 56 Indexes: []memdb.Indexer{ 57 &memdb.StringFieldIndex{ 58 Field: "Namespace", 59 }, 60 &memdb.StringFieldIndex{ 61 Field: "RequestPath", 62 }, 63 }, 64 }, 65 }, 66 // This index enables fetching all the entries in cache 67 // belonging to the leases of a given token. 68 IndexNameLeaseToken: &memdb.IndexSchema{ 69 Name: IndexNameLeaseToken, 70 Unique: false, 71 AllowMissing: true, 72 Indexer: &memdb.StringFieldIndex{ 73 Field: "LeaseToken", 74 }, 75 }, 76 // This index enables fetching all the entries in cache 77 // that are tied to the given token, regardless of the 78 // entries belonging to the token or belonging to the 79 // lease. 80 IndexNameToken: &memdb.IndexSchema{ 81 Name: IndexNameToken, 82 Unique: true, 83 AllowMissing: true, 84 Indexer: &memdb.StringFieldIndex{ 85 Field: "Token", 86 }, 87 }, 88 // This index enables fetching all the entries in cache for 89 // the given parent token. 90 IndexNameTokenParent: &memdb.IndexSchema{ 91 Name: IndexNameTokenParent, 92 Unique: false, 93 AllowMissing: true, 94 Indexer: &memdb.StringFieldIndex{ 95 Field: "TokenParent", 96 }, 97 }, 98 // This index enables fetching all the entries in cache for 99 // the given accessor. 100 IndexNameTokenAccessor: &memdb.IndexSchema{ 101 Name: IndexNameTokenAccessor, 102 Unique: true, 103 AllowMissing: true, 104 Indexer: &memdb.StringFieldIndex{ 105 Field: "TokenAccessor", 106 }, 107 }, 108 // This index enables fetching all the entries in cache for 109 // the given lease identifier. 110 IndexNameLease: &memdb.IndexSchema{ 111 Name: IndexNameLease, 112 Unique: true, 113 AllowMissing: true, 114 Indexer: &memdb.StringFieldIndex{ 115 Field: "Lease", 116 }, 117 }, 118 }, 119 }, 120 }, 121 } 122 123 db, err := memdb.NewMemDB(cacheSchema) 124 if err != nil { 125 return nil, err 126 } 127 return db, nil 128} 129 130// Get returns the index based on the indexer and the index values provided. 131func (c *CacheMemDB) Get(indexName string, indexValues ...interface{}) (*Index, error) { 132 if !validIndexName(indexName) { 133 return nil, fmt.Errorf("invalid index name %q", indexName) 134 } 135 136 txn := c.db.Load().(*memdb.MemDB).Txn(false) 137 138 raw, err := txn.First(tableNameIndexer, indexName, indexValues...) 139 if err != nil { 140 return nil, err 141 } 142 143 if raw == nil { 144 return nil, nil 145 } 146 147 index, ok := raw.(*Index) 148 if !ok { 149 return nil, errors.New("unable to parse index value from the cache") 150 } 151 152 return index, nil 153} 154 155// Set stores the index into the cache. 156func (c *CacheMemDB) Set(index *Index) error { 157 if index == nil { 158 return errors.New("nil index provided") 159 } 160 161 txn := c.db.Load().(*memdb.MemDB).Txn(true) 162 defer txn.Abort() 163 164 if err := txn.Insert(tableNameIndexer, index); err != nil { 165 return fmt.Errorf("unable to insert index into cache: %v", err) 166 } 167 168 txn.Commit() 169 170 return nil 171} 172 173// GetByPrefix returns all the cached indexes based on the index name and the 174// value prefix. 175func (c *CacheMemDB) GetByPrefix(indexName string, indexValues ...interface{}) ([]*Index, error) { 176 if !validIndexName(indexName) { 177 return nil, fmt.Errorf("invalid index name %q", indexName) 178 } 179 180 indexName = indexName + "_prefix" 181 182 // Get all the objects 183 txn := c.db.Load().(*memdb.MemDB).Txn(false) 184 185 iter, err := txn.Get(tableNameIndexer, indexName, indexValues...) 186 if err != nil { 187 return nil, err 188 } 189 190 var indexes []*Index 191 for { 192 obj := iter.Next() 193 if obj == nil { 194 break 195 } 196 index, ok := obj.(*Index) 197 if !ok { 198 return nil, fmt.Errorf("failed to cast cached index") 199 } 200 201 indexes = append(indexes, index) 202 } 203 204 return indexes, nil 205} 206 207// Evict removes an index from the cache based on index name and value. 208func (c *CacheMemDB) Evict(indexName string, indexValues ...interface{}) error { 209 index, err := c.Get(indexName, indexValues...) 210 if err != nil { 211 return fmt.Errorf("unable to fetch index on cache deletion: %v", err) 212 } 213 214 if index == nil { 215 return nil 216 } 217 218 txn := c.db.Load().(*memdb.MemDB).Txn(true) 219 defer txn.Abort() 220 221 if err := txn.Delete(tableNameIndexer, index); err != nil { 222 return fmt.Errorf("unable to delete index from cache: %v", err) 223 } 224 225 txn.Commit() 226 227 return nil 228} 229 230// Flush resets the underlying cache object. 231func (c *CacheMemDB) Flush() error { 232 newDB, err := newDB() 233 if err != nil { 234 return err 235 } 236 237 c.db.Store(newDB) 238 239 return nil 240} 241