1package pouchdb
2
3import (
4	"fmt"
5	"io"
6	"sync"
7	"time"
8
9	"github.com/gopherjs/gopherjs/js"
10	"github.com/gopherjs/jsbuiltin"
11
12	"github.com/go-kivik/pouchdb/bindings"
13)
14
15type replicationState struct {
16	*js.Object
17	startTime        time.Time `js:"start_time"`
18	endTime          time.Time `js:"end_time"`
19	DocsRead         int64     `js:"docs_read"`
20	DocsWritten      int64     `js:"docs_written"`
21	DocWriteFailures int64     `js:"doc_write_failures"`
22	LastSeq          string    `js:"last_seq"`
23}
24
25func (rs *replicationState) StartTime() time.Time {
26	value := rs.Get("start_time")
27	if jsbuiltin.InstanceOf(value, js.Global.Get("Date")) {
28		return rs.startTime
29	}
30	t, err := convertTime(value)
31	if err != nil {
32		panic("start time: " + err.Error())
33	}
34	return t
35}
36
37func (rs *replicationState) EndTime() time.Time {
38	value := rs.Get("end_time")
39	if jsbuiltin.InstanceOf(value, js.Global.Get("Date")) {
40		return rs.endTime
41	}
42	t, err := convertTime(value)
43	if err != nil {
44		panic("end time: " + err.Error())
45	}
46	return t
47}
48
49func convertTime(value *js.Object) (time.Time, error) {
50	if value == js.Undefined {
51		return time.Time{}, nil
52	}
53	switch jsbuiltin.TypeOf(value) {
54	case jsbuiltin.TypeString:
55		return time.Parse(time.RFC3339, value.String())
56	}
57	return time.Time{}, fmt.Errorf("unsupported type")
58}
59
60type replicationHandler struct {
61	event *string
62	state *replicationState
63
64	mu       sync.Mutex
65	wg       sync.WaitGroup
66	complete bool
67	obj      *js.Object
68}
69
70func (r *replicationHandler) Cancel() {
71	r.obj.Call("cancel")
72}
73
74// Status returns the last-read status. If the last-read status was already read,
75// this blocks until the next event.  If the replication is complete, it will
76// return io.EOF immediately.
77func (r *replicationHandler) Status() (string, *replicationState, error) {
78	if r.complete && r.event == nil {
79		return "", nil, io.EOF
80	}
81	r.mu.Lock()
82	if r.event == nil {
83		r.mu.Unlock()
84		// Wait for an event to be ready to read
85		r.wg.Wait()
86		r.mu.Lock()
87	}
88	event, state := r.event, r.state
89	r.event = nil
90	r.mu.Unlock()
91	r.wg.Add(1)
92	return *event, state, nil
93}
94
95func (r *replicationHandler) handleEvent(event string, info *js.Object) {
96	if r.complete {
97		panic(fmt.Sprintf("Unexpected replication event after complete. %v %v", event, info))
98	}
99	r.mu.Lock()
100	defer r.mu.Unlock()
101	r.event = &event
102	switch event {
103	case bindings.ReplicationEventDenied, bindings.ReplicationEventError, bindings.ReplicationEventComplete:
104		r.complete = true
105	}
106	if info != nil && info != js.Undefined {
107		r.state = &replicationState{Object: info}
108	}
109	r.wg.Done()
110}
111
112func newReplicationHandler(rep *js.Object) *replicationHandler {
113	r := &replicationHandler{obj: rep}
114	for _, event := range []string{
115		bindings.ReplicationEventChange,
116		bindings.ReplicationEventComplete,
117		bindings.ReplicationEventPaused,
118		bindings.ReplicationEventActive,
119		bindings.ReplicationEventDenied,
120		bindings.ReplicationEventError,
121	} {
122		func(e string) {
123			rep.Call("on", e, func(info *js.Object) {
124				r.handleEvent(e, info)
125			})
126		}(event)
127	}
128	r.wg.Add(1)
129	return r
130}
131