1package agent
2
3import (
4	"log"
5	"time"
6
7	"github.com/hashicorp/serf/serf"
8)
9
10// queryResponseStream is used to stream the query results back to a client
11type queryResponseStream struct {
12	client streamClient
13	logger *log.Logger
14	seq    uint64
15}
16
17func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream {
18	qs := &queryResponseStream{
19		client: client,
20		logger: logger,
21		seq:    seq,
22	}
23	return qs
24}
25
26// Stream is a long running routine used to stream the results of a query back to a client
27func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) {
28	// Setup a timer for the query ending
29	remaining := resp.Deadline().Sub(time.Now())
30	done := time.After(remaining)
31
32	ackCh := resp.AckCh()
33	respCh := resp.ResponseCh()
34	for {
35		select {
36		case a := <-ackCh:
37			if err := qs.sendAck(a); err != nil {
38				qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err)
39				return
40			}
41		case r := <-respCh:
42			if err := qs.sendResponse(r.From, r.Payload); err != nil {
43				qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err)
44				return
45			}
46		case <-done:
47			if err := qs.sendDone(); err != nil {
48				qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err)
49			}
50			return
51		}
52	}
53}
54
55// sendAck is used to send a single ack
56func (qs *queryResponseStream) sendAck(from string) error {
57	header := responseHeader{
58		Seq:   qs.seq,
59		Error: "",
60	}
61	rec := queryRecord{
62		Type: queryRecordAck,
63		From: from,
64	}
65	return qs.client.Send(&header, &rec)
66}
67
68// sendResponse is used to send a single response
69func (qs *queryResponseStream) sendResponse(from string, payload []byte) error {
70	header := responseHeader{
71		Seq:   qs.seq,
72		Error: "",
73	}
74	rec := queryRecord{
75		Type:    queryRecordResponse,
76		From:    from,
77		Payload: payload,
78	}
79	return qs.client.Send(&header, &rec)
80}
81
82// sendDone is used to signal the end
83func (qs *queryResponseStream) sendDone() error {
84	header := responseHeader{
85		Seq:   qs.seq,
86		Error: "",
87	}
88	rec := queryRecord{
89		Type: queryRecordDone,
90	}
91	return qs.client.Send(&header, &rec)
92}
93