1// Copyright 2015 The go-ethereum Authors 2// This file is part of the go-ethereum library. 3// 4// The go-ethereum library is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Lesser General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// The go-ethereum library is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Lesser General Public License for more details. 13// 14// You should have received a copy of the GNU Lesser General Public License 15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. 16 17package node 18 19import ( 20 "errors" 21 "fmt" 22 "net/http" 23 "os" 24 "path/filepath" 25 "reflect" 26 "strings" 27 "sync" 28 29 "github.com/ethereum/go-ethereum/accounts" 30 "github.com/ethereum/go-ethereum/core/rawdb" 31 "github.com/ethereum/go-ethereum/ethdb" 32 "github.com/ethereum/go-ethereum/event" 33 "github.com/ethereum/go-ethereum/log" 34 "github.com/ethereum/go-ethereum/p2p" 35 "github.com/ethereum/go-ethereum/rpc" 36 "github.com/prometheus/tsdb/fileutil" 37) 38 39// Node is a container on which services can be registered. 40type Node struct { 41 eventmux *event.TypeMux 42 config *Config 43 accman *accounts.Manager 44 log log.Logger 45 keyDir string // key store directory 46 keyDirTemp bool // If true, key directory will be removed by Stop 47 dirLock fileutil.Releaser // prevents concurrent use of instance directory 48 stop chan struct{} // Channel to wait for termination notifications 49 server *p2p.Server // Currently running P2P networking layer 50 startStopLock sync.Mutex // Start/Stop are protected by an additional lock 51 state int // Tracks state of node lifecycle 52 53 lock sync.Mutex 54 lifecycles []Lifecycle // All registered backends, services, and auxiliary services that have a lifecycle 55 rpcAPIs []rpc.API // List of APIs currently provided by the node 56 http *httpServer // 57 ws *httpServer // 58 ipc *ipcServer // Stores information about the ipc http server 59 inprocHandler *rpc.Server // In-process RPC request handler to process the API requests 60 61 databases map[*closeTrackingDB]struct{} // All open databases 62} 63 64const ( 65 initializingState = iota 66 runningState 67 closedState 68) 69 70// New creates a new P2P node, ready for protocol registration. 71func New(conf *Config) (*Node, error) { 72 // Copy config and resolve the datadir so future changes to the current 73 // working directory don't affect the node. 74 confCopy := *conf 75 conf = &confCopy 76 if conf.DataDir != "" { 77 absdatadir, err := filepath.Abs(conf.DataDir) 78 if err != nil { 79 return nil, err 80 } 81 conf.DataDir = absdatadir 82 } 83 if conf.Logger == nil { 84 conf.Logger = log.New() 85 } 86 87 // Ensure that the instance name doesn't cause weird conflicts with 88 // other files in the data directory. 89 if strings.ContainsAny(conf.Name, `/\`) { 90 return nil, errors.New(`Config.Name must not contain '/' or '\'`) 91 } 92 if conf.Name == datadirDefaultKeyStore { 93 return nil, errors.New(`Config.Name cannot be "` + datadirDefaultKeyStore + `"`) 94 } 95 if strings.HasSuffix(conf.Name, ".ipc") { 96 return nil, errors.New(`Config.Name cannot end in ".ipc"`) 97 } 98 99 node := &Node{ 100 config: conf, 101 inprocHandler: rpc.NewServer(), 102 eventmux: new(event.TypeMux), 103 log: conf.Logger, 104 stop: make(chan struct{}), 105 server: &p2p.Server{Config: conf.P2P}, 106 databases: make(map[*closeTrackingDB]struct{}), 107 } 108 109 // Register built-in APIs. 110 node.rpcAPIs = append(node.rpcAPIs, node.apis()...) 111 112 // Acquire the instance directory lock. 113 if err := node.openDataDir(); err != nil { 114 return nil, err 115 } 116 keyDir, isEphem, err := getKeyStoreDir(conf) 117 if err != nil { 118 return nil, err 119 } 120 node.keyDir = keyDir 121 node.keyDirTemp = isEphem 122 // Creates an empty AccountManager with no backends. Callers (e.g. cmd/geth) 123 // are required to add the backends later on. 124 node.accman = accounts.NewManager(&accounts.Config{InsecureUnlockAllowed: conf.InsecureUnlockAllowed}) 125 126 // Initialize the p2p server. This creates the node key and discovery databases. 127 node.server.Config.PrivateKey = node.config.NodeKey() 128 node.server.Config.Name = node.config.NodeName() 129 node.server.Config.Logger = node.log 130 if node.server.Config.StaticNodes == nil { 131 node.server.Config.StaticNodes = node.config.StaticNodes() 132 } 133 if node.server.Config.TrustedNodes == nil { 134 node.server.Config.TrustedNodes = node.config.TrustedNodes() 135 } 136 if node.server.Config.NodeDatabase == "" { 137 node.server.Config.NodeDatabase = node.config.NodeDB() 138 } 139 140 // Check HTTP/WS prefixes are valid. 141 if err := validatePrefix("HTTP", conf.HTTPPathPrefix); err != nil { 142 return nil, err 143 } 144 if err := validatePrefix("WebSocket", conf.WSPathPrefix); err != nil { 145 return nil, err 146 } 147 148 // Configure RPC servers. 149 node.http = newHTTPServer(node.log, conf.HTTPTimeouts) 150 node.ws = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts) 151 node.ipc = newIPCServer(node.log, conf.IPCEndpoint()) 152 153 return node, nil 154} 155 156// Start starts all registered lifecycles, RPC services and p2p networking. 157// Node can only be started once. 158func (n *Node) Start() error { 159 n.startStopLock.Lock() 160 defer n.startStopLock.Unlock() 161 162 n.lock.Lock() 163 switch n.state { 164 case runningState: 165 n.lock.Unlock() 166 return ErrNodeRunning 167 case closedState: 168 n.lock.Unlock() 169 return ErrNodeStopped 170 } 171 n.state = runningState 172 // open networking and RPC endpoints 173 err := n.openEndpoints() 174 lifecycles := make([]Lifecycle, len(n.lifecycles)) 175 copy(lifecycles, n.lifecycles) 176 n.lock.Unlock() 177 178 // Check if endpoint startup failed. 179 if err != nil { 180 n.doClose(nil) 181 return err 182 } 183 // Start all registered lifecycles. 184 var started []Lifecycle 185 for _, lifecycle := range lifecycles { 186 if err = lifecycle.Start(); err != nil { 187 break 188 } 189 started = append(started, lifecycle) 190 } 191 // Check if any lifecycle failed to start. 192 if err != nil { 193 n.stopServices(started) 194 n.doClose(nil) 195 } 196 return err 197} 198 199// Close stops the Node and releases resources acquired in 200// Node constructor New. 201func (n *Node) Close() error { 202 n.startStopLock.Lock() 203 defer n.startStopLock.Unlock() 204 205 n.lock.Lock() 206 state := n.state 207 n.lock.Unlock() 208 switch state { 209 case initializingState: 210 // The node was never started. 211 return n.doClose(nil) 212 case runningState: 213 // The node was started, release resources acquired by Start(). 214 var errs []error 215 if err := n.stopServices(n.lifecycles); err != nil { 216 errs = append(errs, err) 217 } 218 return n.doClose(errs) 219 case closedState: 220 return ErrNodeStopped 221 default: 222 panic(fmt.Sprintf("node is in unknown state %d", state)) 223 } 224} 225 226// doClose releases resources acquired by New(), collecting errors. 227func (n *Node) doClose(errs []error) error { 228 // Close databases. This needs the lock because it needs to 229 // synchronize with OpenDatabase*. 230 n.lock.Lock() 231 n.state = closedState 232 errs = append(errs, n.closeDatabases()...) 233 n.lock.Unlock() 234 235 if err := n.accman.Close(); err != nil { 236 errs = append(errs, err) 237 } 238 if n.keyDirTemp { 239 if err := os.RemoveAll(n.keyDir); err != nil { 240 errs = append(errs, err) 241 } 242 } 243 244 // Release instance directory lock. 245 n.closeDataDir() 246 247 // Unblock n.Wait. 248 close(n.stop) 249 250 // Report any errors that might have occurred. 251 switch len(errs) { 252 case 0: 253 return nil 254 case 1: 255 return errs[0] 256 default: 257 return fmt.Errorf("%v", errs) 258 } 259} 260 261// openEndpoints starts all network and RPC endpoints. 262func (n *Node) openEndpoints() error { 263 // start networking endpoints 264 n.log.Info("Starting peer-to-peer node", "instance", n.server.Name) 265 if err := n.server.Start(); err != nil { 266 return convertFileLockError(err) 267 } 268 // start RPC endpoints 269 err := n.startRPC() 270 if err != nil { 271 n.stopRPC() 272 n.server.Stop() 273 } 274 return err 275} 276 277// containsLifecycle checks if 'lfs' contains 'l'. 278func containsLifecycle(lfs []Lifecycle, l Lifecycle) bool { 279 for _, obj := range lfs { 280 if obj == l { 281 return true 282 } 283 } 284 return false 285} 286 287// stopServices terminates running services, RPC and p2p networking. 288// It is the inverse of Start. 289func (n *Node) stopServices(running []Lifecycle) error { 290 n.stopRPC() 291 292 // Stop running lifecycles in reverse order. 293 failure := &StopError{Services: make(map[reflect.Type]error)} 294 for i := len(running) - 1; i >= 0; i-- { 295 if err := running[i].Stop(); err != nil { 296 failure.Services[reflect.TypeOf(running[i])] = err 297 } 298 } 299 300 // Stop p2p networking. 301 n.server.Stop() 302 303 if len(failure.Services) > 0 { 304 return failure 305 } 306 return nil 307} 308 309func (n *Node) openDataDir() error { 310 if n.config.DataDir == "" { 311 return nil // ephemeral 312 } 313 314 instdir := filepath.Join(n.config.DataDir, n.config.name()) 315 if err := os.MkdirAll(instdir, 0700); err != nil { 316 return err 317 } 318 // Lock the instance directory to prevent concurrent use by another instance as well as 319 // accidental use of the instance directory as a database. 320 release, _, err := fileutil.Flock(filepath.Join(instdir, "LOCK")) 321 if err != nil { 322 return convertFileLockError(err) 323 } 324 n.dirLock = release 325 return nil 326} 327 328func (n *Node) closeDataDir() { 329 // Release instance directory lock. 330 if n.dirLock != nil { 331 if err := n.dirLock.Release(); err != nil { 332 n.log.Error("Can't release datadir lock", "err", err) 333 } 334 n.dirLock = nil 335 } 336} 337 338// configureRPC is a helper method to configure all the various RPC endpoints during node 339// startup. It's not meant to be called at any time afterwards as it makes certain 340// assumptions about the state of the node. 341func (n *Node) startRPC() error { 342 if err := n.startInProc(); err != nil { 343 return err 344 } 345 346 // Configure IPC. 347 if n.ipc.endpoint != "" { 348 if err := n.ipc.start(n.rpcAPIs); err != nil { 349 return err 350 } 351 } 352 353 // Configure HTTP. 354 if n.config.HTTPHost != "" { 355 config := httpConfig{ 356 CorsAllowedOrigins: n.config.HTTPCors, 357 Vhosts: n.config.HTTPVirtualHosts, 358 Modules: n.config.HTTPModules, 359 prefix: n.config.HTTPPathPrefix, 360 } 361 if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil { 362 return err 363 } 364 if err := n.http.enableRPC(n.rpcAPIs, config); err != nil { 365 return err 366 } 367 } 368 369 // Configure WebSocket. 370 if n.config.WSHost != "" { 371 server := n.wsServerForPort(n.config.WSPort) 372 config := wsConfig{ 373 Modules: n.config.WSModules, 374 Origins: n.config.WSOrigins, 375 prefix: n.config.WSPathPrefix, 376 } 377 if err := server.setListenAddr(n.config.WSHost, n.config.WSPort); err != nil { 378 return err 379 } 380 if err := server.enableWS(n.rpcAPIs, config); err != nil { 381 return err 382 } 383 } 384 385 if err := n.http.start(); err != nil { 386 return err 387 } 388 return n.ws.start() 389} 390 391func (n *Node) wsServerForPort(port int) *httpServer { 392 if n.config.HTTPHost == "" || n.http.port == port { 393 return n.http 394 } 395 return n.ws 396} 397 398func (n *Node) stopRPC() { 399 n.http.stop() 400 n.ws.stop() 401 n.ipc.stop() 402 n.stopInProc() 403} 404 405// startInProc registers all RPC APIs on the inproc server. 406func (n *Node) startInProc() error { 407 for _, api := range n.rpcAPIs { 408 if err := n.inprocHandler.RegisterName(api.Namespace, api.Service); err != nil { 409 return err 410 } 411 } 412 return nil 413} 414 415// stopInProc terminates the in-process RPC endpoint. 416func (n *Node) stopInProc() { 417 n.inprocHandler.Stop() 418} 419 420// Wait blocks until the node is closed. 421func (n *Node) Wait() { 422 <-n.stop 423} 424 425// RegisterLifecycle registers the given Lifecycle on the node. 426func (n *Node) RegisterLifecycle(lifecycle Lifecycle) { 427 n.lock.Lock() 428 defer n.lock.Unlock() 429 430 if n.state != initializingState { 431 panic("can't register lifecycle on running/stopped node") 432 } 433 if containsLifecycle(n.lifecycles, lifecycle) { 434 panic(fmt.Sprintf("attempt to register lifecycle %T more than once", lifecycle)) 435 } 436 n.lifecycles = append(n.lifecycles, lifecycle) 437} 438 439// RegisterProtocols adds backend's protocols to the node's p2p server. 440func (n *Node) RegisterProtocols(protocols []p2p.Protocol) { 441 n.lock.Lock() 442 defer n.lock.Unlock() 443 444 if n.state != initializingState { 445 panic("can't register protocols on running/stopped node") 446 } 447 n.server.Protocols = append(n.server.Protocols, protocols...) 448} 449 450// RegisterAPIs registers the APIs a service provides on the node. 451func (n *Node) RegisterAPIs(apis []rpc.API) { 452 n.lock.Lock() 453 defer n.lock.Unlock() 454 455 if n.state != initializingState { 456 panic("can't register APIs on running/stopped node") 457 } 458 n.rpcAPIs = append(n.rpcAPIs, apis...) 459} 460 461// RegisterHandler mounts a handler on the given path on the canonical HTTP server. 462// 463// The name of the handler is shown in a log message when the HTTP server starts 464// and should be a descriptive term for the service provided by the handler. 465func (n *Node) RegisterHandler(name, path string, handler http.Handler) { 466 n.lock.Lock() 467 defer n.lock.Unlock() 468 469 if n.state != initializingState { 470 panic("can't register HTTP handler on running/stopped node") 471 } 472 473 n.http.mux.Handle(path, handler) 474 n.http.handlerNames[path] = name 475} 476 477// Attach creates an RPC client attached to an in-process API handler. 478func (n *Node) Attach() (*rpc.Client, error) { 479 return rpc.DialInProc(n.inprocHandler), nil 480} 481 482// RPCHandler returns the in-process RPC request handler. 483func (n *Node) RPCHandler() (*rpc.Server, error) { 484 n.lock.Lock() 485 defer n.lock.Unlock() 486 487 if n.state == closedState { 488 return nil, ErrNodeStopped 489 } 490 return n.inprocHandler, nil 491} 492 493// Config returns the configuration of node. 494func (n *Node) Config() *Config { 495 return n.config 496} 497 498// Server retrieves the currently running P2P network layer. This method is meant 499// only to inspect fields of the currently running server. Callers should not 500// start or stop the returned server. 501func (n *Node) Server() *p2p.Server { 502 n.lock.Lock() 503 defer n.lock.Unlock() 504 505 return n.server 506} 507 508// DataDir retrieves the current datadir used by the protocol stack. 509// Deprecated: No files should be stored in this directory, use InstanceDir instead. 510func (n *Node) DataDir() string { 511 return n.config.DataDir 512} 513 514// InstanceDir retrieves the instance directory used by the protocol stack. 515func (n *Node) InstanceDir() string { 516 return n.config.instanceDir() 517} 518 519// KeyStoreDir retrieves the key directory 520func (n *Node) KeyStoreDir() string { 521 return n.keyDir 522} 523 524// AccountManager retrieves the account manager used by the protocol stack. 525func (n *Node) AccountManager() *accounts.Manager { 526 return n.accman 527} 528 529// IPCEndpoint retrieves the current IPC endpoint used by the protocol stack. 530func (n *Node) IPCEndpoint() string { 531 return n.ipc.endpoint 532} 533 534// HTTPEndpoint returns the URL of the HTTP server. Note that this URL does not 535// contain the JSON-RPC path prefix set by HTTPPathPrefix. 536func (n *Node) HTTPEndpoint() string { 537 return "http://" + n.http.listenAddr() 538} 539 540// WSEndpoint returns the current JSON-RPC over WebSocket endpoint. 541func (n *Node) WSEndpoint() string { 542 if n.http.wsAllowed() { 543 return "ws://" + n.http.listenAddr() + n.http.wsConfig.prefix 544 } 545 return "ws://" + n.ws.listenAddr() + n.ws.wsConfig.prefix 546} 547 548// EventMux retrieves the event multiplexer used by all the network services in 549// the current protocol stack. 550func (n *Node) EventMux() *event.TypeMux { 551 return n.eventmux 552} 553 554// OpenDatabase opens an existing database with the given name (or creates one if no 555// previous can be found) from within the node's instance directory. If the node is 556// ephemeral, a memory database is returned. 557func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, readonly bool) (ethdb.Database, error) { 558 n.lock.Lock() 559 defer n.lock.Unlock() 560 if n.state == closedState { 561 return nil, ErrNodeStopped 562 } 563 564 var db ethdb.Database 565 var err error 566 if n.config.DataDir == "" { 567 db = rawdb.NewMemoryDatabase() 568 } else { 569 db, err = rawdb.NewLevelDBDatabase(n.ResolvePath(name), cache, handles, namespace, readonly) 570 } 571 572 if err == nil { 573 db = n.wrapDatabase(db) 574 } 575 return db, err 576} 577 578// OpenDatabaseWithFreezer opens an existing database with the given name (or 579// creates one if no previous can be found) from within the node's data directory, 580// also attaching a chain freezer to it that moves ancient chain data from the 581// database to immutable append-only files. If the node is an ephemeral one, a 582// memory database is returned. 583func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly bool) (ethdb.Database, error) { 584 n.lock.Lock() 585 defer n.lock.Unlock() 586 if n.state == closedState { 587 return nil, ErrNodeStopped 588 } 589 590 var db ethdb.Database 591 var err error 592 if n.config.DataDir == "" { 593 db = rawdb.NewMemoryDatabase() 594 } else { 595 root := n.ResolvePath(name) 596 switch { 597 case freezer == "": 598 freezer = filepath.Join(root, "ancient") 599 case !filepath.IsAbs(freezer): 600 freezer = n.ResolvePath(freezer) 601 } 602 db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly) 603 } 604 605 if err == nil { 606 db = n.wrapDatabase(db) 607 } 608 return db, err 609} 610 611// ResolvePath returns the absolute path of a resource in the instance directory. 612func (n *Node) ResolvePath(x string) string { 613 return n.config.ResolvePath(x) 614} 615 616// closeTrackingDB wraps the Close method of a database. When the database is closed by the 617// service, the wrapper removes it from the node's database map. This ensures that Node 618// won't auto-close the database if it is closed by the service that opened it. 619type closeTrackingDB struct { 620 ethdb.Database 621 n *Node 622} 623 624func (db *closeTrackingDB) Close() error { 625 db.n.lock.Lock() 626 delete(db.n.databases, db) 627 db.n.lock.Unlock() 628 return db.Database.Close() 629} 630 631// wrapDatabase ensures the database will be auto-closed when Node is closed. 632func (n *Node) wrapDatabase(db ethdb.Database) ethdb.Database { 633 wrapper := &closeTrackingDB{db, n} 634 n.databases[wrapper] = struct{}{} 635 return wrapper 636} 637 638// closeDatabases closes all open databases. 639func (n *Node) closeDatabases() (errors []error) { 640 for db := range n.databases { 641 delete(n.databases, db) 642 if err := db.Database.Close(); err != nil { 643 errors = append(errors, err) 644 } 645 } 646 return errors 647} 648