1package slack
2
3import (
4	"encoding/json"
5	"fmt"
6	"io"
7	"net/http"
8	stdurl "net/url"
9	"reflect"
10	"time"
11
12	"github.com/gorilla/websocket"
13	"github.com/nlopes/slack/internal/errorsx"
14	"github.com/nlopes/slack/internal/timex"
15)
16
17// ManageConnection can be called on a Slack RTM instance returned by the
18// NewRTM method. It will connect to the slack RTM API and handle all incoming
19// and outgoing events. If a connection fails then it will attempt to reconnect
20// and will notify any listeners through an error event on the IncomingEvents
21// channel.
22//
23// If the connection ends and the disconnect was unintentional then this will
24// attempt to reconnect.
25//
26// This should only be called once per slack API! Otherwise expect undefined
27// behavior.
28//
29// The defined error events are located in websocket_internals.go.
30func (rtm *RTM) ManageConnection() {
31	var (
32		err  error
33		info *Info
34		conn *websocket.Conn
35	)
36
37	for connectionCount := 0; ; connectionCount++ {
38		// start trying to connect
39		// the returned err is already passed onto the IncomingEvents channel
40		if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil {
41			// when the connection is unsuccessful its fatal, and we need to bail out.
42			rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
43			rtm.disconnect()
44			return
45		}
46
47		// lock to prevent data races with Disconnect particularly around isConnected
48		// and conn.
49		rtm.mu.Lock()
50		rtm.conn = conn
51		rtm.info = info
52		rtm.mu.Unlock()
53
54		rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{
55			ConnectionCount: connectionCount,
56			Info:            info,
57		}}
58
59		rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
60
61		// we're now connected so we can set up listeners
62		go rtm.handleIncomingEvents()
63
64		// this should be a blocking call until the connection has ended
65		rtm.handleEvents()
66
67		select {
68		case <-rtm.disconnected:
69			// after handle events returns we need to check if we're disconnected
70			return
71		default:
72			// otherwise continue and run the loop again to reconnect
73		}
74	}
75}
76
77// connect attempts to connect to the slack websocket API. It handles any
78// errors that occur while connecting and will return once a connection
79// has been successfully opened.
80// If useRTMStart is false then it uses rtm.connect to create the connection,
81// otherwise it uses rtm.start.
82func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) {
83	const (
84		errInvalidAuth      = "invalid_auth"
85		errInactiveAccount  = "account_inactive"
86		errMissingAuthToken = "not_authed"
87	)
88
89	// used to provide exponential backoff wait time with jitter before trying
90	// to connect to slack again
91	boff := &backoff{
92		Max: 5 * time.Minute,
93	}
94
95	for {
96		var (
97			backoff time.Duration
98		)
99
100		// send connecting event
101		rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{
102			Attempt:         boff.attempts + 1,
103			ConnectionCount: connectionCount,
104		}}
105
106		// attempt to start the connection
107		info, conn, err := rtm.startRTMAndDial(useRTMStart)
108		if err == nil {
109			return info, conn, nil
110		}
111
112		// check for fatal errors
113		switch err.Error() {
114		case errInvalidAuth, errInactiveAccount, errMissingAuthToken:
115			rtm.Debugf("invalid auth when connecting with RTM: %s", err)
116			rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
117			return nil, nil, err
118		default:
119		}
120
121		switch actual := err.(type) {
122		case statusCodeError:
123			if actual.Code == http.StatusNotFound {
124				rtm.Debugf("invalid auth when connecting with RTM: %s", err)
125				rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
126				return nil, nil, err
127			}
128		case *RateLimitedError:
129			backoff = actual.RetryAfter
130		default:
131		}
132
133		backoff = timex.Max(backoff, boff.Duration())
134		// any other errors are treated as recoverable and we try again after
135		// sending the event along the IncomingEvents channel
136		rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{
137			Attempt:  boff.attempts,
138			Backoff:  backoff,
139			ErrorObj: err,
140		}}
141
142		// get time we should wait before attempting to connect again
143		rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff)
144
145		// wait for one of the following to occur,
146		// backoff duration has elapsed, killChannel is signalled, or
147		// the rtm finishes disconnecting.
148		select {
149		case <-time.After(backoff): // retry after the backoff.
150		case intentional := <-rtm.killChannel:
151			if intentional {
152				rtm.killConnection(intentional, ErrRTMDisconnected)
153				return nil, nil, ErrRTMDisconnected
154			}
155		case <-rtm.disconnected:
156			return nil, nil, ErrRTMDisconnected
157		}
158	}
159}
160
161// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true,
162// then it returns the  full information returned by the "rtm.start" method on the
163// slack API. Else it uses the "rtm.connect" method to connect
164func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) {
165	var (
166		url string
167	)
168
169	if useRTMStart {
170		rtm.Debugf("Starting RTM")
171		info, url, err = rtm.StartRTM()
172	} else {
173		rtm.Debugf("Connecting to RTM")
174		info, url, err = rtm.ConnectRTM()
175	}
176	if err != nil {
177		rtm.Debugf("Failed to start or connect to RTM: %s", err)
178		return nil, nil, err
179	}
180
181	// install connection parameters
182	u, err := stdurl.Parse(url)
183	if err != nil {
184		return nil, nil, err
185	}
186	u.RawQuery = rtm.connParams.Encode()
187	url = u.String()
188
189	rtm.Debugf("Dialing to websocket on url %s", url)
190	// Only use HTTPS for connections to prevent MITM attacks on the connection.
191	upgradeHeader := http.Header{}
192	upgradeHeader.Add("Origin", "https://api.slack.com")
193	dialer := websocket.DefaultDialer
194	if rtm.dialer != nil {
195		dialer = rtm.dialer
196	}
197	conn, _, err := dialer.Dial(url, upgradeHeader)
198	if err != nil {
199		rtm.Debugf("Failed to dial to the websocket: %s", err)
200		return nil, nil, err
201	}
202	return info, conn, err
203}
204
205// killConnection stops the websocket connection and signals to all goroutines
206// that they should cease listening to the connection for events.
207//
208// This should not be called directly! Instead a boolean value (true for
209// intentional, false otherwise) should be sent to the killChannel on the RTM.
210func (rtm *RTM) killConnection(intentional bool, cause error) (err error) {
211	rtm.Debugln("killing connection")
212
213	if rtm.conn != nil {
214		err = rtm.conn.Close()
215	}
216
217	rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}}
218
219	if intentional {
220		rtm.disconnect()
221	}
222
223	return err
224}
225
226// handleEvents is a blocking function that handles all events. This sends
227// pings when asked to (on rtm.forcePing) and upon every given elapsed
228// interval. This also sends outgoing messages that are received from the RTM's
229// outgoingMessages channel. This also handles incoming raw events from the RTM
230// rawEvents channel.
231func (rtm *RTM) handleEvents() {
232	ticker := time.NewTicker(rtm.pingInterval)
233	defer ticker.Stop()
234	for {
235		select {
236		// catch "stop" signal on channel close
237		case intentional := <-rtm.killChannel:
238			_ = rtm.killConnection(intentional, errorsx.String("signaled"))
239			return
240		// detect when the connection is dead.
241		case <-rtm.pingDeadman.C:
242			_ = rtm.killConnection(false, errorsx.String("deadman switch triggered"))
243			return
244		// send pings on ticker interval
245		case <-ticker.C:
246			if err := rtm.ping(); err != nil {
247				_ = rtm.killConnection(false, err)
248				return
249			}
250		case <-rtm.forcePing:
251			if err := rtm.ping(); err != nil {
252				_ = rtm.killConnection(false, err)
253				return
254			}
255		// listen for messages that need to be sent
256		case msg := <-rtm.outgoingMessages:
257			rtm.sendOutgoingMessage(msg)
258		// listen for incoming messages that need to be parsed
259		case rawEvent := <-rtm.rawEvents:
260			switch rtm.handleRawEvent(rawEvent) {
261			case rtmEventTypeGoodbye:
262				_ = rtm.killConnection(false, errorsx.String("goodbye detected"))
263				return
264			default:
265			}
266		}
267	}
268}
269
270// handleIncomingEvents monitors the RTM's opened websocket for any incoming
271// events. It pushes the raw events onto the RTM channel rawEvents.
272//
273// This will stop executing once the RTM's keepRunning channel has been closed
274// or has anything sent to it.
275func (rtm *RTM) handleIncomingEvents() {
276	for {
277		if err := rtm.receiveIncomingEvent(); err != nil {
278			return
279		}
280	}
281}
282
283func (rtm *RTM) sendWithDeadline(msg interface{}) error {
284	// set a write deadline on the connection
285	if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
286		return err
287	}
288	if err := rtm.conn.WriteJSON(msg); err != nil {
289		return err
290	}
291	// remove write deadline
292	return rtm.conn.SetWriteDeadline(time.Time{})
293}
294
295// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket.
296//
297// It does not currently detect if a outgoing message fails due to a disconnect
298// and instead lets a future failed 'PING' detect the failed connection.
299func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
300	rtm.Debugln("Sending message:", msg)
301	if len([]rune(msg.Text)) > MaxMessageTextLength {
302		rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{
303			Message:   msg,
304			MaxLength: MaxMessageTextLength,
305		}}
306		return
307	}
308
309	if err := rtm.sendWithDeadline(msg); err != nil {
310		rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{
311			Message:  msg,
312			ErrorObj: err,
313		}}
314	}
315}
316
317// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message
318// fails to send then this returns an error signifying that the connection
319// should be considered disconnected.
320//
321// This does not handle incoming 'PONG' responses but does store the time of
322// each successful 'PING' send so latency can be detected upon a 'PONG'
323// response.
324func (rtm *RTM) ping() error {
325	id := rtm.idGen.Next()
326	rtm.Debugln("Sending PING ", id)
327	msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()}
328
329	if err := rtm.sendWithDeadline(msg); err != nil {
330		rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error())
331		return err
332	}
333	return nil
334}
335
336// receiveIncomingEvent attempts to receive an event from the RTM's websocket.
337// This will block until a frame is available from the websocket.
338// If the read from the websocket results in a fatal error, this function will return non-nil.
339func (rtm *RTM) receiveIncomingEvent() error {
340	event := json.RawMessage{}
341	err := rtm.conn.ReadJSON(&event)
342	switch {
343	case err == io.ErrUnexpectedEOF:
344		// EOF's don't seem to signify a failed connection so instead we ignore
345		// them here and detect a failed connection upon attempting to send a
346		// 'PING' message
347
348		// trigger a 'PING' to detect potential websocket disconnect
349		select {
350		case rtm.forcePing <- true:
351		case <-rtm.disconnected:
352		}
353	case err != nil:
354		// All other errors from ReadJSON come from NextReader, and should
355		// kill the read loop and force a reconnect.
356		rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
357			ErrorObj: err,
358		}}
359
360		select {
361		case rtm.killChannel <- false:
362		case <-rtm.disconnected:
363		}
364
365		return err
366	case len(event) == 0:
367		rtm.Debugln("Received empty event")
368	default:
369		rtm.Debugln("Incoming Event:", string(event))
370		select {
371		case rtm.rawEvents <- event:
372		case <-rtm.disconnected:
373			rtm.Debugln("disonnected while attempting to send raw event")
374		}
375	}
376	return nil
377}
378
379// handleRawEvent takes a raw JSON message received from the slack websocket
380// and handles the encoded event.
381// returns the event type of the message.
382func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string {
383	event := &Event{}
384	err := json.Unmarshal(rawEvent, event)
385	if err != nil {
386		rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
387		return ""
388	}
389
390	switch event.Type {
391	case rtmEventTypeAck:
392		rtm.handleAck(rawEvent)
393	case rtmEventTypeHello:
394		rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}}
395	case rtmEventTypePong:
396		rtm.handlePong(rawEvent)
397	case rtmEventTypeGoodbye:
398		// just return the event type up for goodbye, will be handled by caller.
399	case rtmEventTypeDesktopNotification:
400		rtm.Debugln("Received desktop notification, ignoring")
401	default:
402		rtm.handleEvent(event.Type, rawEvent)
403	}
404
405	return event.Type
406}
407
408// handleAck handles an incoming 'ACK' message.
409func (rtm *RTM) handleAck(event json.RawMessage) {
410	ack := &AckMessage{}
411	if err := json.Unmarshal(event, ack); err != nil {
412		rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
413		rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
414		return
415	}
416
417	if ack.Ok {
418		rtm.IncomingEvents <- RTMEvent{"ack", ack}
419	} else if ack.RTMResponse.Error != nil {
420		// As there is no documentation for RTM error-codes, this
421		// identification of a rate-limit warning is very brittle.
422		if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." {
423			rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}}
424		} else {
425			rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}}
426		}
427	} else {
428		rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}}
429	}
430}
431
432// handlePong handles an incoming 'PONG' message which should be in response to
433// a previously sent 'PING' message. This is then used to compute the
434// connection's latency.
435func (rtm *RTM) handlePong(event json.RawMessage) {
436	var (
437		p Pong
438	)
439
440	rtm.resetDeadman()
441
442	if err := json.Unmarshal(event, &p); err != nil {
443		rtm.Client.log.Println("RTM Error unmarshalling 'pong' event:", err)
444		return
445	}
446
447	latency := time.Since(time.Unix(p.Timestamp, 0))
448	rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}}
449}
450
451// handleEvent is the "default" response to an event that does not have a
452// special case. It matches the command's name to a mapping of defined events
453// and then sends the corresponding event struct to the IncomingEvents channel.
454// If the event type is not found or the event cannot be unmarshalled into the
455// correct struct then this sends an UnmarshallingErrorEvent to the
456// IncomingEvents channel.
457func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
458	v, exists := EventMapping[typeStr]
459	if !exists {
460		rtm.Debugf("RTM Error - received unmapped event %q: %s\n", typeStr, string(event))
461		err := fmt.Errorf("RTM Error: Received unmapped event %q: %s", typeStr, string(event))
462		rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
463		return
464	}
465	t := reflect.TypeOf(v)
466	recvEvent := reflect.New(t).Interface()
467	err := json.Unmarshal(event, recvEvent)
468	if err != nil {
469		rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event))
470		err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s", typeStr, string(event))
471		rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
472		return
473	}
474	rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent}
475}
476
477// EventMapping holds a mapping of event names to their corresponding struct
478// implementations. The structs should be instances of the unmarshalling
479// target for the matching event type.
480var EventMapping = map[string]interface{}{
481	"message":         MessageEvent{},
482	"presence_change": PresenceChangeEvent{},
483	"user_typing":     UserTypingEvent{},
484
485	"channel_marked":          ChannelMarkedEvent{},
486	"channel_created":         ChannelCreatedEvent{},
487	"channel_joined":          ChannelJoinedEvent{},
488	"channel_left":            ChannelLeftEvent{},
489	"channel_deleted":         ChannelDeletedEvent{},
490	"channel_rename":          ChannelRenameEvent{},
491	"channel_archive":         ChannelArchiveEvent{},
492	"channel_unarchive":       ChannelUnarchiveEvent{},
493	"channel_history_changed": ChannelHistoryChangedEvent{},
494
495	"dnd_updated":      DNDUpdatedEvent{},
496	"dnd_updated_user": DNDUpdatedEvent{},
497
498	"im_created":         IMCreatedEvent{},
499	"im_open":            IMOpenEvent{},
500	"im_close":           IMCloseEvent{},
501	"im_marked":          IMMarkedEvent{},
502	"im_history_changed": IMHistoryChangedEvent{},
503
504	"group_marked":          GroupMarkedEvent{},
505	"group_open":            GroupOpenEvent{},
506	"group_joined":          GroupJoinedEvent{},
507	"group_left":            GroupLeftEvent{},
508	"group_close":           GroupCloseEvent{},
509	"group_rename":          GroupRenameEvent{},
510	"group_archive":         GroupArchiveEvent{},
511	"group_unarchive":       GroupUnarchiveEvent{},
512	"group_history_changed": GroupHistoryChangedEvent{},
513
514	"file_created":         FileCreatedEvent{},
515	"file_shared":          FileSharedEvent{},
516	"file_unshared":        FileUnsharedEvent{},
517	"file_public":          FilePublicEvent{},
518	"file_private":         FilePrivateEvent{},
519	"file_change":          FileChangeEvent{},
520	"file_deleted":         FileDeletedEvent{},
521	"file_comment_added":   FileCommentAddedEvent{},
522	"file_comment_edited":  FileCommentEditedEvent{},
523	"file_comment_deleted": FileCommentDeletedEvent{},
524
525	"pin_added":   PinAddedEvent{},
526	"pin_removed": PinRemovedEvent{},
527
528	"star_added":   StarAddedEvent{},
529	"star_removed": StarRemovedEvent{},
530
531	"reaction_added":   ReactionAddedEvent{},
532	"reaction_removed": ReactionRemovedEvent{},
533
534	"pref_change": PrefChangeEvent{},
535
536	"team_join":              TeamJoinEvent{},
537	"team_rename":            TeamRenameEvent{},
538	"team_pref_change":       TeamPrefChangeEvent{},
539	"team_domain_change":     TeamDomainChangeEvent{},
540	"team_migration_started": TeamMigrationStartedEvent{},
541
542	"manual_presence_change": ManualPresenceChangeEvent{},
543
544	"user_change": UserChangeEvent{},
545
546	"emoji_changed": EmojiChangedEvent{},
547
548	"commands_changed": CommandsChangedEvent{},
549
550	"email_domain_changed": EmailDomainChangedEvent{},
551
552	"bot_added":   BotAddedEvent{},
553	"bot_changed": BotChangedEvent{},
554
555	"accounts_changed": AccountsChangedEvent{},
556
557	"reconnect_url": ReconnectUrlEvent{},
558
559	"member_joined_channel": MemberJoinedChannelEvent{},
560	"member_left_channel":   MemberLeftChannelEvent{},
561
562	"subteam_created":      SubteamCreatedEvent{},
563	"subteam_self_added":   SubteamSelfAddedEvent{},
564	"subteam_self_removed": SubteamSelfRemovedEvent{},
565	"subteam_updated":      SubteamUpdatedEvent{},
566}
567