1package kbchat 2 3import ( 4 "bufio" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "io/ioutil" 10 "os" 11 "os/exec" 12 "sync" 13 "time" 14 15 "github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1" 16 "github.com/keybase/go-keybase-chat-bot/kbchat/types/keybase1" 17 "github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1" 18) 19 20// SubscriptionMessage contains a message and conversation object 21type SubscriptionMessage struct { 22 Message chat1.MsgSummary 23 Conversation chat1.ConvSummary 24} 25 26type SubscriptionConversation struct { 27 Conversation chat1.ConvSummary 28} 29 30type SubscriptionWalletEvent struct { 31 Payment stellar1.PaymentDetailsLocal 32} 33 34// Subscription has methods to control the background message fetcher loop 35type Subscription struct { 36 *DebugOutput 37 sync.Mutex 38 39 newMsgsCh chan SubscriptionMessage 40 newConvsCh chan SubscriptionConversation 41 newWalletCh chan SubscriptionWalletEvent 42 errorCh chan error 43 running bool 44 shutdownCh chan struct{} 45} 46 47func NewSubscription() *Subscription { 48 newMsgsCh := make(chan SubscriptionMessage, 100) 49 newConvsCh := make(chan SubscriptionConversation, 100) 50 newWalletCh := make(chan SubscriptionWalletEvent, 100) 51 errorCh := make(chan error, 100) 52 shutdownCh := make(chan struct{}) 53 return &Subscription{ 54 DebugOutput: NewDebugOutput("Subscription"), 55 newMsgsCh: newMsgsCh, 56 newConvsCh: newConvsCh, 57 newWalletCh: newWalletCh, 58 shutdownCh: shutdownCh, 59 errorCh: errorCh, 60 running: true, 61 } 62} 63 64// Read blocks until a new message arrives 65func (m *Subscription) Read() (msg SubscriptionMessage, err error) { 66 defer m.Trace(&err, "Read")() 67 select { 68 case msg = <-m.newMsgsCh: 69 return msg, nil 70 case err = <-m.errorCh: 71 return SubscriptionMessage{}, err 72 case <-m.shutdownCh: 73 return SubscriptionMessage{}, errors.New("Subscription shutdown") 74 } 75} 76 77func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) { 78 defer m.Trace(&err, "ReadNewConvs")() 79 select { 80 case conv = <-m.newConvsCh: 81 return conv, nil 82 case err = <-m.errorCh: 83 return SubscriptionConversation{}, err 84 case <-m.shutdownCh: 85 return SubscriptionConversation{}, errors.New("Subscription shutdown") 86 } 87} 88 89// Read blocks until a new message arrives 90func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) { 91 defer m.Trace(&err, "ReadWallet")() 92 select { 93 case msg = <-m.newWalletCh: 94 return msg, nil 95 case err = <-m.errorCh: 96 return SubscriptionWalletEvent{}, err 97 case <-m.shutdownCh: 98 return SubscriptionWalletEvent{}, errors.New("Subscription shutdown") 99 } 100} 101 102// Shutdown terminates the background process 103func (m *Subscription) Shutdown() { 104 defer m.Trace(nil, "Shutdown")() 105 m.Lock() 106 defer m.Unlock() 107 if m.running { 108 close(m.shutdownCh) 109 m.running = false 110 } 111} 112 113type ListenOptions struct { 114 Wallet bool 115 Convs bool 116} 117 118type PaymentHolder struct { 119 Payment stellar1.PaymentDetailsLocal `json:"notification"` 120} 121 122type TypeHolder struct { 123 Type string `json:"type"` 124} 125 126type OneshotOptions struct { 127 Username string 128 PaperKey string 129} 130 131type RunOptions struct { 132 KeybaseLocation string 133 HomeDir string 134 Oneshot *OneshotOptions 135 StartService bool 136 // Have the bot send/receive typing notifications 137 EnableTyping bool 138 // Disable bot lite mode 139 DisableBotLiteMode bool 140} 141 142func (r RunOptions) Location() string { 143 if r.KeybaseLocation == "" { 144 return "keybase" 145 } 146 return r.KeybaseLocation 147} 148 149func (r RunOptions) Command(args ...string) *exec.Cmd { 150 var cmd []string 151 if r.HomeDir != "" { 152 cmd = append(cmd, "--home", r.HomeDir) 153 } 154 cmd = append(cmd, args...) 155 return exec.Command(r.Location(), cmd...) 156} 157 158// Start fires up the Keybase JSON API in stdin/stdout mode 159func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) { 160 api := NewAPI(runOpts, opts...) 161 if err := api.startPipes(); err != nil { 162 return nil, err 163 } 164 return api, nil 165} 166 167// API is the main object used for communicating with the Keybase JSON API 168type API struct { 169 sync.Mutex 170 *DebugOutput 171 apiInput io.Writer 172 apiOutput *bufio.Reader 173 apiCmd *exec.Cmd 174 username string 175 runOpts RunOptions 176 subscriptions []*Subscription 177 Timeout time.Duration 178 LogSendBytes int 179} 180 181func CustomTimeout(timeout time.Duration) func(*API) { 182 return func(a *API) { 183 a.Timeout = timeout 184 } 185} 186 187func NewAPI(runOpts RunOptions, opts ...func(*API)) *API { 188 api := &API{ 189 DebugOutput: NewDebugOutput("API"), 190 runOpts: runOpts, 191 Timeout: 5 * time.Second, 192 LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed 193 } 194 for _, opt := range opts { 195 opt(api) 196 } 197 return api 198} 199 200func (a *API) Command(args ...string) *exec.Cmd { 201 return a.runOpts.Command(args...) 202} 203 204func (a *API) getUsername(runOpts RunOptions) (username string, err error) { 205 p := runOpts.Command("whoami", "-json") 206 output, err := p.StdoutPipe() 207 if err != nil { 208 return "", err 209 } 210 p.ExtraFiles = []*os.File{output.(*os.File)} 211 if err = p.Start(); err != nil { 212 return "", err 213 } 214 215 doneCh := make(chan error) 216 go func() { 217 defer func() { close(doneCh) }() 218 statusJSON, err := ioutil.ReadAll(output) 219 if err != nil { 220 doneCh <- fmt.Errorf("error reading whoami output: %v", err) 221 return 222 } 223 var status keybase1.CurrentStatus 224 if err := json.Unmarshal(statusJSON, &status); err != nil { 225 doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err) 226 return 227 } 228 if status.LoggedIn && status.User != nil { 229 username = status.User.Username 230 doneCh <- nil 231 } else { 232 doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User) 233 } 234 // Cleanup the command 235 if err := p.Wait(); err != nil { 236 a.Debug("unable to wait for cmd: %v", err) 237 } 238 }() 239 240 select { 241 case err = <-doneCh: 242 if err != nil { 243 return "", err 244 } 245 case <-time.After(a.Timeout): 246 return "", errors.New("unable to run Keybase command") 247 } 248 249 return username, nil 250} 251 252func (a *API) auth() (string, error) { 253 username, err := a.getUsername(a.runOpts) 254 if err == nil { 255 return username, nil 256 } 257 if a.runOpts.Oneshot == nil { 258 return "", err 259 } 260 username = "" 261 // If a paper key is specified, then login with oneshot mode (logout first) 262 if a.runOpts.Oneshot != nil { 263 if username == a.runOpts.Oneshot.Username { 264 // just get out if we are on the desired user already 265 return username, nil 266 } 267 if err := a.runOpts.Command("logout", "-f").Run(); err != nil { 268 return "", err 269 } 270 if err := a.runOpts.Command("oneshot", "--username", a.runOpts.Oneshot.Username, "--paperkey", 271 a.runOpts.Oneshot.PaperKey).Run(); err != nil { 272 return "", err 273 } 274 username = a.runOpts.Oneshot.Username 275 return username, nil 276 } 277 return "", errors.New("unable to auth") 278} 279 280func (a *API) startPipes() (err error) { 281 a.Lock() 282 defer a.Unlock() 283 if a.apiCmd != nil { 284 if err := a.apiCmd.Process.Kill(); err != nil { 285 return err 286 } 287 } 288 a.apiCmd = nil 289 290 if a.runOpts.StartService { 291 args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"} 292 if err := a.runOpts.Command(args...).Start(); err != nil { 293 return err 294 } 295 } 296 297 if a.username, err = a.auth(); err != nil { 298 return err 299 } 300 301 cmd := a.runOpts.Command("chat", "notification-settings", fmt.Sprintf("-disable-typing=%v", !a.runOpts.EnableTyping)) 302 if err = cmd.Run(); err != nil { 303 return err 304 } 305 306 a.apiCmd = a.runOpts.Command("chat", "api") 307 if a.apiInput, err = a.apiCmd.StdinPipe(); err != nil { 308 return err 309 } 310 output, err := a.apiCmd.StdoutPipe() 311 if err != nil { 312 return err 313 } 314 a.apiCmd.ExtraFiles = []*os.File{output.(*os.File)} 315 if err := a.apiCmd.Start(); err != nil { 316 return err 317 } 318 a.apiOutput = bufio.NewReader(output) 319 return nil 320} 321 322func (a *API) getAPIPipesLocked() (io.Writer, *bufio.Reader, error) { 323 // this should only be called inside a lock 324 if a.apiCmd == nil { 325 return nil, nil, errAPIDisconnected 326 } 327 return a.apiInput, a.apiOutput, nil 328} 329 330func (a *API) GetUsername() string { 331 return a.username 332} 333 334func (a *API) doSend(arg interface{}) (resp SendResponse, err error) { 335 a.Lock() 336 defer a.Unlock() 337 338 bArg, err := json.Marshal(arg) 339 if err != nil { 340 return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err) 341 } 342 input, output, err := a.getAPIPipesLocked() 343 if err != nil { 344 return SendResponse{}, err 345 } 346 if _, err := io.WriteString(input, string(bArg)); err != nil { 347 return SendResponse{}, err 348 } 349 responseRaw, err := output.ReadBytes('\n') 350 if err != nil { 351 return SendResponse{}, err 352 } 353 if err := json.Unmarshal(responseRaw, &resp); err != nil { 354 return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err) 355 } else if resp.Error != nil { 356 return resp, errors.New(resp.Error.Message) 357 } 358 return resp, nil 359} 360 361func (a *API) doFetch(apiInput string) ([]byte, error) { 362 a.Lock() 363 defer a.Unlock() 364 365 input, output, err := a.getAPIPipesLocked() 366 if err != nil { 367 return nil, err 368 } 369 if _, err := io.WriteString(input, apiInput); err != nil { 370 return nil, err 371 } 372 byteOutput, err := output.ReadBytes('\n') 373 if err != nil { 374 return nil, err 375 } 376 377 return byteOutput, nil 378} 379 380// ListenForNewTextMessages proxies to Listen without wallet events 381func (a *API) ListenForNewTextMessages() (*Subscription, error) { 382 opts := ListenOptions{Wallet: false} 383 return a.Listen(opts) 384} 385 386func (a *API) registerSubscription(sub *Subscription) { 387 a.Lock() 388 defer a.Unlock() 389 a.subscriptions = append(a.subscriptions, sub) 390} 391 392// Listen fires of a background loop and puts chat messages and wallet 393// events into channels 394func (a *API) Listen(opts ListenOptions) (*Subscription, error) { 395 done := make(chan struct{}) 396 sub := NewSubscription() 397 a.registerSubscription(sub) 398 pause := 2 * time.Second 399 readScanner := func(boutput *bufio.Scanner) { 400 defer func() { done <- struct{}{} }() 401 for { 402 select { 403 case <-sub.shutdownCh: 404 a.Debug("readScanner: received shutdown") 405 return 406 default: 407 } 408 boutput.Scan() 409 t := boutput.Text() 410 var typeHolder TypeHolder 411 if err := json.Unmarshal([]byte(t), &typeHolder); err != nil { 412 sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) 413 break 414 } 415 switch typeHolder.Type { 416 case "chat": 417 var notification chat1.MsgNotification 418 if err := json.Unmarshal([]byte(t), ¬ification); err != nil { 419 sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) 420 break 421 } 422 if notification.Error != nil { 423 a.Debug("error message received: %s", *notification.Error) 424 } else if notification.Msg != nil { 425 subscriptionMessage := SubscriptionMessage{ 426 Message: *notification.Msg, 427 Conversation: chat1.ConvSummary{ 428 Id: notification.Msg.ConvID, 429 Channel: notification.Msg.Channel, 430 }, 431 } 432 sub.newMsgsCh <- subscriptionMessage 433 } 434 case "chat_conv": 435 var notification chat1.ConvNotification 436 if err := json.Unmarshal([]byte(t), ¬ification); err != nil { 437 sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) 438 break 439 } 440 if notification.Error != nil { 441 a.Debug("error message received: %s", *notification.Error) 442 } else if notification.Conv != nil { 443 subscriptionConv := SubscriptionConversation{ 444 Conversation: *notification.Conv, 445 } 446 sub.newConvsCh <- subscriptionConv 447 } 448 case "wallet": 449 var holder PaymentHolder 450 if err := json.Unmarshal([]byte(t), &holder); err != nil { 451 sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) 452 break 453 } 454 subscriptionPayment := SubscriptionWalletEvent(holder) 455 sub.newWalletCh <- subscriptionPayment 456 default: 457 continue 458 } 459 } 460 } 461 462 attempts := 0 463 maxAttempts := 30 464 go func() { 465 defer func() { 466 close(sub.newMsgsCh) 467 close(sub.newConvsCh) 468 close(sub.newWalletCh) 469 close(sub.errorCh) 470 }() 471 for { 472 select { 473 case <-sub.shutdownCh: 474 a.Debug("Listen: received shutdown") 475 return 476 default: 477 } 478 479 if attempts >= maxAttempts { 480 if err := a.LogSend("Listen: failed to auth, giving up"); err != nil { 481 a.Debug("Listen: logsend failed to send: %v", err) 482 } 483 panic("Listen: failed to auth, giving up") 484 } 485 attempts++ 486 if _, err := a.auth(); err != nil { 487 a.Debug("Listen: failed to auth: %s", err) 488 time.Sleep(pause) 489 continue 490 } 491 cmdElements := []string{"chat", "api-listen"} 492 if opts.Wallet { 493 cmdElements = append(cmdElements, "--wallet") 494 } 495 if opts.Convs { 496 cmdElements = append(cmdElements, "--convs") 497 } 498 p := a.runOpts.Command(cmdElements...) 499 output, err := p.StdoutPipe() 500 if err != nil { 501 a.Debug("Listen: failed to listen: %s", err) 502 time.Sleep(pause) 503 continue 504 } 505 stderr, err := p.StderrPipe() 506 if err != nil { 507 a.Debug("Listen: failed to listen to stderr: %s", err) 508 time.Sleep(pause) 509 continue 510 } 511 p.ExtraFiles = []*os.File{stderr.(*os.File), output.(*os.File)} 512 boutput := bufio.NewScanner(output) 513 if err := p.Start(); err != nil { 514 515 a.Debug("Listen: failed to make listen scanner: %s", err) 516 time.Sleep(pause) 517 continue 518 } 519 attempts = 0 520 go readScanner(boutput) 521 select { 522 case <-sub.shutdownCh: 523 a.Debug("Listen: received shutdown") 524 return 525 case <-done: 526 } 527 if err := p.Wait(); err != nil { 528 stderrBytes, rerr := ioutil.ReadAll(stderr) 529 if rerr != nil { 530 stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr)) 531 } 532 a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes) 533 if err := a.startPipes(); err != nil { 534 a.Debug("Listen: failed to restart pipes: %v", err) 535 } 536 } 537 time.Sleep(pause) 538 } 539 }() 540 return sub, nil 541} 542 543func (a *API) LogSend(feedback string) error { 544 feedback = "go-keybase-chat-bot log send\n" + 545 "username: " + a.GetUsername() + "\n" + 546 feedback 547 548 args := []string{ 549 "log", "send", 550 "--no-confirm", 551 "--feedback", feedback, 552 "-n", fmt.Sprintf("%d", a.LogSendBytes), 553 } 554 return a.runOpts.Command(args...).Run() 555} 556 557func (a *API) Shutdown() (err error) { 558 defer a.Trace(&err, "Shutdown")() 559 a.Lock() 560 defer a.Unlock() 561 for _, sub := range a.subscriptions { 562 sub.Shutdown() 563 } 564 if a.apiCmd != nil { 565 a.Debug("waiting for API command") 566 if err := a.apiCmd.Wait(); err != nil { 567 return err 568 } 569 } 570 571 if a.runOpts.Oneshot != nil { 572 a.Debug("logging out") 573 err := a.runOpts.Command("logout", "--force").Run() 574 if err != nil { 575 return err 576 } 577 } 578 579 if a.runOpts.StartService { 580 a.Debug("stopping service") 581 err := a.runOpts.Command("ctl", "stop", "--shutdown").Run() 582 if err != nil { 583 return err 584 } 585 } 586 587 return nil 588} 589