exec.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package adapters
  17. import (
  18. "bufio"
  19. "context"
  20. "crypto/ecdsa"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "net"
  26. "os"
  27. "os/exec"
  28. "os/signal"
  29. "path/filepath"
  30. "regexp"
  31. "strings"
  32. "sync"
  33. "syscall"
  34. "time"
  35. "github.com/docker/docker/pkg/reexec"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/node"
  38. "github.com/ethereum/go-ethereum/p2p"
  39. "github.com/ethereum/go-ethereum/p2p/discover"
  40. "github.com/ethereum/go-ethereum/rpc"
  41. "golang.org/x/net/websocket"
  42. )
  43. // ExecAdapter is a NodeAdapter which runs simulation nodes by executing the
  44. // current binary as a child process.
  45. //
  46. // An init hook is used so that the child process executes the node services
  47. // (rather than whataver the main() function would normally do), see the
  48. // execP2PNode function for more information.
  49. type ExecAdapter struct {
  50. // BaseDir is the directory under which the data directories for each
  51. // simulation node are created.
  52. BaseDir string
  53. nodes map[discover.NodeID]*ExecNode
  54. }
  55. // NewExecAdapter returns an ExecAdapter which stores node data in
  56. // subdirectories of the given base directory
  57. func NewExecAdapter(baseDir string) *ExecAdapter {
  58. return &ExecAdapter{
  59. BaseDir: baseDir,
  60. nodes: make(map[discover.NodeID]*ExecNode),
  61. }
  62. }
  63. // Name returns the name of the adapter for logging purposes
  64. func (e *ExecAdapter) Name() string {
  65. return "exec-adapter"
  66. }
  67. // NewNode returns a new ExecNode using the given config
  68. func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
  69. if len(config.Services) == 0 {
  70. return nil, errors.New("node must have at least one service")
  71. }
  72. for _, service := range config.Services {
  73. if _, exists := serviceFuncs[service]; !exists {
  74. return nil, fmt.Errorf("unknown node service %q", service)
  75. }
  76. }
  77. // create the node directory using the first 12 characters of the ID
  78. // as Unix socket paths cannot be longer than 256 characters
  79. dir := filepath.Join(e.BaseDir, config.ID.String()[:12])
  80. if err := os.Mkdir(dir, 0755); err != nil {
  81. return nil, fmt.Errorf("error creating node directory: %s", err)
  82. }
  83. // generate the config
  84. conf := &execNodeConfig{
  85. Stack: node.DefaultConfig,
  86. Node: config,
  87. }
  88. conf.Stack.DataDir = filepath.Join(dir, "data")
  89. conf.Stack.WSHost = "127.0.0.1"
  90. conf.Stack.WSPort = 0
  91. conf.Stack.WSOrigins = []string{"*"}
  92. conf.Stack.WSExposeAll = true
  93. conf.Stack.P2P.EnableMsgEvents = false
  94. conf.Stack.P2P.NoDiscovery = true
  95. conf.Stack.P2P.NAT = nil
  96. conf.Stack.NoUSB = true
  97. // listen on a random localhost port (we'll get the actual port after
  98. // starting the node through the RPC admin.nodeInfo method)
  99. conf.Stack.P2P.ListenAddr = "127.0.0.1:0"
  100. node := &ExecNode{
  101. ID: config.ID,
  102. Dir: dir,
  103. Config: conf,
  104. adapter: e,
  105. }
  106. node.newCmd = node.execCommand
  107. e.nodes[node.ID] = node
  108. return node, nil
  109. }
  110. // ExecNode starts a simulation node by exec'ing the current binary and
  111. // running the configured services
  112. type ExecNode struct {
  113. ID discover.NodeID
  114. Dir string
  115. Config *execNodeConfig
  116. Cmd *exec.Cmd
  117. Info *p2p.NodeInfo
  118. adapter *ExecAdapter
  119. client *rpc.Client
  120. wsAddr string
  121. newCmd func() *exec.Cmd
  122. key *ecdsa.PrivateKey
  123. }
  124. // Addr returns the node's enode URL
  125. func (n *ExecNode) Addr() []byte {
  126. if n.Info == nil {
  127. return nil
  128. }
  129. return []byte(n.Info.Enode)
  130. }
  131. // Client returns an rpc.Client which can be used to communicate with the
  132. // underlying services (it is set once the node has started)
  133. func (n *ExecNode) Client() (*rpc.Client, error) {
  134. return n.client, nil
  135. }
  136. // wsAddrPattern is a regex used to read the WebSocket address from the node's
  137. // log
  138. var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`)
  139. // Start exec's the node passing the ID and service as command line arguments
  140. // and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
  141. // variable
  142. func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
  143. if n.Cmd != nil {
  144. return errors.New("already started")
  145. }
  146. defer func() {
  147. if err != nil {
  148. log.Error("node failed to start", "err", err)
  149. n.Stop()
  150. }
  151. }()
  152. // encode a copy of the config containing the snapshot
  153. confCopy := *n.Config
  154. confCopy.Snapshots = snapshots
  155. confCopy.PeerAddrs = make(map[string]string)
  156. for id, node := range n.adapter.nodes {
  157. confCopy.PeerAddrs[id.String()] = node.wsAddr
  158. }
  159. confData, err := json.Marshal(confCopy)
  160. if err != nil {
  161. return fmt.Errorf("error generating node config: %s", err)
  162. }
  163. // use a pipe for stderr so we can both copy the node's stderr to
  164. // os.Stderr and read the WebSocket address from the logs
  165. stderrR, stderrW := io.Pipe()
  166. stderr := io.MultiWriter(os.Stderr, stderrW)
  167. // start the node
  168. cmd := n.newCmd()
  169. cmd.Stdout = os.Stdout
  170. cmd.Stderr = stderr
  171. cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData))
  172. if err := cmd.Start(); err != nil {
  173. return fmt.Errorf("error starting node: %s", err)
  174. }
  175. n.Cmd = cmd
  176. // read the WebSocket address from the stderr logs
  177. var wsAddr string
  178. wsAddrC := make(chan string)
  179. go func() {
  180. s := bufio.NewScanner(stderrR)
  181. for s.Scan() {
  182. if strings.Contains(s.Text(), "WebSocket endpoint opened:") {
  183. wsAddrC <- wsAddrPattern.FindString(s.Text())
  184. }
  185. }
  186. }()
  187. select {
  188. case wsAddr = <-wsAddrC:
  189. if wsAddr == "" {
  190. return errors.New("failed to read WebSocket address from stderr")
  191. }
  192. case <-time.After(10 * time.Second):
  193. return errors.New("timed out waiting for WebSocket address on stderr")
  194. }
  195. // create the RPC client and load the node info
  196. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  197. defer cancel()
  198. client, err := rpc.DialWebsocket(ctx, wsAddr, "")
  199. if err != nil {
  200. return fmt.Errorf("error dialing rpc websocket: %s", err)
  201. }
  202. var info p2p.NodeInfo
  203. if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
  204. return fmt.Errorf("error getting node info: %s", err)
  205. }
  206. n.client = client
  207. n.wsAddr = wsAddr
  208. n.Info = &info
  209. return nil
  210. }
  211. // execCommand returns a command which runs the node locally by exec'ing
  212. // the current binary but setting argv[0] to "p2p-node" so that the child
  213. // runs execP2PNode
  214. func (n *ExecNode) execCommand() *exec.Cmd {
  215. return &exec.Cmd{
  216. Path: reexec.Self(),
  217. Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()},
  218. }
  219. }
  220. // Stop stops the node by first sending SIGTERM and then SIGKILL if the node
  221. // doesn't stop within 5s
  222. func (n *ExecNode) Stop() error {
  223. if n.Cmd == nil {
  224. return nil
  225. }
  226. defer func() {
  227. n.Cmd = nil
  228. }()
  229. if n.client != nil {
  230. n.client.Close()
  231. n.client = nil
  232. n.wsAddr = ""
  233. n.Info = nil
  234. }
  235. if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
  236. return n.Cmd.Process.Kill()
  237. }
  238. waitErr := make(chan error)
  239. go func() {
  240. waitErr <- n.Cmd.Wait()
  241. }()
  242. select {
  243. case err := <-waitErr:
  244. return err
  245. case <-time.After(5 * time.Second):
  246. return n.Cmd.Process.Kill()
  247. }
  248. }
  249. // NodeInfo returns information about the node
  250. func (n *ExecNode) NodeInfo() *p2p.NodeInfo {
  251. info := &p2p.NodeInfo{
  252. ID: n.ID.String(),
  253. }
  254. if n.client != nil {
  255. n.client.Call(&info, "admin_nodeInfo")
  256. }
  257. return info
  258. }
  259. // ServeRPC serves RPC requests over the given connection by dialling the
  260. // node's WebSocket address and joining the two connections
  261. func (n *ExecNode) ServeRPC(clientConn net.Conn) error {
  262. conn, err := websocket.Dial(n.wsAddr, "", "http://localhost")
  263. if err != nil {
  264. return err
  265. }
  266. var wg sync.WaitGroup
  267. wg.Add(2)
  268. join := func(src, dst net.Conn) {
  269. defer wg.Done()
  270. io.Copy(dst, src)
  271. // close the write end of the destination connection
  272. if cw, ok := dst.(interface {
  273. CloseWrite() error
  274. }); ok {
  275. cw.CloseWrite()
  276. } else {
  277. dst.Close()
  278. }
  279. }
  280. go join(conn, clientConn)
  281. go join(clientConn, conn)
  282. wg.Wait()
  283. return nil
  284. }
  285. // Snapshots creates snapshots of the services by calling the
  286. // simulation_snapshot RPC method
  287. func (n *ExecNode) Snapshots() (map[string][]byte, error) {
  288. if n.client == nil {
  289. return nil, errors.New("RPC not started")
  290. }
  291. var snapshots map[string][]byte
  292. return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
  293. }
  294. func init() {
  295. // register a reexec function to start a devp2p node when the current
  296. // binary is executed as "p2p-node"
  297. reexec.Register("p2p-node", execP2PNode)
  298. }
  299. // execNodeConfig is used to serialize the node configuration so it can be
  300. // passed to the child process as a JSON encoded environment variable
  301. type execNodeConfig struct {
  302. Stack node.Config `json:"stack"`
  303. Node *NodeConfig `json:"node"`
  304. Snapshots map[string][]byte `json:"snapshots,omitempty"`
  305. PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
  306. }
  307. // execP2PNode starts a devp2p node when the current binary is executed with
  308. // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
  309. // and the node config from the _P2P_NODE_CONFIG environment variable
  310. func execP2PNode() {
  311. glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
  312. glogger.Verbosity(log.LvlInfo)
  313. log.Root().SetHandler(glogger)
  314. // read the services from argv
  315. serviceNames := strings.Split(os.Args[1], ",")
  316. // decode the config
  317. confEnv := os.Getenv("_P2P_NODE_CONFIG")
  318. if confEnv == "" {
  319. log.Crit("missing _P2P_NODE_CONFIG")
  320. }
  321. var conf execNodeConfig
  322. if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
  323. log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
  324. }
  325. conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
  326. conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
  327. // use explicit IP address in ListenAddr so that Enode URL is usable
  328. externalIP := func() string {
  329. addrs, err := net.InterfaceAddrs()
  330. if err != nil {
  331. log.Crit("error getting IP address", "err", err)
  332. }
  333. for _, addr := range addrs {
  334. if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
  335. return ip.IP.String()
  336. }
  337. }
  338. log.Crit("unable to determine explicit IP address")
  339. return ""
  340. }
  341. if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
  342. conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr
  343. }
  344. if conf.Stack.WSHost == "0.0.0.0" {
  345. conf.Stack.WSHost = externalIP()
  346. }
  347. // initialize the devp2p stack
  348. stack, err := node.New(&conf.Stack)
  349. if err != nil {
  350. log.Crit("error creating node stack", "err", err)
  351. }
  352. // register the services, collecting them into a map so we can wrap
  353. // them in a snapshot service
  354. services := make(map[string]node.Service, len(serviceNames))
  355. for _, name := range serviceNames {
  356. serviceFunc, exists := serviceFuncs[name]
  357. if !exists {
  358. log.Crit("unknown node service", "name", name)
  359. }
  360. constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
  361. ctx := &ServiceContext{
  362. RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
  363. NodeContext: nodeCtx,
  364. Config: conf.Node,
  365. }
  366. if conf.Snapshots != nil {
  367. ctx.Snapshot = conf.Snapshots[name]
  368. }
  369. service, err := serviceFunc(ctx)
  370. if err != nil {
  371. return nil, err
  372. }
  373. services[name] = service
  374. return service, nil
  375. }
  376. if err := stack.Register(constructor); err != nil {
  377. log.Crit("error starting service", "name", name, "err", err)
  378. }
  379. }
  380. // register the snapshot service
  381. if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
  382. return &snapshotService{services}, nil
  383. }); err != nil {
  384. log.Crit("error starting snapshot service", "err", err)
  385. }
  386. // start the stack
  387. if err := stack.Start(); err != nil {
  388. log.Crit("error stating node stack", "err", err)
  389. }
  390. // stop the stack if we get a SIGTERM signal
  391. go func() {
  392. sigc := make(chan os.Signal, 1)
  393. signal.Notify(sigc, syscall.SIGTERM)
  394. defer signal.Stop(sigc)
  395. <-sigc
  396. log.Info("Received SIGTERM, shutting down...")
  397. stack.Stop()
  398. }()
  399. // wait for the stack to exit
  400. stack.Wait()
  401. }
  402. // snapshotService is a node.Service which wraps a list of services and
  403. // exposes an API to generate a snapshot of those services
  404. type snapshotService struct {
  405. services map[string]node.Service
  406. }
  407. func (s *snapshotService) APIs() []rpc.API {
  408. return []rpc.API{{
  409. Namespace: "simulation",
  410. Version: "1.0",
  411. Service: SnapshotAPI{s.services},
  412. }}
  413. }
  414. func (s *snapshotService) Protocols() []p2p.Protocol {
  415. return nil
  416. }
  417. func (s *snapshotService) Start(*p2p.Server) error {
  418. return nil
  419. }
  420. func (s *snapshotService) Stop() error {
  421. return nil
  422. }
  423. // SnapshotAPI provides an RPC method to create snapshots of services
  424. type SnapshotAPI struct {
  425. services map[string]node.Service
  426. }
  427. func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {
  428. snapshots := make(map[string][]byte)
  429. for name, service := range api.services {
  430. if s, ok := service.(interface {
  431. Snapshot() ([]byte, error)
  432. }); ok {
  433. snap, err := s.Snapshot()
  434. if err != nil {
  435. return nil, err
  436. }
  437. snapshots[name] = snap
  438. }
  439. }
  440. return snapshots, nil
  441. }
  442. type wsRPCDialer struct {
  443. addrs map[string]string
  444. }
  445. // DialRPC implements the RPCDialer interface by creating a WebSocket RPC
  446. // client of the given node
  447. func (w *wsRPCDialer) DialRPC(id discover.NodeID) (*rpc.Client, error) {
  448. addr, ok := w.addrs[id.String()]
  449. if !ok {
  450. return nil, fmt.Errorf("unknown node: %s", id)
  451. }
  452. return rpc.DialWebsocket(context.Background(), addr, "http://localhost")
  453. }