1package pouchdb 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/gopherjs/gopherjs/js" 9 10 "github.com/go-kivik/kivik" 11 "github.com/go-kivik/kivik/driver" 12 "github.com/go-kivik/kivik/errors" 13 "github.com/go-kivik/pouchdb/bindings" 14) 15 16type replication struct { 17 source string 18 target string 19 startTime time.Time 20 endTime time.Time 21 state kivik.ReplicationState 22 err error 23 24 // mu protects the above values 25 mu sync.RWMutex 26 27 client *client 28 rh *replicationHandler 29} 30 31var _ driver.Replication = &replication{} 32 33func (c *client) newReplication(target, source string, rep *js.Object) *replication { 34 r := &replication{ 35 target: target, 36 source: source, 37 rh: newReplicationHandler(rep), 38 client: c, 39 } 40 c.replicationsMU.Lock() 41 defer c.replicationsMU.Unlock() 42 c.replications = append(c.replications, r) 43 return r 44} 45 46func (r *replication) readLock() func() { 47 r.mu.RLock() 48 return r.mu.RUnlock 49} 50 51func (r *replication) ReplicationID() string { return "" } 52func (r *replication) Source() string { defer r.readLock()(); return r.source } 53func (r *replication) Target() string { defer r.readLock()(); return r.target } 54func (r *replication) StartTime() time.Time { defer r.readLock()(); return r.startTime } 55func (r *replication) EndTime() time.Time { defer r.readLock()(); return r.endTime } 56func (r *replication) State() string { defer r.readLock()(); return string(r.state) } 57func (r *replication) Err() error { defer r.readLock()(); return r.err } 58 59func (r *replication) Update(ctx context.Context, state *driver.ReplicationInfo) (err error) { 60 defer bindings.RecoverError(&err) 61 r.mu.Lock() 62 defer r.mu.Unlock() 63 event, info, err := r.rh.Status() 64 if err != nil { 65 return err 66 } 67 switch event { 68 case bindings.ReplicationEventDenied, bindings.ReplicationEventError: 69 r.state = kivik.ReplicationError 70 r.err = bindings.NewPouchError(info.Object) 71 case bindings.ReplicationEventComplete: 72 r.state = kivik.ReplicationComplete 73 case bindings.ReplicationEventPaused, bindings.ReplicationEventChange, bindings.ReplicationEventActive: 74 r.state = kivik.ReplicationStarted 75 } 76 if info != nil { 77 startTime, endTime := info.StartTime(), info.EndTime() 78 if r.startTime.IsZero() && !startTime.IsZero() { 79 r.startTime = startTime 80 } 81 if r.endTime.IsZero() && !endTime.IsZero() { 82 r.endTime = endTime 83 } 84 if r.rh.state != nil { 85 state.DocWriteFailures = r.rh.state.DocWriteFailures 86 state.DocsRead = r.rh.state.DocsRead 87 state.DocsWritten = r.rh.state.DocsWritten 88 } 89 } 90 return nil 91} 92 93func (r *replication) Delete(ctx context.Context) (err error) { 94 defer bindings.RecoverError(&err) 95 r.rh.Cancel() 96 r.client.replicationsMU.Lock() 97 defer r.client.replicationsMU.Unlock() 98 for i, rep := range r.client.replications { 99 if rep == r { 100 last := len(r.client.replications) - 1 101 r.client.replications[i] = r.client.replications[last] 102 r.client.replications[last] = nil 103 r.client.replications = r.client.replications[:last] 104 return nil 105 } 106 } 107 return errors.Status(kivik.StatusNotFound, "replication not found") 108} 109 110func replicationEndpoint(dsn string, object interface{}) (name string, obj interface{}, err error) { 111 defer bindings.RecoverError(&err) 112 if object == nil { 113 return dsn, dsn, nil 114 } 115 switch t := object.(type) { 116 case *js.Object: 117 tx := object.(*js.Object) // https://github.com/gopherjs/gopherjs/issues/682 118 // Assume it's a raw PouchDB object 119 return tx.Get("name").String(), tx, nil 120 case *bindings.DB: 121 // Unwrap the bare object 122 return t.Object.Get("name").String(), t.Object, nil 123 } 124 // Just let it pass through 125 return "<unknown>", obj, nil 126} 127 128func (c *client) Replicate(_ context.Context, targetDSN, sourceDSN string, options map[string]interface{}) (driver.Replication, error) { 129 opts, err := c.options(options) 130 if err != nil { 131 return nil, err 132 } 133 // Allow overriding source and target with options, i.e. for PouchDB objects 134 sourceName, sourceObj, err := replicationEndpoint(sourceDSN, opts["source"]) 135 if err != nil { 136 return nil, err 137 } 138 targetName, targetObj, err := replicationEndpoint(targetDSN, opts["target"]) 139 if err != nil { 140 return nil, err 141 } 142 delete(opts, "source") 143 delete(opts, "target") 144 rep, err := c.pouch.Replicate(sourceObj, targetObj, opts) 145 if err != nil { 146 return nil, err 147 } 148 return c.newReplication(targetName, sourceName, rep), nil 149} 150 151func (c *client) GetReplications(_ context.Context, options map[string]interface{}) ([]driver.Replication, error) { 152 for range options { 153 return nil, errors.Status(kivik.StatusBadAPICall, "options not yet supported") 154 } 155 c.replicationsMU.RLock() 156 defer c.replicationsMU.RUnlock() 157 reps := make([]driver.Replication, len(c.replications)) 158 for i, rep := range c.replications { 159 reps[i] = rep 160 } 161 return reps, nil 162} 163