exec.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package adapters
  17. import (
  18. "bytes"
  19. "context"
  20. "crypto/ecdsa"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "net"
  26. "net/http"
  27. "os"
  28. "os/exec"
  29. "os/signal"
  30. "path/filepath"
  31. "strings"
  32. "sync"
  33. "syscall"
  34. "time"
  35. "github.com/docker/docker/pkg/reexec"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/node"
  38. "github.com/ethereum/go-ethereum/p2p"
  39. "github.com/ethereum/go-ethereum/p2p/enode"
  40. "github.com/ethereum/go-ethereum/rpc"
  41. "golang.org/x/net/websocket"
  42. )
  43. func init() {
  44. // Register a reexec function to start a simulation node when the current binary is
  45. // executed as "p2p-node" (rather than whatever the main() function would normally do).
  46. reexec.Register("p2p-node", execP2PNode)
  47. }
  48. // ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary
  49. // as a child process.
  50. type ExecAdapter struct {
  51. // BaseDir is the directory under which the data directories for each
  52. // simulation node are created.
  53. BaseDir string
  54. nodes map[enode.ID]*ExecNode
  55. }
  56. // NewExecAdapter returns an ExecAdapter which stores node data in
  57. // subdirectories of the given base directory
  58. func NewExecAdapter(baseDir string) *ExecAdapter {
  59. return &ExecAdapter{
  60. BaseDir: baseDir,
  61. nodes: make(map[enode.ID]*ExecNode),
  62. }
  63. }
  64. // Name returns the name of the adapter for logging purposes
  65. func (e *ExecAdapter) Name() string {
  66. return "exec-adapter"
  67. }
  68. // NewNode returns a new ExecNode using the given config
  69. func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
  70. if len(config.Services) == 0 {
  71. return nil, errors.New("node must have at least one service")
  72. }
  73. for _, service := range config.Services {
  74. if _, exists := serviceFuncs[service]; !exists {
  75. return nil, fmt.Errorf("unknown node service %q", service)
  76. }
  77. }
  78. // create the node directory using the first 12 characters of the ID
  79. // as Unix socket paths cannot be longer than 256 characters
  80. dir := filepath.Join(e.BaseDir, config.ID.String()[:12])
  81. if err := os.Mkdir(dir, 0755); err != nil {
  82. return nil, fmt.Errorf("error creating node directory: %s", err)
  83. }
  84. // generate the config
  85. conf := &execNodeConfig{
  86. Stack: node.DefaultConfig,
  87. Node: config,
  88. }
  89. if config.DataDir != "" {
  90. conf.Stack.DataDir = config.DataDir
  91. } else {
  92. conf.Stack.DataDir = filepath.Join(dir, "data")
  93. }
  94. conf.Stack.WSHost = "127.0.0.1"
  95. conf.Stack.WSPort = 0
  96. conf.Stack.WSOrigins = []string{"*"}
  97. conf.Stack.WSExposeAll = true
  98. conf.Stack.P2P.EnableMsgEvents = false
  99. conf.Stack.P2P.NoDiscovery = true
  100. conf.Stack.P2P.NAT = nil
  101. conf.Stack.NoUSB = true
  102. // listen on a localhost port, which we set when we
  103. // initialise NodeConfig (usually a random port)
  104. conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port)
  105. node := &ExecNode{
  106. ID: config.ID,
  107. Dir: dir,
  108. Config: conf,
  109. adapter: e,
  110. }
  111. node.newCmd = node.execCommand
  112. e.nodes[node.ID] = node
  113. return node, nil
  114. }
  115. // ExecNode starts a simulation node by exec'ing the current binary and
  116. // running the configured services
  117. type ExecNode struct {
  118. ID enode.ID
  119. Dir string
  120. Config *execNodeConfig
  121. Cmd *exec.Cmd
  122. Info *p2p.NodeInfo
  123. adapter *ExecAdapter
  124. client *rpc.Client
  125. wsAddr string
  126. newCmd func() *exec.Cmd
  127. key *ecdsa.PrivateKey
  128. }
  129. // Addr returns the node's enode URL
  130. func (n *ExecNode) Addr() []byte {
  131. if n.Info == nil {
  132. return nil
  133. }
  134. return []byte(n.Info.Enode)
  135. }
  136. // Client returns an rpc.Client which can be used to communicate with the
  137. // underlying services (it is set once the node has started)
  138. func (n *ExecNode) Client() (*rpc.Client, error) {
  139. return n.client, nil
  140. }
  141. // Start exec's the node passing the ID and service as command line arguments
  142. // and the node config encoded as JSON in an environment variable.
  143. func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
  144. if n.Cmd != nil {
  145. return errors.New("already started")
  146. }
  147. defer func() {
  148. if err != nil {
  149. n.Stop()
  150. }
  151. }()
  152. // encode a copy of the config containing the snapshot
  153. confCopy := *n.Config
  154. confCopy.Snapshots = snapshots
  155. confCopy.PeerAddrs = make(map[string]string)
  156. for id, node := range n.adapter.nodes {
  157. confCopy.PeerAddrs[id.String()] = node.wsAddr
  158. }
  159. confData, err := json.Marshal(confCopy)
  160. if err != nil {
  161. return fmt.Errorf("error generating node config: %s", err)
  162. }
  163. // start the one-shot server that waits for startup information
  164. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  165. defer cancel()
  166. statusURL, statusC := n.waitForStartupJSON(ctx)
  167. // start the node
  168. cmd := n.newCmd()
  169. cmd.Stdout = os.Stdout
  170. cmd.Stderr = os.Stderr
  171. cmd.Env = append(os.Environ(),
  172. envStatusURL+"="+statusURL,
  173. envNodeConfig+"="+string(confData),
  174. )
  175. if err := cmd.Start(); err != nil {
  176. return fmt.Errorf("error starting node: %s", err)
  177. }
  178. n.Cmd = cmd
  179. // read the WebSocket address from the stderr logs
  180. status := <-statusC
  181. if status.Err != "" {
  182. return errors.New(status.Err)
  183. }
  184. client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost")
  185. if err != nil {
  186. return fmt.Errorf("can't connect to RPC server: %v", err)
  187. }
  188. // node ready :)
  189. n.client = client
  190. n.wsAddr = status.WSEndpoint
  191. n.Info = status.NodeInfo
  192. return nil
  193. }
  194. // waitForStartupJSON runs a one-shot HTTP server to receive a startup report.
  195. func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) {
  196. var (
  197. ch = make(chan nodeStartupJSON, 1)
  198. quitOnce sync.Once
  199. srv http.Server
  200. )
  201. l, err := net.Listen("tcp", "127.0.0.1:0")
  202. if err != nil {
  203. ch <- nodeStartupJSON{Err: err.Error()}
  204. return "", ch
  205. }
  206. quit := func(status nodeStartupJSON) {
  207. quitOnce.Do(func() {
  208. l.Close()
  209. ch <- status
  210. })
  211. }
  212. srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  213. var status nodeStartupJSON
  214. if err := json.NewDecoder(r.Body).Decode(&status); err != nil {
  215. status.Err = fmt.Sprintf("can't decode startup report: %v", err)
  216. }
  217. quit(status)
  218. })
  219. // Run the HTTP server, but don't wait forever and shut it down
  220. // if the context is canceled.
  221. go srv.Serve(l)
  222. go func() {
  223. <-ctx.Done()
  224. quit(nodeStartupJSON{Err: "didn't get startup report"})
  225. }()
  226. url := "http://" + l.Addr().String()
  227. return url, ch
  228. }
  229. // execCommand returns a command which runs the node locally by exec'ing
  230. // the current binary but setting argv[0] to "p2p-node" so that the child
  231. // runs execP2PNode
  232. func (n *ExecNode) execCommand() *exec.Cmd {
  233. return &exec.Cmd{
  234. Path: reexec.Self(),
  235. Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()},
  236. }
  237. }
  238. // Stop stops the node by first sending SIGTERM and then SIGKILL if the node
  239. // doesn't stop within 5s
  240. func (n *ExecNode) Stop() error {
  241. if n.Cmd == nil {
  242. return nil
  243. }
  244. defer func() {
  245. n.Cmd = nil
  246. }()
  247. if n.client != nil {
  248. n.client.Close()
  249. n.client = nil
  250. n.wsAddr = ""
  251. n.Info = nil
  252. }
  253. if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
  254. return n.Cmd.Process.Kill()
  255. }
  256. waitErr := make(chan error)
  257. go func() {
  258. waitErr <- n.Cmd.Wait()
  259. }()
  260. select {
  261. case err := <-waitErr:
  262. return err
  263. case <-time.After(5 * time.Second):
  264. return n.Cmd.Process.Kill()
  265. }
  266. }
  267. // NodeInfo returns information about the node
  268. func (n *ExecNode) NodeInfo() *p2p.NodeInfo {
  269. info := &p2p.NodeInfo{
  270. ID: n.ID.String(),
  271. }
  272. if n.client != nil {
  273. n.client.Call(&info, "admin_nodeInfo")
  274. }
  275. return info
  276. }
  277. // ServeRPC serves RPC requests over the given connection by dialling the
  278. // node's WebSocket address and joining the two connections
  279. func (n *ExecNode) ServeRPC(clientConn net.Conn) error {
  280. conn, err := websocket.Dial(n.wsAddr, "", "http://localhost")
  281. if err != nil {
  282. return err
  283. }
  284. var wg sync.WaitGroup
  285. wg.Add(2)
  286. join := func(src, dst net.Conn) {
  287. defer wg.Done()
  288. io.Copy(dst, src)
  289. // close the write end of the destination connection
  290. if cw, ok := dst.(interface {
  291. CloseWrite() error
  292. }); ok {
  293. cw.CloseWrite()
  294. } else {
  295. dst.Close()
  296. }
  297. }
  298. go join(conn, clientConn)
  299. go join(clientConn, conn)
  300. wg.Wait()
  301. return nil
  302. }
  303. // Snapshots creates snapshots of the services by calling the
  304. // simulation_snapshot RPC method
  305. func (n *ExecNode) Snapshots() (map[string][]byte, error) {
  306. if n.client == nil {
  307. return nil, errors.New("RPC not started")
  308. }
  309. var snapshots map[string][]byte
  310. return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
  311. }
  312. // execNodeConfig is used to serialize the node configuration so it can be
  313. // passed to the child process as a JSON encoded environment variable
  314. type execNodeConfig struct {
  315. Stack node.Config `json:"stack"`
  316. Node *NodeConfig `json:"node"`
  317. Snapshots map[string][]byte `json:"snapshots,omitempty"`
  318. PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
  319. }
  320. // execP2PNode starts a simulation node when the current binary is executed with
  321. // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
  322. // and the node config from an environment variable.
  323. func execP2PNode() {
  324. glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
  325. glogger.Verbosity(log.LvlInfo)
  326. log.Root().SetHandler(glogger)
  327. statusURL := os.Getenv(envStatusURL)
  328. if statusURL == "" {
  329. log.Crit("missing " + envStatusURL)
  330. }
  331. // Start the node and gather startup report.
  332. var status nodeStartupJSON
  333. stack, stackErr := startExecNodeStack()
  334. if stackErr != nil {
  335. status.Err = stackErr.Error()
  336. } else {
  337. status.WSEndpoint = "ws://" + stack.WSEndpoint()
  338. status.NodeInfo = stack.Server().NodeInfo()
  339. }
  340. // Send status to the host.
  341. statusJSON, _ := json.Marshal(status)
  342. if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil {
  343. log.Crit("Can't post startup info", "url", statusURL, "err", err)
  344. }
  345. if stackErr != nil {
  346. os.Exit(1)
  347. }
  348. // Stop the stack if we get a SIGTERM signal.
  349. go func() {
  350. sigc := make(chan os.Signal, 1)
  351. signal.Notify(sigc, syscall.SIGTERM)
  352. defer signal.Stop(sigc)
  353. <-sigc
  354. log.Info("Received SIGTERM, shutting down...")
  355. stack.Stop()
  356. }()
  357. stack.Wait() // Wait for the stack to exit.
  358. }
  359. func startExecNodeStack() (*node.Node, error) {
  360. // read the services from argv
  361. serviceNames := strings.Split(os.Args[1], ",")
  362. // decode the config
  363. confEnv := os.Getenv(envNodeConfig)
  364. if confEnv == "" {
  365. return nil, fmt.Errorf("missing " + envNodeConfig)
  366. }
  367. var conf execNodeConfig
  368. if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
  369. return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err)
  370. }
  371. conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
  372. conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
  373. // initialize the devp2p stack
  374. stack, err := node.New(&conf.Stack)
  375. if err != nil {
  376. return nil, fmt.Errorf("error creating node stack: %v", err)
  377. }
  378. // register the services, collecting them into a map so we can wrap
  379. // them in a snapshot service
  380. services := make(map[string]node.Service, len(serviceNames))
  381. for _, name := range serviceNames {
  382. serviceFunc, exists := serviceFuncs[name]
  383. if !exists {
  384. return nil, fmt.Errorf("unknown node service %q", err)
  385. }
  386. constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
  387. ctx := &ServiceContext{
  388. RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
  389. NodeContext: nodeCtx,
  390. Config: conf.Node,
  391. }
  392. if conf.Snapshots != nil {
  393. ctx.Snapshot = conf.Snapshots[name]
  394. }
  395. service, err := serviceFunc(ctx)
  396. if err != nil {
  397. return nil, err
  398. }
  399. services[name] = service
  400. return service, nil
  401. }
  402. if err := stack.Register(constructor); err != nil {
  403. return stack, fmt.Errorf("error registering service %q: %v", name, err)
  404. }
  405. }
  406. // register the snapshot service
  407. err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
  408. return &snapshotService{services}, nil
  409. })
  410. if err != nil {
  411. return stack, fmt.Errorf("error starting snapshot service: %v", err)
  412. }
  413. // start the stack
  414. if err = stack.Start(); err != nil {
  415. err = fmt.Errorf("error starting stack: %v", err)
  416. }
  417. return stack, err
  418. }
  419. const (
  420. envStatusURL = "_P2P_STATUS_URL"
  421. envNodeConfig = "_P2P_NODE_CONFIG"
  422. )
  423. // nodeStartupJSON is sent to the simulation host after startup.
  424. type nodeStartupJSON struct {
  425. Err string
  426. WSEndpoint string
  427. NodeInfo *p2p.NodeInfo
  428. }
  429. // snapshotService is a node.Service which wraps a list of services and
  430. // exposes an API to generate a snapshot of those services
  431. type snapshotService struct {
  432. services map[string]node.Service
  433. }
  434. func (s *snapshotService) APIs() []rpc.API {
  435. return []rpc.API{{
  436. Namespace: "simulation",
  437. Version: "1.0",
  438. Service: SnapshotAPI{s.services},
  439. }}
  440. }
  441. func (s *snapshotService) Protocols() []p2p.Protocol {
  442. return nil
  443. }
  444. func (s *snapshotService) Start(*p2p.Server) error {
  445. return nil
  446. }
  447. func (s *snapshotService) Stop() error {
  448. return nil
  449. }
  450. // SnapshotAPI provides an RPC method to create snapshots of services
  451. type SnapshotAPI struct {
  452. services map[string]node.Service
  453. }
  454. func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {
  455. snapshots := make(map[string][]byte)
  456. for name, service := range api.services {
  457. if s, ok := service.(interface {
  458. Snapshot() ([]byte, error)
  459. }); ok {
  460. snap, err := s.Snapshot()
  461. if err != nil {
  462. return nil, err
  463. }
  464. snapshots[name] = snap
  465. }
  466. }
  467. return snapshots, nil
  468. }
  469. type wsRPCDialer struct {
  470. addrs map[string]string
  471. }
  472. // DialRPC implements the RPCDialer interface by creating a WebSocket RPC
  473. // client of the given node
  474. func (w *wsRPCDialer) DialRPC(id enode.ID) (*rpc.Client, error) {
  475. addr, ok := w.addrs[id.String()]
  476. if !ok {
  477. return nil, fmt.Errorf("unknown node: %s", id)
  478. }
  479. return rpc.DialWebsocket(context.Background(), addr, "http://localhost")
  480. }