1// Package etcd3locker provides a locking mechanism using an etcd3 cluster
2//
3// To initialize a locker, a pre-existing connected etcd3 client must be present
4//
5//	client, err := clientv3.New(clientv3.Config{
6//		Endpoints:   []string{harness.Endpoint},
7//		DialTimeout: 5 * time.Second,
8//	})
9//
10// For the most basic locker (e.g. non-shared etcd3 cluster / use default TTLs),
11// a locker can be instantiated like the following:
12//
13//	locker, err := etcd3locker.New(client)
14//	if err != nil {
15//		return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
16//	}
17//
18// The locker will need to be included in composer that is used by tusd:
19//
20//	composer := tusd.NewStoreComposer()
21//	locker.UseIn(composer)
22//
23// For a shared etcd3 cluster, you may want to modify the prefix that etcd3locker uses:
24//
25//	locker, err := etcd3locker.NewWithPrefix(client, "my-prefix")
26//	if err != nil {
27//		return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
28//	}
29//
30//
31// For full control over all options, an etcd3.LockerOptions may be passed into
32// etcd3.NewWithLockerOptions like the following example:
33//
34//	ttl := 15 // seconds
35//	options := etcd3locker.NewLockerOptions(ttl, "my-prefix")
36//	locker, err := etcd3locker.NewWithLockerOptions(client, options)
37//	if err != nil {
38//		return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
39//	}
40//
41// Tested on etcd 3.1/3.2/3.3
42//
43package etcd3locker
44
45import (
46	"errors"
47	"sync"
48	"time"
49
50	"github.com/tus/tusd"
51	etcd3 "go.etcd.io/etcd/clientv3"
52	"go.etcd.io/etcd/clientv3/concurrency"
53)
54
55var (
56	ErrLockNotHeld = errors.New("Lock not held")
57	GrantTimeout   = 1500 * time.Millisecond
58)
59
60type Etcd3Locker struct {
61	// etcd3 client session
62	Client *etcd3.Client
63
64	// locks is used for storing Etcd3Locks before they are
65	// unlocked. If you want to release a lock, you need the same locker
66	// instance and therefore we need to save them temporarily.
67	locks      map[string]*etcd3Lock
68	mutex      sync.Mutex
69	prefix     string
70	sessionTtl int
71}
72
73// New constructs a new locker using the provided client.
74func New(client *etcd3.Client) (*Etcd3Locker, error) {
75	return NewWithLockerOptions(client, DefaultLockerOptions())
76}
77
78// This method may be used if a different prefix is required for multi-tenant etcd clusters
79func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) {
80	lockerOptions := DefaultLockerOptions()
81	lockerOptions.SetPrefix(prefix)
82	return NewWithLockerOptions(client, lockerOptions)
83}
84
85// This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular.
86func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) {
87	locksMap := map[string]*etcd3Lock{}
88	return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTtl: opts.Ttl(), locks: locksMap, mutex: sync.Mutex{}}, nil
89}
90
91// UseIn adds this locker to the passed composer.
92func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) {
93	composer.UseLocker(locker)
94}
95
96// LockUpload tries to obtain the exclusive lock.
97func (locker *Etcd3Locker) LockUpload(id string) error {
98	session, err := locker.createSession()
99	if err != nil {
100		return err
101	}
102
103	lock := newEtcd3Lock(session, locker.getId(id))
104
105	err = lock.Acquire()
106	if err != nil {
107		return err
108	}
109
110	locker.mutex.Lock()
111	defer locker.mutex.Unlock()
112	// Only add the lock to our list if the acquire was successful and no error appeared.
113	locker.locks[locker.getId(id)] = lock
114
115	return nil
116}
117
118// UnlockUpload releases a lock.
119func (locker *Etcd3Locker) UnlockUpload(id string) error {
120	locker.mutex.Lock()
121	defer locker.mutex.Unlock()
122
123	// Complain if no lock has been found. This can only happen if LockUpload
124	// has not been invoked before or UnlockUpload multiple times.
125	lock, ok := locker.locks[locker.getId(id)]
126	if !ok {
127		return ErrLockNotHeld
128	}
129
130	err := lock.Release()
131	if err != nil {
132		return err
133	}
134
135	defer delete(locker.locks, locker.getId(id))
136	return lock.CloseSession()
137}
138
139func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) {
140	return concurrency.NewSession(locker.Client, concurrency.WithTTL(locker.sessionTtl))
141}
142
143func (locker *Etcd3Locker) getId(id string) string {
144	return locker.prefix + id
145}
146