1package slack
2
3import (
4	"encoding/json"
5	"fmt"
6	"io"
7	"net/http"
8	stdurl "net/url"
9	"reflect"
10	"time"
11
12	"github.com/slack-go/slack/internal/backoff"
13	"github.com/slack-go/slack/internal/misc"
14
15	"github.com/gorilla/websocket"
16	"github.com/slack-go/slack/internal/errorsx"
17	"github.com/slack-go/slack/internal/timex"
18)
19
20// ManageConnection can be called on a Slack RTM instance returned by the
21// NewRTM method. It will connect to the slack RTM API and handle all incoming
22// and outgoing events. If a connection fails then it will attempt to reconnect
23// and will notify any listeners through an error event on the IncomingEvents
24// channel.
25//
26// If the connection ends and the disconnect was unintentional then this will
27// attempt to reconnect.
28//
29// This should only be called once per slack API! Otherwise expect undefined
30// behavior.
31//
32// The defined error events are located in websocket_internals.go.
33func (rtm *RTM) ManageConnection() {
34	var (
35		err  error
36		info *Info
37		conn *websocket.Conn
38	)
39
40	for connectionCount := 0; ; connectionCount++ {
41		// start trying to connect
42		// the returned err is already passed onto the IncomingEvents channel
43		if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil {
44			// when the connection is unsuccessful its fatal, and we need to bail out.
45			rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
46			rtm.disconnect()
47			return
48		}
49
50		// lock to prevent data races with Disconnect particularly around isConnected
51		// and conn.
52		rtm.mu.Lock()
53		rtm.conn = conn
54		rtm.info = info
55		rtm.mu.Unlock()
56
57		rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{
58			ConnectionCount: connectionCount,
59			Info:            info,
60		}}
61
62		rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
63
64		rawEvents := make(chan json.RawMessage)
65		// we're now connected so we can set up listeners
66		go rtm.handleIncomingEvents(rawEvents)
67		// this should be a blocking call until the connection has ended
68		rtm.handleEvents(rawEvents)
69
70		select {
71		case <-rtm.disconnected:
72			// after handle events returns we need to check if we're disconnected
73			// when this happens we need to cleanup the newly created connection.
74			if err = conn.Close(); err != nil {
75				rtm.Debugln("failed to close conn on disconnected RTM", err)
76			}
77			return
78		default:
79			// otherwise continue and run the loop again to reconnect
80		}
81	}
82}
83
84// connect attempts to connect to the slack websocket API. It handles any
85// errors that occur while connecting and will return once a connection
86// has been successfully opened.
87// If useRTMStart is false then it uses rtm.connect to create the connection,
88// otherwise it uses rtm.start.
89func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) {
90	const (
91		errInvalidAuth      = "invalid_auth"
92		errInactiveAccount  = "account_inactive"
93		errMissingAuthToken = "not_authed"
94		errTokenRevoked     = "token_revoked"
95	)
96
97	// used to provide exponential backoff wait time with jitter before trying
98	// to connect to slack again
99	boff := &backoff.Backoff{
100		Max: 5 * time.Minute,
101	}
102
103	for {
104		var (
105			backoff time.Duration
106		)
107
108		// send connecting event
109		rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{
110			Attempt:         boff.Attempts() + 1,
111			ConnectionCount: connectionCount,
112		}}
113
114		// attempt to start the connection
115		info, conn, err := rtm.startRTMAndDial(useRTMStart)
116		if err == nil {
117			return info, conn, nil
118		}
119
120		// check for fatal errors
121		switch err.Error() {
122		case errInvalidAuth, errInactiveAccount, errMissingAuthToken, errTokenRevoked:
123			rtm.Debugf("invalid auth when connecting with RTM: %s", err)
124			rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
125			return nil, nil, err
126		default:
127		}
128
129		switch actual := err.(type) {
130		case misc.StatusCodeError:
131			if actual.Code == http.StatusNotFound {
132				rtm.Debugf("invalid auth when connecting with RTM: %s", err)
133				rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
134				return nil, nil, err
135			}
136		case *RateLimitedError:
137			backoff = actual.RetryAfter
138		default:
139		}
140
141		backoff = timex.Max(backoff, boff.Duration())
142		// any other errors are treated as recoverable and we try again after
143		// sending the event along the IncomingEvents channel
144		rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{
145			Attempt:  boff.Attempts(),
146			Backoff:  backoff,
147			ErrorObj: err,
148		}}
149
150		// get time we should wait before attempting to connect again
151		rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.Attempts(), err, backoff)
152
153		// wait for one of the following to occur,
154		// backoff duration has elapsed, killChannel is signalled, or
155		// the rtm finishes disconnecting.
156		select {
157		case <-time.After(backoff): // retry after the backoff.
158		case intentional := <-rtm.killChannel:
159			if intentional {
160				rtm.killConnection(intentional, ErrRTMDisconnected)
161				return nil, nil, ErrRTMDisconnected
162			}
163		case <-rtm.disconnected:
164			return nil, nil, ErrRTMDisconnected
165		}
166	}
167}
168
169// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true,
170// then it returns the  full information returned by the "rtm.start" method on the
171// slack API. Else it uses the "rtm.connect" method to connect
172func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) {
173	var (
174		url string
175	)
176
177	if useRTMStart {
178		rtm.Debugf("Starting RTM")
179		info, url, err = rtm.StartRTM()
180	} else {
181		rtm.Debugf("Connecting to RTM")
182		info, url, err = rtm.ConnectRTM()
183	}
184	if err != nil {
185		rtm.Debugf("Failed to start or connect to RTM: %s", err)
186		return nil, nil, err
187	}
188
189	// install connection parameters
190	u, err := stdurl.Parse(url)
191	if err != nil {
192		return nil, nil, err
193	}
194	u.RawQuery = rtm.connParams.Encode()
195	url = u.String()
196
197	rtm.Debugf("Dialing to websocket on url %s", url)
198	// Only use HTTPS for connections to prevent MITM attacks on the connection.
199	upgradeHeader := http.Header{}
200	upgradeHeader.Add("Origin", "https://api.slack.com")
201	dialer := websocket.DefaultDialer
202	if rtm.dialer != nil {
203		dialer = rtm.dialer
204	}
205	conn, _, err := dialer.Dial(url, upgradeHeader)
206	if err != nil {
207		rtm.Debugf("Failed to dial to the websocket: %s", err)
208		return nil, nil, err
209	}
210	return info, conn, err
211}
212
213// killConnection stops the websocket connection and signals to all goroutines
214// that they should cease listening to the connection for events.
215//
216// This should not be called directly! Instead a boolean value (true for
217// intentional, false otherwise) should be sent to the killChannel on the RTM.
218func (rtm *RTM) killConnection(intentional bool, cause error) (err error) {
219	rtm.Debugln("killing connection", cause)
220
221	if rtm.conn != nil {
222		err = rtm.conn.Close()
223	}
224
225	rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}}
226
227	if intentional {
228		rtm.disconnect()
229	}
230
231	return err
232}
233
234// handleEvents is a blocking function that handles all events. This sends
235// pings when asked to (on rtm.forcePing) and upon every given elapsed
236// interval. This also sends outgoing messages that are received from the RTM's
237// outgoingMessages channel. This also handles incoming raw events from the RTM
238// rawEvents channel.
239func (rtm *RTM) handleEvents(events chan json.RawMessage) {
240	ticker := time.NewTicker(rtm.pingInterval)
241	defer ticker.Stop()
242	for {
243		select {
244		// catch "stop" signal on channel close
245		case intentional := <-rtm.killChannel:
246			_ = rtm.killConnection(intentional, errorsx.String("signaled"))
247			return
248		// detect when the connection is dead.
249		case <-rtm.pingDeadman.C:
250			_ = rtm.killConnection(false, ErrRTMDeadman)
251			return
252		// send pings on ticker interval
253		case <-ticker.C:
254			if err := rtm.ping(); err != nil {
255				_ = rtm.killConnection(false, err)
256				return
257			}
258		case <-rtm.forcePing:
259			if err := rtm.ping(); err != nil {
260				_ = rtm.killConnection(false, err)
261				return
262			}
263		// listen for messages that need to be sent
264		case msg := <-rtm.outgoingMessages:
265			rtm.sendOutgoingMessage(msg)
266			// listen for incoming messages that need to be parsed
267		case rawEvent := <-events:
268			switch rtm.handleRawEvent(rawEvent) {
269			case rtmEventTypeGoodbye:
270				// kill the connection, but DO NOT RETURN, a follow up kill signal will
271				// be sent that still needs to be processed. this duplication is because
272				// the event reader restarts once it emits the goodbye event.
273				// unlike the other cases in this function a final read will be triggered
274				// against the connection which will emit a kill signal. if we return early
275				// this kill signal will be processed by the next connection.
276				_ = rtm.killConnection(false, ErrRTMGoodbye)
277			default:
278			}
279		}
280	}
281}
282
283// handleIncomingEvents monitors the RTM's opened websocket for any incoming
284// events. It pushes the raw events into the channel.
285//
286// This will stop executing once the RTM's when a fatal error is detected, or
287// a disconnect occurs.
288func (rtm *RTM) handleIncomingEvents(events chan json.RawMessage) {
289	for {
290		if err := rtm.receiveIncomingEvent(events); err != nil {
291			select {
292			case rtm.killChannel <- false:
293			case <-rtm.disconnected:
294			}
295			return
296		}
297	}
298}
299
300func (rtm *RTM) sendWithDeadline(msg interface{}) error {
301	// set a write deadline on the connection
302	if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
303		return err
304	}
305	if err := rtm.conn.WriteJSON(msg); err != nil {
306		return err
307	}
308	// remove write deadline
309	return rtm.conn.SetWriteDeadline(time.Time{})
310}
311
312// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket.
313//
314// It does not currently detect if a outgoing message fails due to a disconnect
315// and instead lets a future failed 'PING' detect the failed connection.
316func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
317	rtm.Debugln("Sending message:", msg)
318	if len([]rune(msg.Text)) > MaxMessageTextLength {
319		rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{
320			Message:   msg,
321			MaxLength: MaxMessageTextLength,
322		}}
323		return
324	}
325
326	if err := rtm.sendWithDeadline(msg); err != nil {
327		rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{
328			Message:  msg,
329			ErrorObj: err,
330		}}
331	}
332}
333
334// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message
335// fails to send then this returns an error signifying that the connection
336// should be considered disconnected.
337//
338// This does not handle incoming 'PONG' responses but does store the time of
339// each successful 'PING' send so latency can be detected upon a 'PONG'
340// response.
341func (rtm *RTM) ping() error {
342	id := rtm.idGen.Next()
343	rtm.Debugln("Sending PING ", id)
344	msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()}
345
346	if err := rtm.sendWithDeadline(msg); err != nil {
347		rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error())
348		return err
349	}
350	return nil
351}
352
353// receiveIncomingEvent attempts to receive an event from the RTM's websocket.
354// This will block until a frame is available from the websocket.
355// If the read from the websocket results in a fatal error, this function will return non-nil.
356func (rtm *RTM) receiveIncomingEvent(events chan json.RawMessage) error {
357	event := json.RawMessage{}
358	err := rtm.conn.ReadJSON(&event)
359
360	// check if the connection was closed.
361	if websocket.IsUnexpectedCloseError(err) {
362		return err
363	}
364
365	switch {
366	case err == io.ErrUnexpectedEOF:
367		// EOF's don't seem to signify a failed connection so instead we ignore
368		// them here and detect a failed connection upon attempting to send a
369		// 'PING' message
370
371		// trigger a 'PING' to detect potential websocket disconnect
372		select {
373		case rtm.forcePing <- true:
374		case <-rtm.disconnected:
375		}
376	case err != nil:
377		// All other errors from ReadJSON come from NextReader, and should
378		// kill the read loop and force a reconnect.
379		rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
380			ErrorObj: err,
381		}}
382
383		return err
384	case len(event) == 0:
385		rtm.Debugln("Received empty event")
386	default:
387		rtm.Debugln("Incoming Event:", string(event))
388		select {
389		case events <- event:
390		case <-rtm.disconnected:
391			rtm.Debugln("disonnected while attempting to send raw event")
392		}
393	}
394
395	return nil
396}
397
398// handleRawEvent takes a raw JSON message received from the slack websocket
399// and handles the encoded event.
400// returns the event type of the message.
401func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string {
402	event := &Event{}
403	err := json.Unmarshal(rawEvent, event)
404	if err != nil {
405		rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
406		return ""
407	}
408
409	switch event.Type {
410	case rtmEventTypeAck:
411		rtm.handleAck(rawEvent)
412	case rtmEventTypeHello:
413		rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}}
414	case rtmEventTypePong:
415		rtm.handlePong(rawEvent)
416	case rtmEventTypeGoodbye:
417		// just return the event type up for goodbye, will be handled by caller.
418	default:
419		rtm.handleEvent(event.Type, rawEvent)
420	}
421
422	return event.Type
423}
424
425// handleAck handles an incoming 'ACK' message.
426func (rtm *RTM) handleAck(event json.RawMessage) {
427	ack := &AckMessage{}
428	if err := json.Unmarshal(event, ack); err != nil {
429		rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
430		rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
431		return
432	}
433
434	if ack.Ok {
435		rtm.IncomingEvents <- RTMEvent{"ack", ack}
436	} else if ack.RTMResponse.Error != nil {
437		// As there is no documentation for RTM error-codes, this
438		// identification of a rate-limit warning is very brittle.
439		if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." {
440			rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}}
441		} else {
442			rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error, ack.ReplyTo}}
443		}
444	} else {
445		rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ErrorObj: fmt.Errorf("ack decode failure")}}
446	}
447}
448
449// handlePong handles an incoming 'PONG' message which should be in response to
450// a previously sent 'PING' message. This is then used to compute the
451// connection's latency.
452func (rtm *RTM) handlePong(event json.RawMessage) {
453	var (
454		p Pong
455	)
456
457	rtm.resetDeadman()
458
459	if err := json.Unmarshal(event, &p); err != nil {
460		rtm.Client.log.Println("RTM Error unmarshalling 'pong' event:", err)
461		return
462	}
463
464	latency := time.Since(time.Unix(p.Timestamp, 0))
465	rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}}
466}
467
468// handleEvent is the "default" response to an event that does not have a
469// special case. It matches the command's name to a mapping of defined events
470// and then sends the corresponding event struct to the IncomingEvents channel.
471// If the event type is not found or the event cannot be unmarshalled into the
472// correct struct then this sends an UnmarshallingErrorEvent to the
473// IncomingEvents channel.
474func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
475	v, exists := EventMapping[typeStr]
476	if !exists {
477		rtm.Debugf("RTM Error - received unmapped event %q: %s\n", typeStr, string(event))
478		err := fmt.Errorf("RTM Error: Received unmapped event %q: %s", typeStr, string(event))
479		rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
480		return
481	}
482	t := reflect.TypeOf(v)
483	recvEvent := reflect.New(t).Interface()
484	err := json.Unmarshal(event, recvEvent)
485	if err != nil {
486		rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event))
487		err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s", typeStr, string(event))
488		rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
489		return
490	}
491	rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent}
492}
493
494// EventMapping holds a mapping of event names to their corresponding struct
495// implementations. The structs should be instances of the unmarshalling
496// target for the matching event type.
497var EventMapping = map[string]interface{}{
498	"message":         MessageEvent{},
499	"presence_change": PresenceChangeEvent{},
500	"user_typing":     UserTypingEvent{},
501
502	"channel_marked":          ChannelMarkedEvent{},
503	"channel_created":         ChannelCreatedEvent{},
504	"channel_joined":          ChannelJoinedEvent{},
505	"channel_left":            ChannelLeftEvent{},
506	"channel_deleted":         ChannelDeletedEvent{},
507	"channel_rename":          ChannelRenameEvent{},
508	"channel_archive":         ChannelArchiveEvent{},
509	"channel_unarchive":       ChannelUnarchiveEvent{},
510	"channel_history_changed": ChannelHistoryChangedEvent{},
511
512	"dnd_updated":      DNDUpdatedEvent{},
513	"dnd_updated_user": DNDUpdatedEvent{},
514
515	"im_created":         IMCreatedEvent{},
516	"im_open":            IMOpenEvent{},
517	"im_close":           IMCloseEvent{},
518	"im_marked":          IMMarkedEvent{},
519	"im_history_changed": IMHistoryChangedEvent{},
520
521	"group_marked":          GroupMarkedEvent{},
522	"group_open":            GroupOpenEvent{},
523	"group_joined":          GroupJoinedEvent{},
524	"group_left":            GroupLeftEvent{},
525	"group_close":           GroupCloseEvent{},
526	"group_rename":          GroupRenameEvent{},
527	"group_archive":         GroupArchiveEvent{},
528	"group_unarchive":       GroupUnarchiveEvent{},
529	"group_history_changed": GroupHistoryChangedEvent{},
530
531	"file_created":         FileCreatedEvent{},
532	"file_shared":          FileSharedEvent{},
533	"file_unshared":        FileUnsharedEvent{},
534	"file_public":          FilePublicEvent{},
535	"file_private":         FilePrivateEvent{},
536	"file_change":          FileChangeEvent{},
537	"file_deleted":         FileDeletedEvent{},
538	"file_comment_added":   FileCommentAddedEvent{},
539	"file_comment_edited":  FileCommentEditedEvent{},
540	"file_comment_deleted": FileCommentDeletedEvent{},
541
542	"pin_added":   PinAddedEvent{},
543	"pin_removed": PinRemovedEvent{},
544
545	"star_added":   StarAddedEvent{},
546	"star_removed": StarRemovedEvent{},
547
548	"reaction_added":   ReactionAddedEvent{},
549	"reaction_removed": ReactionRemovedEvent{},
550
551	"pref_change": PrefChangeEvent{},
552
553	"team_join":              TeamJoinEvent{},
554	"team_rename":            TeamRenameEvent{},
555	"team_pref_change":       TeamPrefChangeEvent{},
556	"team_domain_change":     TeamDomainChangeEvent{},
557	"team_migration_started": TeamMigrationStartedEvent{},
558
559	"manual_presence_change": ManualPresenceChangeEvent{},
560
561	"user_change": UserChangeEvent{},
562
563	"emoji_changed": EmojiChangedEvent{},
564
565	"commands_changed": CommandsChangedEvent{},
566
567	"email_domain_changed": EmailDomainChangedEvent{},
568
569	"bot_added":   BotAddedEvent{},
570	"bot_changed": BotChangedEvent{},
571
572	"accounts_changed": AccountsChangedEvent{},
573
574	"reconnect_url": ReconnectUrlEvent{},
575
576	"member_joined_channel": MemberJoinedChannelEvent{},
577	"member_left_channel":   MemberLeftChannelEvent{},
578
579	"subteam_created":         SubteamCreatedEvent{},
580	"subteam_members_changed": SubteamMembersChangedEvent{},
581	"subteam_self_added":      SubteamSelfAddedEvent{},
582	"subteam_self_removed":    SubteamSelfRemovedEvent{},
583	"subteam_updated":         SubteamUpdatedEvent{},
584
585	"desktop_notification":       DesktopNotificationEvent{},
586	"mobile_in_app_notification": MobileInAppNotificationEvent{},
587}
588