1package couchdb
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"time"
8
9	"github.com/go-kivik/couchdb/chttp"
10	"github.com/go-kivik/kivik"
11	"github.com/go-kivik/kivik/driver"
12	"github.com/go-kivik/kivik/errors"
13)
14
15type schedulerDoc struct {
16	Database      string    `json:"database"`
17	DocID         string    `json:"doc_id"`
18	ReplicationID string    `json:"id"`
19	Source        string    `json:"source"`
20	Target        string    `json:"target"`
21	StartTime     time.Time `json:"start_time"`
22	LastUpdated   time.Time `json:"last_updated"`
23	State         string    `json:"state"`
24	Info          repInfo   `json:"info"`
25}
26
27type repInfo struct {
28	Error            error
29	DocsRead         int64 `json:"docs_read"`
30	DocsWritten      int64 `json:"docs_written"`
31	DocWriteFailures int64 `json:"doc_write_failures"`
32	Pending          int64 `json:"changes_pending"`
33}
34
35func (i *repInfo) UnmarshalJSON(data []byte) error {
36	switch {
37	case string(data) == "null":
38		return nil
39	case data[0] == '{':
40		type repInfoClone repInfo
41		var x repInfoClone
42		if err := json.Unmarshal(data, &x); err != nil {
43			return err
44		}
45		*i = repInfo(x)
46	default:
47		var e replicationError
48		if err := json.Unmarshal(data, &e); err != nil {
49			return err
50		}
51		i.Error = &e
52	}
53	return nil
54}
55
56type schedulerReplication struct {
57	docID         string
58	database      string
59	replicationID string
60	source        string
61	target        string
62	startTime     time.Time
63	lastUpdated   time.Time
64	state         string
65	info          repInfo
66
67	*db
68}
69
70var _ driver.Replication = &schedulerReplication{}
71
72func (c *client) schedulerSupported(ctx context.Context) (bool, error) {
73	c.sdMU.Lock()
74	defer c.sdMU.Unlock()
75	if c.schedulerDetected != nil {
76		return *c.schedulerDetected, nil
77	}
78	resp, err := c.DoReq(ctx, kivik.MethodHead, "_scheduler/jobs", nil)
79	if err != nil {
80		return false, err
81	}
82	var supported bool
83	switch resp.StatusCode {
84	case kivik.StatusBadRequest:
85		// 1.6.x, 1.7.x
86		supported = false
87	case kivik.StatusNotFound:
88		// 2.0.x
89		supported = false
90	case kivik.StatusOK, kivik.StatusUnauthorized:
91		// 2.1.x +
92		supported = true
93	default:
94		return false, errors.Statusf(kivik.StatusBadResponse, "Unknown response code %d", resp.StatusCode)
95	}
96	c.schedulerDetected = &supported
97	return supported, nil
98}
99
100func (c *client) newSchedulerReplication(doc *schedulerDoc) *schedulerReplication {
101	rep := &schedulerReplication{
102		db: &db{
103			client: c,
104			dbName: doc.Database,
105		},
106	}
107	rep.setFromDoc(doc)
108	return rep
109}
110
111func (r *schedulerReplication) setFromDoc(doc *schedulerDoc) {
112	if r.source == "" {
113		r.docID = doc.DocID
114		r.database = doc.Database
115		r.replicationID = doc.ReplicationID
116		r.source = doc.Source
117		r.target = doc.Target
118		r.startTime = doc.StartTime
119	}
120	r.lastUpdated = doc.LastUpdated
121	r.state = doc.State
122	r.info = doc.Info
123}
124
125func (c *client) fetchSchedulerReplication(ctx context.Context, docID string) (*schedulerReplication, error) {
126	rep := &schedulerReplication{
127		docID:    docID,
128		database: "_replicator",
129		db: &db{
130			client: c,
131			dbName: "_replicator",
132		},
133	}
134	for rep.source == "" {
135		if err := rep.update(ctx); err != nil {
136			return rep, err
137		}
138		time.Sleep(100 * time.Millisecond)
139	}
140	return rep, nil
141}
142
143func (r *schedulerReplication) StartTime() time.Time { return r.startTime }
144func (r *schedulerReplication) EndTime() time.Time {
145	if r.state == "failed" || r.state == "completed" {
146		return r.lastUpdated
147	}
148	return time.Time{}
149}
150func (r *schedulerReplication) Err() error            { return r.info.Error }
151func (r *schedulerReplication) ReplicationID() string { return r.replicationID }
152func (r *schedulerReplication) Source() string        { return r.source }
153func (r *schedulerReplication) Target() string        { return r.target }
154func (r *schedulerReplication) State() string         { return r.state }
155
156func (r *schedulerReplication) Update(ctx context.Context, rep *driver.ReplicationInfo) error {
157	if err := r.update(ctx); err != nil {
158		return err
159	}
160	rep.DocWriteFailures = r.info.DocWriteFailures
161	rep.DocsRead = r.info.DocsRead
162	rep.DocsWritten = r.info.DocsWritten
163	return nil
164}
165
166func (r *schedulerReplication) Delete(ctx context.Context) error {
167	_, rev, err := r.GetMeta(ctx, r.docID, nil)
168	if err != nil {
169		return err
170	}
171	_, err = r.db.Delete(ctx, r.docID, rev, nil)
172	return err
173}
174
175func (r *schedulerReplication) update(ctx context.Context) error {
176	path := fmt.Sprintf("/_scheduler/docs/%s/%s", r.database, chttp.EncodeDocID(r.docID))
177	var doc schedulerDoc
178	if _, err := r.db.Client.DoJSON(ctx, kivik.MethodGet, path, nil, &doc); err != nil {
179		if cerr, ok := err.(*chttp.HTTPError); ok {
180			if cerr.Code == 500 && cerr.Reason == "function_clause" {
181				// This is a race condition bug in CouchDB 2.1.x. So try again.
182				// See https://github.com/apache/couchdb/issues/1000
183				return r.update(ctx)
184			}
185		}
186		return err
187	}
188	r.setFromDoc(&doc)
189	return nil
190}
191
192func (c *client) getReplicationsFromScheduler(ctx context.Context, options map[string]interface{}) ([]driver.Replication, error) {
193	params, err := optionsToParams(options)
194	if err != nil {
195		return nil, err
196	}
197	var result struct {
198		Docs []schedulerDoc `json:"docs"`
199	}
200	path := "/_scheduler/docs"
201	if params != nil {
202		path = path + "?" + params.Encode()
203	}
204	if _, err = c.DoJSON(ctx, kivik.MethodGet, path, nil, &result); err != nil {
205		return nil, err
206	}
207	reps := make([]driver.Replication, 0, len(result.Docs))
208	for _, row := range result.Docs {
209		rep := c.newSchedulerReplication(&row)
210		reps = append(reps, rep)
211	}
212	return reps, nil
213}
214