network.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package simulations
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math/rand"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/event"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  32. )
  33. var DialBanTimeout = 200 * time.Millisecond
  34. // NetworkConfig defines configuration options for starting a Network
  35. type NetworkConfig struct {
  36. ID string `json:"id"`
  37. DefaultService string `json:"default_service,omitempty"`
  38. }
  39. // Network models a p2p simulation network which consists of a collection of
  40. // simulated nodes and the connections which exist between them.
  41. //
  42. // The Network has a single NodeAdapter which is responsible for actually
  43. // starting nodes and connecting them together.
  44. //
  45. // The Network emits events when nodes are started and stopped, when they are
  46. // connected and disconnected, and also when messages are sent between nodes.
  47. type Network struct {
  48. NetworkConfig
  49. Nodes []*Node `json:"nodes"`
  50. nodeMap map[enode.ID]int
  51. Conns []*Conn `json:"conns"`
  52. connMap map[string]int
  53. nodeAdapter adapters.NodeAdapter
  54. events event.Feed
  55. lock sync.RWMutex
  56. quitc chan struct{}
  57. }
  58. // NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
  59. func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
  60. return &Network{
  61. NetworkConfig: *conf,
  62. nodeAdapter: nodeAdapter,
  63. nodeMap: make(map[enode.ID]int),
  64. connMap: make(map[string]int),
  65. quitc: make(chan struct{}),
  66. }
  67. }
  68. // Events returns the output event feed of the Network.
  69. func (net *Network) Events() *event.Feed {
  70. return &net.events
  71. }
  72. // NewNodeWithConfig adds a new node to the network with the given config,
  73. // returning an error if a node with the same ID or name already exists
  74. func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
  75. net.lock.Lock()
  76. defer net.lock.Unlock()
  77. if conf.Reachable == nil {
  78. conf.Reachable = func(otherID enode.ID) bool {
  79. _, err := net.InitConn(conf.ID, otherID)
  80. if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
  81. return false
  82. }
  83. return true
  84. }
  85. }
  86. // check the node doesn't already exist
  87. if node := net.getNode(conf.ID); node != nil {
  88. return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
  89. }
  90. if node := net.getNodeByName(conf.Name); node != nil {
  91. return nil, fmt.Errorf("node with name %q already exists", conf.Name)
  92. }
  93. // if no services are configured, use the default service
  94. if len(conf.Services) == 0 {
  95. conf.Services = []string{net.DefaultService}
  96. }
  97. // use the NodeAdapter to create the node
  98. adapterNode, err := net.nodeAdapter.NewNode(conf)
  99. if err != nil {
  100. return nil, err
  101. }
  102. node := &Node{
  103. Node: adapterNode,
  104. Config: conf,
  105. }
  106. log.Trace("Node created", "id", conf.ID)
  107. net.nodeMap[conf.ID] = len(net.Nodes)
  108. net.Nodes = append(net.Nodes, node)
  109. // emit a "control" event
  110. net.events.Send(ControlEvent(node))
  111. return node, nil
  112. }
  113. // Config returns the network configuration
  114. func (net *Network) Config() *NetworkConfig {
  115. return &net.NetworkConfig
  116. }
  117. // StartAll starts all nodes in the network
  118. func (net *Network) StartAll() error {
  119. for _, node := range net.Nodes {
  120. if node.Up {
  121. continue
  122. }
  123. if err := net.Start(node.ID()); err != nil {
  124. return err
  125. }
  126. }
  127. return nil
  128. }
  129. // StopAll stops all nodes in the network
  130. func (net *Network) StopAll() error {
  131. for _, node := range net.Nodes {
  132. if !node.Up {
  133. continue
  134. }
  135. if err := net.Stop(node.ID()); err != nil {
  136. return err
  137. }
  138. }
  139. return nil
  140. }
  141. // Start starts the node with the given ID
  142. func (net *Network) Start(id enode.ID) error {
  143. return net.startWithSnapshots(id, nil)
  144. }
  145. // startWithSnapshots starts the node with the given ID using the give
  146. // snapshots
  147. func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
  148. net.lock.Lock()
  149. node := net.getNode(id)
  150. if node == nil {
  151. net.lock.Unlock()
  152. return fmt.Errorf("node %v does not exist", id)
  153. }
  154. if node.Up {
  155. net.lock.Unlock()
  156. return fmt.Errorf("node %v already up", id)
  157. }
  158. log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
  159. if err := node.Start(snapshots); err != nil {
  160. net.lock.Unlock()
  161. log.Warn("Node startup failed", "id", id, "err", err)
  162. return err
  163. }
  164. node.Up = true
  165. log.Info("Started node", "id", id)
  166. ev := NewEvent(node)
  167. net.lock.Unlock()
  168. net.events.Send(ev)
  169. // subscribe to peer events
  170. client, err := node.Client()
  171. if err != nil {
  172. return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
  173. }
  174. events := make(chan *p2p.PeerEvent)
  175. sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
  176. if err != nil {
  177. return fmt.Errorf("error getting peer events for node %v: %s", id, err)
  178. }
  179. go net.watchPeerEvents(id, events, sub)
  180. return nil
  181. }
  182. // watchPeerEvents reads peer events from the given channel and emits
  183. // corresponding network events
  184. func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub event.Subscription) {
  185. defer func() {
  186. sub.Unsubscribe()
  187. // assume the node is now down
  188. net.lock.Lock()
  189. defer net.lock.Unlock()
  190. node := net.getNode(id)
  191. if node == nil {
  192. return
  193. }
  194. node.Up = false
  195. ev := NewEvent(node)
  196. net.events.Send(ev)
  197. }()
  198. for {
  199. select {
  200. case event, ok := <-events:
  201. if !ok {
  202. return
  203. }
  204. peer := event.Peer
  205. switch event.Type {
  206. case p2p.PeerEventTypeAdd:
  207. net.DidConnect(id, peer)
  208. case p2p.PeerEventTypeDrop:
  209. net.DidDisconnect(id, peer)
  210. case p2p.PeerEventTypeMsgSend:
  211. net.DidSend(id, peer, event.Protocol, *event.MsgCode)
  212. case p2p.PeerEventTypeMsgRecv:
  213. net.DidReceive(peer, id, event.Protocol, *event.MsgCode)
  214. }
  215. case err := <-sub.Err():
  216. if err != nil {
  217. log.Error("Error in peer event subscription", "id", id, "err", err)
  218. }
  219. return
  220. }
  221. }
  222. }
  223. // Stop stops the node with the given ID
  224. func (net *Network) Stop(id enode.ID) error {
  225. net.lock.Lock()
  226. node := net.getNode(id)
  227. if node == nil {
  228. net.lock.Unlock()
  229. return fmt.Errorf("node %v does not exist", id)
  230. }
  231. if !node.Up {
  232. net.lock.Unlock()
  233. return fmt.Errorf("node %v already down", id)
  234. }
  235. node.Up = false
  236. net.lock.Unlock()
  237. err := node.Stop()
  238. if err != nil {
  239. net.lock.Lock()
  240. node.Up = true
  241. net.lock.Unlock()
  242. return err
  243. }
  244. log.Info("Stopped node", "id", id, "err", err)
  245. net.lock.Lock()
  246. ev := ControlEvent(node)
  247. net.lock.Unlock()
  248. net.events.Send(ev)
  249. return nil
  250. }
  251. // Connect connects two nodes together by calling the "admin_addPeer" RPC
  252. // method on the "one" node so that it connects to the "other" node
  253. func (net *Network) Connect(oneID, otherID enode.ID) error {
  254. log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
  255. conn, err := net.InitConn(oneID, otherID)
  256. if err != nil {
  257. return err
  258. }
  259. client, err := conn.one.Client()
  260. if err != nil {
  261. return err
  262. }
  263. net.events.Send(ControlEvent(conn))
  264. return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
  265. }
  266. // Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
  267. // method on the "one" node so that it disconnects from the "other" node
  268. func (net *Network) Disconnect(oneID, otherID enode.ID) error {
  269. conn := net.GetConn(oneID, otherID)
  270. if conn == nil {
  271. return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
  272. }
  273. if !conn.Up {
  274. return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
  275. }
  276. client, err := conn.one.Client()
  277. if err != nil {
  278. return err
  279. }
  280. net.events.Send(ControlEvent(conn))
  281. return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
  282. }
  283. // DidConnect tracks the fact that the "one" node connected to the "other" node
  284. func (net *Network) DidConnect(one, other enode.ID) error {
  285. net.lock.Lock()
  286. defer net.lock.Unlock()
  287. conn, err := net.getOrCreateConn(one, other)
  288. if err != nil {
  289. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  290. }
  291. if conn.Up {
  292. return fmt.Errorf("%v and %v already connected", one, other)
  293. }
  294. conn.Up = true
  295. net.events.Send(NewEvent(conn))
  296. return nil
  297. }
  298. // DidDisconnect tracks the fact that the "one" node disconnected from the
  299. // "other" node
  300. func (net *Network) DidDisconnect(one, other enode.ID) error {
  301. net.lock.Lock()
  302. defer net.lock.Unlock()
  303. conn := net.getConn(one, other)
  304. if conn == nil {
  305. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  306. }
  307. if !conn.Up {
  308. return fmt.Errorf("%v and %v already disconnected", one, other)
  309. }
  310. conn.Up = false
  311. conn.initiated = time.Now().Add(-DialBanTimeout)
  312. net.events.Send(NewEvent(conn))
  313. return nil
  314. }
  315. // DidSend tracks the fact that "sender" sent a message to "receiver"
  316. func (net *Network) DidSend(sender, receiver enode.ID, proto string, code uint64) error {
  317. msg := &Msg{
  318. One: sender,
  319. Other: receiver,
  320. Protocol: proto,
  321. Code: code,
  322. Received: false,
  323. }
  324. net.events.Send(NewEvent(msg))
  325. return nil
  326. }
  327. // DidReceive tracks the fact that "receiver" received a message from "sender"
  328. func (net *Network) DidReceive(sender, receiver enode.ID, proto string, code uint64) error {
  329. msg := &Msg{
  330. One: sender,
  331. Other: receiver,
  332. Protocol: proto,
  333. Code: code,
  334. Received: true,
  335. }
  336. net.events.Send(NewEvent(msg))
  337. return nil
  338. }
  339. // GetNode gets the node with the given ID, returning nil if the node does not
  340. // exist
  341. func (net *Network) GetNode(id enode.ID) *Node {
  342. net.lock.RLock()
  343. defer net.lock.RUnlock()
  344. return net.getNode(id)
  345. }
  346. // GetNode gets the node with the given name, returning nil if the node does
  347. // not exist
  348. func (net *Network) GetNodeByName(name string) *Node {
  349. net.lock.RLock()
  350. defer net.lock.RUnlock()
  351. return net.getNodeByName(name)
  352. }
  353. func (net *Network) getNodeByName(name string) *Node {
  354. for _, node := range net.Nodes {
  355. if node.Config.Name == name {
  356. return node
  357. }
  358. }
  359. return nil
  360. }
  361. // GetNodes returns the existing nodes
  362. func (net *Network) GetNodes() (nodes []*Node) {
  363. net.lock.RLock()
  364. defer net.lock.RUnlock()
  365. nodes = append(nodes, net.Nodes...)
  366. return nodes
  367. }
  368. func (net *Network) getNode(id enode.ID) *Node {
  369. i, found := net.nodeMap[id]
  370. if !found {
  371. return nil
  372. }
  373. return net.Nodes[i]
  374. }
  375. // GetRandomUpNode returns a random node on the network, which is running.
  376. func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {
  377. net.lock.RLock()
  378. defer net.lock.RUnlock()
  379. return net.getRandomNode(net.getUpNodeIDs(), excludeIDs)
  380. }
  381. func (net *Network) getUpNodeIDs() (ids []enode.ID) {
  382. for _, node := range net.Nodes {
  383. if node.Up {
  384. ids = append(ids, node.ID())
  385. }
  386. }
  387. return ids
  388. }
  389. // GetRandomDownNode returns a random node on the network, which is stopped.
  390. func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {
  391. net.lock.RLock()
  392. defer net.lock.RUnlock()
  393. return net.getRandomNode(net.getDownNodeIDs(), excludeIDs)
  394. }
  395. func (net *Network) getDownNodeIDs() (ids []enode.ID) {
  396. for _, node := range net.GetNodes() {
  397. if !node.Up {
  398. ids = append(ids, node.ID())
  399. }
  400. }
  401. return ids
  402. }
  403. func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
  404. filtered := filterIDs(ids, excludeIDs)
  405. l := len(filtered)
  406. if l == 0 {
  407. return nil
  408. }
  409. return net.GetNode(filtered[rand.Intn(l)])
  410. }
  411. func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {
  412. exclude := make(map[enode.ID]bool)
  413. for _, id := range excludeIDs {
  414. exclude[id] = true
  415. }
  416. var filtered []enode.ID
  417. for _, id := range ids {
  418. if _, found := exclude[id]; !found {
  419. filtered = append(filtered, id)
  420. }
  421. }
  422. return filtered
  423. }
  424. // GetConn returns the connection which exists between "one" and "other"
  425. // regardless of which node initiated the connection
  426. func (net *Network) GetConn(oneID, otherID enode.ID) *Conn {
  427. net.lock.RLock()
  428. defer net.lock.RUnlock()
  429. return net.getConn(oneID, otherID)
  430. }
  431. // GetOrCreateConn is like GetConn but creates the connection if it doesn't
  432. // already exist
  433. func (net *Network) GetOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
  434. net.lock.Lock()
  435. defer net.lock.Unlock()
  436. return net.getOrCreateConn(oneID, otherID)
  437. }
  438. func (net *Network) getOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
  439. if conn := net.getConn(oneID, otherID); conn != nil {
  440. return conn, nil
  441. }
  442. one := net.getNode(oneID)
  443. if one == nil {
  444. return nil, fmt.Errorf("node %v does not exist", oneID)
  445. }
  446. other := net.getNode(otherID)
  447. if other == nil {
  448. return nil, fmt.Errorf("node %v does not exist", otherID)
  449. }
  450. conn := &Conn{
  451. One: oneID,
  452. Other: otherID,
  453. one: one,
  454. other: other,
  455. }
  456. label := ConnLabel(oneID, otherID)
  457. net.connMap[label] = len(net.Conns)
  458. net.Conns = append(net.Conns, conn)
  459. return conn, nil
  460. }
  461. func (net *Network) getConn(oneID, otherID enode.ID) *Conn {
  462. label := ConnLabel(oneID, otherID)
  463. i, found := net.connMap[label]
  464. if !found {
  465. return nil
  466. }
  467. return net.Conns[i]
  468. }
  469. // InitConn(one, other) retrieves the connection model for the connection between
  470. // peers one and other, or creates a new one if it does not exist
  471. // the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
  472. // it checks if the connection is already up, and if the nodes are running
  473. // NOTE:
  474. // it also checks whether there has been recent attempt to connect the peers
  475. // this is cheating as the simulation is used as an oracle and know about
  476. // remote peers attempt to connect to a node which will then not initiate the connection
  477. func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
  478. net.lock.Lock()
  479. defer net.lock.Unlock()
  480. if oneID == otherID {
  481. return nil, fmt.Errorf("refusing to connect to self %v", oneID)
  482. }
  483. conn, err := net.getOrCreateConn(oneID, otherID)
  484. if err != nil {
  485. return nil, err
  486. }
  487. if conn.Up {
  488. return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
  489. }
  490. if time.Since(conn.initiated) < DialBanTimeout {
  491. return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
  492. }
  493. err = conn.nodesUp()
  494. if err != nil {
  495. log.Trace("Nodes not up", "err", err)
  496. return nil, fmt.Errorf("nodes not up: %v", err)
  497. }
  498. log.Debug("Connection initiated", "id", oneID, "other", otherID)
  499. conn.initiated = time.Now()
  500. return conn, nil
  501. }
  502. // Shutdown stops all nodes in the network and closes the quit channel
  503. func (net *Network) Shutdown() {
  504. for _, node := range net.Nodes {
  505. log.Debug("Stopping node", "id", node.ID())
  506. if err := node.Stop(); err != nil {
  507. log.Warn("Can't stop node", "id", node.ID(), "err", err)
  508. }
  509. // If the node has the close method, call it.
  510. if closer, ok := node.Node.(io.Closer); ok {
  511. if err := closer.Close(); err != nil {
  512. log.Warn("Can't close node", "id", node.ID(), "err", err)
  513. }
  514. }
  515. }
  516. close(net.quitc)
  517. }
  518. // Reset resets all network properties:
  519. // empties the nodes and the connection list
  520. func (net *Network) Reset() {
  521. net.lock.Lock()
  522. defer net.lock.Unlock()
  523. //re-initialize the maps
  524. net.connMap = make(map[string]int)
  525. net.nodeMap = make(map[enode.ID]int)
  526. net.Nodes = nil
  527. net.Conns = nil
  528. }
  529. // Node is a wrapper around adapters.Node which is used to track the status
  530. // of a node in the network
  531. type Node struct {
  532. adapters.Node `json:"-"`
  533. // Config if the config used to created the node
  534. Config *adapters.NodeConfig `json:"config"`
  535. // Up tracks whether or not the node is running
  536. Up bool `json:"up"`
  537. }
  538. // ID returns the ID of the node
  539. func (n *Node) ID() enode.ID {
  540. return n.Config.ID
  541. }
  542. // String returns a log-friendly string
  543. func (n *Node) String() string {
  544. return fmt.Sprintf("Node %v", n.ID().TerminalString())
  545. }
  546. // NodeInfo returns information about the node
  547. func (n *Node) NodeInfo() *p2p.NodeInfo {
  548. // avoid a panic if the node is not started yet
  549. if n.Node == nil {
  550. return nil
  551. }
  552. info := n.Node.NodeInfo()
  553. info.Name = n.Config.Name
  554. return info
  555. }
  556. // MarshalJSON implements the json.Marshaler interface so that the encoded
  557. // JSON includes the NodeInfo
  558. func (n *Node) MarshalJSON() ([]byte, error) {
  559. return json.Marshal(struct {
  560. Info *p2p.NodeInfo `json:"info,omitempty"`
  561. Config *adapters.NodeConfig `json:"config,omitempty"`
  562. Up bool `json:"up"`
  563. }{
  564. Info: n.NodeInfo(),
  565. Config: n.Config,
  566. Up: n.Up,
  567. })
  568. }
  569. // Conn represents a connection between two nodes in the network
  570. type Conn struct {
  571. // One is the node which initiated the connection
  572. One enode.ID `json:"one"`
  573. // Other is the node which the connection was made to
  574. Other enode.ID `json:"other"`
  575. // Up tracks whether or not the connection is active
  576. Up bool `json:"up"`
  577. // Registers when the connection was grabbed to dial
  578. initiated time.Time
  579. one *Node
  580. other *Node
  581. }
  582. // nodesUp returns whether both nodes are currently up
  583. func (c *Conn) nodesUp() error {
  584. if !c.one.Up {
  585. return fmt.Errorf("one %v is not up", c.One)
  586. }
  587. if !c.other.Up {
  588. return fmt.Errorf("other %v is not up", c.Other)
  589. }
  590. return nil
  591. }
  592. // String returns a log-friendly string
  593. func (c *Conn) String() string {
  594. return fmt.Sprintf("Conn %v->%v", c.One.TerminalString(), c.Other.TerminalString())
  595. }
  596. // Msg represents a p2p message sent between two nodes in the network
  597. type Msg struct {
  598. One enode.ID `json:"one"`
  599. Other enode.ID `json:"other"`
  600. Protocol string `json:"protocol"`
  601. Code uint64 `json:"code"`
  602. Received bool `json:"received"`
  603. }
  604. // String returns a log-friendly string
  605. func (m *Msg) String() string {
  606. return fmt.Sprintf("Msg(%d) %v->%v", m.Code, m.One.TerminalString(), m.Other.TerminalString())
  607. }
  608. // ConnLabel generates a deterministic string which represents a connection
  609. // between two nodes, used to compare if two connections are between the same
  610. // nodes
  611. func ConnLabel(source, target enode.ID) string {
  612. var first, second enode.ID
  613. if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
  614. first = target
  615. second = source
  616. } else {
  617. first = source
  618. second = target
  619. }
  620. return fmt.Sprintf("%v-%v", first, second)
  621. }
  622. // Snapshot represents the state of a network at a single point in time and can
  623. // be used to restore the state of a network
  624. type Snapshot struct {
  625. Nodes []NodeSnapshot `json:"nodes,omitempty"`
  626. Conns []Conn `json:"conns,omitempty"`
  627. }
  628. // NodeSnapshot represents the state of a node in the network
  629. type NodeSnapshot struct {
  630. Node Node `json:"node,omitempty"`
  631. // Snapshots is arbitrary data gathered from calling node.Snapshots()
  632. Snapshots map[string][]byte `json:"snapshots,omitempty"`
  633. }
  634. // Snapshot creates a network snapshot
  635. func (net *Network) Snapshot() (*Snapshot, error) {
  636. return net.snapshot(nil, nil)
  637. }
  638. func (net *Network) SnapshotWithServices(addServices []string, removeServices []string) (*Snapshot, error) {
  639. return net.snapshot(addServices, removeServices)
  640. }
  641. func (net *Network) snapshot(addServices []string, removeServices []string) (*Snapshot, error) {
  642. net.lock.Lock()
  643. defer net.lock.Unlock()
  644. snap := &Snapshot{
  645. Nodes: make([]NodeSnapshot, len(net.Nodes)),
  646. }
  647. for i, node := range net.Nodes {
  648. snap.Nodes[i] = NodeSnapshot{Node: *node}
  649. if !node.Up {
  650. continue
  651. }
  652. snapshots, err := node.Snapshots()
  653. if err != nil {
  654. return nil, err
  655. }
  656. snap.Nodes[i].Snapshots = snapshots
  657. for _, addSvc := range addServices {
  658. haveSvc := false
  659. for _, svc := range snap.Nodes[i].Node.Config.Services {
  660. if svc == addSvc {
  661. haveSvc = true
  662. break
  663. }
  664. }
  665. if !haveSvc {
  666. snap.Nodes[i].Node.Config.Services = append(snap.Nodes[i].Node.Config.Services, addSvc)
  667. }
  668. }
  669. if len(removeServices) > 0 {
  670. var cleanedServices []string
  671. for _, svc := range snap.Nodes[i].Node.Config.Services {
  672. haveSvc := false
  673. for _, rmSvc := range removeServices {
  674. if rmSvc == svc {
  675. haveSvc = true
  676. break
  677. }
  678. }
  679. if !haveSvc {
  680. cleanedServices = append(cleanedServices, svc)
  681. }
  682. }
  683. snap.Nodes[i].Node.Config.Services = cleanedServices
  684. }
  685. }
  686. for _, conn := range net.Conns {
  687. if conn.Up {
  688. snap.Conns = append(snap.Conns, *conn)
  689. }
  690. }
  691. return snap, nil
  692. }
  693. var snapshotLoadTimeout = 120 * time.Second
  694. // Load loads a network snapshot
  695. func (net *Network) Load(snap *Snapshot) error {
  696. // Start nodes.
  697. for _, n := range snap.Nodes {
  698. if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
  699. return err
  700. }
  701. if !n.Node.Up {
  702. continue
  703. }
  704. if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
  705. return err
  706. }
  707. }
  708. // Prepare connection events counter.
  709. allConnected := make(chan struct{}) // closed when all connections are established
  710. done := make(chan struct{}) // ensures that the event loop goroutine is terminated
  711. defer close(done)
  712. // Subscribe to event channel.
  713. // It needs to be done outside of the event loop goroutine (created below)
  714. // to ensure that the event channel is blocking before connect calls are made.
  715. events := make(chan *Event)
  716. sub := net.Events().Subscribe(events)
  717. defer sub.Unsubscribe()
  718. go func() {
  719. // Expected number of connections.
  720. total := len(snap.Conns)
  721. // Set of all established connections from the snapshot, not other connections.
  722. // Key array element 0 is the connection One field value, and element 1 connection Other field.
  723. connections := make(map[[2]enode.ID]struct{}, total)
  724. for {
  725. select {
  726. case e := <-events:
  727. // Ignore control events as they do not represent
  728. // connect or disconnect (Up) state change.
  729. if e.Control {
  730. continue
  731. }
  732. // Detect only connection events.
  733. if e.Type != EventTypeConn {
  734. continue
  735. }
  736. connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
  737. // Nodes are still not connected or have been disconnected.
  738. if !e.Conn.Up {
  739. // Delete the connection from the set of established connections.
  740. // This will prevent false positive in case disconnections happen.
  741. delete(connections, connection)
  742. log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other)
  743. continue
  744. }
  745. // Check that the connection is from the snapshot.
  746. for _, conn := range snap.Conns {
  747. if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
  748. // Add the connection to the set of established connections.
  749. connections[connection] = struct{}{}
  750. if len(connections) == total {
  751. // Signal that all nodes are connected.
  752. close(allConnected)
  753. return
  754. }
  755. break
  756. }
  757. }
  758. case <-done:
  759. // Load function returned, terminate this goroutine.
  760. return
  761. }
  762. }
  763. }()
  764. // Start connecting.
  765. for _, conn := range snap.Conns {
  766. if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
  767. //in this case, at least one of the nodes of a connection is not up,
  768. //so it would result in the snapshot `Load` to fail
  769. continue
  770. }
  771. if err := net.Connect(conn.One, conn.Other); err != nil {
  772. return err
  773. }
  774. }
  775. select {
  776. // Wait until all connections from the snapshot are established.
  777. case <-allConnected:
  778. // Make sure that we do not wait forever.
  779. case <-time.After(snapshotLoadTimeout):
  780. return errors.New("snapshot connections not established")
  781. }
  782. return nil
  783. }
  784. // Subscribe reads control events from a channel and executes them
  785. func (net *Network) Subscribe(events chan *Event) {
  786. for {
  787. select {
  788. case event, ok := <-events:
  789. if !ok {
  790. return
  791. }
  792. if event.Control {
  793. net.executeControlEvent(event)
  794. }
  795. case <-net.quitc:
  796. return
  797. }
  798. }
  799. }
  800. func (net *Network) executeControlEvent(event *Event) {
  801. log.Trace("Executing control event", "type", event.Type, "event", event)
  802. switch event.Type {
  803. case EventTypeNode:
  804. if err := net.executeNodeEvent(event); err != nil {
  805. log.Error("Error executing node event", "event", event, "err", err)
  806. }
  807. case EventTypeConn:
  808. if err := net.executeConnEvent(event); err != nil {
  809. log.Error("Error executing conn event", "event", event, "err", err)
  810. }
  811. case EventTypeMsg:
  812. log.Warn("Ignoring control msg event")
  813. }
  814. }
  815. func (net *Network) executeNodeEvent(e *Event) error {
  816. if !e.Node.Up {
  817. return net.Stop(e.Node.ID())
  818. }
  819. if _, err := net.NewNodeWithConfig(e.Node.Config); err != nil {
  820. return err
  821. }
  822. return net.Start(e.Node.ID())
  823. }
  824. func (net *Network) executeConnEvent(e *Event) error {
  825. if e.Conn.Up {
  826. return net.Connect(e.Conn.One, e.Conn.Other)
  827. } else {
  828. return net.Disconnect(e.Conn.One, e.Conn.Other)
  829. }
  830. }