1package slack 2 3import ( 4 "encoding/json" 5 "fmt" 6 "io" 7 "net/http" 8 stdurl "net/url" 9 "reflect" 10 "time" 11 12 "github.com/gorilla/websocket" 13 "github.com/nlopes/slack/internal/errorsx" 14 "github.com/nlopes/slack/internal/timex" 15) 16 17// ManageConnection can be called on a Slack RTM instance returned by the 18// NewRTM method. It will connect to the slack RTM API and handle all incoming 19// and outgoing events. If a connection fails then it will attempt to reconnect 20// and will notify any listeners through an error event on the IncomingEvents 21// channel. 22// 23// If the connection ends and the disconnect was unintentional then this will 24// attempt to reconnect. 25// 26// This should only be called once per slack API! Otherwise expect undefined 27// behavior. 28// 29// The defined error events are located in websocket_internals.go. 30func (rtm *RTM) ManageConnection() { 31 var ( 32 err error 33 info *Info 34 conn *websocket.Conn 35 ) 36 37 for connectionCount := 0; ; connectionCount++ { 38 // start trying to connect 39 // the returned err is already passed onto the IncomingEvents channel 40 if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil { 41 // when the connection is unsuccessful its fatal, and we need to bail out. 42 rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err) 43 rtm.disconnect() 44 return 45 } 46 47 // lock to prevent data races with Disconnect particularly around isConnected 48 // and conn. 49 rtm.mu.Lock() 50 rtm.conn = conn 51 rtm.info = info 52 rtm.mu.Unlock() 53 54 rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{ 55 ConnectionCount: connectionCount, 56 Info: info, 57 }} 58 59 rtm.Debugf("RTM connection succeeded on try %d", connectionCount) 60 61 // we're now connected so we can set up listeners 62 go rtm.handleIncomingEvents() 63 64 // this should be a blocking call until the connection has ended 65 rtm.handleEvents() 66 67 select { 68 case <-rtm.disconnected: 69 // after handle events returns we need to check if we're disconnected 70 return 71 default: 72 // otherwise continue and run the loop again to reconnect 73 } 74 } 75} 76 77// connect attempts to connect to the slack websocket API. It handles any 78// errors that occur while connecting and will return once a connection 79// has been successfully opened. 80// If useRTMStart is false then it uses rtm.connect to create the connection, 81// otherwise it uses rtm.start. 82func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) { 83 const ( 84 errInvalidAuth = "invalid_auth" 85 errInactiveAccount = "account_inactive" 86 errMissingAuthToken = "not_authed" 87 ) 88 89 // used to provide exponential backoff wait time with jitter before trying 90 // to connect to slack again 91 boff := &backoff{ 92 Max: 5 * time.Minute, 93 } 94 95 for { 96 var ( 97 backoff time.Duration 98 ) 99 100 // send connecting event 101 rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{ 102 Attempt: boff.attempts + 1, 103 ConnectionCount: connectionCount, 104 }} 105 106 // attempt to start the connection 107 info, conn, err := rtm.startRTMAndDial(useRTMStart) 108 if err == nil { 109 return info, conn, nil 110 } 111 112 // check for fatal errors 113 switch err.Error() { 114 case errInvalidAuth, errInactiveAccount, errMissingAuthToken: 115 rtm.Debugf("invalid auth when connecting with RTM: %s", err) 116 rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} 117 return nil, nil, err 118 default: 119 } 120 121 switch actual := err.(type) { 122 case statusCodeError: 123 if actual.Code == http.StatusNotFound { 124 rtm.Debugf("invalid auth when connecting with RTM: %s", err) 125 rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} 126 return nil, nil, err 127 } 128 case *RateLimitedError: 129 backoff = actual.RetryAfter 130 default: 131 } 132 133 backoff = timex.Max(backoff, boff.Duration()) 134 // any other errors are treated as recoverable and we try again after 135 // sending the event along the IncomingEvents channel 136 rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{ 137 Attempt: boff.attempts, 138 Backoff: backoff, 139 ErrorObj: err, 140 }} 141 142 // get time we should wait before attempting to connect again 143 rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff) 144 145 // wait for one of the following to occur, 146 // backoff duration has elapsed, killChannel is signalled, or 147 // the rtm finishes disconnecting. 148 select { 149 case <-time.After(backoff): // retry after the backoff. 150 case intentional := <-rtm.killChannel: 151 if intentional { 152 rtm.killConnection(intentional, ErrRTMDisconnected) 153 return nil, nil, ErrRTMDisconnected 154 } 155 case <-rtm.disconnected: 156 return nil, nil, ErrRTMDisconnected 157 } 158 } 159} 160 161// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true, 162// then it returns the full information returned by the "rtm.start" method on the 163// slack API. Else it uses the "rtm.connect" method to connect 164func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) { 165 var ( 166 url string 167 ) 168 169 if useRTMStart { 170 rtm.Debugf("Starting RTM") 171 info, url, err = rtm.StartRTM() 172 } else { 173 rtm.Debugf("Connecting to RTM") 174 info, url, err = rtm.ConnectRTM() 175 } 176 if err != nil { 177 rtm.Debugf("Failed to start or connect to RTM: %s", err) 178 return nil, nil, err 179 } 180 181 // install connection parameters 182 u, err := stdurl.Parse(url) 183 if err != nil { 184 return nil, nil, err 185 } 186 u.RawQuery = rtm.connParams.Encode() 187 url = u.String() 188 189 rtm.Debugf("Dialing to websocket on url %s", url) 190 // Only use HTTPS for connections to prevent MITM attacks on the connection. 191 upgradeHeader := http.Header{} 192 upgradeHeader.Add("Origin", "https://api.slack.com") 193 dialer := websocket.DefaultDialer 194 if rtm.dialer != nil { 195 dialer = rtm.dialer 196 } 197 conn, _, err := dialer.Dial(url, upgradeHeader) 198 if err != nil { 199 rtm.Debugf("Failed to dial to the websocket: %s", err) 200 return nil, nil, err 201 } 202 return info, conn, err 203} 204 205// killConnection stops the websocket connection and signals to all goroutines 206// that they should cease listening to the connection for events. 207// 208// This should not be called directly! Instead a boolean value (true for 209// intentional, false otherwise) should be sent to the killChannel on the RTM. 210func (rtm *RTM) killConnection(intentional bool, cause error) (err error) { 211 rtm.Debugln("killing connection") 212 213 if rtm.conn != nil { 214 err = rtm.conn.Close() 215 } 216 217 rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}} 218 219 if intentional { 220 rtm.disconnect() 221 } 222 223 return err 224} 225 226// handleEvents is a blocking function that handles all events. This sends 227// pings when asked to (on rtm.forcePing) and upon every given elapsed 228// interval. This also sends outgoing messages that are received from the RTM's 229// outgoingMessages channel. This also handles incoming raw events from the RTM 230// rawEvents channel. 231func (rtm *RTM) handleEvents() { 232 ticker := time.NewTicker(rtm.pingInterval) 233 defer ticker.Stop() 234 for { 235 select { 236 // catch "stop" signal on channel close 237 case intentional := <-rtm.killChannel: 238 _ = rtm.killConnection(intentional, errorsx.String("signaled")) 239 return 240 // detect when the connection is dead. 241 case <-rtm.pingDeadman.C: 242 _ = rtm.killConnection(false, errorsx.String("deadman switch triggered")) 243 return 244 // send pings on ticker interval 245 case <-ticker.C: 246 if err := rtm.ping(); err != nil { 247 _ = rtm.killConnection(false, err) 248 return 249 } 250 case <-rtm.forcePing: 251 if err := rtm.ping(); err != nil { 252 _ = rtm.killConnection(false, err) 253 return 254 } 255 // listen for messages that need to be sent 256 case msg := <-rtm.outgoingMessages: 257 rtm.sendOutgoingMessage(msg) 258 // listen for incoming messages that need to be parsed 259 case rawEvent := <-rtm.rawEvents: 260 switch rtm.handleRawEvent(rawEvent) { 261 case rtmEventTypeGoodbye: 262 _ = rtm.killConnection(false, errorsx.String("goodbye detected")) 263 return 264 default: 265 } 266 } 267 } 268} 269 270// handleIncomingEvents monitors the RTM's opened websocket for any incoming 271// events. It pushes the raw events onto the RTM channel rawEvents. 272// 273// This will stop executing once the RTM's keepRunning channel has been closed 274// or has anything sent to it. 275func (rtm *RTM) handleIncomingEvents() { 276 for { 277 if err := rtm.receiveIncomingEvent(); err != nil { 278 return 279 } 280 } 281} 282 283func (rtm *RTM) sendWithDeadline(msg interface{}) error { 284 // set a write deadline on the connection 285 if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { 286 return err 287 } 288 if err := rtm.conn.WriteJSON(msg); err != nil { 289 return err 290 } 291 // remove write deadline 292 return rtm.conn.SetWriteDeadline(time.Time{}) 293} 294 295// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket. 296// 297// It does not currently detect if a outgoing message fails due to a disconnect 298// and instead lets a future failed 'PING' detect the failed connection. 299func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) { 300 rtm.Debugln("Sending message:", msg) 301 if len([]rune(msg.Text)) > MaxMessageTextLength { 302 rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{ 303 Message: msg, 304 MaxLength: MaxMessageTextLength, 305 }} 306 return 307 } 308 309 if err := rtm.sendWithDeadline(msg); err != nil { 310 rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{ 311 Message: msg, 312 ErrorObj: err, 313 }} 314 } 315} 316 317// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message 318// fails to send then this returns an error signifying that the connection 319// should be considered disconnected. 320// 321// This does not handle incoming 'PONG' responses but does store the time of 322// each successful 'PING' send so latency can be detected upon a 'PONG' 323// response. 324func (rtm *RTM) ping() error { 325 id := rtm.idGen.Next() 326 rtm.Debugln("Sending PING ", id) 327 msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()} 328 329 if err := rtm.sendWithDeadline(msg); err != nil { 330 rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error()) 331 return err 332 } 333 return nil 334} 335 336// receiveIncomingEvent attempts to receive an event from the RTM's websocket. 337// This will block until a frame is available from the websocket. 338// If the read from the websocket results in a fatal error, this function will return non-nil. 339func (rtm *RTM) receiveIncomingEvent() error { 340 event := json.RawMessage{} 341 err := rtm.conn.ReadJSON(&event) 342 switch { 343 case err == io.ErrUnexpectedEOF: 344 // EOF's don't seem to signify a failed connection so instead we ignore 345 // them here and detect a failed connection upon attempting to send a 346 // 'PING' message 347 348 // trigger a 'PING' to detect potential websocket disconnect 349 select { 350 case rtm.forcePing <- true: 351 case <-rtm.disconnected: 352 } 353 case err != nil: 354 // All other errors from ReadJSON come from NextReader, and should 355 // kill the read loop and force a reconnect. 356 rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{ 357 ErrorObj: err, 358 }} 359 360 select { 361 case rtm.killChannel <- false: 362 case <-rtm.disconnected: 363 } 364 365 return err 366 case len(event) == 0: 367 rtm.Debugln("Received empty event") 368 default: 369 rtm.Debugln("Incoming Event:", string(event)) 370 select { 371 case rtm.rawEvents <- event: 372 case <-rtm.disconnected: 373 rtm.Debugln("disonnected while attempting to send raw event") 374 } 375 } 376 return nil 377} 378 379// handleRawEvent takes a raw JSON message received from the slack websocket 380// and handles the encoded event. 381// returns the event type of the message. 382func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string { 383 event := &Event{} 384 err := json.Unmarshal(rawEvent, event) 385 if err != nil { 386 rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} 387 return "" 388 } 389 390 switch event.Type { 391 case rtmEventTypeAck: 392 rtm.handleAck(rawEvent) 393 case rtmEventTypeHello: 394 rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}} 395 case rtmEventTypePong: 396 rtm.handlePong(rawEvent) 397 case rtmEventTypeGoodbye: 398 // just return the event type up for goodbye, will be handled by caller. 399 case rtmEventTypeDesktopNotification: 400 rtm.Debugln("Received desktop notification, ignoring") 401 default: 402 rtm.handleEvent(event.Type, rawEvent) 403 } 404 405 return event.Type 406} 407 408// handleAck handles an incoming 'ACK' message. 409func (rtm *RTM) handleAck(event json.RawMessage) { 410 ack := &AckMessage{} 411 if err := json.Unmarshal(event, ack); err != nil { 412 rtm.Debugln("RTM Error unmarshalling 'ack' event:", err) 413 rtm.Debugln(" -> Erroneous 'ack' event:", string(event)) 414 return 415 } 416 417 if ack.Ok { 418 rtm.IncomingEvents <- RTMEvent{"ack", ack} 419 } else if ack.RTMResponse.Error != nil { 420 // As there is no documentation for RTM error-codes, this 421 // identification of a rate-limit warning is very brittle. 422 if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." { 423 rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}} 424 } else { 425 rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}} 426 } 427 } else { 428 rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}} 429 } 430} 431 432// handlePong handles an incoming 'PONG' message which should be in response to 433// a previously sent 'PING' message. This is then used to compute the 434// connection's latency. 435func (rtm *RTM) handlePong(event json.RawMessage) { 436 var ( 437 p Pong 438 ) 439 440 rtm.resetDeadman() 441 442 if err := json.Unmarshal(event, &p); err != nil { 443 rtm.Client.log.Println("RTM Error unmarshalling 'pong' event:", err) 444 return 445 } 446 447 latency := time.Since(time.Unix(p.Timestamp, 0)) 448 rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}} 449} 450 451// handleEvent is the "default" response to an event that does not have a 452// special case. It matches the command's name to a mapping of defined events 453// and then sends the corresponding event struct to the IncomingEvents channel. 454// If the event type is not found or the event cannot be unmarshalled into the 455// correct struct then this sends an UnmarshallingErrorEvent to the 456// IncomingEvents channel. 457func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) { 458 v, exists := EventMapping[typeStr] 459 if !exists { 460 rtm.Debugf("RTM Error - received unmapped event %q: %s\n", typeStr, string(event)) 461 err := fmt.Errorf("RTM Error: Received unmapped event %q: %s", typeStr, string(event)) 462 rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} 463 return 464 } 465 t := reflect.TypeOf(v) 466 recvEvent := reflect.New(t).Interface() 467 err := json.Unmarshal(event, recvEvent) 468 if err != nil { 469 rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event)) 470 err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s", typeStr, string(event)) 471 rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} 472 return 473 } 474 rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent} 475} 476 477// EventMapping holds a mapping of event names to their corresponding struct 478// implementations. The structs should be instances of the unmarshalling 479// target for the matching event type. 480var EventMapping = map[string]interface{}{ 481 "message": MessageEvent{}, 482 "presence_change": PresenceChangeEvent{}, 483 "user_typing": UserTypingEvent{}, 484 485 "channel_marked": ChannelMarkedEvent{}, 486 "channel_created": ChannelCreatedEvent{}, 487 "channel_joined": ChannelJoinedEvent{}, 488 "channel_left": ChannelLeftEvent{}, 489 "channel_deleted": ChannelDeletedEvent{}, 490 "channel_rename": ChannelRenameEvent{}, 491 "channel_archive": ChannelArchiveEvent{}, 492 "channel_unarchive": ChannelUnarchiveEvent{}, 493 "channel_history_changed": ChannelHistoryChangedEvent{}, 494 495 "dnd_updated": DNDUpdatedEvent{}, 496 "dnd_updated_user": DNDUpdatedEvent{}, 497 498 "im_created": IMCreatedEvent{}, 499 "im_open": IMOpenEvent{}, 500 "im_close": IMCloseEvent{}, 501 "im_marked": IMMarkedEvent{}, 502 "im_history_changed": IMHistoryChangedEvent{}, 503 504 "group_marked": GroupMarkedEvent{}, 505 "group_open": GroupOpenEvent{}, 506 "group_joined": GroupJoinedEvent{}, 507 "group_left": GroupLeftEvent{}, 508 "group_close": GroupCloseEvent{}, 509 "group_rename": GroupRenameEvent{}, 510 "group_archive": GroupArchiveEvent{}, 511 "group_unarchive": GroupUnarchiveEvent{}, 512 "group_history_changed": GroupHistoryChangedEvent{}, 513 514 "file_created": FileCreatedEvent{}, 515 "file_shared": FileSharedEvent{}, 516 "file_unshared": FileUnsharedEvent{}, 517 "file_public": FilePublicEvent{}, 518 "file_private": FilePrivateEvent{}, 519 "file_change": FileChangeEvent{}, 520 "file_deleted": FileDeletedEvent{}, 521 "file_comment_added": FileCommentAddedEvent{}, 522 "file_comment_edited": FileCommentEditedEvent{}, 523 "file_comment_deleted": FileCommentDeletedEvent{}, 524 525 "pin_added": PinAddedEvent{}, 526 "pin_removed": PinRemovedEvent{}, 527 528 "star_added": StarAddedEvent{}, 529 "star_removed": StarRemovedEvent{}, 530 531 "reaction_added": ReactionAddedEvent{}, 532 "reaction_removed": ReactionRemovedEvent{}, 533 534 "pref_change": PrefChangeEvent{}, 535 536 "team_join": TeamJoinEvent{}, 537 "team_rename": TeamRenameEvent{}, 538 "team_pref_change": TeamPrefChangeEvent{}, 539 "team_domain_change": TeamDomainChangeEvent{}, 540 "team_migration_started": TeamMigrationStartedEvent{}, 541 542 "manual_presence_change": ManualPresenceChangeEvent{}, 543 544 "user_change": UserChangeEvent{}, 545 546 "emoji_changed": EmojiChangedEvent{}, 547 548 "commands_changed": CommandsChangedEvent{}, 549 550 "email_domain_changed": EmailDomainChangedEvent{}, 551 552 "bot_added": BotAddedEvent{}, 553 "bot_changed": BotChangedEvent{}, 554 555 "accounts_changed": AccountsChangedEvent{}, 556 557 "reconnect_url": ReconnectUrlEvent{}, 558 559 "member_joined_channel": MemberJoinedChannelEvent{}, 560 "member_left_channel": MemberLeftChannelEvent{}, 561 562 "subteam_created": SubteamCreatedEvent{}, 563 "subteam_self_added": SubteamSelfAddedEvent{}, 564 "subteam_self_removed": SubteamSelfRemovedEvent{}, 565 "subteam_updated": SubteamUpdatedEvent{}, 566} 567