1package kivik
2
3import (
4	"context"
5	"sync"
6	"time"
7
8	"github.com/go-kivik/kivik/driver"
9	"github.com/go-kivik/kivik/errors"
10)
11
12// ReplicationState represents a replication's state
13type ReplicationState string
14
15// The possible values for the _replication_state field in _replicator documents
16// plus a blank value for unstarted replications.
17const (
18	ReplicationNotStarted ReplicationState = ""
19	ReplicationStarted    ReplicationState = "triggered"
20	ReplicationError      ReplicationState = "error"
21	ReplicationComplete   ReplicationState = "completed"
22)
23
24// Replication represents a CouchDB replication process.
25type Replication struct {
26	Source string
27	Target string
28
29	infoMU    sync.RWMutex
30	info      *driver.ReplicationInfo
31	statusErr error
32	irep      driver.Replication
33}
34
35// DocsWritten returns the number of documents written, if known.
36func (r *Replication) DocsWritten() int64 {
37	if r != nil && r.info != nil {
38		r.infoMU.RLock()
39		defer r.infoMU.RUnlock()
40		return r.info.DocsWritten
41	}
42	return 0
43}
44
45// DocsRead returns the number of documents read, if known.
46func (r *Replication) DocsRead() int64 {
47	if r != nil && r.info != nil {
48		r.infoMU.RLock()
49		defer r.infoMU.RUnlock()
50		return r.info.DocsRead
51	}
52	return 0
53}
54
55// DocWriteFailures returns the number of doc write failures, if known.
56func (r *Replication) DocWriteFailures() int64 {
57	if r != nil && r.info != nil {
58		r.infoMU.RLock()
59		defer r.infoMU.RUnlock()
60		return r.info.DocWriteFailures
61	}
62	return 0
63}
64
65// Progress returns the current replication progress, if known.
66func (r *Replication) Progress() float64 {
67	if r != nil && r.info != nil {
68		r.infoMU.RLock()
69		defer r.infoMU.RUnlock()
70		return r.info.Progress
71	}
72	return 0
73}
74
75func newReplication(rep driver.Replication) *Replication {
76	return &Replication{
77		Source: rep.Source(),
78		Target: rep.Target(),
79		irep:   rep,
80	}
81}
82
83// ReplicationID returns the _replication_id field of the replicator document.
84func (r *Replication) ReplicationID() string {
85	return r.irep.ReplicationID()
86}
87
88// StartTime returns the replication start time, once the replication has been
89// triggered.
90func (r *Replication) StartTime() time.Time {
91	return r.irep.StartTime()
92}
93
94// EndTime returns the replication end time, once the replication has terminated.
95func (r *Replication) EndTime() time.Time {
96	return r.irep.EndTime()
97}
98
99// State returns the current replication state
100func (r *Replication) State() ReplicationState {
101	return ReplicationState(r.irep.State())
102}
103
104// Err returns the error, if any, that caused the replication to abort.
105func (r *Replication) Err() error {
106	if r == nil {
107		return nil
108	}
109	return r.irep.Err()
110}
111
112// IsActive returns true if the replication has not yet completed or
113// errored.
114func (r *Replication) IsActive() bool {
115	if r == nil {
116		return false
117	}
118	return r.State() != ReplicationError && r.State() != ReplicationComplete
119}
120
121// Delete deletes a replication. If it is currently running, it will be
122// cancelled.
123func (r *Replication) Delete(ctx context.Context) error {
124	return r.irep.Delete(ctx)
125}
126
127// Update requests a replication state update from the server. If there is an
128// error retrieving the update, it is returned and the replication state is
129// unaltered.
130func (r *Replication) Update(ctx context.Context) error {
131	var info driver.ReplicationInfo
132	r.statusErr = r.irep.Update(ctx, &info)
133	if r.statusErr != nil {
134		return r.statusErr
135	}
136	r.infoMU.Lock()
137	r.info = &info
138	r.infoMU.Unlock()
139	return nil
140}
141
142// GetReplications returns a list of defined replications in the _replicator
143// database. Options are in the same format as to AllDocs(), except that
144// "conflicts" and "update_seq" are ignored.
145func (c *Client) GetReplications(ctx context.Context, options ...Options) ([]*Replication, error) {
146	if replicator, ok := c.driverClient.(driver.ClientReplicator); ok {
147		opts, err := mergeOptions(options...)
148		if err != nil {
149			return nil, err
150		}
151		reps, err := replicator.GetReplications(ctx, opts)
152		if err != nil {
153			return nil, err
154		}
155		replications := make([]*Replication, len(reps))
156		for i, rep := range reps {
157			replications[i] = newReplication(rep)
158		}
159		return replications, nil
160	}
161	return nil, errors.Status(StatusNotImplemented, "kivik: driver does not support replication")
162}
163
164// Replicate initiates a replication from source to target.
165func (c *Client) Replicate(ctx context.Context, targetDSN, sourceDSN string, options ...Options) (*Replication, error) {
166	if replicator, ok := c.driverClient.(driver.ClientReplicator); ok {
167		opts, err := mergeOptions(options...)
168		if err != nil {
169			return nil, err
170		}
171		rep, err := replicator.Replicate(ctx, targetDSN, sourceDSN, opts)
172		if err != nil {
173			return nil, err
174		}
175		return newReplication(rep), nil
176	}
177	return nil, errors.Status(StatusNotImplemented, "kivik: driver does not support replication")
178}
179
180// ReplicationInfo represents a snapshot of the status of a replication.
181type ReplicationInfo struct {
182	DocWriteFailures int64
183	DocsRead         int64
184	DocsWritten      int64
185	Progress         float64
186}
187