1package couchdb 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "github.com/go-kivik/couchdb/chttp" 10 "github.com/go-kivik/kivik" 11 "github.com/go-kivik/kivik/driver" 12 "github.com/go-kivik/kivik/errors" 13) 14 15type schedulerDoc struct { 16 Database string `json:"database"` 17 DocID string `json:"doc_id"` 18 ReplicationID string `json:"id"` 19 Source string `json:"source"` 20 Target string `json:"target"` 21 StartTime time.Time `json:"start_time"` 22 LastUpdated time.Time `json:"last_updated"` 23 State string `json:"state"` 24 Info repInfo `json:"info"` 25} 26 27type repInfo struct { 28 Error error 29 DocsRead int64 `json:"docs_read"` 30 DocsWritten int64 `json:"docs_written"` 31 DocWriteFailures int64 `json:"doc_write_failures"` 32 Pending int64 `json:"changes_pending"` 33} 34 35func (i *repInfo) UnmarshalJSON(data []byte) error { 36 switch { 37 case string(data) == "null": 38 return nil 39 case data[0] == '{': 40 type repInfoClone repInfo 41 var x repInfoClone 42 if err := json.Unmarshal(data, &x); err != nil { 43 return err 44 } 45 *i = repInfo(x) 46 default: 47 var e replicationError 48 if err := json.Unmarshal(data, &e); err != nil { 49 return err 50 } 51 i.Error = &e 52 } 53 return nil 54} 55 56type schedulerReplication struct { 57 docID string 58 database string 59 replicationID string 60 source string 61 target string 62 startTime time.Time 63 lastUpdated time.Time 64 state string 65 info repInfo 66 67 *db 68} 69 70var _ driver.Replication = &schedulerReplication{} 71 72func (c *client) schedulerSupported(ctx context.Context) (bool, error) { 73 c.sdMU.Lock() 74 defer c.sdMU.Unlock() 75 if c.schedulerDetected != nil { 76 return *c.schedulerDetected, nil 77 } 78 resp, err := c.DoReq(ctx, kivik.MethodHead, "_scheduler/jobs", nil) 79 if err != nil { 80 return false, err 81 } 82 var supported bool 83 switch resp.StatusCode { 84 case kivik.StatusBadRequest: 85 // 1.6.x, 1.7.x 86 supported = false 87 case kivik.StatusNotFound: 88 // 2.0.x 89 supported = false 90 case kivik.StatusOK, kivik.StatusUnauthorized: 91 // 2.1.x + 92 supported = true 93 default: 94 return false, errors.Statusf(kivik.StatusBadResponse, "Unknown response code %d", resp.StatusCode) 95 } 96 c.schedulerDetected = &supported 97 return supported, nil 98} 99 100func (c *client) newSchedulerReplication(doc *schedulerDoc) *schedulerReplication { 101 rep := &schedulerReplication{ 102 db: &db{ 103 client: c, 104 dbName: doc.Database, 105 }, 106 } 107 rep.setFromDoc(doc) 108 return rep 109} 110 111func (r *schedulerReplication) setFromDoc(doc *schedulerDoc) { 112 if r.source == "" { 113 r.docID = doc.DocID 114 r.database = doc.Database 115 r.replicationID = doc.ReplicationID 116 r.source = doc.Source 117 r.target = doc.Target 118 r.startTime = doc.StartTime 119 } 120 r.lastUpdated = doc.LastUpdated 121 r.state = doc.State 122 r.info = doc.Info 123} 124 125func (c *client) fetchSchedulerReplication(ctx context.Context, docID string) (*schedulerReplication, error) { 126 rep := &schedulerReplication{ 127 docID: docID, 128 database: "_replicator", 129 db: &db{ 130 client: c, 131 dbName: "_replicator", 132 }, 133 } 134 for rep.source == "" { 135 if err := rep.update(ctx); err != nil { 136 return rep, err 137 } 138 time.Sleep(100 * time.Millisecond) 139 } 140 return rep, nil 141} 142 143func (r *schedulerReplication) StartTime() time.Time { return r.startTime } 144func (r *schedulerReplication) EndTime() time.Time { 145 if r.state == "failed" || r.state == "completed" { 146 return r.lastUpdated 147 } 148 return time.Time{} 149} 150func (r *schedulerReplication) Err() error { return r.info.Error } 151func (r *schedulerReplication) ReplicationID() string { return r.replicationID } 152func (r *schedulerReplication) Source() string { return r.source } 153func (r *schedulerReplication) Target() string { return r.target } 154func (r *schedulerReplication) State() string { return r.state } 155 156func (r *schedulerReplication) Update(ctx context.Context, rep *driver.ReplicationInfo) error { 157 if err := r.update(ctx); err != nil { 158 return err 159 } 160 rep.DocWriteFailures = r.info.DocWriteFailures 161 rep.DocsRead = r.info.DocsRead 162 rep.DocsWritten = r.info.DocsWritten 163 return nil 164} 165 166func (r *schedulerReplication) Delete(ctx context.Context) error { 167 _, rev, err := r.GetMeta(ctx, r.docID, nil) 168 if err != nil { 169 return err 170 } 171 _, err = r.db.Delete(ctx, r.docID, rev, nil) 172 return err 173} 174 175func (r *schedulerReplication) update(ctx context.Context) error { 176 path := fmt.Sprintf("/_scheduler/docs/%s/%s", r.database, chttp.EncodeDocID(r.docID)) 177 var doc schedulerDoc 178 if _, err := r.db.Client.DoJSON(ctx, kivik.MethodGet, path, nil, &doc); err != nil { 179 if cerr, ok := err.(*chttp.HTTPError); ok { 180 if cerr.Code == 500 && cerr.Reason == "function_clause" { 181 // This is a race condition bug in CouchDB 2.1.x. So try again. 182 // See https://github.com/apache/couchdb/issues/1000 183 return r.update(ctx) 184 } 185 } 186 return err 187 } 188 r.setFromDoc(&doc) 189 return nil 190} 191 192func (c *client) getReplicationsFromScheduler(ctx context.Context, options map[string]interface{}) ([]driver.Replication, error) { 193 params, err := optionsToParams(options) 194 if err != nil { 195 return nil, err 196 } 197 var result struct { 198 Docs []schedulerDoc `json:"docs"` 199 } 200 path := "/_scheduler/docs" 201 if params != nil { 202 path = path + "?" + params.Encode() 203 } 204 if _, err = c.DoJSON(ctx, kivik.MethodGet, path, nil, &result); err != nil { 205 return nil, err 206 } 207 reps := make([]driver.Replication, 0, len(result.Docs)) 208 for _, row := range result.Docs { 209 rep := c.newSchedulerReplication(&row) 210 reps = append(reps, rep) 211 } 212 return reps, nil 213} 214