1package libkbfs
2
3import (
4	"errors"
5	"io"
6	"net"
7	"sync"
8
9	"github.com/keybase/client/go/libkb"
10	"github.com/keybase/client/go/logger"
11	kbgitkbfs "github.com/keybase/client/go/protocol/kbgitkbfs1"
12	keybase1 "github.com/keybase/client/go/protocol/keybase1"
13	"github.com/keybase/client/go/systemd"
14	"github.com/keybase/go-framed-msgpack-rpc/rpc"
15)
16
17// KBFSErrorUnwrapper unwraps errors from the KBFS service.
18type KBFSErrorUnwrapper struct {
19}
20
21var _ rpc.ErrorUnwrapper = KBFSErrorUnwrapper{}
22
23// MakeArg implements rpc.ErrorUnwrapper.
24func (eu KBFSErrorUnwrapper) MakeArg() interface{} {
25	return &keybase1.Status{}
26}
27
28// UnwrapError implements rpc.ErrorUnwrapper.
29func (eu KBFSErrorUnwrapper) UnwrapError(arg interface{}) (appError error,
30	dispatchError error) {
31	s, ok := arg.(*keybase1.Status)
32	if !ok {
33		return nil, errors.New("Error converting arg to keybase1.Status object in DiskCacheErrorUnwrapper.UnwrapError")
34	}
35
36	if s == nil || s.Code == 0 {
37		return nil, nil
38	}
39
40	switch s.Code {
41	case StatusCodeDiskBlockCacheError:
42		appError = DiskBlockCacheError{Msg: s.Desc}
43	default:
44		ase := libkb.AppStatusError{
45			Code:   s.Code,
46			Name:   s.Name,
47			Desc:   s.Desc,
48			Fields: make(map[string]string),
49		}
50		for _, f := range s.Fields {
51			ase.Fields[f.Key] = f.Value
52		}
53		appError = ase
54	}
55
56	return appError, nil
57}
58
59type kbfsServiceConfig interface {
60	diskBlockCacheGetter
61	logMaker
62	syncedTlfGetterSetter
63}
64
65// KBFSService represents a running KBFS service.
66type KBFSService struct {
67	config   kbfsServiceConfig
68	log      logger.Logger
69	kbCtx    Context
70	stopOnce sync.Once
71	stopCh   chan struct{}
72}
73
74// NewKBFSService creates a new KBFSService.
75func NewKBFSService(kbCtx Context, config kbfsServiceConfig) (
76	*KBFSService, error) {
77	log := config.MakeLogger("FSS")
78	// Check to see if we're receiving a socket from systemd. If not, create
79	// one and bind to it.
80	listener, err := systemd.GetListenerFromEnvironment()
81	if err != nil {
82		return nil, err
83	}
84	if listener != nil {
85		log.Debug("Found listener in the environment. Listening on fd 3.")
86	} else {
87		log.Debug("No listener found in the environment. Binding a new socket.")
88		listener, err = kbCtx.BindToKBFSSocket()
89		if err != nil {
90			return nil, err
91		}
92	}
93	k := &KBFSService{
94		config: config,
95		log:    log,
96		kbCtx:  kbCtx,
97	}
98	k.Run(listener)
99	return k, nil
100}
101
102// Run starts listening on the passed-in listener.
103func (k *KBFSService) Run(l net.Listener) {
104	go func() { _ = k.listenLoop(l) }()
105}
106
107// registerProtocols registers protocols for this KBFSService.
108func (k *KBFSService) registerProtocols(
109	srv *rpc.Server, xp rpc.Transporter) error {
110	// TODO: fill in with actual protocols.
111	protocols := []rpc.Protocol{
112		kbgitkbfs.DiskBlockCacheProtocol(NewDiskBlockCacheService(k.config)),
113	}
114	for _, proto := range protocols {
115		if err := srv.Register(proto); err != nil {
116			return err
117		}
118	}
119	return nil
120}
121
122// handle creates a server on an established connection.
123func (k *KBFSService) handle(c net.Conn) {
124	xp := rpc.NewTransport(c, k.kbCtx.NewRPCLogFactory(), k.kbCtx.NewNetworkInstrumenter(keybase1.NetworkSource_LOCAL),
125		libkb.WrapError, rpc.DefaultMaxFrameLength)
126
127	server := rpc.NewServer(xp, libkb.WrapError)
128
129	err := k.registerProtocols(server, xp)
130
131	if err != nil {
132		k.log.Warning("RegisterProtocols error: %s", err)
133		return
134	}
135
136	// Run the server, then wait for it or this KBFSService to finish.
137	serverCh := server.Run()
138	go func() {
139		select {
140		case <-k.stopCh:
141		case <-serverCh:
142		}
143		// Close is idempotent, so always close when we're done.
144		c.Close()
145	}()
146	<-serverCh
147
148	// err is always non-nil.
149	err = server.Err()
150	if err != io.EOF {
151		k.log.Warning("Run error: %s", err)
152	}
153
154	k.log.Debug("handle() complete")
155}
156
157// listenLoop listens on a passed-in listener and calls `handle` for any
158// connection that is established on the listener.
159func (k *KBFSService) listenLoop(l net.Listener) error {
160	go func() {
161		<-k.stopCh
162		l.Close()
163	}()
164	defer l.Close()
165	for {
166		c, err := l.Accept()
167		if err != nil {
168			if libkb.IsSocketClosedError(err) {
169				err = nil
170			}
171
172			k.log.Debug("listenLoop() done, error: %+v", err)
173			return err
174		}
175		go k.handle(c)
176	}
177}
178
179// Shutdown shuts down this KBFSService.
180func (k *KBFSService) Shutdown() {
181	k.stopOnce.Do(func() {
182		close(k.stopCh)
183	})
184}
185