network.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package simulations
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "fmt"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/event"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. "github.com/ethereum/go-ethereum/p2p/discover"
  28. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  29. )
  30. var dialBanTimeout = 200 * time.Millisecond
  31. // NetworkConfig defines configuration options for starting a Network
  32. type NetworkConfig struct {
  33. ID string `json:"id"`
  34. DefaultService string `json:"default_service,omitempty"`
  35. }
  36. // Network models a p2p simulation network which consists of a collection of
  37. // simulated nodes and the connections which exist between them.
  38. //
  39. // The Network has a single NodeAdapter which is responsible for actually
  40. // starting nodes and connecting them together.
  41. //
  42. // The Network emits events when nodes are started and stopped, when they are
  43. // connected and disconnected, and also when messages are sent between nodes.
  44. type Network struct {
  45. NetworkConfig
  46. Nodes []*Node `json:"nodes"`
  47. nodeMap map[discover.NodeID]int
  48. Conns []*Conn `json:"conns"`
  49. connMap map[string]int
  50. nodeAdapter adapters.NodeAdapter
  51. events event.Feed
  52. lock sync.RWMutex
  53. quitc chan struct{}
  54. }
  55. // NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
  56. func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
  57. return &Network{
  58. NetworkConfig: *conf,
  59. nodeAdapter: nodeAdapter,
  60. nodeMap: make(map[discover.NodeID]int),
  61. connMap: make(map[string]int),
  62. quitc: make(chan struct{}),
  63. }
  64. }
  65. // Events returns the output event feed of the Network.
  66. func (net *Network) Events() *event.Feed {
  67. return &net.events
  68. }
  69. // NewNode adds a new node to the network with a random ID
  70. func (net *Network) NewNode() (*Node, error) {
  71. conf := adapters.RandomNodeConfig()
  72. conf.Services = []string{net.DefaultService}
  73. return net.NewNodeWithConfig(conf)
  74. }
  75. // NewNodeWithConfig adds a new node to the network with the given config,
  76. // returning an error if a node with the same ID or name already exists
  77. func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
  78. net.lock.Lock()
  79. defer net.lock.Unlock()
  80. // create a random ID and PrivateKey if not set
  81. if conf.ID == (discover.NodeID{}) {
  82. c := adapters.RandomNodeConfig()
  83. conf.ID = c.ID
  84. conf.PrivateKey = c.PrivateKey
  85. }
  86. id := conf.ID
  87. if conf.Reachable == nil {
  88. conf.Reachable = func(otherID discover.NodeID) bool {
  89. _, err := net.InitConn(conf.ID, otherID)
  90. return err == nil
  91. }
  92. }
  93. // assign a name to the node if not set
  94. if conf.Name == "" {
  95. conf.Name = fmt.Sprintf("node%02d", len(net.Nodes)+1)
  96. }
  97. // check the node doesn't already exist
  98. if node := net.getNode(id); node != nil {
  99. return nil, fmt.Errorf("node with ID %q already exists", id)
  100. }
  101. if node := net.getNodeByName(conf.Name); node != nil {
  102. return nil, fmt.Errorf("node with name %q already exists", conf.Name)
  103. }
  104. // if no services are configured, use the default service
  105. if len(conf.Services) == 0 {
  106. conf.Services = []string{net.DefaultService}
  107. }
  108. // use the NodeAdapter to create the node
  109. adapterNode, err := net.nodeAdapter.NewNode(conf)
  110. if err != nil {
  111. return nil, err
  112. }
  113. node := &Node{
  114. Node: adapterNode,
  115. Config: conf,
  116. }
  117. log.Trace(fmt.Sprintf("node %v created", id))
  118. net.nodeMap[id] = len(net.Nodes)
  119. net.Nodes = append(net.Nodes, node)
  120. // emit a "control" event
  121. net.events.Send(ControlEvent(node))
  122. return node, nil
  123. }
  124. // Config returns the network configuration
  125. func (net *Network) Config() *NetworkConfig {
  126. return &net.NetworkConfig
  127. }
  128. // StartAll starts all nodes in the network
  129. func (net *Network) StartAll() error {
  130. for _, node := range net.Nodes {
  131. if node.Up {
  132. continue
  133. }
  134. if err := net.Start(node.ID()); err != nil {
  135. return err
  136. }
  137. }
  138. return nil
  139. }
  140. // StopAll stops all nodes in the network
  141. func (net *Network) StopAll() error {
  142. for _, node := range net.Nodes {
  143. if !node.Up {
  144. continue
  145. }
  146. if err := net.Stop(node.ID()); err != nil {
  147. return err
  148. }
  149. }
  150. return nil
  151. }
  152. // Start starts the node with the given ID
  153. func (net *Network) Start(id discover.NodeID) error {
  154. return net.startWithSnapshots(id, nil)
  155. }
  156. // startWithSnapshots starts the node with the given ID using the give
  157. // snapshots
  158. func (net *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error {
  159. node := net.GetNode(id)
  160. if node == nil {
  161. return fmt.Errorf("node %v does not exist", id)
  162. }
  163. if node.Up {
  164. return fmt.Errorf("node %v already up", id)
  165. }
  166. log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, net.nodeAdapter.Name()))
  167. if err := node.Start(snapshots); err != nil {
  168. log.Warn(fmt.Sprintf("start up failed: %v", err))
  169. return err
  170. }
  171. node.Up = true
  172. log.Info(fmt.Sprintf("started node %v: %v", id, node.Up))
  173. net.events.Send(NewEvent(node))
  174. // subscribe to peer events
  175. client, err := node.Client()
  176. if err != nil {
  177. return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
  178. }
  179. events := make(chan *p2p.PeerEvent)
  180. sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
  181. if err != nil {
  182. return fmt.Errorf("error getting peer events for node %v: %s", id, err)
  183. }
  184. go net.watchPeerEvents(id, events, sub)
  185. return nil
  186. }
  187. // watchPeerEvents reads peer events from the given channel and emits
  188. // corresponding network events
  189. func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEvent, sub event.Subscription) {
  190. defer func() {
  191. sub.Unsubscribe()
  192. // assume the node is now down
  193. net.lock.Lock()
  194. node := net.getNode(id)
  195. node.Up = false
  196. net.lock.Unlock()
  197. net.events.Send(NewEvent(node))
  198. }()
  199. for {
  200. select {
  201. case event, ok := <-events:
  202. if !ok {
  203. return
  204. }
  205. peer := event.Peer
  206. switch event.Type {
  207. case p2p.PeerEventTypeAdd:
  208. net.DidConnect(id, peer)
  209. case p2p.PeerEventTypeDrop:
  210. net.DidDisconnect(id, peer)
  211. case p2p.PeerEventTypeMsgSend:
  212. net.DidSend(id, peer, event.Protocol, *event.MsgCode)
  213. case p2p.PeerEventTypeMsgRecv:
  214. net.DidReceive(peer, id, event.Protocol, *event.MsgCode)
  215. }
  216. case err := <-sub.Err():
  217. if err != nil {
  218. log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err)
  219. }
  220. return
  221. }
  222. }
  223. }
  224. // Stop stops the node with the given ID
  225. func (net *Network) Stop(id discover.NodeID) error {
  226. node := net.GetNode(id)
  227. if node == nil {
  228. return fmt.Errorf("node %v does not exist", id)
  229. }
  230. if !node.Up {
  231. return fmt.Errorf("node %v already down", id)
  232. }
  233. if err := node.Stop(); err != nil {
  234. return err
  235. }
  236. node.Up = false
  237. log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up))
  238. net.events.Send(ControlEvent(node))
  239. return nil
  240. }
  241. // Connect connects two nodes together by calling the "admin_addPeer" RPC
  242. // method on the "one" node so that it connects to the "other" node
  243. func (net *Network) Connect(oneID, otherID discover.NodeID) error {
  244. log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
  245. conn, err := net.InitConn(oneID, otherID)
  246. if err != nil {
  247. return err
  248. }
  249. client, err := conn.one.Client()
  250. if err != nil {
  251. return err
  252. }
  253. net.events.Send(ControlEvent(conn))
  254. return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
  255. }
  256. // Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
  257. // method on the "one" node so that it disconnects from the "other" node
  258. func (net *Network) Disconnect(oneID, otherID discover.NodeID) error {
  259. conn := net.GetConn(oneID, otherID)
  260. if conn == nil {
  261. return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
  262. }
  263. if !conn.Up {
  264. return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
  265. }
  266. client, err := conn.one.Client()
  267. if err != nil {
  268. return err
  269. }
  270. net.events.Send(ControlEvent(conn))
  271. return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
  272. }
  273. // DidConnect tracks the fact that the "one" node connected to the "other" node
  274. func (net *Network) DidConnect(one, other discover.NodeID) error {
  275. conn, err := net.GetOrCreateConn(one, other)
  276. if err != nil {
  277. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  278. }
  279. if conn.Up {
  280. return fmt.Errorf("%v and %v already connected", one, other)
  281. }
  282. conn.Up = true
  283. net.events.Send(NewEvent(conn))
  284. return nil
  285. }
  286. // DidDisconnect tracks the fact that the "one" node disconnected from the
  287. // "other" node
  288. func (net *Network) DidDisconnect(one, other discover.NodeID) error {
  289. conn := net.GetConn(one, other)
  290. if conn == nil {
  291. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  292. }
  293. if !conn.Up {
  294. return fmt.Errorf("%v and %v already disconnected", one, other)
  295. }
  296. conn.Up = false
  297. conn.initiated = time.Now().Add(-dialBanTimeout)
  298. net.events.Send(NewEvent(conn))
  299. return nil
  300. }
  301. // DidSend tracks the fact that "sender" sent a message to "receiver"
  302. func (net *Network) DidSend(sender, receiver discover.NodeID, proto string, code uint64) error {
  303. msg := &Msg{
  304. One: sender,
  305. Other: receiver,
  306. Protocol: proto,
  307. Code: code,
  308. Received: false,
  309. }
  310. net.events.Send(NewEvent(msg))
  311. return nil
  312. }
  313. // DidReceive tracks the fact that "receiver" received a message from "sender"
  314. func (net *Network) DidReceive(sender, receiver discover.NodeID, proto string, code uint64) error {
  315. msg := &Msg{
  316. One: sender,
  317. Other: receiver,
  318. Protocol: proto,
  319. Code: code,
  320. Received: true,
  321. }
  322. net.events.Send(NewEvent(msg))
  323. return nil
  324. }
  325. // GetNode gets the node with the given ID, returning nil if the node does not
  326. // exist
  327. func (net *Network) GetNode(id discover.NodeID) *Node {
  328. net.lock.Lock()
  329. defer net.lock.Unlock()
  330. return net.getNode(id)
  331. }
  332. // GetNode gets the node with the given name, returning nil if the node does
  333. // not exist
  334. func (net *Network) GetNodeByName(name string) *Node {
  335. net.lock.Lock()
  336. defer net.lock.Unlock()
  337. return net.getNodeByName(name)
  338. }
  339. // GetNodes returns the existing nodes
  340. func (net *Network) GetNodes() (nodes []*Node) {
  341. net.lock.Lock()
  342. defer net.lock.Unlock()
  343. nodes = append(nodes, net.Nodes...)
  344. return nodes
  345. }
  346. func (net *Network) getNode(id discover.NodeID) *Node {
  347. i, found := net.nodeMap[id]
  348. if !found {
  349. return nil
  350. }
  351. return net.Nodes[i]
  352. }
  353. func (net *Network) getNodeByName(name string) *Node {
  354. for _, node := range net.Nodes {
  355. if node.Config.Name == name {
  356. return node
  357. }
  358. }
  359. return nil
  360. }
  361. // GetConn returns the connection which exists between "one" and "other"
  362. // regardless of which node initiated the connection
  363. func (net *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
  364. net.lock.Lock()
  365. defer net.lock.Unlock()
  366. return net.getConn(oneID, otherID)
  367. }
  368. // GetOrCreateConn is like GetConn but creates the connection if it doesn't
  369. // already exist
  370. func (net *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
  371. net.lock.Lock()
  372. defer net.lock.Unlock()
  373. return net.getOrCreateConn(oneID, otherID)
  374. }
  375. func (net *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
  376. if conn := net.getConn(oneID, otherID); conn != nil {
  377. return conn, nil
  378. }
  379. one := net.getNode(oneID)
  380. if one == nil {
  381. return nil, fmt.Errorf("node %v does not exist", oneID)
  382. }
  383. other := net.getNode(otherID)
  384. if other == nil {
  385. return nil, fmt.Errorf("node %v does not exist", otherID)
  386. }
  387. conn := &Conn{
  388. One: oneID,
  389. Other: otherID,
  390. one: one,
  391. other: other,
  392. }
  393. label := ConnLabel(oneID, otherID)
  394. net.connMap[label] = len(net.Conns)
  395. net.Conns = append(net.Conns, conn)
  396. return conn, nil
  397. }
  398. func (net *Network) getConn(oneID, otherID discover.NodeID) *Conn {
  399. label := ConnLabel(oneID, otherID)
  400. i, found := net.connMap[label]
  401. if !found {
  402. return nil
  403. }
  404. return net.Conns[i]
  405. }
  406. // InitConn(one, other) retrieves the connectiton model for the connection between
  407. // peers one and other, or creates a new one if it does not exist
  408. // the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
  409. // it checks if the connection is already up, and if the nodes are running
  410. // NOTE:
  411. // it also checks whether there has been recent attempt to connect the peers
  412. // this is cheating as the simulation is used as an oracle and know about
  413. // remote peers attempt to connect to a node which will then not initiate the connection
  414. func (net *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
  415. net.lock.Lock()
  416. defer net.lock.Unlock()
  417. if oneID == otherID {
  418. return nil, fmt.Errorf("refusing to connect to self %v", oneID)
  419. }
  420. conn, err := net.getOrCreateConn(oneID, otherID)
  421. if err != nil {
  422. return nil, err
  423. }
  424. if time.Since(conn.initiated) < dialBanTimeout {
  425. return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
  426. }
  427. if conn.Up {
  428. return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
  429. }
  430. err = conn.nodesUp()
  431. if err != nil {
  432. return nil, fmt.Errorf("nodes not up: %v", err)
  433. }
  434. conn.initiated = time.Now()
  435. return conn, nil
  436. }
  437. // Shutdown stops all nodes in the network and closes the quit channel
  438. func (net *Network) Shutdown() {
  439. for _, node := range net.Nodes {
  440. log.Debug(fmt.Sprintf("stopping node %s", node.ID().TerminalString()))
  441. if err := node.Stop(); err != nil {
  442. log.Warn(fmt.Sprintf("error stopping node %s", node.ID().TerminalString()), "err", err)
  443. }
  444. }
  445. close(net.quitc)
  446. }
  447. //Reset resets all network properties:
  448. //emtpies the nodes and the connection list
  449. func (net *Network) Reset() {
  450. net.lock.Lock()
  451. defer net.lock.Unlock()
  452. //re-initialize the maps
  453. net.connMap = make(map[string]int)
  454. net.nodeMap = make(map[discover.NodeID]int)
  455. net.Nodes = nil
  456. net.Conns = nil
  457. }
  458. // Node is a wrapper around adapters.Node which is used to track the status
  459. // of a node in the network
  460. type Node struct {
  461. adapters.Node `json:"-"`
  462. // Config if the config used to created the node
  463. Config *adapters.NodeConfig `json:"config"`
  464. // Up tracks whether or not the node is running
  465. Up bool `json:"up"`
  466. }
  467. // ID returns the ID of the node
  468. func (n *Node) ID() discover.NodeID {
  469. return n.Config.ID
  470. }
  471. // String returns a log-friendly string
  472. func (n *Node) String() string {
  473. return fmt.Sprintf("Node %v", n.ID().TerminalString())
  474. }
  475. // NodeInfo returns information about the node
  476. func (n *Node) NodeInfo() *p2p.NodeInfo {
  477. // avoid a panic if the node is not started yet
  478. if n.Node == nil {
  479. return nil
  480. }
  481. info := n.Node.NodeInfo()
  482. info.Name = n.Config.Name
  483. return info
  484. }
  485. // MarshalJSON implements the json.Marshaler interface so that the encoded
  486. // JSON includes the NodeInfo
  487. func (n *Node) MarshalJSON() ([]byte, error) {
  488. return json.Marshal(struct {
  489. Info *p2p.NodeInfo `json:"info,omitempty"`
  490. Config *adapters.NodeConfig `json:"config,omitempty"`
  491. Up bool `json:"up"`
  492. }{
  493. Info: n.NodeInfo(),
  494. Config: n.Config,
  495. Up: n.Up,
  496. })
  497. }
  498. // Conn represents a connection between two nodes in the network
  499. type Conn struct {
  500. // One is the node which initiated the connection
  501. One discover.NodeID `json:"one"`
  502. // Other is the node which the connection was made to
  503. Other discover.NodeID `json:"other"`
  504. // Up tracks whether or not the connection is active
  505. Up bool `json:"up"`
  506. // Registers when the connection was grabbed to dial
  507. initiated time.Time
  508. one *Node
  509. other *Node
  510. }
  511. // nodesUp returns whether both nodes are currently up
  512. func (c *Conn) nodesUp() error {
  513. if !c.one.Up {
  514. return fmt.Errorf("one %v is not up", c.One)
  515. }
  516. if !c.other.Up {
  517. return fmt.Errorf("other %v is not up", c.Other)
  518. }
  519. return nil
  520. }
  521. // String returns a log-friendly string
  522. func (c *Conn) String() string {
  523. return fmt.Sprintf("Conn %v->%v", c.One.TerminalString(), c.Other.TerminalString())
  524. }
  525. // Msg represents a p2p message sent between two nodes in the network
  526. type Msg struct {
  527. One discover.NodeID `json:"one"`
  528. Other discover.NodeID `json:"other"`
  529. Protocol string `json:"protocol"`
  530. Code uint64 `json:"code"`
  531. Received bool `json:"received"`
  532. }
  533. // String returns a log-friendly string
  534. func (m *Msg) String() string {
  535. return fmt.Sprintf("Msg(%d) %v->%v", m.Code, m.One.TerminalString(), m.Other.TerminalString())
  536. }
  537. // ConnLabel generates a deterministic string which represents a connection
  538. // between two nodes, used to compare if two connections are between the same
  539. // nodes
  540. func ConnLabel(source, target discover.NodeID) string {
  541. var first, second discover.NodeID
  542. if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
  543. first = target
  544. second = source
  545. } else {
  546. first = source
  547. second = target
  548. }
  549. return fmt.Sprintf("%v-%v", first, second)
  550. }
  551. // Snapshot represents the state of a network at a single point in time and can
  552. // be used to restore the state of a network
  553. type Snapshot struct {
  554. Nodes []NodeSnapshot `json:"nodes,omitempty"`
  555. Conns []Conn `json:"conns,omitempty"`
  556. }
  557. // NodeSnapshot represents the state of a node in the network
  558. type NodeSnapshot struct {
  559. Node Node `json:"node,omitempty"`
  560. // Snapshots is arbitrary data gathered from calling node.Snapshots()
  561. Snapshots map[string][]byte `json:"snapshots,omitempty"`
  562. }
  563. // Snapshot creates a network snapshot
  564. func (net *Network) Snapshot() (*Snapshot, error) {
  565. net.lock.Lock()
  566. defer net.lock.Unlock()
  567. snap := &Snapshot{
  568. Nodes: make([]NodeSnapshot, len(net.Nodes)),
  569. Conns: make([]Conn, len(net.Conns)),
  570. }
  571. for i, node := range net.Nodes {
  572. snap.Nodes[i] = NodeSnapshot{Node: *node}
  573. if !node.Up {
  574. continue
  575. }
  576. snapshots, err := node.Snapshots()
  577. if err != nil {
  578. return nil, err
  579. }
  580. snap.Nodes[i].Snapshots = snapshots
  581. }
  582. for i, conn := range net.Conns {
  583. snap.Conns[i] = *conn
  584. }
  585. return snap, nil
  586. }
  587. // Load loads a network snapshot
  588. func (net *Network) Load(snap *Snapshot) error {
  589. for _, n := range snap.Nodes {
  590. if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
  591. return err
  592. }
  593. if !n.Node.Up {
  594. continue
  595. }
  596. if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
  597. return err
  598. }
  599. }
  600. for _, conn := range snap.Conns {
  601. if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
  602. //in this case, at least one of the nodes of a connection is not up,
  603. //so it would result in the snapshot `Load` to fail
  604. continue
  605. }
  606. if err := net.Connect(conn.One, conn.Other); err != nil {
  607. return err
  608. }
  609. }
  610. return nil
  611. }
  612. // Subscribe reads control events from a channel and executes them
  613. func (net *Network) Subscribe(events chan *Event) {
  614. for {
  615. select {
  616. case event, ok := <-events:
  617. if !ok {
  618. return
  619. }
  620. if event.Control {
  621. net.executeControlEvent(event)
  622. }
  623. case <-net.quitc:
  624. return
  625. }
  626. }
  627. }
  628. func (net *Network) executeControlEvent(event *Event) {
  629. log.Trace("execute control event", "type", event.Type, "event", event)
  630. switch event.Type {
  631. case EventTypeNode:
  632. if err := net.executeNodeEvent(event); err != nil {
  633. log.Error("error executing node event", "event", event, "err", err)
  634. }
  635. case EventTypeConn:
  636. if err := net.executeConnEvent(event); err != nil {
  637. log.Error("error executing conn event", "event", event, "err", err)
  638. }
  639. case EventTypeMsg:
  640. log.Warn("ignoring control msg event")
  641. }
  642. }
  643. func (net *Network) executeNodeEvent(e *Event) error {
  644. if !e.Node.Up {
  645. return net.Stop(e.Node.ID())
  646. }
  647. if _, err := net.NewNodeWithConfig(e.Node.Config); err != nil {
  648. return err
  649. }
  650. return net.Start(e.Node.ID())
  651. }
  652. func (net *Network) executeConnEvent(e *Event) error {
  653. if e.Conn.Up {
  654. return net.Connect(e.Conn.One, e.Conn.Other)
  655. } else {
  656. return net.Disconnect(e.Conn.One, e.Conn.Other)
  657. }
  658. }