1package kivik 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/flimzy/kivik/driver" 9 "github.com/flimzy/kivik/errors" 10) 11 12// ReplicationState represents a replication's state 13type ReplicationState string 14 15// The possible values for the _replication_state field in _replicator documents 16// plus a blank value for unstarted replications. 17const ( 18 ReplicationNotStarted ReplicationState = "" 19 ReplicationStarted ReplicationState = "triggered" 20 ReplicationError ReplicationState = "error" 21 ReplicationComplete ReplicationState = "completed" 22) 23 24// Replication represents a CouchDB replication process. 25type Replication struct { 26 Source string 27 Target string 28 29 infoMU sync.RWMutex 30 info *driver.ReplicationInfo 31 statusErr error 32 irep driver.Replication 33} 34 35// DocsWritten returns the number of documents written, if known. 36func (r *Replication) DocsWritten() int64 { 37 if r != nil && r.info != nil { 38 r.infoMU.RLock() 39 defer r.infoMU.RUnlock() 40 return r.info.DocsWritten 41 } 42 return 0 43} 44 45// DocsRead returns the number of documents read, if known. 46func (r *Replication) DocsRead() int64 { 47 if r != nil && r.info != nil { 48 r.infoMU.RLock() 49 defer r.infoMU.RUnlock() 50 return r.info.DocsRead 51 } 52 return 0 53} 54 55// DocWriteFailures returns the number of doc write failures, if known. 56func (r *Replication) DocWriteFailures() int64 { 57 if r != nil && r.info != nil { 58 r.infoMU.RLock() 59 defer r.infoMU.RUnlock() 60 return r.info.DocWriteFailures 61 } 62 return 0 63} 64 65// Progress returns the current replication progress, if known. 66func (r *Replication) Progress() float64 { 67 if r != nil && r.info != nil { 68 r.infoMU.RLock() 69 defer r.infoMU.RUnlock() 70 return r.info.Progress 71 } 72 return 0 73} 74 75func newReplication(rep driver.Replication) *Replication { 76 return &Replication{ 77 Source: rep.Source(), 78 Target: rep.Target(), 79 irep: rep, 80 } 81} 82 83// ReplicationID returns the _replication_id field of the replicator document. 84func (r *Replication) ReplicationID() string { 85 return r.irep.ReplicationID() 86} 87 88// StartTime returns the replication start time, once the replication has been 89// triggered. 90func (r *Replication) StartTime() time.Time { 91 return r.irep.StartTime() 92} 93 94// EndTime returns the replication end time, once the replication has terminated. 95func (r *Replication) EndTime() time.Time { 96 return r.irep.EndTime() 97} 98 99// State returns the current replication state 100func (r *Replication) State() ReplicationState { 101 return ReplicationState(r.irep.State()) 102} 103 104// Err returns the error, if any, that caused the replication to abort. 105func (r *Replication) Err() error { 106 if r == nil { 107 return nil 108 } 109 return r.irep.Err() 110} 111 112// IsActive returns true if the replication has not yet completed or 113// errored. 114func (r *Replication) IsActive() bool { 115 if r == nil { 116 return false 117 } 118 return r.State() != ReplicationError && r.State() != ReplicationComplete 119} 120 121// Delete deletes a replication. If it is currently running, it will be 122// cancelled. 123func (r *Replication) Delete(ctx context.Context) error { 124 return r.irep.Delete(ctx) 125} 126 127// Update requests a replication state update from the server. If there is an 128// error retrieving the update, it is returned and the replication state is 129// unaltered. 130func (r *Replication) Update(ctx context.Context) error { 131 var info driver.ReplicationInfo 132 r.statusErr = r.irep.Update(ctx, &info) 133 if r.statusErr != nil { 134 return r.statusErr 135 } 136 r.infoMU.Lock() 137 r.info = &info 138 r.infoMU.Unlock() 139 return nil 140} 141 142// GetReplications returns a list of defined replications in the _replicator 143// database. Options are in the same format as to AllDocs(), except that 144// "conflicts" and "update_seq" are ignored. 145func (c *Client) GetReplications(ctx context.Context, options ...Options) ([]*Replication, error) { 146 if replicator, ok := c.driverClient.(driver.ClientReplicator); ok { 147 opts, err := mergeOptions(options...) 148 if err != nil { 149 return nil, err 150 } 151 reps, err := replicator.GetReplications(ctx, opts) 152 if err != nil { 153 return nil, err 154 } 155 replications := make([]*Replication, len(reps)) 156 for i, rep := range reps { 157 replications[i] = newReplication(rep) 158 } 159 return replications, nil 160 } 161 return nil, errors.Status(StatusNotImplemented, "kivik: driver does not support replication") 162} 163 164// Replicate initiates a replication from source to target. 165func (c *Client) Replicate(ctx context.Context, targetDSN, sourceDSN string, options ...Options) (*Replication, error) { 166 if replicator, ok := c.driverClient.(driver.ClientReplicator); ok { 167 opts, err := mergeOptions(options...) 168 if err != nil { 169 return nil, err 170 } 171 rep, err := replicator.Replicate(ctx, targetDSN, sourceDSN, opts) 172 if err != nil { 173 return nil, err 174 } 175 return newReplication(rep), nil 176 } 177 return nil, errors.Status(StatusNotImplemented, "kivik: driver does not support replication") 178} 179 180// ReplicationInfo represents a snapshot of the status of a replication. 181type ReplicationInfo struct { 182 DocWriteFailures int64 183 DocsRead int64 184 DocsWritten int64 185 Progress float64 186} 187