exec.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package adapters
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "net"
  25. "net/http"
  26. "os"
  27. "os/exec"
  28. "os/signal"
  29. "path/filepath"
  30. "strings"
  31. "sync"
  32. "syscall"
  33. "time"
  34. "github.com/docker/docker/pkg/reexec"
  35. "github.com/ethereum/go-ethereum/log"
  36. "github.com/ethereum/go-ethereum/node"
  37. "github.com/ethereum/go-ethereum/p2p"
  38. "github.com/ethereum/go-ethereum/p2p/enode"
  39. "github.com/ethereum/go-ethereum/rpc"
  40. "github.com/gorilla/websocket"
  41. )
  42. func init() {
  43. // Register a reexec function to start a simulation node when the current binary is
  44. // executed as "p2p-node" (rather than whatever the main() function would normally do).
  45. reexec.Register("p2p-node", execP2PNode)
  46. }
  47. // ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary
  48. // as a child process.
  49. type ExecAdapter struct {
  50. // BaseDir is the directory under which the data directories for each
  51. // simulation node are created.
  52. BaseDir string
  53. nodes map[enode.ID]*ExecNode
  54. }
  55. // NewExecAdapter returns an ExecAdapter which stores node data in
  56. // subdirectories of the given base directory
  57. func NewExecAdapter(baseDir string) *ExecAdapter {
  58. return &ExecAdapter{
  59. BaseDir: baseDir,
  60. nodes: make(map[enode.ID]*ExecNode),
  61. }
  62. }
  63. // Name returns the name of the adapter for logging purposes
  64. func (e *ExecAdapter) Name() string {
  65. return "exec-adapter"
  66. }
  67. // NewNode returns a new ExecNode using the given config
  68. func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
  69. if len(config.Lifecycles) == 0 {
  70. return nil, errors.New("node must have at least one service lifecycle")
  71. }
  72. for _, service := range config.Lifecycles {
  73. if _, exists := lifecycleConstructorFuncs[service]; !exists {
  74. return nil, fmt.Errorf("unknown node service %q", service)
  75. }
  76. }
  77. // create the node directory using the first 12 characters of the ID
  78. // as Unix socket paths cannot be longer than 256 characters
  79. dir := filepath.Join(e.BaseDir, config.ID.String()[:12])
  80. if err := os.Mkdir(dir, 0755); err != nil {
  81. return nil, fmt.Errorf("error creating node directory: %s", err)
  82. }
  83. err := config.initDummyEnode()
  84. if err != nil {
  85. return nil, err
  86. }
  87. // generate the config
  88. conf := &execNodeConfig{
  89. Stack: node.DefaultConfig,
  90. Node: config,
  91. }
  92. if config.DataDir != "" {
  93. conf.Stack.DataDir = config.DataDir
  94. } else {
  95. conf.Stack.DataDir = filepath.Join(dir, "data")
  96. }
  97. // these parameters are crucial for execadapter node to run correctly
  98. conf.Stack.WSHost = "127.0.0.1"
  99. conf.Stack.WSPort = 0
  100. conf.Stack.WSOrigins = []string{"*"}
  101. conf.Stack.WSExposeAll = true
  102. conf.Stack.P2P.EnableMsgEvents = config.EnableMsgEvents
  103. conf.Stack.P2P.NoDiscovery = true
  104. conf.Stack.P2P.NAT = nil
  105. // Listen on a localhost port, which we set when we
  106. // initialise NodeConfig (usually a random port)
  107. conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port)
  108. node := &ExecNode{
  109. ID: config.ID,
  110. Dir: dir,
  111. Config: conf,
  112. adapter: e,
  113. }
  114. node.newCmd = node.execCommand
  115. e.nodes[node.ID] = node
  116. return node, nil
  117. }
  118. // ExecNode starts a simulation node by exec'ing the current binary and
  119. // running the configured services
  120. type ExecNode struct {
  121. ID enode.ID
  122. Dir string
  123. Config *execNodeConfig
  124. Cmd *exec.Cmd
  125. Info *p2p.NodeInfo
  126. adapter *ExecAdapter
  127. client *rpc.Client
  128. wsAddr string
  129. newCmd func() *exec.Cmd
  130. }
  131. // Addr returns the node's enode URL
  132. func (n *ExecNode) Addr() []byte {
  133. if n.Info == nil {
  134. return nil
  135. }
  136. return []byte(n.Info.Enode)
  137. }
  138. // Client returns an rpc.Client which can be used to communicate with the
  139. // underlying services (it is set once the node has started)
  140. func (n *ExecNode) Client() (*rpc.Client, error) {
  141. return n.client, nil
  142. }
  143. // Start exec's the node passing the ID and service as command line arguments
  144. // and the node config encoded as JSON in an environment variable.
  145. func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
  146. if n.Cmd != nil {
  147. return errors.New("already started")
  148. }
  149. defer func() {
  150. if err != nil {
  151. n.Stop()
  152. }
  153. }()
  154. // encode a copy of the config containing the snapshot
  155. confCopy := *n.Config
  156. confCopy.Snapshots = snapshots
  157. confCopy.PeerAddrs = make(map[string]string)
  158. for id, node := range n.adapter.nodes {
  159. confCopy.PeerAddrs[id.String()] = node.wsAddr
  160. }
  161. confData, err := json.Marshal(confCopy)
  162. if err != nil {
  163. return fmt.Errorf("error generating node config: %s", err)
  164. }
  165. // expose the admin namespace via websocket if it's not enabled
  166. exposed := confCopy.Stack.WSExposeAll
  167. if !exposed {
  168. for _, api := range confCopy.Stack.WSModules {
  169. if api == "admin" {
  170. exposed = true
  171. break
  172. }
  173. }
  174. }
  175. if !exposed {
  176. confCopy.Stack.WSModules = append(confCopy.Stack.WSModules, "admin")
  177. }
  178. // start the one-shot server that waits for startup information
  179. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  180. defer cancel()
  181. statusURL, statusC := n.waitForStartupJSON(ctx)
  182. // start the node
  183. cmd := n.newCmd()
  184. cmd.Stdout = os.Stdout
  185. cmd.Stderr = os.Stderr
  186. cmd.Env = append(os.Environ(),
  187. envStatusURL+"="+statusURL,
  188. envNodeConfig+"="+string(confData),
  189. )
  190. if err := cmd.Start(); err != nil {
  191. return fmt.Errorf("error starting node: %s", err)
  192. }
  193. n.Cmd = cmd
  194. // Wait for the node to start.
  195. status := <-statusC
  196. if status.Err != "" {
  197. return errors.New(status.Err)
  198. }
  199. client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "")
  200. if err != nil {
  201. return fmt.Errorf("can't connect to RPC server: %v", err)
  202. }
  203. // Node ready :)
  204. n.client = client
  205. n.wsAddr = status.WSEndpoint
  206. n.Info = status.NodeInfo
  207. return nil
  208. }
  209. // waitForStartupJSON runs a one-shot HTTP server to receive a startup report.
  210. func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) {
  211. var (
  212. ch = make(chan nodeStartupJSON, 1)
  213. quitOnce sync.Once
  214. srv http.Server
  215. )
  216. l, err := net.Listen("tcp", "127.0.0.1:0")
  217. if err != nil {
  218. ch <- nodeStartupJSON{Err: err.Error()}
  219. return "", ch
  220. }
  221. quit := func(status nodeStartupJSON) {
  222. quitOnce.Do(func() {
  223. l.Close()
  224. ch <- status
  225. })
  226. }
  227. srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  228. var status nodeStartupJSON
  229. if err := json.NewDecoder(r.Body).Decode(&status); err != nil {
  230. status.Err = fmt.Sprintf("can't decode startup report: %v", err)
  231. }
  232. quit(status)
  233. })
  234. // Run the HTTP server, but don't wait forever and shut it down
  235. // if the context is canceled.
  236. go srv.Serve(l)
  237. go func() {
  238. <-ctx.Done()
  239. quit(nodeStartupJSON{Err: "didn't get startup report"})
  240. }()
  241. url := "http://" + l.Addr().String()
  242. return url, ch
  243. }
  244. // execCommand returns a command which runs the node locally by exec'ing
  245. // the current binary but setting argv[0] to "p2p-node" so that the child
  246. // runs execP2PNode
  247. func (n *ExecNode) execCommand() *exec.Cmd {
  248. return &exec.Cmd{
  249. Path: reexec.Self(),
  250. Args: []string{"p2p-node", strings.Join(n.Config.Node.Lifecycles, ","), n.ID.String()},
  251. }
  252. }
  253. // Stop stops the node by first sending SIGTERM and then SIGKILL if the node
  254. // doesn't stop within 5s
  255. func (n *ExecNode) Stop() error {
  256. if n.Cmd == nil {
  257. return nil
  258. }
  259. defer func() {
  260. n.Cmd = nil
  261. }()
  262. if n.client != nil {
  263. n.client.Close()
  264. n.client = nil
  265. n.wsAddr = ""
  266. n.Info = nil
  267. }
  268. if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
  269. return n.Cmd.Process.Kill()
  270. }
  271. waitErr := make(chan error, 1)
  272. go func() {
  273. waitErr <- n.Cmd.Wait()
  274. }()
  275. select {
  276. case err := <-waitErr:
  277. return err
  278. case <-time.After(5 * time.Second):
  279. return n.Cmd.Process.Kill()
  280. }
  281. }
  282. // NodeInfo returns information about the node
  283. func (n *ExecNode) NodeInfo() *p2p.NodeInfo {
  284. info := &p2p.NodeInfo{
  285. ID: n.ID.String(),
  286. }
  287. if n.client != nil {
  288. n.client.Call(&info, "admin_nodeInfo")
  289. }
  290. return info
  291. }
  292. // ServeRPC serves RPC requests over the given connection by dialling the
  293. // node's WebSocket address and joining the two connections
  294. func (n *ExecNode) ServeRPC(clientConn *websocket.Conn) error {
  295. conn, _, err := websocket.DefaultDialer.Dial(n.wsAddr, nil)
  296. if err != nil {
  297. return err
  298. }
  299. var wg sync.WaitGroup
  300. wg.Add(2)
  301. go wsCopy(&wg, conn, clientConn)
  302. go wsCopy(&wg, clientConn, conn)
  303. wg.Wait()
  304. conn.Close()
  305. return nil
  306. }
  307. func wsCopy(wg *sync.WaitGroup, src, dst *websocket.Conn) {
  308. defer wg.Done()
  309. for {
  310. msgType, r, err := src.NextReader()
  311. if err != nil {
  312. return
  313. }
  314. w, err := dst.NextWriter(msgType)
  315. if err != nil {
  316. return
  317. }
  318. if _, err = io.Copy(w, r); err != nil {
  319. return
  320. }
  321. }
  322. }
  323. // Snapshots creates snapshots of the services by calling the
  324. // simulation_snapshot RPC method
  325. func (n *ExecNode) Snapshots() (map[string][]byte, error) {
  326. if n.client == nil {
  327. return nil, errors.New("RPC not started")
  328. }
  329. var snapshots map[string][]byte
  330. return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
  331. }
  332. // execNodeConfig is used to serialize the node configuration so it can be
  333. // passed to the child process as a JSON encoded environment variable
  334. type execNodeConfig struct {
  335. Stack node.Config `json:"stack"`
  336. Node *NodeConfig `json:"node"`
  337. Snapshots map[string][]byte `json:"snapshots,omitempty"`
  338. PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
  339. }
  340. func initLogging() {
  341. // Initialize the logging by default first.
  342. glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
  343. glogger.Verbosity(log.LvlInfo)
  344. log.Root().SetHandler(glogger)
  345. confEnv := os.Getenv(envNodeConfig)
  346. if confEnv == "" {
  347. return
  348. }
  349. var conf execNodeConfig
  350. if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
  351. return
  352. }
  353. var writer = os.Stderr
  354. if conf.Node.LogFile != "" {
  355. logWriter, err := os.Create(conf.Node.LogFile)
  356. if err != nil {
  357. return
  358. }
  359. writer = logWriter
  360. }
  361. var verbosity = log.LvlInfo
  362. if conf.Node.LogVerbosity <= log.LvlTrace && conf.Node.LogVerbosity >= log.LvlCrit {
  363. verbosity = conf.Node.LogVerbosity
  364. }
  365. // Reinitialize the logger
  366. glogger = log.NewGlogHandler(log.StreamHandler(writer, log.TerminalFormat(true)))
  367. glogger.Verbosity(verbosity)
  368. log.Root().SetHandler(glogger)
  369. }
  370. // execP2PNode starts a simulation node when the current binary is executed with
  371. // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
  372. // and the node config from an environment variable.
  373. func execP2PNode() {
  374. initLogging()
  375. statusURL := os.Getenv(envStatusURL)
  376. if statusURL == "" {
  377. log.Crit("missing " + envStatusURL)
  378. }
  379. // Start the node and gather startup report.
  380. var status nodeStartupJSON
  381. stack, stackErr := startExecNodeStack()
  382. if stackErr != nil {
  383. status.Err = stackErr.Error()
  384. } else {
  385. status.WSEndpoint = stack.WSEndpoint()
  386. status.NodeInfo = stack.Server().NodeInfo()
  387. }
  388. // Send status to the host.
  389. statusJSON, _ := json.Marshal(status)
  390. if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil {
  391. log.Crit("Can't post startup info", "url", statusURL, "err", err)
  392. }
  393. if stackErr != nil {
  394. os.Exit(1)
  395. }
  396. // Stop the stack if we get a SIGTERM signal.
  397. go func() {
  398. sigc := make(chan os.Signal, 1)
  399. signal.Notify(sigc, syscall.SIGTERM)
  400. defer signal.Stop(sigc)
  401. <-sigc
  402. log.Info("Received SIGTERM, shutting down...")
  403. stack.Close()
  404. }()
  405. stack.Wait() // Wait for the stack to exit.
  406. }
  407. func startExecNodeStack() (*node.Node, error) {
  408. // read the services from argv
  409. serviceNames := strings.Split(os.Args[1], ",")
  410. // decode the config
  411. confEnv := os.Getenv(envNodeConfig)
  412. if confEnv == "" {
  413. return nil, fmt.Errorf("missing " + envNodeConfig)
  414. }
  415. var conf execNodeConfig
  416. if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
  417. return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err)
  418. }
  419. // create enode record
  420. nodeTcpConn, _ := net.ResolveTCPAddr("tcp", conf.Stack.P2P.ListenAddr)
  421. if nodeTcpConn.IP == nil {
  422. nodeTcpConn.IP = net.IPv4(127, 0, 0, 1)
  423. }
  424. conf.Node.initEnode(nodeTcpConn.IP, nodeTcpConn.Port, nodeTcpConn.Port)
  425. conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
  426. conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
  427. // initialize the devp2p stack
  428. stack, err := node.New(&conf.Stack)
  429. if err != nil {
  430. return nil, fmt.Errorf("error creating node stack: %v", err)
  431. }
  432. // Register the services, collecting them into a map so they can
  433. // be accessed by the snapshot API.
  434. services := make(map[string]node.Lifecycle, len(serviceNames))
  435. for _, name := range serviceNames {
  436. lifecycleFunc, exists := lifecycleConstructorFuncs[name]
  437. if !exists {
  438. return nil, fmt.Errorf("unknown node service %q", err)
  439. }
  440. ctx := &ServiceContext{
  441. RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
  442. Config: conf.Node,
  443. }
  444. if conf.Snapshots != nil {
  445. ctx.Snapshot = conf.Snapshots[name]
  446. }
  447. service, err := lifecycleFunc(ctx, stack)
  448. if err != nil {
  449. return nil, err
  450. }
  451. services[name] = service
  452. }
  453. // Add the snapshot API.
  454. stack.RegisterAPIs([]rpc.API{{
  455. Namespace: "simulation",
  456. Version: "1.0",
  457. Service: SnapshotAPI{services},
  458. }})
  459. if err = stack.Start(); err != nil {
  460. err = fmt.Errorf("error starting stack: %v", err)
  461. }
  462. return stack, err
  463. }
  464. const (
  465. envStatusURL = "_P2P_STATUS_URL"
  466. envNodeConfig = "_P2P_NODE_CONFIG"
  467. )
  468. // nodeStartupJSON is sent to the simulation host after startup.
  469. type nodeStartupJSON struct {
  470. Err string
  471. WSEndpoint string
  472. NodeInfo *p2p.NodeInfo
  473. }
  474. // SnapshotAPI provides an RPC method to create snapshots of services
  475. type SnapshotAPI struct {
  476. services map[string]node.Lifecycle
  477. }
  478. func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {
  479. snapshots := make(map[string][]byte)
  480. for name, service := range api.services {
  481. if s, ok := service.(interface {
  482. Snapshot() ([]byte, error)
  483. }); ok {
  484. snap, err := s.Snapshot()
  485. if err != nil {
  486. return nil, err
  487. }
  488. snapshots[name] = snap
  489. }
  490. }
  491. return snapshots, nil
  492. }
  493. type wsRPCDialer struct {
  494. addrs map[string]string
  495. }
  496. // DialRPC implements the RPCDialer interface by creating a WebSocket RPC
  497. // client of the given node
  498. func (w *wsRPCDialer) DialRPC(id enode.ID) (*rpc.Client, error) {
  499. addr, ok := w.addrs[id.String()]
  500. if !ok {
  501. return nil, fmt.Errorf("unknown node: %s", id)
  502. }
  503. return rpc.DialWebsocket(context.Background(), addr, "http://localhost")
  504. }