1package goose
2
3import (
4	"bufio"
5	"errors"
6	"fmt"
7	"net"
8	"net/http"
9	"strings"
10	"sync"
11)
12
13var (
14	// ErrUnableToHijackRequest is returned by AddReceiver if the type
15	// conversion to http.Hijacker is unsuccessful
16	ErrUnableToHijackRequest = errors.New("Unable to hijack request")
17)
18
19// EventStream represents a collection of receivers
20type EventStream struct {
21	mutex     *sync.Mutex
22	receivers map[net.Conn]*EventReceiver
23}
24
25// NewEventStream creates a new event stream
26func NewEventStream() *EventStream {
27	return &EventStream{
28		mutex:     new(sync.Mutex),
29		receivers: make(map[net.Conn]*EventReceiver),
30	}
31}
32
33// EventReceiver represents a hijacked HTTP connection
34type EventReceiver struct {
35	stream *EventStream
36	conn   net.Conn
37	bufrw  *bufio.ReadWriter
38}
39
40// Notify sends the event to all event stream receivers
41func (es *EventStream) Notify(event string, bytes []byte) {
42	// TODO reader?
43
44	lines := strings.Split(string(bytes), "\n")
45
46	data := ""
47	for _, l := range lines {
48		data += event + ": " + l + "\n"
49	}
50
51	sz := len(data) + 1
52	size := fmt.Sprintf("%X", sz)
53
54	for _, er := range es.receivers {
55		go er.send(size, data)
56	}
57}
58
59func (er *EventReceiver) send(size, data string) {
60	_, err := er.write([]byte(size + "\r\n"))
61	if err != nil {
62		return
63	}
64
65	lines := strings.Split(data, "\n")
66	for _, ln := range lines {
67		_, err = er.write([]byte(ln + "\n"))
68		if err != nil {
69			return
70		}
71	}
72	er.write([]byte("\r\n"))
73}
74
75func (er *EventReceiver) write(bytes []byte) (int, error) {
76	n, err := er.bufrw.Write(bytes)
77
78	if err != nil {
79		er.stream.mutex.Lock()
80		delete(er.stream.receivers, er.conn)
81		er.stream.mutex.Unlock()
82		er.conn.Close()
83		return n, err
84	}
85
86	err = er.bufrw.Flush()
87	if err != nil {
88		er.stream.mutex.Lock()
89		delete(er.stream.receivers, er.conn)
90		er.stream.mutex.Unlock()
91		er.conn.Close()
92	}
93
94	return n, err
95}
96
97// AddReceiver hijacks a http.ResponseWriter and attaches it to the event stream
98func (es *EventStream) AddReceiver(w http.ResponseWriter) (*EventReceiver, error) {
99	w.Header().Set("Content-Type", "text/event-stream")
100	w.Header().Set("Cache-Control", "no-cache")
101	w.Header().Set("Connection", "keep-alive")
102
103	w.WriteHeader(200)
104
105	hj, ok := w.(http.Hijacker)
106	if !ok {
107		return nil, ErrUnableToHijackRequest
108	}
109
110	hjConn, hjBufrw, err := hj.Hijack()
111	if err != nil {
112		return nil, err
113	}
114
115	rec := &EventReceiver{es, hjConn, hjBufrw}
116
117	es.mutex.Lock()
118	es.receivers[hjConn] = rec
119	es.mutex.Unlock()
120
121	return rec, nil
122}
123