1package kbchat
2
3import (
4	"bufio"
5	"encoding/json"
6	"errors"
7	"fmt"
8	"io"
9	"io/ioutil"
10	"os"
11	"os/exec"
12	"sync"
13	"time"
14
15	"github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1"
16	"github.com/keybase/go-keybase-chat-bot/kbchat/types/keybase1"
17	"github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1"
18)
19
20// SubscriptionMessage contains a message and conversation object
21type SubscriptionMessage struct {
22	Message      chat1.MsgSummary
23	Conversation chat1.ConvSummary
24}
25
26type SubscriptionConversation struct {
27	Conversation chat1.ConvSummary
28}
29
30type SubscriptionWalletEvent struct {
31	Payment stellar1.PaymentDetailsLocal
32}
33
34// Subscription has methods to control the background message fetcher loop
35type Subscription struct {
36	*DebugOutput
37	sync.Mutex
38
39	newMsgsCh   chan SubscriptionMessage
40	newConvsCh  chan SubscriptionConversation
41	newWalletCh chan SubscriptionWalletEvent
42	errorCh     chan error
43	running     bool
44	shutdownCh  chan struct{}
45}
46
47func NewSubscription() *Subscription {
48	newMsgsCh := make(chan SubscriptionMessage, 100)
49	newConvsCh := make(chan SubscriptionConversation, 100)
50	newWalletCh := make(chan SubscriptionWalletEvent, 100)
51	errorCh := make(chan error, 100)
52	shutdownCh := make(chan struct{})
53	return &Subscription{
54		DebugOutput: NewDebugOutput("Subscription"),
55		newMsgsCh:   newMsgsCh,
56		newConvsCh:  newConvsCh,
57		newWalletCh: newWalletCh,
58		shutdownCh:  shutdownCh,
59		errorCh:     errorCh,
60		running:     true,
61	}
62}
63
64// Read blocks until a new message arrives
65func (m *Subscription) Read() (msg SubscriptionMessage, err error) {
66	defer m.Trace(&err, "Read")()
67	select {
68	case msg = <-m.newMsgsCh:
69		return msg, nil
70	case err = <-m.errorCh:
71		return SubscriptionMessage{}, err
72	case <-m.shutdownCh:
73		return SubscriptionMessage{}, errors.New("Subscription shutdown")
74	}
75}
76
77func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) {
78	defer m.Trace(&err, "ReadNewConvs")()
79	select {
80	case conv = <-m.newConvsCh:
81		return conv, nil
82	case err = <-m.errorCh:
83		return SubscriptionConversation{}, err
84	case <-m.shutdownCh:
85		return SubscriptionConversation{}, errors.New("Subscription shutdown")
86	}
87}
88
89// Read blocks until a new message arrives
90func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) {
91	defer m.Trace(&err, "ReadWallet")()
92	select {
93	case msg = <-m.newWalletCh:
94		return msg, nil
95	case err = <-m.errorCh:
96		return SubscriptionWalletEvent{}, err
97	case <-m.shutdownCh:
98		return SubscriptionWalletEvent{}, errors.New("Subscription shutdown")
99	}
100}
101
102// Shutdown terminates the background process
103func (m *Subscription) Shutdown() {
104	defer m.Trace(nil, "Shutdown")()
105	m.Lock()
106	defer m.Unlock()
107	if m.running {
108		close(m.shutdownCh)
109		m.running = false
110	}
111}
112
113type ListenOptions struct {
114	Wallet bool
115	Convs  bool
116}
117
118type PaymentHolder struct {
119	Payment stellar1.PaymentDetailsLocal `json:"notification"`
120}
121
122type TypeHolder struct {
123	Type string `json:"type"`
124}
125
126type OneshotOptions struct {
127	Username string
128	PaperKey string
129}
130
131type RunOptions struct {
132	KeybaseLocation string
133	HomeDir         string
134	Oneshot         *OneshotOptions
135	StartService    bool
136	// Have the bot send/receive typing notifications
137	EnableTyping bool
138	// Disable bot lite mode
139	DisableBotLiteMode bool
140}
141
142func (r RunOptions) Location() string {
143	if r.KeybaseLocation == "" {
144		return "keybase"
145	}
146	return r.KeybaseLocation
147}
148
149func (r RunOptions) Command(args ...string) *exec.Cmd {
150	var cmd []string
151	if r.HomeDir != "" {
152		cmd = append(cmd, "--home", r.HomeDir)
153	}
154	cmd = append(cmd, args...)
155	return exec.Command(r.Location(), cmd...)
156}
157
158// Start fires up the Keybase JSON API in stdin/stdout mode
159func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) {
160	api := NewAPI(runOpts, opts...)
161	if err := api.startPipes(); err != nil {
162		return nil, err
163	}
164	return api, nil
165}
166
167// API is the main object used for communicating with the Keybase JSON API
168type API struct {
169	sync.Mutex
170	*DebugOutput
171	apiInput      io.Writer
172	apiOutput     *bufio.Reader
173	apiCmd        *exec.Cmd
174	username      string
175	runOpts       RunOptions
176	subscriptions []*Subscription
177	Timeout       time.Duration
178	LogSendBytes  int
179}
180
181func CustomTimeout(timeout time.Duration) func(*API) {
182	return func(a *API) {
183		a.Timeout = timeout
184	}
185}
186
187func NewAPI(runOpts RunOptions, opts ...func(*API)) *API {
188	api := &API{
189		DebugOutput:  NewDebugOutput("API"),
190		runOpts:      runOpts,
191		Timeout:      5 * time.Second,
192		LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed
193	}
194	for _, opt := range opts {
195		opt(api)
196	}
197	return api
198}
199
200func (a *API) Command(args ...string) *exec.Cmd {
201	return a.runOpts.Command(args...)
202}
203
204func (a *API) getUsername(runOpts RunOptions) (username string, err error) {
205	p := runOpts.Command("whoami", "-json")
206	output, err := p.StdoutPipe()
207	if err != nil {
208		return "", err
209	}
210	p.ExtraFiles = []*os.File{output.(*os.File)}
211	if err = p.Start(); err != nil {
212		return "", err
213	}
214
215	doneCh := make(chan error)
216	go func() {
217		defer func() { close(doneCh) }()
218		statusJSON, err := ioutil.ReadAll(output)
219		if err != nil {
220			doneCh <- fmt.Errorf("error reading whoami output: %v", err)
221			return
222		}
223		var status keybase1.CurrentStatus
224		if err := json.Unmarshal(statusJSON, &status); err != nil {
225			doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err)
226			return
227		}
228		if status.LoggedIn && status.User != nil {
229			username = status.User.Username
230			doneCh <- nil
231		} else {
232			doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User)
233		}
234		// Cleanup the command
235		if err := p.Wait(); err != nil {
236			a.Debug("unable to wait for cmd: %v", err)
237		}
238	}()
239
240	select {
241	case err = <-doneCh:
242		if err != nil {
243			return "", err
244		}
245	case <-time.After(a.Timeout):
246		return "", errors.New("unable to run Keybase command")
247	}
248
249	return username, nil
250}
251
252func (a *API) auth() (string, error) {
253	username, err := a.getUsername(a.runOpts)
254	if err == nil {
255		return username, nil
256	}
257	if a.runOpts.Oneshot == nil {
258		return "", err
259	}
260	username = ""
261	// If a paper key is specified, then login with oneshot mode (logout first)
262	if a.runOpts.Oneshot != nil {
263		if username == a.runOpts.Oneshot.Username {
264			// just get out if we are on the desired user already
265			return username, nil
266		}
267		if err := a.runOpts.Command("logout", "-f").Run(); err != nil {
268			return "", err
269		}
270		if err := a.runOpts.Command("oneshot", "--username", a.runOpts.Oneshot.Username, "--paperkey",
271			a.runOpts.Oneshot.PaperKey).Run(); err != nil {
272			return "", err
273		}
274		username = a.runOpts.Oneshot.Username
275		return username, nil
276	}
277	return "", errors.New("unable to auth")
278}
279
280func (a *API) startPipes() (err error) {
281	a.Lock()
282	defer a.Unlock()
283	if a.apiCmd != nil {
284		if err := a.apiCmd.Process.Kill(); err != nil {
285			return err
286		}
287	}
288	a.apiCmd = nil
289
290	if a.runOpts.StartService {
291		args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"}
292		if err := a.runOpts.Command(args...).Start(); err != nil {
293			return err
294		}
295	}
296
297	if a.username, err = a.auth(); err != nil {
298		return err
299	}
300
301	cmd := a.runOpts.Command("chat", "notification-settings", fmt.Sprintf("-disable-typing=%v", !a.runOpts.EnableTyping))
302	if err = cmd.Run(); err != nil {
303		return err
304	}
305
306	a.apiCmd = a.runOpts.Command("chat", "api")
307	if a.apiInput, err = a.apiCmd.StdinPipe(); err != nil {
308		return err
309	}
310	output, err := a.apiCmd.StdoutPipe()
311	if err != nil {
312		return err
313	}
314	a.apiCmd.ExtraFiles = []*os.File{output.(*os.File)}
315	if err := a.apiCmd.Start(); err != nil {
316		return err
317	}
318	a.apiOutput = bufio.NewReader(output)
319	return nil
320}
321
322func (a *API) getAPIPipesLocked() (io.Writer, *bufio.Reader, error) {
323	// this should only be called inside a lock
324	if a.apiCmd == nil {
325		return nil, nil, errAPIDisconnected
326	}
327	return a.apiInput, a.apiOutput, nil
328}
329
330func (a *API) GetUsername() string {
331	return a.username
332}
333
334func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
335	a.Lock()
336	defer a.Unlock()
337
338	bArg, err := json.Marshal(arg)
339	if err != nil {
340		return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err)
341	}
342	input, output, err := a.getAPIPipesLocked()
343	if err != nil {
344		return SendResponse{}, err
345	}
346	if _, err := io.WriteString(input, string(bArg)); err != nil {
347		return SendResponse{}, err
348	}
349	responseRaw, err := output.ReadBytes('\n')
350	if err != nil {
351		return SendResponse{}, err
352	}
353	if err := json.Unmarshal(responseRaw, &resp); err != nil {
354		return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err)
355	} else if resp.Error != nil {
356		return resp, errors.New(resp.Error.Message)
357	}
358	return resp, nil
359}
360
361func (a *API) doFetch(apiInput string) ([]byte, error) {
362	a.Lock()
363	defer a.Unlock()
364
365	input, output, err := a.getAPIPipesLocked()
366	if err != nil {
367		return nil, err
368	}
369	if _, err := io.WriteString(input, apiInput); err != nil {
370		return nil, err
371	}
372	byteOutput, err := output.ReadBytes('\n')
373	if err != nil {
374		return nil, err
375	}
376
377	return byteOutput, nil
378}
379
380// ListenForNewTextMessages proxies to Listen without wallet events
381func (a *API) ListenForNewTextMessages() (*Subscription, error) {
382	opts := ListenOptions{Wallet: false}
383	return a.Listen(opts)
384}
385
386func (a *API) registerSubscription(sub *Subscription) {
387	a.Lock()
388	defer a.Unlock()
389	a.subscriptions = append(a.subscriptions, sub)
390}
391
392// Listen fires of a background loop and puts chat messages and wallet
393// events into channels
394func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
395	done := make(chan struct{})
396	sub := NewSubscription()
397	a.registerSubscription(sub)
398	pause := 2 * time.Second
399	readScanner := func(boutput *bufio.Scanner) {
400		defer func() { done <- struct{}{} }()
401		for {
402			select {
403			case <-sub.shutdownCh:
404				a.Debug("readScanner: received shutdown")
405				return
406			default:
407			}
408			boutput.Scan()
409			t := boutput.Text()
410			var typeHolder TypeHolder
411			if err := json.Unmarshal([]byte(t), &typeHolder); err != nil {
412				sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
413				break
414			}
415			switch typeHolder.Type {
416			case "chat":
417				var notification chat1.MsgNotification
418				if err := json.Unmarshal([]byte(t), &notification); err != nil {
419					sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
420					break
421				}
422				if notification.Error != nil {
423					a.Debug("error message received: %s", *notification.Error)
424				} else if notification.Msg != nil {
425					subscriptionMessage := SubscriptionMessage{
426						Message: *notification.Msg,
427						Conversation: chat1.ConvSummary{
428							Id:      notification.Msg.ConvID,
429							Channel: notification.Msg.Channel,
430						},
431					}
432					sub.newMsgsCh <- subscriptionMessage
433				}
434			case "chat_conv":
435				var notification chat1.ConvNotification
436				if err := json.Unmarshal([]byte(t), &notification); err != nil {
437					sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
438					break
439				}
440				if notification.Error != nil {
441					a.Debug("error message received: %s", *notification.Error)
442				} else if notification.Conv != nil {
443					subscriptionConv := SubscriptionConversation{
444						Conversation: *notification.Conv,
445					}
446					sub.newConvsCh <- subscriptionConv
447				}
448			case "wallet":
449				var holder PaymentHolder
450				if err := json.Unmarshal([]byte(t), &holder); err != nil {
451					sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
452					break
453				}
454				subscriptionPayment := SubscriptionWalletEvent(holder)
455				sub.newWalletCh <- subscriptionPayment
456			default:
457				continue
458			}
459		}
460	}
461
462	attempts := 0
463	maxAttempts := 30
464	go func() {
465		defer func() {
466			close(sub.newMsgsCh)
467			close(sub.newConvsCh)
468			close(sub.newWalletCh)
469			close(sub.errorCh)
470		}()
471		for {
472			select {
473			case <-sub.shutdownCh:
474				a.Debug("Listen: received shutdown")
475				return
476			default:
477			}
478
479			if attempts >= maxAttempts {
480				if err := a.LogSend("Listen: failed to auth, giving up"); err != nil {
481					a.Debug("Listen: logsend failed to send: %v", err)
482				}
483				panic("Listen: failed to auth, giving up")
484			}
485			attempts++
486			if _, err := a.auth(); err != nil {
487				a.Debug("Listen: failed to auth: %s", err)
488				time.Sleep(pause)
489				continue
490			}
491			cmdElements := []string{"chat", "api-listen"}
492			if opts.Wallet {
493				cmdElements = append(cmdElements, "--wallet")
494			}
495			if opts.Convs {
496				cmdElements = append(cmdElements, "--convs")
497			}
498			p := a.runOpts.Command(cmdElements...)
499			output, err := p.StdoutPipe()
500			if err != nil {
501				a.Debug("Listen: failed to listen: %s", err)
502				time.Sleep(pause)
503				continue
504			}
505			stderr, err := p.StderrPipe()
506			if err != nil {
507				a.Debug("Listen: failed to listen to stderr: %s", err)
508				time.Sleep(pause)
509				continue
510			}
511			p.ExtraFiles = []*os.File{stderr.(*os.File), output.(*os.File)}
512			boutput := bufio.NewScanner(output)
513			if err := p.Start(); err != nil {
514
515				a.Debug("Listen: failed to make listen scanner: %s", err)
516				time.Sleep(pause)
517				continue
518			}
519			attempts = 0
520			go readScanner(boutput)
521			select {
522			case <-sub.shutdownCh:
523				a.Debug("Listen: received shutdown")
524				return
525			case <-done:
526			}
527			if err := p.Wait(); err != nil {
528				stderrBytes, rerr := ioutil.ReadAll(stderr)
529				if rerr != nil {
530					stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr))
531				}
532				a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes)
533				if err := a.startPipes(); err != nil {
534					a.Debug("Listen: failed to restart pipes: %v", err)
535				}
536			}
537			time.Sleep(pause)
538		}
539	}()
540	return sub, nil
541}
542
543func (a *API) LogSend(feedback string) error {
544	feedback = "go-keybase-chat-bot log send\n" +
545		"username: " + a.GetUsername() + "\n" +
546		feedback
547
548	args := []string{
549		"log", "send",
550		"--no-confirm",
551		"--feedback", feedback,
552		"-n", fmt.Sprintf("%d", a.LogSendBytes),
553	}
554	return a.runOpts.Command(args...).Run()
555}
556
557func (a *API) Shutdown() (err error) {
558	defer a.Trace(&err, "Shutdown")()
559	a.Lock()
560	defer a.Unlock()
561	for _, sub := range a.subscriptions {
562		sub.Shutdown()
563	}
564	if a.apiCmd != nil {
565		a.Debug("waiting for API command")
566		if err := a.apiCmd.Wait(); err != nil {
567			return err
568		}
569	}
570
571	if a.runOpts.Oneshot != nil {
572		a.Debug("logging out")
573		err := a.runOpts.Command("logout", "--force").Run()
574		if err != nil {
575			return err
576		}
577	}
578
579	if a.runOpts.StartService {
580		a.Debug("stopping service")
581		err := a.runOpts.Command("ctl", "stop", "--shutdown").Run()
582		if err != nil {
583			return err
584		}
585	}
586
587	return nil
588}
589