network.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package simulations
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math/rand"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/event"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  32. )
  33. var DialBanTimeout = 200 * time.Millisecond
  34. // NetworkConfig defines configuration options for starting a Network
  35. type NetworkConfig struct {
  36. ID string `json:"id"`
  37. DefaultService string `json:"default_service,omitempty"`
  38. }
  39. // Network models a p2p simulation network which consists of a collection of
  40. // simulated nodes and the connections which exist between them.
  41. //
  42. // The Network has a single NodeAdapter which is responsible for actually
  43. // starting nodes and connecting them together.
  44. //
  45. // The Network emits events when nodes are started and stopped, when they are
  46. // connected and disconnected, and also when messages are sent between nodes.
  47. type Network struct {
  48. NetworkConfig
  49. Nodes []*Node `json:"nodes"`
  50. nodeMap map[enode.ID]int
  51. Conns []*Conn `json:"conns"`
  52. connMap map[string]int
  53. nodeAdapter adapters.NodeAdapter
  54. events event.Feed
  55. lock sync.RWMutex
  56. quitc chan struct{}
  57. }
  58. // NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
  59. func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
  60. return &Network{
  61. NetworkConfig: *conf,
  62. nodeAdapter: nodeAdapter,
  63. nodeMap: make(map[enode.ID]int),
  64. connMap: make(map[string]int),
  65. quitc: make(chan struct{}),
  66. }
  67. }
  68. // Events returns the output event feed of the Network.
  69. func (net *Network) Events() *event.Feed {
  70. return &net.events
  71. }
  72. // NewNodeWithConfig adds a new node to the network with the given config,
  73. // returning an error if a node with the same ID or name already exists
  74. func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
  75. net.lock.Lock()
  76. defer net.lock.Unlock()
  77. if conf.Reachable == nil {
  78. conf.Reachable = func(otherID enode.ID) bool {
  79. _, err := net.InitConn(conf.ID, otherID)
  80. if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
  81. return false
  82. }
  83. return true
  84. }
  85. }
  86. // check the node doesn't already exist
  87. if node := net.getNode(conf.ID); node != nil {
  88. return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
  89. }
  90. if node := net.getNodeByName(conf.Name); node != nil {
  91. return nil, fmt.Errorf("node with name %q already exists", conf.Name)
  92. }
  93. // if no services are configured, use the default service
  94. if len(conf.Services) == 0 {
  95. conf.Services = []string{net.DefaultService}
  96. }
  97. // use the NodeAdapter to create the node
  98. adapterNode, err := net.nodeAdapter.NewNode(conf)
  99. if err != nil {
  100. return nil, err
  101. }
  102. node := &Node{
  103. Node: adapterNode,
  104. Config: conf,
  105. }
  106. log.Trace("Node created", "id", conf.ID)
  107. net.nodeMap[conf.ID] = len(net.Nodes)
  108. net.Nodes = append(net.Nodes, node)
  109. // emit a "control" event
  110. net.events.Send(ControlEvent(node))
  111. return node, nil
  112. }
  113. // Config returns the network configuration
  114. func (net *Network) Config() *NetworkConfig {
  115. return &net.NetworkConfig
  116. }
  117. // StartAll starts all nodes in the network
  118. func (net *Network) StartAll() error {
  119. for _, node := range net.Nodes {
  120. if node.Up() {
  121. continue
  122. }
  123. if err := net.Start(node.ID()); err != nil {
  124. return err
  125. }
  126. }
  127. return nil
  128. }
  129. // StopAll stops all nodes in the network
  130. func (net *Network) StopAll() error {
  131. for _, node := range net.Nodes {
  132. if !node.Up() {
  133. continue
  134. }
  135. if err := net.Stop(node.ID()); err != nil {
  136. return err
  137. }
  138. }
  139. return nil
  140. }
  141. // Start starts the node with the given ID
  142. func (net *Network) Start(id enode.ID) error {
  143. return net.startWithSnapshots(id, nil)
  144. }
  145. // startWithSnapshots starts the node with the given ID using the give
  146. // snapshots
  147. func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
  148. net.lock.Lock()
  149. defer net.lock.Unlock()
  150. node := net.getNode(id)
  151. if node == nil {
  152. return fmt.Errorf("node %v does not exist", id)
  153. }
  154. if node.Up() {
  155. return fmt.Errorf("node %v already up", id)
  156. }
  157. log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
  158. if err := node.Start(snapshots); err != nil {
  159. log.Warn("Node startup failed", "id", id, "err", err)
  160. return err
  161. }
  162. node.SetUp(true)
  163. log.Info("Started node", "id", id)
  164. ev := NewEvent(node)
  165. net.events.Send(ev)
  166. // subscribe to peer events
  167. client, err := node.Client()
  168. if err != nil {
  169. return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
  170. }
  171. events := make(chan *p2p.PeerEvent)
  172. sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
  173. if err != nil {
  174. return fmt.Errorf("error getting peer events for node %v: %s", id, err)
  175. }
  176. go net.watchPeerEvents(id, events, sub)
  177. return nil
  178. }
  179. // watchPeerEvents reads peer events from the given channel and emits
  180. // corresponding network events
  181. func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub event.Subscription) {
  182. defer func() {
  183. sub.Unsubscribe()
  184. // assume the node is now down
  185. net.lock.Lock()
  186. defer net.lock.Unlock()
  187. node := net.getNode(id)
  188. if node == nil {
  189. return
  190. }
  191. node.SetUp(false)
  192. ev := NewEvent(node)
  193. net.events.Send(ev)
  194. }()
  195. for {
  196. select {
  197. case event, ok := <-events:
  198. if !ok {
  199. return
  200. }
  201. peer := event.Peer
  202. switch event.Type {
  203. case p2p.PeerEventTypeAdd:
  204. net.DidConnect(id, peer)
  205. case p2p.PeerEventTypeDrop:
  206. net.DidDisconnect(id, peer)
  207. case p2p.PeerEventTypeMsgSend:
  208. net.DidSend(id, peer, event.Protocol, *event.MsgCode)
  209. case p2p.PeerEventTypeMsgRecv:
  210. net.DidReceive(peer, id, event.Protocol, *event.MsgCode)
  211. }
  212. case err := <-sub.Err():
  213. if err != nil {
  214. log.Error("Error in peer event subscription", "id", id, "err", err)
  215. }
  216. return
  217. }
  218. }
  219. }
  220. // Stop stops the node with the given ID
  221. func (net *Network) Stop(id enode.ID) error {
  222. // IMPORTANT: node.Stop() must NOT be called under net.lock as
  223. // node.Reachable() closure has a reference to the network and
  224. // calls net.InitConn() what also locks the network. => DEADLOCK
  225. // That holds until the following ticket is not resolved:
  226. var err error
  227. node, err := func() (*Node, error) {
  228. net.lock.Lock()
  229. defer net.lock.Unlock()
  230. node := net.getNode(id)
  231. if node == nil {
  232. return nil, fmt.Errorf("node %v does not exist", id)
  233. }
  234. if !node.Up() {
  235. return nil, fmt.Errorf("node %v already down", id)
  236. }
  237. node.SetUp(false)
  238. return node, nil
  239. }()
  240. if err != nil {
  241. return err
  242. }
  243. err = node.Stop() // must be called without net.lock
  244. net.lock.Lock()
  245. defer net.lock.Unlock()
  246. if err != nil {
  247. node.SetUp(true)
  248. return err
  249. }
  250. log.Info("Stopped node", "id", id, "err", err)
  251. ev := ControlEvent(node)
  252. net.events.Send(ev)
  253. return nil
  254. }
  255. // Connect connects two nodes together by calling the "admin_addPeer" RPC
  256. // method on the "one" node so that it connects to the "other" node
  257. func (net *Network) Connect(oneID, otherID enode.ID) error {
  258. net.lock.Lock()
  259. defer net.lock.Unlock()
  260. return net.connect(oneID, otherID)
  261. }
  262. func (net *Network) connect(oneID, otherID enode.ID) error {
  263. log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
  264. conn, err := net.initConn(oneID, otherID)
  265. if err != nil {
  266. return err
  267. }
  268. client, err := conn.one.Client()
  269. if err != nil {
  270. return err
  271. }
  272. net.events.Send(ControlEvent(conn))
  273. return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
  274. }
  275. // Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
  276. // method on the "one" node so that it disconnects from the "other" node
  277. func (net *Network) Disconnect(oneID, otherID enode.ID) error {
  278. conn := net.GetConn(oneID, otherID)
  279. if conn == nil {
  280. return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
  281. }
  282. if !conn.Up {
  283. return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
  284. }
  285. client, err := conn.one.Client()
  286. if err != nil {
  287. return err
  288. }
  289. net.events.Send(ControlEvent(conn))
  290. return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
  291. }
  292. // DidConnect tracks the fact that the "one" node connected to the "other" node
  293. func (net *Network) DidConnect(one, other enode.ID) error {
  294. net.lock.Lock()
  295. defer net.lock.Unlock()
  296. conn, err := net.getOrCreateConn(one, other)
  297. if err != nil {
  298. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  299. }
  300. if conn.Up {
  301. return fmt.Errorf("%v and %v already connected", one, other)
  302. }
  303. conn.Up = true
  304. net.events.Send(NewEvent(conn))
  305. return nil
  306. }
  307. // DidDisconnect tracks the fact that the "one" node disconnected from the
  308. // "other" node
  309. func (net *Network) DidDisconnect(one, other enode.ID) error {
  310. net.lock.Lock()
  311. defer net.lock.Unlock()
  312. conn := net.getConn(one, other)
  313. if conn == nil {
  314. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  315. }
  316. if !conn.Up {
  317. return fmt.Errorf("%v and %v already disconnected", one, other)
  318. }
  319. conn.Up = false
  320. conn.initiated = time.Now().Add(-DialBanTimeout)
  321. net.events.Send(NewEvent(conn))
  322. return nil
  323. }
  324. // DidSend tracks the fact that "sender" sent a message to "receiver"
  325. func (net *Network) DidSend(sender, receiver enode.ID, proto string, code uint64) error {
  326. msg := &Msg{
  327. One: sender,
  328. Other: receiver,
  329. Protocol: proto,
  330. Code: code,
  331. Received: false,
  332. }
  333. net.events.Send(NewEvent(msg))
  334. return nil
  335. }
  336. // DidReceive tracks the fact that "receiver" received a message from "sender"
  337. func (net *Network) DidReceive(sender, receiver enode.ID, proto string, code uint64) error {
  338. msg := &Msg{
  339. One: sender,
  340. Other: receiver,
  341. Protocol: proto,
  342. Code: code,
  343. Received: true,
  344. }
  345. net.events.Send(NewEvent(msg))
  346. return nil
  347. }
  348. // GetNode gets the node with the given ID, returning nil if the node does not
  349. // exist
  350. func (net *Network) GetNode(id enode.ID) *Node {
  351. net.lock.RLock()
  352. defer net.lock.RUnlock()
  353. return net.getNode(id)
  354. }
  355. func (net *Network) getNode(id enode.ID) *Node {
  356. i, found := net.nodeMap[id]
  357. if !found {
  358. return nil
  359. }
  360. return net.Nodes[i]
  361. }
  362. // GetNode gets the node with the given name, returning nil if the node does
  363. // not exist
  364. func (net *Network) GetNodeByName(name string) *Node {
  365. net.lock.RLock()
  366. defer net.lock.RUnlock()
  367. return net.getNodeByName(name)
  368. }
  369. func (net *Network) getNodeByName(name string) *Node {
  370. for _, node := range net.Nodes {
  371. if node.Config.Name == name {
  372. return node
  373. }
  374. }
  375. return nil
  376. }
  377. // GetNodes returns the existing nodes
  378. func (net *Network) GetNodes() (nodes []*Node) {
  379. net.lock.RLock()
  380. defer net.lock.RUnlock()
  381. return net.getNodes()
  382. }
  383. func (net *Network) getNodes() (nodes []*Node) {
  384. nodes = append(nodes, net.Nodes...)
  385. return nodes
  386. }
  387. // GetRandomUpNode returns a random node on the network, which is running.
  388. func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {
  389. net.lock.RLock()
  390. defer net.lock.RUnlock()
  391. return net.getRandomUpNode(excludeIDs...)
  392. }
  393. // GetRandomUpNode returns a random node on the network, which is running.
  394. func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node {
  395. return net.getRandomNode(net.getUpNodeIDs(), excludeIDs)
  396. }
  397. func (net *Network) getUpNodeIDs() (ids []enode.ID) {
  398. for _, node := range net.Nodes {
  399. if node.Up() {
  400. ids = append(ids, node.ID())
  401. }
  402. }
  403. return ids
  404. }
  405. // GetRandomDownNode returns a random node on the network, which is stopped.
  406. func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {
  407. net.lock.RLock()
  408. defer net.lock.RUnlock()
  409. return net.getRandomNode(net.getDownNodeIDs(), excludeIDs)
  410. }
  411. func (net *Network) getDownNodeIDs() (ids []enode.ID) {
  412. for _, node := range net.getNodes() {
  413. if !node.Up() {
  414. ids = append(ids, node.ID())
  415. }
  416. }
  417. return ids
  418. }
  419. func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
  420. filtered := filterIDs(ids, excludeIDs)
  421. l := len(filtered)
  422. if l == 0 {
  423. return nil
  424. }
  425. return net.getNode(filtered[rand.Intn(l)])
  426. }
  427. func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {
  428. exclude := make(map[enode.ID]bool)
  429. for _, id := range excludeIDs {
  430. exclude[id] = true
  431. }
  432. var filtered []enode.ID
  433. for _, id := range ids {
  434. if _, found := exclude[id]; !found {
  435. filtered = append(filtered, id)
  436. }
  437. }
  438. return filtered
  439. }
  440. // GetConn returns the connection which exists between "one" and "other"
  441. // regardless of which node initiated the connection
  442. func (net *Network) GetConn(oneID, otherID enode.ID) *Conn {
  443. net.lock.RLock()
  444. defer net.lock.RUnlock()
  445. return net.getConn(oneID, otherID)
  446. }
  447. // GetOrCreateConn is like GetConn but creates the connection if it doesn't
  448. // already exist
  449. func (net *Network) GetOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
  450. net.lock.Lock()
  451. defer net.lock.Unlock()
  452. return net.getOrCreateConn(oneID, otherID)
  453. }
  454. func (net *Network) getOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
  455. if conn := net.getConn(oneID, otherID); conn != nil {
  456. return conn, nil
  457. }
  458. one := net.getNode(oneID)
  459. if one == nil {
  460. return nil, fmt.Errorf("node %v does not exist", oneID)
  461. }
  462. other := net.getNode(otherID)
  463. if other == nil {
  464. return nil, fmt.Errorf("node %v does not exist", otherID)
  465. }
  466. conn := &Conn{
  467. One: oneID,
  468. Other: otherID,
  469. one: one,
  470. other: other,
  471. }
  472. label := ConnLabel(oneID, otherID)
  473. net.connMap[label] = len(net.Conns)
  474. net.Conns = append(net.Conns, conn)
  475. return conn, nil
  476. }
  477. func (net *Network) getConn(oneID, otherID enode.ID) *Conn {
  478. label := ConnLabel(oneID, otherID)
  479. i, found := net.connMap[label]
  480. if !found {
  481. return nil
  482. }
  483. return net.Conns[i]
  484. }
  485. // InitConn(one, other) retrieves the connection model for the connection between
  486. // peers one and other, or creates a new one if it does not exist
  487. // the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
  488. // it checks if the connection is already up, and if the nodes are running
  489. // NOTE:
  490. // it also checks whether there has been recent attempt to connect the peers
  491. // this is cheating as the simulation is used as an oracle and know about
  492. // remote peers attempt to connect to a node which will then not initiate the connection
  493. func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
  494. net.lock.Lock()
  495. defer net.lock.Unlock()
  496. return net.initConn(oneID, otherID)
  497. }
  498. func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) {
  499. if oneID == otherID {
  500. return nil, fmt.Errorf("refusing to connect to self %v", oneID)
  501. }
  502. conn, err := net.getOrCreateConn(oneID, otherID)
  503. if err != nil {
  504. return nil, err
  505. }
  506. if conn.Up {
  507. return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
  508. }
  509. if time.Since(conn.initiated) < DialBanTimeout {
  510. return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
  511. }
  512. err = conn.nodesUp()
  513. if err != nil {
  514. log.Trace("Nodes not up", "err", err)
  515. return nil, fmt.Errorf("nodes not up: %v", err)
  516. }
  517. log.Debug("Connection initiated", "id", oneID, "other", otherID)
  518. conn.initiated = time.Now()
  519. return conn, nil
  520. }
  521. // Shutdown stops all nodes in the network and closes the quit channel
  522. func (net *Network) Shutdown() {
  523. for _, node := range net.Nodes {
  524. log.Debug("Stopping node", "id", node.ID())
  525. if err := node.Stop(); err != nil {
  526. log.Warn("Can't stop node", "id", node.ID(), "err", err)
  527. }
  528. // If the node has the close method, call it.
  529. if closer, ok := node.Node.(io.Closer); ok {
  530. if err := closer.Close(); err != nil {
  531. log.Warn("Can't close node", "id", node.ID(), "err", err)
  532. }
  533. }
  534. }
  535. close(net.quitc)
  536. }
  537. // Reset resets all network properties:
  538. // empties the nodes and the connection list
  539. func (net *Network) Reset() {
  540. net.lock.Lock()
  541. defer net.lock.Unlock()
  542. //re-initialize the maps
  543. net.connMap = make(map[string]int)
  544. net.nodeMap = make(map[enode.ID]int)
  545. net.Nodes = nil
  546. net.Conns = nil
  547. }
  548. // Node is a wrapper around adapters.Node which is used to track the status
  549. // of a node in the network
  550. type Node struct {
  551. adapters.Node `json:"-"`
  552. // Config if the config used to created the node
  553. Config *adapters.NodeConfig `json:"config"`
  554. // up tracks whether or not the node is running
  555. up bool
  556. upMu sync.RWMutex
  557. }
  558. func (n *Node) Up() bool {
  559. n.upMu.RLock()
  560. defer n.upMu.RUnlock()
  561. return n.up
  562. }
  563. func (n *Node) SetUp(up bool) {
  564. n.upMu.Lock()
  565. defer n.upMu.Unlock()
  566. n.up = up
  567. }
  568. // ID returns the ID of the node
  569. func (n *Node) ID() enode.ID {
  570. return n.Config.ID
  571. }
  572. // String returns a log-friendly string
  573. func (n *Node) String() string {
  574. return fmt.Sprintf("Node %v", n.ID().TerminalString())
  575. }
  576. // NodeInfo returns information about the node
  577. func (n *Node) NodeInfo() *p2p.NodeInfo {
  578. // avoid a panic if the node is not started yet
  579. if n.Node == nil {
  580. return nil
  581. }
  582. info := n.Node.NodeInfo()
  583. info.Name = n.Config.Name
  584. return info
  585. }
  586. // MarshalJSON implements the json.Marshaler interface so that the encoded
  587. // JSON includes the NodeInfo
  588. func (n *Node) MarshalJSON() ([]byte, error) {
  589. return json.Marshal(struct {
  590. Info *p2p.NodeInfo `json:"info,omitempty"`
  591. Config *adapters.NodeConfig `json:"config,omitempty"`
  592. Up bool `json:"up"`
  593. }{
  594. Info: n.NodeInfo(),
  595. Config: n.Config,
  596. Up: n.Up(),
  597. })
  598. }
  599. // UnmarshalJSON implements json.Unmarshaler interface so that we don't lose
  600. // Node.up status. IMPORTANT: The implementation is incomplete; we lose
  601. // p2p.NodeInfo.
  602. func (n *Node) UnmarshalJSON(raw []byte) error {
  603. // TODO: How should we turn back NodeInfo into n.Node?
  604. // Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
  605. node := struct {
  606. Config *adapters.NodeConfig `json:"config,omitempty"`
  607. Up bool `json:"up"`
  608. }{}
  609. if err := json.Unmarshal(raw, &node); err != nil {
  610. return err
  611. }
  612. n.SetUp(node.Up)
  613. n.Config = node.Config
  614. return nil
  615. }
  616. // Conn represents a connection between two nodes in the network
  617. type Conn struct {
  618. // One is the node which initiated the connection
  619. One enode.ID `json:"one"`
  620. // Other is the node which the connection was made to
  621. Other enode.ID `json:"other"`
  622. // Up tracks whether or not the connection is active
  623. Up bool `json:"up"`
  624. // Registers when the connection was grabbed to dial
  625. initiated time.Time
  626. one *Node
  627. other *Node
  628. }
  629. // nodesUp returns whether both nodes are currently up
  630. func (c *Conn) nodesUp() error {
  631. if !c.one.Up() {
  632. return fmt.Errorf("one %v is not up", c.One)
  633. }
  634. if !c.other.Up() {
  635. return fmt.Errorf("other %v is not up", c.Other)
  636. }
  637. return nil
  638. }
  639. // String returns a log-friendly string
  640. func (c *Conn) String() string {
  641. return fmt.Sprintf("Conn %v->%v", c.One.TerminalString(), c.Other.TerminalString())
  642. }
  643. // Msg represents a p2p message sent between two nodes in the network
  644. type Msg struct {
  645. One enode.ID `json:"one"`
  646. Other enode.ID `json:"other"`
  647. Protocol string `json:"protocol"`
  648. Code uint64 `json:"code"`
  649. Received bool `json:"received"`
  650. }
  651. // String returns a log-friendly string
  652. func (m *Msg) String() string {
  653. return fmt.Sprintf("Msg(%d) %v->%v", m.Code, m.One.TerminalString(), m.Other.TerminalString())
  654. }
  655. // ConnLabel generates a deterministic string which represents a connection
  656. // between two nodes, used to compare if two connections are between the same
  657. // nodes
  658. func ConnLabel(source, target enode.ID) string {
  659. var first, second enode.ID
  660. if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
  661. first = target
  662. second = source
  663. } else {
  664. first = source
  665. second = target
  666. }
  667. return fmt.Sprintf("%v-%v", first, second)
  668. }
  669. // Snapshot represents the state of a network at a single point in time and can
  670. // be used to restore the state of a network
  671. type Snapshot struct {
  672. Nodes []NodeSnapshot `json:"nodes,omitempty"`
  673. Conns []Conn `json:"conns,omitempty"`
  674. }
  675. // NodeSnapshot represents the state of a node in the network
  676. type NodeSnapshot struct {
  677. Node Node `json:"node,omitempty"`
  678. // Snapshots is arbitrary data gathered from calling node.Snapshots()
  679. Snapshots map[string][]byte `json:"snapshots,omitempty"`
  680. }
  681. // Snapshot creates a network snapshot
  682. func (net *Network) Snapshot() (*Snapshot, error) {
  683. return net.snapshot(nil, nil)
  684. }
  685. func (net *Network) SnapshotWithServices(addServices []string, removeServices []string) (*Snapshot, error) {
  686. return net.snapshot(addServices, removeServices)
  687. }
  688. func (net *Network) snapshot(addServices []string, removeServices []string) (*Snapshot, error) {
  689. net.lock.Lock()
  690. defer net.lock.Unlock()
  691. snap := &Snapshot{
  692. Nodes: make([]NodeSnapshot, len(net.Nodes)),
  693. }
  694. for i, node := range net.Nodes {
  695. snap.Nodes[i] = NodeSnapshot{Node: *node}
  696. if !node.Up() {
  697. continue
  698. }
  699. snapshots, err := node.Snapshots()
  700. if err != nil {
  701. return nil, err
  702. }
  703. snap.Nodes[i].Snapshots = snapshots
  704. for _, addSvc := range addServices {
  705. haveSvc := false
  706. for _, svc := range snap.Nodes[i].Node.Config.Services {
  707. if svc == addSvc {
  708. haveSvc = true
  709. break
  710. }
  711. }
  712. if !haveSvc {
  713. snap.Nodes[i].Node.Config.Services = append(snap.Nodes[i].Node.Config.Services, addSvc)
  714. }
  715. }
  716. if len(removeServices) > 0 {
  717. var cleanedServices []string
  718. for _, svc := range snap.Nodes[i].Node.Config.Services {
  719. haveSvc := false
  720. for _, rmSvc := range removeServices {
  721. if rmSvc == svc {
  722. haveSvc = true
  723. break
  724. }
  725. }
  726. if !haveSvc {
  727. cleanedServices = append(cleanedServices, svc)
  728. }
  729. }
  730. snap.Nodes[i].Node.Config.Services = cleanedServices
  731. }
  732. }
  733. for _, conn := range net.Conns {
  734. if conn.Up {
  735. snap.Conns = append(snap.Conns, *conn)
  736. }
  737. }
  738. return snap, nil
  739. }
  740. // longrunning tests may need a longer timeout
  741. var snapshotLoadTimeout = 900 * time.Second
  742. // Load loads a network snapshot
  743. func (net *Network) Load(snap *Snapshot) error {
  744. // Start nodes.
  745. for _, n := range snap.Nodes {
  746. if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
  747. return err
  748. }
  749. if !n.Node.Up() {
  750. continue
  751. }
  752. if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
  753. return err
  754. }
  755. }
  756. // Prepare connection events counter.
  757. allConnected := make(chan struct{}) // closed when all connections are established
  758. done := make(chan struct{}) // ensures that the event loop goroutine is terminated
  759. defer close(done)
  760. // Subscribe to event channel.
  761. // It needs to be done outside of the event loop goroutine (created below)
  762. // to ensure that the event channel is blocking before connect calls are made.
  763. events := make(chan *Event)
  764. sub := net.Events().Subscribe(events)
  765. defer sub.Unsubscribe()
  766. go func() {
  767. // Expected number of connections.
  768. total := len(snap.Conns)
  769. // Set of all established connections from the snapshot, not other connections.
  770. // Key array element 0 is the connection One field value, and element 1 connection Other field.
  771. connections := make(map[[2]enode.ID]struct{}, total)
  772. for {
  773. select {
  774. case e := <-events:
  775. // Ignore control events as they do not represent
  776. // connect or disconnect (Up) state change.
  777. if e.Control {
  778. continue
  779. }
  780. // Detect only connection events.
  781. if e.Type != EventTypeConn {
  782. continue
  783. }
  784. connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
  785. // Nodes are still not connected or have been disconnected.
  786. if !e.Conn.Up {
  787. // Delete the connection from the set of established connections.
  788. // This will prevent false positive in case disconnections happen.
  789. delete(connections, connection)
  790. log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other)
  791. continue
  792. }
  793. // Check that the connection is from the snapshot.
  794. for _, conn := range snap.Conns {
  795. if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
  796. // Add the connection to the set of established connections.
  797. connections[connection] = struct{}{}
  798. if len(connections) == total {
  799. // Signal that all nodes are connected.
  800. close(allConnected)
  801. return
  802. }
  803. break
  804. }
  805. }
  806. case <-done:
  807. // Load function returned, terminate this goroutine.
  808. return
  809. }
  810. }
  811. }()
  812. // Start connecting.
  813. for _, conn := range snap.Conns {
  814. if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
  815. //in this case, at least one of the nodes of a connection is not up,
  816. //so it would result in the snapshot `Load` to fail
  817. continue
  818. }
  819. if err := net.Connect(conn.One, conn.Other); err != nil {
  820. return err
  821. }
  822. }
  823. select {
  824. // Wait until all connections from the snapshot are established.
  825. case <-allConnected:
  826. // Make sure that we do not wait forever.
  827. case <-time.After(snapshotLoadTimeout):
  828. return errors.New("snapshot connections not established")
  829. }
  830. return nil
  831. }
  832. // Subscribe reads control events from a channel and executes them
  833. func (net *Network) Subscribe(events chan *Event) {
  834. for {
  835. select {
  836. case event, ok := <-events:
  837. if !ok {
  838. return
  839. }
  840. if event.Control {
  841. net.executeControlEvent(event)
  842. }
  843. case <-net.quitc:
  844. return
  845. }
  846. }
  847. }
  848. func (net *Network) executeControlEvent(event *Event) {
  849. log.Trace("Executing control event", "type", event.Type, "event", event)
  850. switch event.Type {
  851. case EventTypeNode:
  852. if err := net.executeNodeEvent(event); err != nil {
  853. log.Error("Error executing node event", "event", event, "err", err)
  854. }
  855. case EventTypeConn:
  856. if err := net.executeConnEvent(event); err != nil {
  857. log.Error("Error executing conn event", "event", event, "err", err)
  858. }
  859. case EventTypeMsg:
  860. log.Warn("Ignoring control msg event")
  861. }
  862. }
  863. func (net *Network) executeNodeEvent(e *Event) error {
  864. if !e.Node.Up() {
  865. return net.Stop(e.Node.ID())
  866. }
  867. if _, err := net.NewNodeWithConfig(e.Node.Config); err != nil {
  868. return err
  869. }
  870. return net.Start(e.Node.ID())
  871. }
  872. func (net *Network) executeConnEvent(e *Event) error {
  873. if e.Conn.Up {
  874. return net.Connect(e.Conn.One, e.Conn.Other)
  875. } else {
  876. return net.Disconnect(e.Conn.One, e.Conn.Other)
  877. }
  878. }