1package slack 2 3import ( 4 "encoding/json" 5 "fmt" 6 "io" 7 "net/http" 8 stdurl "net/url" 9 "reflect" 10 "time" 11 12 "github.com/slack-go/slack/internal/backoff" 13 "github.com/slack-go/slack/internal/misc" 14 15 "github.com/gorilla/websocket" 16 "github.com/slack-go/slack/internal/errorsx" 17 "github.com/slack-go/slack/internal/timex" 18) 19 20// ManageConnection can be called on a Slack RTM instance returned by the 21// NewRTM method. It will connect to the slack RTM API and handle all incoming 22// and outgoing events. If a connection fails then it will attempt to reconnect 23// and will notify any listeners through an error event on the IncomingEvents 24// channel. 25// 26// If the connection ends and the disconnect was unintentional then this will 27// attempt to reconnect. 28// 29// This should only be called once per slack API! Otherwise expect undefined 30// behavior. 31// 32// The defined error events are located in websocket_internals.go. 33func (rtm *RTM) ManageConnection() { 34 var ( 35 err error 36 info *Info 37 conn *websocket.Conn 38 ) 39 40 for connectionCount := 0; ; connectionCount++ { 41 // start trying to connect 42 // the returned err is already passed onto the IncomingEvents channel 43 if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil { 44 // when the connection is unsuccessful its fatal, and we need to bail out. 45 rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err) 46 rtm.disconnect() 47 return 48 } 49 50 // lock to prevent data races with Disconnect particularly around isConnected 51 // and conn. 52 rtm.mu.Lock() 53 rtm.conn = conn 54 rtm.info = info 55 rtm.mu.Unlock() 56 57 rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{ 58 ConnectionCount: connectionCount, 59 Info: info, 60 }} 61 62 rtm.Debugf("RTM connection succeeded on try %d", connectionCount) 63 64 rawEvents := make(chan json.RawMessage) 65 // we're now connected so we can set up listeners 66 go rtm.handleIncomingEvents(rawEvents) 67 // this should be a blocking call until the connection has ended 68 rtm.handleEvents(rawEvents) 69 70 select { 71 case <-rtm.disconnected: 72 // after handle events returns we need to check if we're disconnected 73 // when this happens we need to cleanup the newly created connection. 74 if err = conn.Close(); err != nil { 75 rtm.Debugln("failed to close conn on disconnected RTM", err) 76 } 77 return 78 default: 79 // otherwise continue and run the loop again to reconnect 80 } 81 } 82} 83 84// connect attempts to connect to the slack websocket API. It handles any 85// errors that occur while connecting and will return once a connection 86// has been successfully opened. 87// If useRTMStart is false then it uses rtm.connect to create the connection, 88// otherwise it uses rtm.start. 89func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) { 90 const ( 91 errInvalidAuth = "invalid_auth" 92 errInactiveAccount = "account_inactive" 93 errMissingAuthToken = "not_authed" 94 errTokenRevoked = "token_revoked" 95 ) 96 97 // used to provide exponential backoff wait time with jitter before trying 98 // to connect to slack again 99 boff := &backoff.Backoff{ 100 Max: 5 * time.Minute, 101 } 102 103 for { 104 var ( 105 backoff time.Duration 106 ) 107 108 // send connecting event 109 rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{ 110 Attempt: boff.Attempts() + 1, 111 ConnectionCount: connectionCount, 112 }} 113 114 // attempt to start the connection 115 info, conn, err := rtm.startRTMAndDial(useRTMStart) 116 if err == nil { 117 return info, conn, nil 118 } 119 120 // check for fatal errors 121 switch err.Error() { 122 case errInvalidAuth, errInactiveAccount, errMissingAuthToken, errTokenRevoked: 123 rtm.Debugf("invalid auth when connecting with RTM: %s", err) 124 rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} 125 return nil, nil, err 126 default: 127 } 128 129 switch actual := err.(type) { 130 case misc.StatusCodeError: 131 if actual.Code == http.StatusNotFound { 132 rtm.Debugf("invalid auth when connecting with RTM: %s", err) 133 rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} 134 return nil, nil, err 135 } 136 case *RateLimitedError: 137 backoff = actual.RetryAfter 138 default: 139 } 140 141 backoff = timex.Max(backoff, boff.Duration()) 142 // any other errors are treated as recoverable and we try again after 143 // sending the event along the IncomingEvents channel 144 rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{ 145 Attempt: boff.Attempts(), 146 Backoff: backoff, 147 ErrorObj: err, 148 }} 149 150 // get time we should wait before attempting to connect again 151 rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.Attempts(), err, backoff) 152 153 // wait for one of the following to occur, 154 // backoff duration has elapsed, killChannel is signalled, or 155 // the rtm finishes disconnecting. 156 select { 157 case <-time.After(backoff): // retry after the backoff. 158 case intentional := <-rtm.killChannel: 159 if intentional { 160 rtm.killConnection(intentional, ErrRTMDisconnected) 161 return nil, nil, ErrRTMDisconnected 162 } 163 case <-rtm.disconnected: 164 return nil, nil, ErrRTMDisconnected 165 } 166 } 167} 168 169// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true, 170// then it returns the full information returned by the "rtm.start" method on the 171// slack API. Else it uses the "rtm.connect" method to connect 172func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) { 173 var ( 174 url string 175 ) 176 177 if useRTMStart { 178 rtm.Debugf("Starting RTM") 179 info, url, err = rtm.StartRTM() 180 } else { 181 rtm.Debugf("Connecting to RTM") 182 info, url, err = rtm.ConnectRTM() 183 } 184 if err != nil { 185 rtm.Debugf("Failed to start or connect to RTM: %s", err) 186 return nil, nil, err 187 } 188 189 // install connection parameters 190 u, err := stdurl.Parse(url) 191 if err != nil { 192 return nil, nil, err 193 } 194 u.RawQuery = rtm.connParams.Encode() 195 url = u.String() 196 197 rtm.Debugf("Dialing to websocket on url %s", url) 198 // Only use HTTPS for connections to prevent MITM attacks on the connection. 199 upgradeHeader := http.Header{} 200 upgradeHeader.Add("Origin", "https://api.slack.com") 201 dialer := websocket.DefaultDialer 202 if rtm.dialer != nil { 203 dialer = rtm.dialer 204 } 205 conn, _, err := dialer.Dial(url, upgradeHeader) 206 if err != nil { 207 rtm.Debugf("Failed to dial to the websocket: %s", err) 208 return nil, nil, err 209 } 210 return info, conn, err 211} 212 213// killConnection stops the websocket connection and signals to all goroutines 214// that they should cease listening to the connection for events. 215// 216// This should not be called directly! Instead a boolean value (true for 217// intentional, false otherwise) should be sent to the killChannel on the RTM. 218func (rtm *RTM) killConnection(intentional bool, cause error) (err error) { 219 rtm.Debugln("killing connection", cause) 220 221 if rtm.conn != nil { 222 err = rtm.conn.Close() 223 } 224 225 rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}} 226 227 if intentional { 228 rtm.disconnect() 229 } 230 231 return err 232} 233 234// handleEvents is a blocking function that handles all events. This sends 235// pings when asked to (on rtm.forcePing) and upon every given elapsed 236// interval. This also sends outgoing messages that are received from the RTM's 237// outgoingMessages channel. This also handles incoming raw events from the RTM 238// rawEvents channel. 239func (rtm *RTM) handleEvents(events chan json.RawMessage) { 240 ticker := time.NewTicker(rtm.pingInterval) 241 defer ticker.Stop() 242 for { 243 select { 244 // catch "stop" signal on channel close 245 case intentional := <-rtm.killChannel: 246 _ = rtm.killConnection(intentional, errorsx.String("signaled")) 247 return 248 // detect when the connection is dead. 249 case <-rtm.pingDeadman.C: 250 _ = rtm.killConnection(false, ErrRTMDeadman) 251 return 252 // send pings on ticker interval 253 case <-ticker.C: 254 if err := rtm.ping(); err != nil { 255 _ = rtm.killConnection(false, err) 256 return 257 } 258 case <-rtm.forcePing: 259 if err := rtm.ping(); err != nil { 260 _ = rtm.killConnection(false, err) 261 return 262 } 263 // listen for messages that need to be sent 264 case msg := <-rtm.outgoingMessages: 265 rtm.sendOutgoingMessage(msg) 266 // listen for incoming messages that need to be parsed 267 case rawEvent := <-events: 268 switch rtm.handleRawEvent(rawEvent) { 269 case rtmEventTypeGoodbye: 270 // kill the connection, but DO NOT RETURN, a follow up kill signal will 271 // be sent that still needs to be processed. this duplication is because 272 // the event reader restarts once it emits the goodbye event. 273 // unlike the other cases in this function a final read will be triggered 274 // against the connection which will emit a kill signal. if we return early 275 // this kill signal will be processed by the next connection. 276 _ = rtm.killConnection(false, ErrRTMGoodbye) 277 default: 278 } 279 } 280 } 281} 282 283// handleIncomingEvents monitors the RTM's opened websocket for any incoming 284// events. It pushes the raw events into the channel. 285// 286// This will stop executing once the RTM's when a fatal error is detected, or 287// a disconnect occurs. 288func (rtm *RTM) handleIncomingEvents(events chan json.RawMessage) { 289 for { 290 if err := rtm.receiveIncomingEvent(events); err != nil { 291 select { 292 case rtm.killChannel <- false: 293 case <-rtm.disconnected: 294 } 295 return 296 } 297 } 298} 299 300func (rtm *RTM) sendWithDeadline(msg interface{}) error { 301 // set a write deadline on the connection 302 if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { 303 return err 304 } 305 if err := rtm.conn.WriteJSON(msg); err != nil { 306 return err 307 } 308 // remove write deadline 309 return rtm.conn.SetWriteDeadline(time.Time{}) 310} 311 312// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket. 313// 314// It does not currently detect if a outgoing message fails due to a disconnect 315// and instead lets a future failed 'PING' detect the failed connection. 316func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) { 317 rtm.Debugln("Sending message:", msg) 318 if len([]rune(msg.Text)) > MaxMessageTextLength { 319 rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{ 320 Message: msg, 321 MaxLength: MaxMessageTextLength, 322 }} 323 return 324 } 325 326 if err := rtm.sendWithDeadline(msg); err != nil { 327 rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{ 328 Message: msg, 329 ErrorObj: err, 330 }} 331 } 332} 333 334// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message 335// fails to send then this returns an error signifying that the connection 336// should be considered disconnected. 337// 338// This does not handle incoming 'PONG' responses but does store the time of 339// each successful 'PING' send so latency can be detected upon a 'PONG' 340// response. 341func (rtm *RTM) ping() error { 342 id := rtm.idGen.Next() 343 rtm.Debugln("Sending PING ", id) 344 msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()} 345 346 if err := rtm.sendWithDeadline(msg); err != nil { 347 rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error()) 348 return err 349 } 350 return nil 351} 352 353// receiveIncomingEvent attempts to receive an event from the RTM's websocket. 354// This will block until a frame is available from the websocket. 355// If the read from the websocket results in a fatal error, this function will return non-nil. 356func (rtm *RTM) receiveIncomingEvent(events chan json.RawMessage) error { 357 event := json.RawMessage{} 358 err := rtm.conn.ReadJSON(&event) 359 360 // check if the connection was closed. 361 if websocket.IsUnexpectedCloseError(err) { 362 return err 363 } 364 365 switch { 366 case err == io.ErrUnexpectedEOF: 367 // EOF's don't seem to signify a failed connection so instead we ignore 368 // them here and detect a failed connection upon attempting to send a 369 // 'PING' message 370 371 // trigger a 'PING' to detect potential websocket disconnect 372 select { 373 case rtm.forcePing <- true: 374 case <-rtm.disconnected: 375 } 376 case err != nil: 377 // All other errors from ReadJSON come from NextReader, and should 378 // kill the read loop and force a reconnect. 379 rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{ 380 ErrorObj: err, 381 }} 382 383 return err 384 case len(event) == 0: 385 rtm.Debugln("Received empty event") 386 default: 387 rtm.Debugln("Incoming Event:", string(event)) 388 select { 389 case events <- event: 390 case <-rtm.disconnected: 391 rtm.Debugln("disonnected while attempting to send raw event") 392 } 393 } 394 395 return nil 396} 397 398// handleRawEvent takes a raw JSON message received from the slack websocket 399// and handles the encoded event. 400// returns the event type of the message. 401func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string { 402 event := &Event{} 403 err := json.Unmarshal(rawEvent, event) 404 if err != nil { 405 rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} 406 return "" 407 } 408 409 switch event.Type { 410 case rtmEventTypeAck: 411 rtm.handleAck(rawEvent) 412 case rtmEventTypeHello: 413 rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}} 414 case rtmEventTypePong: 415 rtm.handlePong(rawEvent) 416 case rtmEventTypeGoodbye: 417 // just return the event type up for goodbye, will be handled by caller. 418 default: 419 rtm.handleEvent(event.Type, rawEvent) 420 } 421 422 return event.Type 423} 424 425// handleAck handles an incoming 'ACK' message. 426func (rtm *RTM) handleAck(event json.RawMessage) { 427 ack := &AckMessage{} 428 if err := json.Unmarshal(event, ack); err != nil { 429 rtm.Debugln("RTM Error unmarshalling 'ack' event:", err) 430 rtm.Debugln(" -> Erroneous 'ack' event:", string(event)) 431 return 432 } 433 434 if ack.Ok { 435 rtm.IncomingEvents <- RTMEvent{"ack", ack} 436 } else if ack.RTMResponse.Error != nil { 437 // As there is no documentation for RTM error-codes, this 438 // identification of a rate-limit warning is very brittle. 439 if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." { 440 rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}} 441 } else { 442 rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error, ack.ReplyTo}} 443 } 444 } else { 445 rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ErrorObj: fmt.Errorf("ack decode failure")}} 446 } 447} 448 449// handlePong handles an incoming 'PONG' message which should be in response to 450// a previously sent 'PING' message. This is then used to compute the 451// connection's latency. 452func (rtm *RTM) handlePong(event json.RawMessage) { 453 var ( 454 p Pong 455 ) 456 457 rtm.resetDeadman() 458 459 if err := json.Unmarshal(event, &p); err != nil { 460 rtm.Client.log.Println("RTM Error unmarshalling 'pong' event:", err) 461 return 462 } 463 464 latency := time.Since(time.Unix(p.Timestamp, 0)) 465 rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}} 466} 467 468// handleEvent is the "default" response to an event that does not have a 469// special case. It matches the command's name to a mapping of defined events 470// and then sends the corresponding event struct to the IncomingEvents channel. 471// If the event type is not found or the event cannot be unmarshalled into the 472// correct struct then this sends an UnmarshallingErrorEvent to the 473// IncomingEvents channel. 474func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) { 475 v, exists := EventMapping[typeStr] 476 if !exists { 477 rtm.Debugf("RTM Error - received unmapped event %q: %s\n", typeStr, string(event)) 478 err := fmt.Errorf("RTM Error: Received unmapped event %q: %s", typeStr, string(event)) 479 rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} 480 return 481 } 482 t := reflect.TypeOf(v) 483 recvEvent := reflect.New(t).Interface() 484 err := json.Unmarshal(event, recvEvent) 485 if err != nil { 486 rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event)) 487 err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s", typeStr, string(event)) 488 rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} 489 return 490 } 491 rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent} 492} 493 494// EventMapping holds a mapping of event names to their corresponding struct 495// implementations. The structs should be instances of the unmarshalling 496// target for the matching event type. 497var EventMapping = map[string]interface{}{ 498 "message": MessageEvent{}, 499 "presence_change": PresenceChangeEvent{}, 500 "user_typing": UserTypingEvent{}, 501 502 "channel_marked": ChannelMarkedEvent{}, 503 "channel_created": ChannelCreatedEvent{}, 504 "channel_joined": ChannelJoinedEvent{}, 505 "channel_left": ChannelLeftEvent{}, 506 "channel_deleted": ChannelDeletedEvent{}, 507 "channel_rename": ChannelRenameEvent{}, 508 "channel_archive": ChannelArchiveEvent{}, 509 "channel_unarchive": ChannelUnarchiveEvent{}, 510 "channel_history_changed": ChannelHistoryChangedEvent{}, 511 512 "dnd_updated": DNDUpdatedEvent{}, 513 "dnd_updated_user": DNDUpdatedEvent{}, 514 515 "im_created": IMCreatedEvent{}, 516 "im_open": IMOpenEvent{}, 517 "im_close": IMCloseEvent{}, 518 "im_marked": IMMarkedEvent{}, 519 "im_history_changed": IMHistoryChangedEvent{}, 520 521 "group_marked": GroupMarkedEvent{}, 522 "group_open": GroupOpenEvent{}, 523 "group_joined": GroupJoinedEvent{}, 524 "group_left": GroupLeftEvent{}, 525 "group_close": GroupCloseEvent{}, 526 "group_rename": GroupRenameEvent{}, 527 "group_archive": GroupArchiveEvent{}, 528 "group_unarchive": GroupUnarchiveEvent{}, 529 "group_history_changed": GroupHistoryChangedEvent{}, 530 531 "file_created": FileCreatedEvent{}, 532 "file_shared": FileSharedEvent{}, 533 "file_unshared": FileUnsharedEvent{}, 534 "file_public": FilePublicEvent{}, 535 "file_private": FilePrivateEvent{}, 536 "file_change": FileChangeEvent{}, 537 "file_deleted": FileDeletedEvent{}, 538 "file_comment_added": FileCommentAddedEvent{}, 539 "file_comment_edited": FileCommentEditedEvent{}, 540 "file_comment_deleted": FileCommentDeletedEvent{}, 541 542 "pin_added": PinAddedEvent{}, 543 "pin_removed": PinRemovedEvent{}, 544 545 "star_added": StarAddedEvent{}, 546 "star_removed": StarRemovedEvent{}, 547 548 "reaction_added": ReactionAddedEvent{}, 549 "reaction_removed": ReactionRemovedEvent{}, 550 551 "pref_change": PrefChangeEvent{}, 552 553 "team_join": TeamJoinEvent{}, 554 "team_rename": TeamRenameEvent{}, 555 "team_pref_change": TeamPrefChangeEvent{}, 556 "team_domain_change": TeamDomainChangeEvent{}, 557 "team_migration_started": TeamMigrationStartedEvent{}, 558 559 "manual_presence_change": ManualPresenceChangeEvent{}, 560 561 "user_change": UserChangeEvent{}, 562 563 "emoji_changed": EmojiChangedEvent{}, 564 565 "commands_changed": CommandsChangedEvent{}, 566 567 "email_domain_changed": EmailDomainChangedEvent{}, 568 569 "bot_added": BotAddedEvent{}, 570 "bot_changed": BotChangedEvent{}, 571 572 "accounts_changed": AccountsChangedEvent{}, 573 574 "reconnect_url": ReconnectUrlEvent{}, 575 576 "member_joined_channel": MemberJoinedChannelEvent{}, 577 "member_left_channel": MemberLeftChannelEvent{}, 578 579 "subteam_created": SubteamCreatedEvent{}, 580 "subteam_members_changed": SubteamMembersChangedEvent{}, 581 "subteam_self_added": SubteamSelfAddedEvent{}, 582 "subteam_self_removed": SubteamSelfRemovedEvent{}, 583 "subteam_updated": SubteamUpdatedEvent{}, 584 585 "desktop_notification": DesktopNotificationEvent{}, 586 "mobile_in_app_notification": MobileInAppNotificationEvent{}, 587} 588