1package goose 2 3import ( 4 "bufio" 5 "errors" 6 "fmt" 7 "net" 8 "net/http" 9 "strings" 10 "sync" 11) 12 13var ( 14 // ErrUnableToHijackRequest is returned by AddReceiver if the type 15 // conversion to http.Hijacker is unsuccessful 16 ErrUnableToHijackRequest = errors.New("Unable to hijack request") 17) 18 19// EventStream represents a collection of receivers 20type EventStream struct { 21 mutex *sync.Mutex 22 receivers map[net.Conn]*EventReceiver 23} 24 25// NewEventStream creates a new event stream 26func NewEventStream() *EventStream { 27 return &EventStream{ 28 mutex: new(sync.Mutex), 29 receivers: make(map[net.Conn]*EventReceiver), 30 } 31} 32 33// EventReceiver represents a hijacked HTTP connection 34type EventReceiver struct { 35 stream *EventStream 36 conn net.Conn 37 bufrw *bufio.ReadWriter 38} 39 40// Notify sends the event to all event stream receivers 41func (es *EventStream) Notify(event string, bytes []byte) { 42 // TODO reader? 43 44 lines := strings.Split(string(bytes), "\n") 45 46 data := "" 47 for _, l := range lines { 48 data += event + ": " + l + "\n" 49 } 50 51 sz := len(data) + 1 52 size := fmt.Sprintf("%X", sz) 53 54 for _, er := range es.receivers { 55 go er.send(size, data) 56 } 57} 58 59func (er *EventReceiver) send(size, data string) { 60 _, err := er.write([]byte(size + "\r\n")) 61 if err != nil { 62 return 63 } 64 65 lines := strings.Split(data, "\n") 66 for _, ln := range lines { 67 _, err = er.write([]byte(ln + "\n")) 68 if err != nil { 69 return 70 } 71 } 72 er.write([]byte("\r\n")) 73} 74 75func (er *EventReceiver) write(bytes []byte) (int, error) { 76 n, err := er.bufrw.Write(bytes) 77 78 if err != nil { 79 er.stream.mutex.Lock() 80 delete(er.stream.receivers, er.conn) 81 er.stream.mutex.Unlock() 82 er.conn.Close() 83 return n, err 84 } 85 86 err = er.bufrw.Flush() 87 if err != nil { 88 er.stream.mutex.Lock() 89 delete(er.stream.receivers, er.conn) 90 er.stream.mutex.Unlock() 91 er.conn.Close() 92 } 93 94 return n, err 95} 96 97// AddReceiver hijacks a http.ResponseWriter and attaches it to the event stream 98func (es *EventStream) AddReceiver(w http.ResponseWriter) (*EventReceiver, error) { 99 w.Header().Set("Content-Type", "text/event-stream") 100 w.Header().Set("Cache-Control", "no-cache") 101 w.Header().Set("Connection", "keep-alive") 102 103 w.WriteHeader(200) 104 105 hj, ok := w.(http.Hijacker) 106 if !ok { 107 return nil, ErrUnableToHijackRequest 108 } 109 110 hjConn, hjBufrw, err := hj.Hijack() 111 if err != nil { 112 return nil, err 113 } 114 115 rec := &EventReceiver{es, hjConn, hjBufrw} 116 117 es.mutex.Lock() 118 es.receivers[hjConn] = rec 119 es.mutex.Unlock() 120 121 return rec, nil 122} 123