1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package agent 16 17import ( 18 "errors" 19 "fmt" 20 "io/ioutil" 21 "net/url" 22 "os" 23 "os/exec" 24 "path/filepath" 25 "syscall" 26 "time" 27 28 "go.etcd.io/etcd/client/pkg/v3/fileutil" 29 "go.etcd.io/etcd/pkg/v3/proxy" 30 "go.etcd.io/etcd/tests/v3/functional/rpcpb" 31 32 "go.uber.org/zap" 33) 34 35// return error for system errors (e.g. fail to create files) 36// return status error in response for wrong configuration/operation (e.g. start etcd twice) 37func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response, err error) { 38 defer func() { 39 if err == nil && req != nil { 40 srv.last = req.Operation 41 srv.lg.Info("handler success", zap.String("operation", req.Operation.String())) 42 } 43 }() 44 if req != nil { 45 srv.Member = req.Member 46 srv.Tester = req.Tester 47 } 48 49 switch req.Operation { 50 case rpcpb.Operation_INITIAL_START_ETCD: 51 return srv.handle_INITIAL_START_ETCD(req) 52 case rpcpb.Operation_RESTART_ETCD: 53 return srv.handle_RESTART_ETCD(req) 54 55 case rpcpb.Operation_SIGTERM_ETCD: 56 return srv.handle_SIGTERM_ETCD() 57 case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA: 58 return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA() 59 60 case rpcpb.Operation_SAVE_SNAPSHOT: 61 return srv.handle_SAVE_SNAPSHOT() 62 case rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT: 63 return srv.handle_RESTORE_RESTART_FROM_SNAPSHOT(req) 64 case rpcpb.Operation_RESTART_FROM_SNAPSHOT: 65 return srv.handle_RESTART_FROM_SNAPSHOT(req) 66 67 case rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA: 68 return srv.handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() 69 case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT: 70 return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() 71 72 case rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX: 73 return srv.handle_BLACKHOLE_PEER_PORT_TX_RX(), nil 74 case rpcpb.Operation_UNBLACKHOLE_PEER_PORT_TX_RX: 75 return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX(), nil 76 case rpcpb.Operation_DELAY_PEER_PORT_TX_RX: 77 return srv.handle_DELAY_PEER_PORT_TX_RX(), nil 78 case rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX: 79 return srv.handle_UNDELAY_PEER_PORT_TX_RX(), nil 80 81 default: 82 msg := fmt.Sprintf("operation not found (%v)", req.Operation) 83 return &rpcpb.Response{Success: false, Status: msg}, errors.New(msg) 84 } 85} 86 87// just archive the first file 88func (srv *Server) createEtcdLogFile() error { 89 var err error 90 srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutputs[0]) 91 if err != nil { 92 return err 93 } 94 srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutputs[0])) 95 return nil 96} 97 98func (srv *Server) creatEtcd(fromSnapshot bool, failpoints string) error { 99 if !fileutil.Exist(srv.Member.EtcdExec) { 100 return fmt.Errorf("unknown etcd exec path %q does not exist", srv.Member.EtcdExec) 101 } 102 103 etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags() 104 if fromSnapshot { 105 etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags() 106 } 107 u, _ := url.Parse(srv.Member.FailpointHTTPAddr) 108 srv.lg.Info( 109 "creating etcd command", 110 zap.String("etcd-exec", etcdPath), 111 zap.Strings("etcd-flags", etcdFlags), 112 zap.String("GOFAIL_FAILPOINTS", failpoints), 113 zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr), 114 zap.String("failpoint-addr", u.Host), 115 ) 116 srv.etcdCmd = exec.Command(etcdPath, etcdFlags...) 117 srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host} 118 if failpoints != "" { 119 srv.etcdCmd.Env = append(srv.etcdCmd.Env, "GOFAIL_FAILPOINTS="+failpoints) 120 } 121 srv.etcdCmd.Stdout = srv.etcdLogFile 122 srv.etcdCmd.Stderr = srv.etcdLogFile 123 return nil 124} 125 126// start but do not wait for it to complete 127func (srv *Server) runEtcd() error { 128 errc := make(chan error) 129 go func() { 130 time.Sleep(5 * time.Second) 131 // server advertise client/peer listener had to start first 132 // before setting up proxy listener 133 errc <- srv.startProxy() 134 }() 135 136 if srv.etcdCmd != nil { 137 srv.lg.Info( 138 "starting etcd command", 139 zap.String("command-path", srv.etcdCmd.Path), 140 ) 141 err := srv.etcdCmd.Start() 142 perr := <-errc 143 srv.lg.Info( 144 "started etcd command", 145 zap.String("command-path", srv.etcdCmd.Path), 146 zap.Errors("errors", []error{err, perr}), 147 ) 148 if err != nil { 149 return err 150 } 151 return perr 152 } 153 154 select { 155 case <-srv.etcdServer.Server.ReadyNotify(): 156 srv.lg.Info("embedded etcd is ready") 157 case <-time.After(time.Minute): 158 srv.etcdServer.Close() 159 return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err()) 160 } 161 return <-errc 162} 163 164// SIGQUIT to exit with stackstrace 165func (srv *Server) stopEtcd(sig os.Signal) error { 166 srv.stopProxy() 167 168 if srv.etcdCmd != nil { 169 srv.lg.Info( 170 "stopping etcd command", 171 zap.String("command-path", srv.etcdCmd.Path), 172 zap.String("signal", sig.String()), 173 ) 174 175 err := srv.etcdCmd.Process.Signal(sig) 176 if err != nil { 177 return err 178 } 179 180 errc := make(chan error) 181 go func() { 182 _, ew := srv.etcdCmd.Process.Wait() 183 errc <- ew 184 close(errc) 185 }() 186 187 select { 188 case <-time.After(5 * time.Second): 189 srv.etcdCmd.Process.Kill() 190 case e := <-errc: 191 return e 192 } 193 194 err = <-errc 195 196 srv.lg.Info( 197 "stopped etcd command", 198 zap.String("command-path", srv.etcdCmd.Path), 199 zap.String("signal", sig.String()), 200 zap.Error(err), 201 ) 202 return err 203 } 204 205 srv.lg.Info("stopping embedded etcd") 206 srv.etcdServer.Server.HardStop() 207 srv.etcdServer.Close() 208 srv.lg.Info("stopped embedded etcd") 209 return nil 210} 211 212func (srv *Server) startProxy() error { 213 if srv.Member.EtcdClientProxy { 214 advertiseClientURL, advertiseClientURLPort, err := getURLAndPort(srv.Member.Etcd.AdvertiseClientURLs[0]) 215 if err != nil { 216 return err 217 } 218 listenClientURL, _, err := getURLAndPort(srv.Member.Etcd.ListenClientURLs[0]) 219 if err != nil { 220 return err 221 } 222 223 srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String())) 224 srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{ 225 Logger: srv.lg, 226 From: *advertiseClientURL, 227 To: *listenClientURL, 228 }) 229 select { 230 case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error(): 231 return err 232 case <-time.After(2 * time.Second): 233 srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String())) 234 } 235 } 236 237 if srv.Member.EtcdPeerProxy { 238 advertisePeerURL, advertisePeerURLPort, err := getURLAndPort(srv.Member.Etcd.AdvertisePeerURLs[0]) 239 if err != nil { 240 return err 241 } 242 listenPeerURL, _, err := getURLAndPort(srv.Member.Etcd.ListenPeerURLs[0]) 243 if err != nil { 244 return err 245 } 246 247 srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String())) 248 srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{ 249 Logger: srv.lg, 250 From: *advertisePeerURL, 251 To: *listenPeerURL, 252 }) 253 select { 254 case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error(): 255 return err 256 case <-time.After(2 * time.Second): 257 srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String())) 258 } 259 } 260 return nil 261} 262 263func (srv *Server) stopProxy() { 264 if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 { 265 for port, px := range srv.advertiseClientPortToProxy { 266 if err := px.Close(); err != nil { 267 srv.lg.Warn("failed to close proxy", zap.Int("port", port)) 268 continue 269 } 270 select { 271 case <-px.Done(): 272 // enough time to release port 273 time.Sleep(time.Second) 274 case <-time.After(time.Second): 275 } 276 srv.lg.Info("closed proxy", 277 zap.Int("port", port), 278 zap.String("from", px.From()), 279 zap.String("to", px.To()), 280 ) 281 } 282 srv.advertiseClientPortToProxy = make(map[int]proxy.Server) 283 } 284 if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 { 285 for port, px := range srv.advertisePeerPortToProxy { 286 if err := px.Close(); err != nil { 287 srv.lg.Warn("failed to close proxy", zap.Int("port", port)) 288 continue 289 } 290 select { 291 case <-px.Done(): 292 // enough time to release port 293 time.Sleep(time.Second) 294 case <-time.After(time.Second): 295 } 296 srv.lg.Info("closed proxy", 297 zap.Int("port", port), 298 zap.String("from", px.From()), 299 zap.String("to", px.To()), 300 ) 301 } 302 srv.advertisePeerPortToProxy = make(map[int]proxy.Server) 303 } 304} 305 306// if started with manual TLS, stores TLS assets 307// from tester/client to disk before starting etcd process 308func (srv *Server) saveTLSAssets() error { 309 if srv.Member.PeerCertPath != "" { 310 if srv.Member.PeerCertData == "" { 311 return fmt.Errorf("got empty data for %q", srv.Member.PeerCertPath) 312 } 313 if err := ioutil.WriteFile(srv.Member.PeerCertPath, []byte(srv.Member.PeerCertData), 0644); err != nil { 314 return err 315 } 316 } 317 if srv.Member.PeerKeyPath != "" { 318 if srv.Member.PeerKeyData == "" { 319 return fmt.Errorf("got empty data for %q", srv.Member.PeerKeyPath) 320 } 321 if err := ioutil.WriteFile(srv.Member.PeerKeyPath, []byte(srv.Member.PeerKeyData), 0644); err != nil { 322 return err 323 } 324 } 325 if srv.Member.PeerTrustedCAPath != "" { 326 if srv.Member.PeerTrustedCAData == "" { 327 return fmt.Errorf("got empty data for %q", srv.Member.PeerTrustedCAPath) 328 } 329 if err := ioutil.WriteFile(srv.Member.PeerTrustedCAPath, []byte(srv.Member.PeerTrustedCAData), 0644); err != nil { 330 return err 331 } 332 } 333 if srv.Member.PeerCertPath != "" && 334 srv.Member.PeerKeyPath != "" && 335 srv.Member.PeerTrustedCAPath != "" { 336 srv.lg.Info( 337 "wrote", 338 zap.String("peer-cert", srv.Member.PeerCertPath), 339 zap.String("peer-key", srv.Member.PeerKeyPath), 340 zap.String("peer-trusted-ca", srv.Member.PeerTrustedCAPath), 341 ) 342 } 343 344 if srv.Member.ClientCertPath != "" { 345 if srv.Member.ClientCertData == "" { 346 return fmt.Errorf("got empty data for %q", srv.Member.ClientCertPath) 347 } 348 if err := ioutil.WriteFile(srv.Member.ClientCertPath, []byte(srv.Member.ClientCertData), 0644); err != nil { 349 return err 350 } 351 } 352 if srv.Member.ClientKeyPath != "" { 353 if srv.Member.ClientKeyData == "" { 354 return fmt.Errorf("got empty data for %q", srv.Member.ClientKeyPath) 355 } 356 if err := ioutil.WriteFile(srv.Member.ClientKeyPath, []byte(srv.Member.ClientKeyData), 0644); err != nil { 357 return err 358 } 359 } 360 if srv.Member.ClientTrustedCAPath != "" { 361 if srv.Member.ClientTrustedCAData == "" { 362 return fmt.Errorf("got empty data for %q", srv.Member.ClientTrustedCAPath) 363 } 364 if err := ioutil.WriteFile(srv.Member.ClientTrustedCAPath, []byte(srv.Member.ClientTrustedCAData), 0644); err != nil { 365 return err 366 } 367 } 368 if srv.Member.ClientCertPath != "" && 369 srv.Member.ClientKeyPath != "" && 370 srv.Member.ClientTrustedCAPath != "" { 371 srv.lg.Info( 372 "wrote", 373 zap.String("client-cert", srv.Member.ClientCertPath), 374 zap.String("client-key", srv.Member.ClientKeyPath), 375 zap.String("client-trusted-ca", srv.Member.ClientTrustedCAPath), 376 ) 377 } 378 return nil 379} 380 381func (srv *Server) loadAutoTLSAssets() error { 382 if srv.Member.Etcd.PeerAutoTLS { 383 // in case of slow disk 384 time.Sleep(time.Second) 385 386 fdir := filepath.Join(srv.Member.Etcd.DataDir, "fixtures", "peer") 387 388 srv.lg.Info( 389 "loading peer auto TLS assets", 390 zap.String("dir", fdir), 391 zap.String("endpoint", srv.EtcdClientEndpoint), 392 ) 393 394 certPath := filepath.Join(fdir, "cert.pem") 395 if !fileutil.Exist(certPath) { 396 return fmt.Errorf("cannot find %q", certPath) 397 } 398 certData, err := ioutil.ReadFile(certPath) 399 if err != nil { 400 return fmt.Errorf("cannot read %q (%v)", certPath, err) 401 } 402 srv.Member.PeerCertData = string(certData) 403 404 keyPath := filepath.Join(fdir, "key.pem") 405 if !fileutil.Exist(keyPath) { 406 return fmt.Errorf("cannot find %q", keyPath) 407 } 408 keyData, err := ioutil.ReadFile(keyPath) 409 if err != nil { 410 return fmt.Errorf("cannot read %q (%v)", keyPath, err) 411 } 412 srv.Member.PeerKeyData = string(keyData) 413 414 srv.lg.Info( 415 "loaded peer auto TLS assets", 416 zap.String("peer-cert-path", certPath), 417 zap.Int("peer-cert-length", len(certData)), 418 zap.String("peer-key-path", keyPath), 419 zap.Int("peer-key-length", len(keyData)), 420 ) 421 } 422 423 if srv.Member.Etcd.ClientAutoTLS { 424 // in case of slow disk 425 time.Sleep(time.Second) 426 427 fdir := filepath.Join(srv.Member.Etcd.DataDir, "fixtures", "client") 428 429 srv.lg.Info( 430 "loading client TLS assets", 431 zap.String("dir", fdir), 432 zap.String("endpoint", srv.EtcdClientEndpoint), 433 ) 434 435 certPath := filepath.Join(fdir, "cert.pem") 436 if !fileutil.Exist(certPath) { 437 return fmt.Errorf("cannot find %q", certPath) 438 } 439 certData, err := ioutil.ReadFile(certPath) 440 if err != nil { 441 return fmt.Errorf("cannot read %q (%v)", certPath, err) 442 } 443 srv.Member.ClientCertData = string(certData) 444 445 keyPath := filepath.Join(fdir, "key.pem") 446 if !fileutil.Exist(keyPath) { 447 return fmt.Errorf("cannot find %q", keyPath) 448 } 449 keyData, err := ioutil.ReadFile(keyPath) 450 if err != nil { 451 return fmt.Errorf("cannot read %q (%v)", keyPath, err) 452 } 453 srv.Member.ClientKeyData = string(keyData) 454 455 srv.lg.Info( 456 "loaded client TLS assets", 457 zap.String("client-cert-path", certPath), 458 zap.Int("client-cert-length", len(certData)), 459 zap.String("client-key-path", keyPath), 460 zap.Int("client-key-length", len(keyData)), 461 ) 462 } 463 464 return nil 465} 466 467func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) { 468 if srv.last != rpcpb.Operation_NOT_STARTED { 469 return &rpcpb.Response{ 470 Success: false, 471 Status: fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()), 472 Member: req.Member, 473 }, nil 474 } 475 476 err := fileutil.TouchDirAll(srv.Member.BaseDir) 477 if err != nil { 478 return nil, err 479 } 480 srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir)) 481 482 if srv.etcdServer == nil { 483 if err = srv.createEtcdLogFile(); err != nil { 484 return nil, err 485 } 486 } 487 488 if err = srv.saveTLSAssets(); err != nil { 489 return nil, err 490 } 491 if err = srv.creatEtcd(false, req.Member.Failpoints); err != nil { 492 return nil, err 493 } 494 if err = srv.runEtcd(); err != nil { 495 return nil, err 496 } 497 if err = srv.loadAutoTLSAssets(); err != nil { 498 return nil, err 499 } 500 501 return &rpcpb.Response{ 502 Success: true, 503 Status: "start etcd PASS", 504 Member: srv.Member, 505 }, nil 506} 507 508func (srv *Server) handle_RESTART_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) { 509 var err error 510 if !fileutil.Exist(srv.Member.BaseDir) { 511 err = fileutil.TouchDirAll(srv.Member.BaseDir) 512 if err != nil { 513 return nil, err 514 } 515 } 516 517 if err = srv.saveTLSAssets(); err != nil { 518 return nil, err 519 } 520 if err = srv.creatEtcd(false, req.Member.Failpoints); err != nil { 521 return nil, err 522 } 523 if err = srv.runEtcd(); err != nil { 524 return nil, err 525 } 526 if err = srv.loadAutoTLSAssets(); err != nil { 527 return nil, err 528 } 529 530 return &rpcpb.Response{ 531 Success: true, 532 Status: "restart etcd PASS", 533 Member: srv.Member, 534 }, nil 535} 536 537func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) { 538 if err := srv.stopEtcd(syscall.SIGTERM); err != nil { 539 return nil, err 540 } 541 542 if srv.etcdServer != nil { 543 srv.etcdServer.GetLogger().Sync() 544 } else { 545 srv.etcdLogFile.Sync() 546 } 547 548 return &rpcpb.Response{ 549 Success: true, 550 Status: "killed etcd", 551 }, nil 552} 553 554func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) { 555 err := srv.stopEtcd(syscall.SIGQUIT) 556 if err != nil { 557 return nil, err 558 } 559 560 if srv.etcdServer != nil { 561 srv.etcdServer.GetLogger().Sync() 562 } else { 563 srv.etcdLogFile.Sync() 564 srv.etcdLogFile.Close() 565 } 566 567 // for debugging purposes, rename instead of removing 568 if err = os.RemoveAll(srv.Member.BaseDir + ".backup"); err != nil { 569 return nil, err 570 } 571 if err = os.Rename(srv.Member.BaseDir, srv.Member.BaseDir+".backup"); err != nil { 572 return nil, err 573 } 574 srv.lg.Info( 575 "renamed", 576 zap.String("base-dir", srv.Member.BaseDir), 577 zap.String("new-dir", srv.Member.BaseDir+".backup"), 578 ) 579 580 // create a new log file for next new member restart 581 if !fileutil.Exist(srv.Member.BaseDir) { 582 err = fileutil.TouchDirAll(srv.Member.BaseDir) 583 if err != nil { 584 return nil, err 585 } 586 } 587 588 return &rpcpb.Response{ 589 Success: true, 590 Status: "killed etcd and removed base directory", 591 }, nil 592} 593 594func (srv *Server) handle_SAVE_SNAPSHOT() (*rpcpb.Response, error) { 595 err := srv.Member.SaveSnapshot(srv.lg) 596 if err != nil { 597 return nil, err 598 } 599 return &rpcpb.Response{ 600 Success: true, 601 Status: "saved snapshot", 602 SnapshotInfo: srv.Member.SnapshotInfo, 603 }, nil 604} 605 606func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT(req *rpcpb.Request) (resp *rpcpb.Response, err error) { 607 err = srv.Member.RestoreSnapshot(srv.lg) 608 if err != nil { 609 return nil, err 610 } 611 resp, err = srv.handle_RESTART_FROM_SNAPSHOT(req) 612 if resp != nil && err == nil { 613 resp.Status = "restored snapshot and " + resp.Status 614 } 615 return resp, err 616} 617 618func (srv *Server) handle_RESTART_FROM_SNAPSHOT(req *rpcpb.Request) (resp *rpcpb.Response, err error) { 619 if err = srv.saveTLSAssets(); err != nil { 620 return nil, err 621 } 622 if err = srv.creatEtcd(true, req.Member.Failpoints); err != nil { 623 return nil, err 624 } 625 if err = srv.runEtcd(); err != nil { 626 return nil, err 627 } 628 if err = srv.loadAutoTLSAssets(); err != nil { 629 return nil, err 630 } 631 632 return &rpcpb.Response{ 633 Success: true, 634 Status: "restarted etcd from snapshot", 635 SnapshotInfo: srv.Member.SnapshotInfo, 636 }, nil 637} 638 639func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) { 640 err := srv.stopEtcd(syscall.SIGQUIT) 641 if err != nil { 642 return nil, err 643 } 644 645 if srv.etcdServer != nil { 646 srv.etcdServer.GetLogger().Sync() 647 } else { 648 srv.etcdLogFile.Sync() 649 srv.etcdLogFile.Close() 650 } 651 652 // TODO: support separate WAL directory 653 if err = archive( 654 srv.Member.BaseDir, 655 srv.Member.Etcd.LogOutputs[0], 656 srv.Member.Etcd.DataDir, 657 ); err != nil { 658 return nil, err 659 } 660 srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir)) 661 662 if srv.etcdServer == nil { 663 if err = srv.createEtcdLogFile(); err != nil { 664 return nil, err 665 } 666 } 667 668 // TODO: Verify whether this cleaning of 'cache pages' is needed. 669 srv.lg.Info("cleaning up page cache") 670 if err := cleanPageCache(); err != nil { 671 srv.lg.Warn("failed to clean up page cache", zap.String("error", err.Error())) 672 } 673 srv.lg.Info("cleaned up page cache") 674 675 return &rpcpb.Response{ 676 Success: true, 677 Status: "cleaned up etcd", 678 }, nil 679} 680 681// stop proxy, etcd, delete data directory 682func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) { 683 err := srv.stopEtcd(syscall.SIGQUIT) 684 if err != nil { 685 return nil, err 686 } 687 688 if srv.etcdServer != nil { 689 srv.etcdServer.GetLogger().Sync() 690 } else { 691 srv.etcdLogFile.Sync() 692 srv.etcdLogFile.Close() 693 } 694 695 err = os.RemoveAll(srv.Member.BaseDir) 696 if err != nil { 697 return nil, err 698 } 699 srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir)) 700 701 // stop agent server 702 srv.Stop() 703 704 return &rpcpb.Response{ 705 Success: true, 706 Status: "destroyed etcd and agent", 707 }, nil 708} 709 710func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response { 711 for port, px := range srv.advertisePeerPortToProxy { 712 srv.lg.Info("blackholing", zap.Int("peer-port", port)) 713 px.BlackholeTx() 714 px.BlackholeRx() 715 srv.lg.Info("blackholed", zap.Int("peer-port", port)) 716 } 717 return &rpcpb.Response{ 718 Success: true, 719 Status: "blackholed peer port tx/rx", 720 } 721} 722 723func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response { 724 for port, px := range srv.advertisePeerPortToProxy { 725 srv.lg.Info("unblackholing", zap.Int("peer-port", port)) 726 px.UnblackholeTx() 727 px.UnblackholeRx() 728 srv.lg.Info("unblackholed", zap.Int("peer-port", port)) 729 } 730 return &rpcpb.Response{ 731 Success: true, 732 Status: "unblackholed peer port tx/rx", 733 } 734} 735 736func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() *rpcpb.Response { 737 lat := time.Duration(srv.Tester.UpdatedDelayLatencyMs) * time.Millisecond 738 rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond 739 740 for port, px := range srv.advertisePeerPortToProxy { 741 srv.lg.Info("delaying", 742 zap.Int("peer-port", port), 743 zap.Duration("latency", lat), 744 zap.Duration("random-variable", rv), 745 ) 746 px.DelayTx(lat, rv) 747 px.DelayRx(lat, rv) 748 srv.lg.Info("delayed", 749 zap.Int("peer-port", port), 750 zap.Duration("latency", lat), 751 zap.Duration("random-variable", rv), 752 ) 753 } 754 755 return &rpcpb.Response{ 756 Success: true, 757 Status: "delayed peer port tx/rx", 758 } 759} 760 761func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() *rpcpb.Response { 762 for port, px := range srv.advertisePeerPortToProxy { 763 srv.lg.Info("undelaying", zap.Int("peer-port", port)) 764 px.UndelayTx() 765 px.UndelayRx() 766 srv.lg.Info("undelayed", zap.Int("peer-port", port)) 767 } 768 return &rpcpb.Response{ 769 Success: true, 770 Status: "undelayed peer port tx/rx", 771 } 772} 773