1package daemon 2 3import ( 4 "errors" 5 "net" 6 "sync" 7 8 "src.elv.sh/pkg/daemon/daemondefs" 9 "src.elv.sh/pkg/daemon/internal/api" 10 "src.elv.sh/pkg/rpc" 11 "src.elv.sh/pkg/store/storedefs" 12) 13 14const retriesOnShutdown = 3 15 16var ( 17 // ErrDaemonUnreachable is returned when the daemon cannot be reached after 18 // several retries. 19 ErrDaemonUnreachable = errors.New("daemon offline") 20) 21 22// Implementation of the Client interface. 23type client struct { 24 sockPath string 25 rpcClient *rpc.Client 26 waits sync.WaitGroup 27} 28 29// NewClient creates a new Client instance that talks to the socket. Connection 30// creation is deferred to the first request. 31func NewClient(sockPath string) daemondefs.Client { 32 return &client{sockPath, nil, sync.WaitGroup{}} 33} 34 35// SockPath returns the socket path that the Client talks to. If the client is 36// nil, it returns an empty string. 37func (c *client) SockPath() string { 38 return c.sockPath 39} 40 41// ResetConn resets the current connection. A new connection will be established 42// the next time a request is made. If the client is nil, it does nothing. 43func (c *client) ResetConn() error { 44 if c.rpcClient == nil { 45 return nil 46 } 47 rc := c.rpcClient 48 c.rpcClient = nil 49 return rc.Close() 50} 51 52// Close waits for all outstanding requests to finish and close the connection. 53// If the client is nil, it does nothing and returns nil. 54func (c *client) Close() error { 55 c.waits.Wait() 56 return c.ResetConn() 57} 58 59func (c *client) call(f string, req, res interface{}) error { 60 c.waits.Add(1) 61 defer c.waits.Done() 62 63 for attempt := 0; attempt < retriesOnShutdown; attempt++ { 64 if c.rpcClient == nil { 65 conn, err := net.Dial("unix", c.sockPath) 66 if err != nil { 67 return err 68 } 69 c.rpcClient = rpc.NewClient(conn) 70 } 71 72 err := c.rpcClient.Call(api.ServiceName+"."+f, req, res) 73 if err == rpc.ErrShutdown { 74 // Clear rpcClient so as to reconnect next time 75 c.rpcClient = nil 76 continue 77 } else { 78 return err 79 } 80 } 81 return ErrDaemonUnreachable 82} 83 84// Convenience methods for RPC methods. These are quite repetitive; when the 85// number of RPC calls grow above some threshold, a code generator should be 86// written to generate them. 87 88func (c *client) Version() (int, error) { 89 req := &api.VersionRequest{} 90 res := &api.VersionResponse{} 91 err := c.call("Version", req, res) 92 return res.Version, err 93} 94 95func (c *client) Pid() (int, error) { 96 req := &api.PidRequest{} 97 res := &api.PidResponse{} 98 err := c.call("Pid", req, res) 99 return res.Pid, err 100} 101 102func (c *client) NextCmdSeq() (int, error) { 103 req := &api.NextCmdRequest{} 104 res := &api.NextCmdSeqResponse{} 105 err := c.call("NextCmdSeq", req, res) 106 return res.Seq, err 107} 108 109func (c *client) AddCmd(text string) (int, error) { 110 req := &api.AddCmdRequest{Text: text} 111 res := &api.AddCmdResponse{} 112 err := c.call("AddCmd", req, res) 113 return res.Seq, err 114} 115 116func (c *client) DelCmd(seq int) error { 117 req := &api.DelCmdRequest{Seq: seq} 118 res := &api.DelCmdResponse{} 119 err := c.call("DelCmd", req, res) 120 return err 121} 122 123func (c *client) Cmd(seq int) (string, error) { 124 req := &api.CmdRequest{Seq: seq} 125 res := &api.CmdResponse{} 126 err := c.call("Cmd", req, res) 127 return res.Text, err 128} 129 130func (c *client) CmdsWithSeq(from, upto int) ([]storedefs.Cmd, error) { 131 req := &api.CmdsWithSeqRequest{From: from, Upto: upto} 132 res := &api.CmdsWithSeqResponse{} 133 err := c.call("CmdsWithSeq", req, res) 134 return res.Cmds, err 135} 136 137func (c *client) NextCmd(from int, prefix string) (storedefs.Cmd, error) { 138 req := &api.NextCmdRequest{From: from, Prefix: prefix} 139 res := &api.NextCmdResponse{} 140 err := c.call("NextCmd", req, res) 141 return storedefs.Cmd{Text: res.Text, Seq: res.Seq}, err 142} 143 144func (c *client) PrevCmd(upto int, prefix string) (storedefs.Cmd, error) { 145 req := &api.PrevCmdRequest{Upto: upto, Prefix: prefix} 146 res := &api.PrevCmdResponse{} 147 err := c.call("PrevCmd", req, res) 148 return storedefs.Cmd{Text: res.Text, Seq: res.Seq}, err 149} 150 151func (c *client) AddDir(dir string, incFactor float64) error { 152 req := &api.AddDirRequest{Dir: dir, IncFactor: incFactor} 153 res := &api.AddDirResponse{} 154 err := c.call("AddDir", req, res) 155 return err 156} 157 158func (c *client) DelDir(dir string) error { 159 req := &api.DelDirRequest{Dir: dir} 160 res := &api.DelDirResponse{} 161 err := c.call("DelDir", req, res) 162 return err 163} 164 165func (c *client) Dirs(blacklist map[string]struct{}) ([]storedefs.Dir, error) { 166 req := &api.DirsRequest{Blacklist: blacklist} 167 res := &api.DirsResponse{} 168 err := c.call("Dirs", req, res) 169 return res.Dirs, err 170} 171 172func (c *client) SharedVar(name string) (string, error) { 173 req := &api.SharedVarRequest{Name: name} 174 res := &api.SharedVarResponse{} 175 err := c.call("SharedVar", req, res) 176 return res.Value, err 177} 178 179func (c *client) SetSharedVar(name, value string) error { 180 req := &api.SetSharedVarRequest{Name: name, Value: value} 181 res := &api.SetSharedVarResponse{} 182 return c.call("SetSharedVar", req, res) 183} 184 185func (c *client) DelSharedVar(name string) error { 186 req := &api.DelSharedVarRequest{Name: name} 187 res := &api.DelSharedVarResponse{} 188 return c.call("DelSharedVar", req, res) 189} 190