1package pouchdb
2
3import (
4	"context"
5	"sync"
6	"time"
7
8	"github.com/gopherjs/gopherjs/js"
9
10	"github.com/go-kivik/kivik"
11	"github.com/go-kivik/kivik/driver"
12	"github.com/go-kivik/kivik/errors"
13	"github.com/go-kivik/pouchdb/bindings"
14)
15
16type replication struct {
17	source    string
18	target    string
19	startTime time.Time
20	endTime   time.Time
21	state     kivik.ReplicationState
22	err       error
23
24	// mu protects the above values
25	mu sync.RWMutex
26
27	client *client
28	rh     *replicationHandler
29}
30
31var _ driver.Replication = &replication{}
32
33func (c *client) newReplication(target, source string, rep *js.Object) *replication {
34	r := &replication{
35		target: target,
36		source: source,
37		rh:     newReplicationHandler(rep),
38		client: c,
39	}
40	c.replicationsMU.Lock()
41	defer c.replicationsMU.Unlock()
42	c.replications = append(c.replications, r)
43	return r
44}
45
46func (r *replication) readLock() func() {
47	r.mu.RLock()
48	return r.mu.RUnlock
49}
50
51func (r *replication) ReplicationID() string { return "" }
52func (r *replication) Source() string        { defer r.readLock()(); return r.source }
53func (r *replication) Target() string        { defer r.readLock()(); return r.target }
54func (r *replication) StartTime() time.Time  { defer r.readLock()(); return r.startTime }
55func (r *replication) EndTime() time.Time    { defer r.readLock()(); return r.endTime }
56func (r *replication) State() string         { defer r.readLock()(); return string(r.state) }
57func (r *replication) Err() error            { defer r.readLock()(); return r.err }
58
59func (r *replication) Update(ctx context.Context, state *driver.ReplicationInfo) (err error) {
60	defer bindings.RecoverError(&err)
61	r.mu.Lock()
62	defer r.mu.Unlock()
63	event, info, err := r.rh.Status()
64	if err != nil {
65		return err
66	}
67	switch event {
68	case bindings.ReplicationEventDenied, bindings.ReplicationEventError:
69		r.state = kivik.ReplicationError
70		r.err = bindings.NewPouchError(info.Object)
71	case bindings.ReplicationEventComplete:
72		r.state = kivik.ReplicationComplete
73	case bindings.ReplicationEventPaused, bindings.ReplicationEventChange, bindings.ReplicationEventActive:
74		r.state = kivik.ReplicationStarted
75	}
76	if info != nil {
77		startTime, endTime := info.StartTime(), info.EndTime()
78		if r.startTime.IsZero() && !startTime.IsZero() {
79			r.startTime = startTime
80		}
81		if r.endTime.IsZero() && !endTime.IsZero() {
82			r.endTime = endTime
83		}
84		if r.rh.state != nil {
85			state.DocWriteFailures = r.rh.state.DocWriteFailures
86			state.DocsRead = r.rh.state.DocsRead
87			state.DocsWritten = r.rh.state.DocsWritten
88		}
89	}
90	return nil
91}
92
93func (r *replication) Delete(ctx context.Context) (err error) {
94	defer bindings.RecoverError(&err)
95	r.rh.Cancel()
96	r.client.replicationsMU.Lock()
97	defer r.client.replicationsMU.Unlock()
98	for i, rep := range r.client.replications {
99		if rep == r {
100			last := len(r.client.replications) - 1
101			r.client.replications[i] = r.client.replications[last]
102			r.client.replications[last] = nil
103			r.client.replications = r.client.replications[:last]
104			return nil
105		}
106	}
107	return errors.Status(kivik.StatusNotFound, "replication not found")
108}
109
110func replicationEndpoint(dsn string, object interface{}) (name string, obj interface{}, err error) {
111	defer bindings.RecoverError(&err)
112	if object == nil {
113		return dsn, dsn, nil
114	}
115	switch t := object.(type) {
116	case *js.Object:
117		tx := object.(*js.Object) // https://github.com/gopherjs/gopherjs/issues/682
118		// Assume it's a raw PouchDB object
119		return tx.Get("name").String(), tx, nil
120	case *bindings.DB:
121		// Unwrap the bare object
122		return t.Object.Get("name").String(), t.Object, nil
123	}
124	// Just let it pass through
125	return "<unknown>", obj, nil
126}
127
128func (c *client) Replicate(_ context.Context, targetDSN, sourceDSN string, options map[string]interface{}) (driver.Replication, error) {
129	opts, err := c.options(options)
130	if err != nil {
131		return nil, err
132	}
133	// Allow overriding source and target with options, i.e. for PouchDB objects
134	sourceName, sourceObj, err := replicationEndpoint(sourceDSN, opts["source"])
135	if err != nil {
136		return nil, err
137	}
138	targetName, targetObj, err := replicationEndpoint(targetDSN, opts["target"])
139	if err != nil {
140		return nil, err
141	}
142	delete(opts, "source")
143	delete(opts, "target")
144	rep, err := c.pouch.Replicate(sourceObj, targetObj, opts)
145	if err != nil {
146		return nil, err
147	}
148	return c.newReplication(targetName, sourceName, rep), nil
149}
150
151func (c *client) GetReplications(_ context.Context, options map[string]interface{}) ([]driver.Replication, error) {
152	for range options {
153		return nil, errors.Status(kivik.StatusBadAPICall, "options not yet supported")
154	}
155	c.replicationsMU.RLock()
156	defer c.replicationsMU.RUnlock()
157	reps := make([]driver.Replication, len(c.replications))
158	for i, rep := range c.replications {
159		reps[i] = rep
160	}
161	return reps, nil
162}
163