1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package agent
16
17import (
18	"errors"
19	"fmt"
20	"io/ioutil"
21	"net/url"
22	"os"
23	"os/exec"
24	"path/filepath"
25	"syscall"
26	"time"
27
28	"go.etcd.io/etcd/functional/rpcpb"
29	"go.etcd.io/etcd/pkg/fileutil"
30	"go.etcd.io/etcd/pkg/proxy"
31
32	"go.uber.org/zap"
33)
34
35// return error for system errors (e.g. fail to create files)
36// return status error in response for wrong configuration/operation (e.g. start etcd twice)
37func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response, err error) {
38	defer func() {
39		if err == nil && req != nil {
40			srv.last = req.Operation
41			srv.lg.Info("handler success", zap.String("operation", req.Operation.String()))
42		}
43	}()
44	if req != nil {
45		srv.Member = req.Member
46		srv.Tester = req.Tester
47	}
48
49	switch req.Operation {
50	case rpcpb.Operation_INITIAL_START_ETCD:
51		return srv.handle_INITIAL_START_ETCD(req)
52	case rpcpb.Operation_RESTART_ETCD:
53		return srv.handle_RESTART_ETCD()
54
55	case rpcpb.Operation_SIGTERM_ETCD:
56		return srv.handle_SIGTERM_ETCD()
57	case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA:
58		return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA()
59
60	case rpcpb.Operation_SAVE_SNAPSHOT:
61		return srv.handle_SAVE_SNAPSHOT()
62	case rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT:
63		return srv.handle_RESTORE_RESTART_FROM_SNAPSHOT()
64	case rpcpb.Operation_RESTART_FROM_SNAPSHOT:
65		return srv.handle_RESTART_FROM_SNAPSHOT()
66
67	case rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA:
68		return srv.handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA()
69	case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT:
70		return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
71
72	case rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX:
73		return srv.handle_BLACKHOLE_PEER_PORT_TX_RX(), nil
74	case rpcpb.Operation_UNBLACKHOLE_PEER_PORT_TX_RX:
75		return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX(), nil
76	case rpcpb.Operation_DELAY_PEER_PORT_TX_RX:
77		return srv.handle_DELAY_PEER_PORT_TX_RX(), nil
78	case rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX:
79		return srv.handle_UNDELAY_PEER_PORT_TX_RX(), nil
80
81	default:
82		msg := fmt.Sprintf("operation not found (%v)", req.Operation)
83		return &rpcpb.Response{Success: false, Status: msg}, errors.New(msg)
84	}
85}
86
87// just archive the first file
88func (srv *Server) createEtcdLogFile() error {
89	var err error
90	srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutputs[0])
91	if err != nil {
92		return err
93	}
94	srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutputs[0]))
95	return nil
96}
97
98func (srv *Server) creatEtcd(fromSnapshot bool) error {
99	if !fileutil.Exist(srv.Member.EtcdExec) {
100		return fmt.Errorf("unknown etcd exec path %q does not exist", srv.Member.EtcdExec)
101	}
102
103	etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
104	if fromSnapshot {
105		etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
106	}
107	u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
108	srv.lg.Info(
109		"creating etcd command",
110		zap.String("etcd-exec", etcdPath),
111		zap.Strings("etcd-flags", etcdFlags),
112		zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
113		zap.String("failpoint-addr", u.Host),
114	)
115	srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
116	srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
117	srv.etcdCmd.Stdout = srv.etcdLogFile
118	srv.etcdCmd.Stderr = srv.etcdLogFile
119	return nil
120}
121
122// start but do not wait for it to complete
123func (srv *Server) runEtcd() error {
124	errc := make(chan error)
125	go func() {
126		time.Sleep(5 * time.Second)
127		// server advertise client/peer listener had to start first
128		// before setting up proxy listener
129		errc <- srv.startProxy()
130	}()
131
132	if srv.etcdCmd != nil {
133		srv.lg.Info(
134			"starting etcd command",
135			zap.String("command-path", srv.etcdCmd.Path),
136		)
137		err := srv.etcdCmd.Start()
138		perr := <-errc
139		srv.lg.Info(
140			"started etcd command",
141			zap.String("command-path", srv.etcdCmd.Path),
142			zap.Errors("errors", []error{err, perr}),
143		)
144		if err != nil {
145			return err
146		}
147		return perr
148	}
149
150	select {
151	case <-srv.etcdServer.Server.ReadyNotify():
152		srv.lg.Info("embedded etcd is ready")
153	case <-time.After(time.Minute):
154		srv.etcdServer.Close()
155		return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err())
156	}
157	return <-errc
158}
159
160// SIGQUIT to exit with stackstrace
161func (srv *Server) stopEtcd(sig os.Signal) error {
162	srv.stopProxy()
163
164	if srv.etcdCmd != nil {
165		srv.lg.Info(
166			"stopping etcd command",
167			zap.String("command-path", srv.etcdCmd.Path),
168			zap.String("signal", sig.String()),
169		)
170
171		err := srv.etcdCmd.Process.Signal(sig)
172		if err != nil {
173			return err
174		}
175
176		errc := make(chan error)
177		go func() {
178			_, ew := srv.etcdCmd.Process.Wait()
179			errc <- ew
180			close(errc)
181		}()
182
183		select {
184		case <-time.After(5 * time.Second):
185			srv.etcdCmd.Process.Kill()
186		case e := <-errc:
187			return e
188		}
189
190		err = <-errc
191
192		srv.lg.Info(
193			"stopped etcd command",
194			zap.String("command-path", srv.etcdCmd.Path),
195			zap.String("signal", sig.String()),
196			zap.Error(err),
197		)
198		return err
199	}
200
201	srv.lg.Info("stopping embedded etcd")
202	srv.etcdServer.Server.HardStop()
203	srv.etcdServer.Close()
204	srv.lg.Info("stopped embedded etcd")
205	return nil
206}
207
208func (srv *Server) startProxy() error {
209	if srv.Member.EtcdClientProxy {
210		advertiseClientURL, advertiseClientURLPort, err := getURLAndPort(srv.Member.Etcd.AdvertiseClientURLs[0])
211		if err != nil {
212			return err
213		}
214		listenClientURL, _, err := getURLAndPort(srv.Member.Etcd.ListenClientURLs[0])
215		if err != nil {
216			return err
217		}
218
219		srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
220		srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
221			Logger: srv.lg,
222			From:   *advertiseClientURL,
223			To:     *listenClientURL,
224		})
225		select {
226		case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error():
227			return err
228		case <-time.After(2 * time.Second):
229			srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String()))
230		}
231	}
232
233	if srv.Member.EtcdPeerProxy {
234		advertisePeerURL, advertisePeerURLPort, err := getURLAndPort(srv.Member.Etcd.AdvertisePeerURLs[0])
235		if err != nil {
236			return err
237		}
238		listenPeerURL, _, err := getURLAndPort(srv.Member.Etcd.ListenPeerURLs[0])
239		if err != nil {
240			return err
241		}
242
243		srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
244		srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
245			Logger: srv.lg,
246			From:   *advertisePeerURL,
247			To:     *listenPeerURL,
248		})
249		select {
250		case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error():
251			return err
252		case <-time.After(2 * time.Second):
253			srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
254		}
255	}
256	return nil
257}
258
259func (srv *Server) stopProxy() {
260	if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 {
261		for port, px := range srv.advertiseClientPortToProxy {
262			if err := px.Close(); err != nil {
263				srv.lg.Warn("failed to close proxy", zap.Int("port", port))
264				continue
265			}
266			select {
267			case <-px.Done():
268				// enough time to release port
269				time.Sleep(time.Second)
270			case <-time.After(time.Second):
271			}
272			srv.lg.Info("closed proxy",
273				zap.Int("port", port),
274				zap.String("from", px.From()),
275				zap.String("to", px.To()),
276			)
277		}
278		srv.advertiseClientPortToProxy = make(map[int]proxy.Server)
279	}
280	if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 {
281		for port, px := range srv.advertisePeerPortToProxy {
282			if err := px.Close(); err != nil {
283				srv.lg.Warn("failed to close proxy", zap.Int("port", port))
284				continue
285			}
286			select {
287			case <-px.Done():
288				// enough time to release port
289				time.Sleep(time.Second)
290			case <-time.After(time.Second):
291			}
292			srv.lg.Info("closed proxy",
293				zap.Int("port", port),
294				zap.String("from", px.From()),
295				zap.String("to", px.To()),
296			)
297		}
298		srv.advertisePeerPortToProxy = make(map[int]proxy.Server)
299	}
300}
301
302// if started with manual TLS, stores TLS assets
303// from tester/client to disk before starting etcd process
304func (srv *Server) saveTLSAssets() error {
305	if srv.Member.PeerCertPath != "" {
306		if srv.Member.PeerCertData == "" {
307			return fmt.Errorf("got empty data for %q", srv.Member.PeerCertPath)
308		}
309		if err := ioutil.WriteFile(srv.Member.PeerCertPath, []byte(srv.Member.PeerCertData), 0644); err != nil {
310			return err
311		}
312	}
313	if srv.Member.PeerKeyPath != "" {
314		if srv.Member.PeerKeyData == "" {
315			return fmt.Errorf("got empty data for %q", srv.Member.PeerKeyPath)
316		}
317		if err := ioutil.WriteFile(srv.Member.PeerKeyPath, []byte(srv.Member.PeerKeyData), 0644); err != nil {
318			return err
319		}
320	}
321	if srv.Member.PeerTrustedCAPath != "" {
322		if srv.Member.PeerTrustedCAData == "" {
323			return fmt.Errorf("got empty data for %q", srv.Member.PeerTrustedCAPath)
324		}
325		if err := ioutil.WriteFile(srv.Member.PeerTrustedCAPath, []byte(srv.Member.PeerTrustedCAData), 0644); err != nil {
326			return err
327		}
328	}
329	if srv.Member.PeerCertPath != "" &&
330		srv.Member.PeerKeyPath != "" &&
331		srv.Member.PeerTrustedCAPath != "" {
332		srv.lg.Info(
333			"wrote",
334			zap.String("peer-cert", srv.Member.PeerCertPath),
335			zap.String("peer-key", srv.Member.PeerKeyPath),
336			zap.String("peer-trusted-ca", srv.Member.PeerTrustedCAPath),
337		)
338	}
339
340	if srv.Member.ClientCertPath != "" {
341		if srv.Member.ClientCertData == "" {
342			return fmt.Errorf("got empty data for %q", srv.Member.ClientCertPath)
343		}
344		if err := ioutil.WriteFile(srv.Member.ClientCertPath, []byte(srv.Member.ClientCertData), 0644); err != nil {
345			return err
346		}
347	}
348	if srv.Member.ClientKeyPath != "" {
349		if srv.Member.ClientKeyData == "" {
350			return fmt.Errorf("got empty data for %q", srv.Member.ClientKeyPath)
351		}
352		if err := ioutil.WriteFile(srv.Member.ClientKeyPath, []byte(srv.Member.ClientKeyData), 0644); err != nil {
353			return err
354		}
355	}
356	if srv.Member.ClientTrustedCAPath != "" {
357		if srv.Member.ClientTrustedCAData == "" {
358			return fmt.Errorf("got empty data for %q", srv.Member.ClientTrustedCAPath)
359		}
360		if err := ioutil.WriteFile(srv.Member.ClientTrustedCAPath, []byte(srv.Member.ClientTrustedCAData), 0644); err != nil {
361			return err
362		}
363	}
364	if srv.Member.ClientCertPath != "" &&
365		srv.Member.ClientKeyPath != "" &&
366		srv.Member.ClientTrustedCAPath != "" {
367		srv.lg.Info(
368			"wrote",
369			zap.String("client-cert", srv.Member.ClientCertPath),
370			zap.String("client-key", srv.Member.ClientKeyPath),
371			zap.String("client-trusted-ca", srv.Member.ClientTrustedCAPath),
372		)
373	}
374	return nil
375}
376
377func (srv *Server) loadAutoTLSAssets() error {
378	if srv.Member.Etcd.PeerAutoTLS {
379		// in case of slow disk
380		time.Sleep(time.Second)
381
382		fdir := filepath.Join(srv.Member.Etcd.DataDir, "fixtures", "peer")
383
384		srv.lg.Info(
385			"loading peer auto TLS assets",
386			zap.String("dir", fdir),
387			zap.String("endpoint", srv.EtcdClientEndpoint),
388		)
389
390		certPath := filepath.Join(fdir, "cert.pem")
391		if !fileutil.Exist(certPath) {
392			return fmt.Errorf("cannot find %q", certPath)
393		}
394		certData, err := ioutil.ReadFile(certPath)
395		if err != nil {
396			return fmt.Errorf("cannot read %q (%v)", certPath, err)
397		}
398		srv.Member.PeerCertData = string(certData)
399
400		keyPath := filepath.Join(fdir, "key.pem")
401		if !fileutil.Exist(keyPath) {
402			return fmt.Errorf("cannot find %q", keyPath)
403		}
404		keyData, err := ioutil.ReadFile(keyPath)
405		if err != nil {
406			return fmt.Errorf("cannot read %q (%v)", keyPath, err)
407		}
408		srv.Member.PeerKeyData = string(keyData)
409
410		srv.lg.Info(
411			"loaded peer auto TLS assets",
412			zap.String("peer-cert-path", certPath),
413			zap.Int("peer-cert-length", len(certData)),
414			zap.String("peer-key-path", keyPath),
415			zap.Int("peer-key-length", len(keyData)),
416		)
417	}
418
419	if srv.Member.Etcd.ClientAutoTLS {
420		// in case of slow disk
421		time.Sleep(time.Second)
422
423		fdir := filepath.Join(srv.Member.Etcd.DataDir, "fixtures", "client")
424
425		srv.lg.Info(
426			"loading client TLS assets",
427			zap.String("dir", fdir),
428			zap.String("endpoint", srv.EtcdClientEndpoint),
429		)
430
431		certPath := filepath.Join(fdir, "cert.pem")
432		if !fileutil.Exist(certPath) {
433			return fmt.Errorf("cannot find %q", certPath)
434		}
435		certData, err := ioutil.ReadFile(certPath)
436		if err != nil {
437			return fmt.Errorf("cannot read %q (%v)", certPath, err)
438		}
439		srv.Member.ClientCertData = string(certData)
440
441		keyPath := filepath.Join(fdir, "key.pem")
442		if !fileutil.Exist(keyPath) {
443			return fmt.Errorf("cannot find %q", keyPath)
444		}
445		keyData, err := ioutil.ReadFile(keyPath)
446		if err != nil {
447			return fmt.Errorf("cannot read %q (%v)", keyPath, err)
448		}
449		srv.Member.ClientKeyData = string(keyData)
450
451		srv.lg.Info(
452			"loaded client TLS assets",
453			zap.String("client-cert-path", certPath),
454			zap.Int("client-cert-length", len(certData)),
455			zap.String("client-key-path", keyPath),
456			zap.Int("client-key-length", len(keyData)),
457		)
458	}
459
460	return nil
461}
462
463func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
464	if srv.last != rpcpb.Operation_NOT_STARTED {
465		return &rpcpb.Response{
466			Success: false,
467			Status:  fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
468			Member:  req.Member,
469		}, nil
470	}
471
472	err := fileutil.TouchDirAll(srv.Member.BaseDir)
473	if err != nil {
474		return nil, err
475	}
476	srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
477
478	if srv.etcdServer == nil {
479		if err = srv.createEtcdLogFile(); err != nil {
480			return nil, err
481		}
482	}
483
484	if err = srv.saveTLSAssets(); err != nil {
485		return nil, err
486	}
487	if err = srv.creatEtcd(false); err != nil {
488		return nil, err
489	}
490	if err = srv.runEtcd(); err != nil {
491		return nil, err
492	}
493	if err = srv.loadAutoTLSAssets(); err != nil {
494		return nil, err
495	}
496
497	return &rpcpb.Response{
498		Success: true,
499		Status:  "start etcd PASS",
500		Member:  srv.Member,
501	}, nil
502}
503
504func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
505	var err error
506	if !fileutil.Exist(srv.Member.BaseDir) {
507		err = fileutil.TouchDirAll(srv.Member.BaseDir)
508		if err != nil {
509			return nil, err
510		}
511	}
512
513	if err = srv.saveTLSAssets(); err != nil {
514		return nil, err
515	}
516	if err = srv.creatEtcd(false); err != nil {
517		return nil, err
518	}
519	if err = srv.runEtcd(); err != nil {
520		return nil, err
521	}
522	if err = srv.loadAutoTLSAssets(); err != nil {
523		return nil, err
524	}
525
526	return &rpcpb.Response{
527		Success: true,
528		Status:  "restart etcd PASS",
529		Member:  srv.Member,
530	}, nil
531}
532
533func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
534	if err := srv.stopEtcd(syscall.SIGTERM); err != nil {
535		return nil, err
536	}
537
538	if srv.etcdServer != nil {
539		srv.etcdServer.GetLogger().Sync()
540	} else {
541		srv.etcdLogFile.Sync()
542	}
543
544	return &rpcpb.Response{
545		Success: true,
546		Status:  "killed etcd",
547	}, nil
548}
549
550func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
551	err := srv.stopEtcd(syscall.SIGQUIT)
552	if err != nil {
553		return nil, err
554	}
555
556	if srv.etcdServer != nil {
557		srv.etcdServer.GetLogger().Sync()
558	} else {
559		srv.etcdLogFile.Sync()
560		srv.etcdLogFile.Close()
561	}
562
563	// for debugging purposes, rename instead of removing
564	if err = os.RemoveAll(srv.Member.BaseDir + ".backup"); err != nil {
565		return nil, err
566	}
567	if err = os.Rename(srv.Member.BaseDir, srv.Member.BaseDir+".backup"); err != nil {
568		return nil, err
569	}
570	srv.lg.Info(
571		"renamed",
572		zap.String("base-dir", srv.Member.BaseDir),
573		zap.String("new-dir", srv.Member.BaseDir+".backup"),
574	)
575
576	// create a new log file for next new member restart
577	if !fileutil.Exist(srv.Member.BaseDir) {
578		err = fileutil.TouchDirAll(srv.Member.BaseDir)
579		if err != nil {
580			return nil, err
581		}
582	}
583
584	return &rpcpb.Response{
585		Success: true,
586		Status:  "killed etcd and removed base directory",
587	}, nil
588}
589
590func (srv *Server) handle_SAVE_SNAPSHOT() (*rpcpb.Response, error) {
591	err := srv.Member.SaveSnapshot(srv.lg)
592	if err != nil {
593		return nil, err
594	}
595	return &rpcpb.Response{
596		Success:      true,
597		Status:       "saved snapshot",
598		SnapshotInfo: srv.Member.SnapshotInfo,
599	}, nil
600}
601
602func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
603	err = srv.Member.RestoreSnapshot(srv.lg)
604	if err != nil {
605		return nil, err
606	}
607	resp, err = srv.handle_RESTART_FROM_SNAPSHOT()
608	if resp != nil && err == nil {
609		resp.Status = "restored snapshot and " + resp.Status
610	}
611	return resp, err
612}
613
614func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
615	if err = srv.saveTLSAssets(); err != nil {
616		return nil, err
617	}
618	if err = srv.creatEtcd(true); err != nil {
619		return nil, err
620	}
621	if err = srv.runEtcd(); err != nil {
622		return nil, err
623	}
624	if err = srv.loadAutoTLSAssets(); err != nil {
625		return nil, err
626	}
627
628	return &rpcpb.Response{
629		Success:      true,
630		Status:       "restarted etcd from snapshot",
631		SnapshotInfo: srv.Member.SnapshotInfo,
632	}, nil
633}
634
635func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) {
636	err := srv.stopEtcd(syscall.SIGQUIT)
637	if err != nil {
638		return nil, err
639	}
640
641	if srv.etcdServer != nil {
642		srv.etcdServer.GetLogger().Sync()
643	} else {
644		srv.etcdLogFile.Sync()
645		srv.etcdLogFile.Close()
646	}
647
648	// TODO: support separate WAL directory
649	if err = archive(
650		srv.Member.BaseDir,
651		srv.Member.Etcd.LogOutputs[0],
652		srv.Member.Etcd.DataDir,
653	); err != nil {
654		return nil, err
655	}
656	srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir))
657
658	if srv.etcdServer == nil {
659		if err = srv.createEtcdLogFile(); err != nil {
660			return nil, err
661		}
662	}
663
664	srv.lg.Info("cleaning up page cache")
665	if err := cleanPageCache(); err != nil {
666		srv.lg.Warn("failed to clean up page cache", zap.String("error", err.Error()))
667	}
668	srv.lg.Info("cleaned up page cache")
669
670	return &rpcpb.Response{
671		Success: true,
672		Status:  "cleaned up etcd",
673	}, nil
674}
675
676// stop proxy, etcd, delete data directory
677func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
678	err := srv.stopEtcd(syscall.SIGQUIT)
679	if err != nil {
680		return nil, err
681	}
682
683	if srv.etcdServer != nil {
684		srv.etcdServer.GetLogger().Sync()
685	} else {
686		srv.etcdLogFile.Sync()
687		srv.etcdLogFile.Close()
688	}
689
690	err = os.RemoveAll(srv.Member.BaseDir)
691	if err != nil {
692		return nil, err
693	}
694	srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir))
695
696	// stop agent server
697	srv.Stop()
698
699	return &rpcpb.Response{
700		Success: true,
701		Status:  "destroyed etcd and agent",
702	}, nil
703}
704
705func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
706	for port, px := range srv.advertisePeerPortToProxy {
707		srv.lg.Info("blackholing", zap.Int("peer-port", port))
708		px.BlackholeTx()
709		px.BlackholeRx()
710		srv.lg.Info("blackholed", zap.Int("peer-port", port))
711	}
712	return &rpcpb.Response{
713		Success: true,
714		Status:  "blackholed peer port tx/rx",
715	}
716}
717
718func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
719	for port, px := range srv.advertisePeerPortToProxy {
720		srv.lg.Info("unblackholing", zap.Int("peer-port", port))
721		px.UnblackholeTx()
722		px.UnblackholeRx()
723		srv.lg.Info("unblackholed", zap.Int("peer-port", port))
724	}
725	return &rpcpb.Response{
726		Success: true,
727		Status:  "unblackholed peer port tx/rx",
728	}
729}
730
731func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() *rpcpb.Response {
732	lat := time.Duration(srv.Tester.UpdatedDelayLatencyMs) * time.Millisecond
733	rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond
734
735	for port, px := range srv.advertisePeerPortToProxy {
736		srv.lg.Info("delaying",
737			zap.Int("peer-port", port),
738			zap.Duration("latency", lat),
739			zap.Duration("random-variable", rv),
740		)
741		px.DelayTx(lat, rv)
742		px.DelayRx(lat, rv)
743		srv.lg.Info("delayed",
744			zap.Int("peer-port", port),
745			zap.Duration("latency", lat),
746			zap.Duration("random-variable", rv),
747		)
748	}
749
750	return &rpcpb.Response{
751		Success: true,
752		Status:  "delayed peer port tx/rx",
753	}
754}
755
756func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() *rpcpb.Response {
757	for port, px := range srv.advertisePeerPortToProxy {
758		srv.lg.Info("undelaying", zap.Int("peer-port", port))
759		px.UndelayTx()
760		px.UndelayRx()
761		srv.lg.Info("undelayed", zap.Int("peer-port", port))
762	}
763	return &rpcpb.Response{
764		Success: true,
765		Status:  "undelayed peer port tx/rx",
766	}
767}
768