1package agent 2 3import ( 4 "log" 5 "time" 6 7 "github.com/hashicorp/serf/serf" 8) 9 10// queryResponseStream is used to stream the query results back to a client 11type queryResponseStream struct { 12 client streamClient 13 logger *log.Logger 14 seq uint64 15} 16 17func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream { 18 qs := &queryResponseStream{ 19 client: client, 20 logger: logger, 21 seq: seq, 22 } 23 return qs 24} 25 26// Stream is a long running routine used to stream the results of a query back to a client 27func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) { 28 // Setup a timer for the query ending 29 remaining := resp.Deadline().Sub(time.Now()) 30 done := time.After(remaining) 31 32 ackCh := resp.AckCh() 33 respCh := resp.ResponseCh() 34 for { 35 select { 36 case a := <-ackCh: 37 if err := qs.sendAck(a); err != nil { 38 qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err) 39 return 40 } 41 case r := <-respCh: 42 if err := qs.sendResponse(r.From, r.Payload); err != nil { 43 qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err) 44 return 45 } 46 case <-done: 47 if err := qs.sendDone(); err != nil { 48 qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err) 49 } 50 return 51 } 52 } 53} 54 55// sendAck is used to send a single ack 56func (qs *queryResponseStream) sendAck(from string) error { 57 header := responseHeader{ 58 Seq: qs.seq, 59 Error: "", 60 } 61 rec := queryRecord{ 62 Type: queryRecordAck, 63 From: from, 64 } 65 return qs.client.Send(&header, &rec) 66} 67 68// sendResponse is used to send a single response 69func (qs *queryResponseStream) sendResponse(from string, payload []byte) error { 70 header := responseHeader{ 71 Seq: qs.seq, 72 Error: "", 73 } 74 rec := queryRecord{ 75 Type: queryRecordResponse, 76 From: from, 77 Payload: payload, 78 } 79 return qs.client.Send(&header, &rec) 80} 81 82// sendDone is used to signal the end 83func (qs *queryResponseStream) sendDone() error { 84 header := responseHeader{ 85 Seq: qs.seq, 86 Error: "", 87 } 88 rec := queryRecord{ 89 Type: queryRecordDone, 90 } 91 return qs.client.Send(&header, &rec) 92} 93