1// Package etcd3locker provides a locking mechanism using an etcd3 cluster 2// 3// To initialize a locker, a pre-existing connected etcd3 client must be present 4// 5// client, err := clientv3.New(clientv3.Config{ 6// Endpoints: []string{harness.Endpoint}, 7// DialTimeout: 5 * time.Second, 8// }) 9// 10// For the most basic locker (e.g. non-shared etcd3 cluster / use default TTLs), 11// a locker can be instantiated like the following: 12// 13// locker, err := etcd3locker.New(client) 14// if err != nil { 15// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) 16// } 17// 18// The locker will need to be included in composer that is used by tusd: 19// 20// composer := tusd.NewStoreComposer() 21// locker.UseIn(composer) 22// 23// For a shared etcd3 cluster, you may want to modify the prefix that etcd3locker uses: 24// 25// locker, err := etcd3locker.NewWithPrefix(client, "my-prefix") 26// if err != nil { 27// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) 28// } 29// 30// 31// For full control over all options, an etcd3.LockerOptions may be passed into 32// etcd3.NewWithLockerOptions like the following example: 33// 34// ttl := 15 // seconds 35// options := etcd3locker.NewLockerOptions(ttl, "my-prefix") 36// locker, err := etcd3locker.NewWithLockerOptions(client, options) 37// if err != nil { 38// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) 39// } 40// 41// Tested on etcd 3.1/3.2/3.3 42// 43package etcd3locker 44 45import ( 46 "errors" 47 "sync" 48 "time" 49 50 "github.com/tus/tusd" 51 etcd3 "go.etcd.io/etcd/clientv3" 52 "go.etcd.io/etcd/clientv3/concurrency" 53) 54 55var ( 56 ErrLockNotHeld = errors.New("Lock not held") 57 GrantTimeout = 1500 * time.Millisecond 58) 59 60type Etcd3Locker struct { 61 // etcd3 client session 62 Client *etcd3.Client 63 64 // locks is used for storing Etcd3Locks before they are 65 // unlocked. If you want to release a lock, you need the same locker 66 // instance and therefore we need to save them temporarily. 67 locks map[string]*etcd3Lock 68 mutex sync.Mutex 69 prefix string 70 sessionTtl int 71} 72 73// New constructs a new locker using the provided client. 74func New(client *etcd3.Client) (*Etcd3Locker, error) { 75 return NewWithLockerOptions(client, DefaultLockerOptions()) 76} 77 78// This method may be used if a different prefix is required for multi-tenant etcd clusters 79func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) { 80 lockerOptions := DefaultLockerOptions() 81 lockerOptions.SetPrefix(prefix) 82 return NewWithLockerOptions(client, lockerOptions) 83} 84 85// This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular. 86func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) { 87 locksMap := map[string]*etcd3Lock{} 88 return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTtl: opts.Ttl(), locks: locksMap, mutex: sync.Mutex{}}, nil 89} 90 91// UseIn adds this locker to the passed composer. 92func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) { 93 composer.UseLocker(locker) 94} 95 96// LockUpload tries to obtain the exclusive lock. 97func (locker *Etcd3Locker) LockUpload(id string) error { 98 session, err := locker.createSession() 99 if err != nil { 100 return err 101 } 102 103 lock := newEtcd3Lock(session, locker.getId(id)) 104 105 err = lock.Acquire() 106 if err != nil { 107 return err 108 } 109 110 locker.mutex.Lock() 111 defer locker.mutex.Unlock() 112 // Only add the lock to our list if the acquire was successful and no error appeared. 113 locker.locks[locker.getId(id)] = lock 114 115 return nil 116} 117 118// UnlockUpload releases a lock. 119func (locker *Etcd3Locker) UnlockUpload(id string) error { 120 locker.mutex.Lock() 121 defer locker.mutex.Unlock() 122 123 // Complain if no lock has been found. This can only happen if LockUpload 124 // has not been invoked before or UnlockUpload multiple times. 125 lock, ok := locker.locks[locker.getId(id)] 126 if !ok { 127 return ErrLockNotHeld 128 } 129 130 err := lock.Release() 131 if err != nil { 132 return err 133 } 134 135 defer delete(locker.locks, locker.getId(id)) 136 return lock.CloseSession() 137} 138 139func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) { 140 return concurrency.NewSession(locker.Client, concurrency.WithTTL(locker.sessionTtl)) 141} 142 143func (locker *Etcd3Locker) getId(id string) string { 144 return locker.prefix + id 145} 146