http_test.go 20 KB


  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package simulations
  17. import (
  18. "context"
  19. "fmt"
  20. "math/rand"
  21. "net/http/httptest"
  22. "reflect"
  23. "sync"
  24. "sync/atomic"
  25. "testing"
  26. "time"
  27. "github.com/ethereum/go-ethereum/event"
  28. "github.com/ethereum/go-ethereum/node"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/p2p/discover"
  31. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  32. "github.com/ethereum/go-ethereum/rpc"
  33. )
  34. // testService implements the node.Service interface and provides protocols
  35. // and APIs which are useful for testing nodes in a simulation network
  36. type testService struct {
  37. id discover.NodeID
  38. // peerCount is incremented once a peer handshake has been performed
  39. peerCount int64
  40. peers map[discover.NodeID]*testPeer
  41. peersMtx sync.Mutex
  42. // state stores []byte which is used to test creating and loading
  43. // snapshots
  44. state atomic.Value
  45. }
  46. func newTestService(ctx *adapters.ServiceContext) (node.Service, error) {
  47. svc := &testService{
  48. id: ctx.Config.ID,
  49. peers: make(map[discover.NodeID]*testPeer),
  50. }
  51. svc.state.Store(ctx.Snapshot)
  52. return svc, nil
  53. }
  54. type testPeer struct {
  55. testReady chan struct{}
  56. dumReady chan struct{}
  57. }
  58. func (t *testService) peer(id discover.NodeID) *testPeer {
  59. t.peersMtx.Lock()
  60. defer t.peersMtx.Unlock()
  61. if peer, ok := t.peers[id]; ok {
  62. return peer
  63. }
  64. peer := &testPeer{
  65. testReady: make(chan struct{}),
  66. dumReady: make(chan struct{}),
  67. }
  68. t.peers[id] = peer
  69. return peer
  70. }
  71. func (t *testService) Protocols() []p2p.Protocol {
  72. return []p2p.Protocol{
  73. {
  74. Name: "test",
  75. Version: 1,
  76. Length: 3,
  77. Run: t.RunTest,
  78. },
  79. {
  80. Name: "dum",
  81. Version: 1,
  82. Length: 1,
  83. Run: t.RunDum,
  84. },
  85. {
  86. Name: "prb",
  87. Version: 1,
  88. Length: 1,
  89. Run: t.RunPrb,
  90. },
  91. }
  92. }
  93. func (t *testService) APIs() []rpc.API {
  94. return []rpc.API{{
  95. Namespace: "test",
  96. Version: "1.0",
  97. Service: &TestAPI{
  98. state: &t.state,
  99. peerCount: &t.peerCount,
  100. },
  101. }}
  102. }
  103. func (t *testService) Start(server *p2p.Server) error {
  104. return nil
  105. }
  106. func (t *testService) Stop() error {
  107. return nil
  108. }
  109. // handshake performs a peer handshake by sending and expecting an empty
  110. // message with the given code
  111. func (t *testService) handshake(rw p2p.MsgReadWriter, code uint64) error {
  112. errc := make(chan error, 2)
  113. go func() { errc <- p2p.Send(rw, code, struct{}{}) }()
  114. go func() { errc <- p2p.ExpectMsg(rw, code, struct{}{}) }()
  115. for i := 0; i < 2; i++ {
  116. if err := <-errc; err != nil {
  117. return err
  118. }
  119. }
  120. return nil
  121. }
  122. func (t *testService) RunTest(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  123. peer := t.peer(p.ID())
  124. // perform three handshakes with three different message codes,
  125. // used to test message sending and filtering
  126. if err := t.handshake(rw, 2); err != nil {
  127. return err
  128. }
  129. if err := t.handshake(rw, 1); err != nil {
  130. return err
  131. }
  132. if err := t.handshake(rw, 0); err != nil {
  133. return err
  134. }
  135. // close the testReady channel so that other protocols can run
  136. close(peer.testReady)
  137. // track the peer
  138. atomic.AddInt64(&t.peerCount, 1)
  139. defer atomic.AddInt64(&t.peerCount, -1)
  140. // block until the peer is dropped
  141. for {
  142. _, err := rw.ReadMsg()
  143. if err != nil {
  144. return err
  145. }
  146. }
  147. }
  148. func (t *testService) RunDum(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  149. peer := t.peer(p.ID())
  150. // wait for the test protocol to perform its handshake
  151. <-peer.testReady
  152. // perform a handshake
  153. if err := t.handshake(rw, 0); err != nil {
  154. return err
  155. }
  156. // close the dumReady channel so that other protocols can run
  157. close(peer.dumReady)
  158. // block until the peer is dropped
  159. for {
  160. _, err := rw.ReadMsg()
  161. if err != nil {
  162. return err
  163. }
  164. }
  165. }
  166. func (t *testService) RunPrb(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  167. peer := t.peer(p.ID())
  168. // wait for the dum protocol to perform its handshake
  169. <-peer.dumReady
  170. // perform a handshake
  171. if err := t.handshake(rw, 0); err != nil {
  172. return err
  173. }
  174. // block until the peer is dropped
  175. for {
  176. _, err := rw.ReadMsg()
  177. if err != nil {
  178. return err
  179. }
  180. }
  181. }
  182. func (t *testService) Snapshot() ([]byte, error) {
  183. return t.state.Load().([]byte), nil
  184. }
  185. // TestAPI provides a test API to:
  186. // * get the peer count
  187. // * get and set an arbitrary state byte slice
  188. // * get and increment a counter
  189. // * subscribe to counter increment events
  190. type TestAPI struct {
  191. state *atomic.Value
  192. peerCount *int64
  193. counter int64
  194. feed event.Feed
  195. }
  196. func (t *TestAPI) PeerCount() int64 {
  197. return atomic.LoadInt64(t.peerCount)
  198. }
  199. func (t *TestAPI) Get() int64 {
  200. return atomic.LoadInt64(&t.counter)
  201. }
  202. func (t *TestAPI) Add(delta int64) {
  203. atomic.AddInt64(&t.counter, delta)
  204. t.feed.Send(delta)
  205. }
  206. func (t *TestAPI) GetState() []byte {
  207. return t.state.Load().([]byte)
  208. }
  209. func (t *TestAPI) SetState(state []byte) {
  210. t.state.Store(state)
  211. }
  212. func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) {
  213. notifier, supported := rpc.NotifierFromContext(ctx)
  214. if !supported {
  215. return nil, rpc.ErrNotificationsUnsupported
  216. }
  217. rpcSub := notifier.CreateSubscription()
  218. go func() {
  219. events := make(chan int64)
  220. sub := t.feed.Subscribe(events)
  221. defer sub.Unsubscribe()
  222. for {
  223. select {
  224. case event := <-events:
  225. notifier.Notify(rpcSub.ID, event)
  226. case <-sub.Err():
  227. return
  228. case <-rpcSub.Err():
  229. return
  230. case <-notifier.Closed():
  231. return
  232. }
  233. }
  234. }()
  235. return rpcSub, nil
  236. }
  237. var testServices = adapters.Services{
  238. "test": newTestService,
  239. }
  240. func testHTTPServer(t *testing.T) (*Network, *httptest.Server) {
  241. adapter := adapters.NewSimAdapter(testServices)
  242. network := NewNetwork(adapter, &NetworkConfig{
  243. DefaultService: "test",
  244. })
  245. return network, httptest.NewServer(NewServer(network))
  246. }
  247. // TestHTTPNetwork tests interacting with a simulation network using the HTTP
  248. // API
  249. func TestHTTPNetwork(t *testing.T) {
  250. // start the server
  251. network, s := testHTTPServer(t)
  252. defer s.Close()
  253. // subscribe to events so we can check them later
  254. client := NewClient(s.URL)
  255. events := make(chan *Event, 100)
  256. var opts SubscribeOpts
  257. sub, err := client.SubscribeNetwork(events, opts)
  258. if err != nil {
  259. t.Fatalf("error subscribing to network events: %s", err)
  260. }
  261. defer sub.Unsubscribe()
  262. // check we can retrieve details about the network
  263. gotNetwork, err := client.GetNetwork()
  264. if err != nil {
  265. t.Fatalf("error getting network: %s", err)
  266. }
  267. if gotNetwork.ID != network.ID {
  268. t.Fatalf("expected network to have ID %q, got %q", network.ID, gotNetwork.ID)
  269. }
  270. // start a simulation network
  271. nodeIDs := startTestNetwork(t, client)
  272. // check we got all the events
  273. x := &expectEvents{t, events, sub}
  274. x.expect(
  275. x.nodeEvent(nodeIDs[0], false),
  276. x.nodeEvent(nodeIDs[1], false),
  277. x.nodeEvent(nodeIDs[0], true),
  278. x.nodeEvent(nodeIDs[1], true),
  279. x.connEvent(nodeIDs[0], nodeIDs[1], false),
  280. x.connEvent(nodeIDs[0], nodeIDs[1], true),
  281. )
  282. // reconnect the stream and check we get the current nodes and conns
  283. events = make(chan *Event, 100)
  284. opts.Current = true
  285. sub, err = client.SubscribeNetwork(events, opts)
  286. if err != nil {
  287. t.Fatalf("error subscribing to network events: %s", err)
  288. }
  289. defer sub.Unsubscribe()
  290. x = &expectEvents{t, events, sub}
  291. x.expect(
  292. x.nodeEvent(nodeIDs[0], true),
  293. x.nodeEvent(nodeIDs[1], true),
  294. x.connEvent(nodeIDs[0], nodeIDs[1], true),
  295. )
  296. }
  297. func startTestNetwork(t *testing.T, client *Client) []string {
  298. // create two nodes
  299. nodeCount := 2
  300. nodeIDs := make([]string, nodeCount)
  301. for i := 0; i < nodeCount; i++ {
  302. config := adapters.RandomNodeConfig()
  303. node, err := client.CreateNode(config)
  304. if err != nil {
  305. t.Fatalf("error creating node: %s", err)
  306. }
  307. nodeIDs[i] = node.ID
  308. }
  309. // check both nodes exist
  310. nodes, err := client.GetNodes()
  311. if err != nil {
  312. t.Fatalf("error getting nodes: %s", err)
  313. }
  314. if len(nodes) != nodeCount {
  315. t.Fatalf("expected %d nodes, got %d", nodeCount, len(nodes))
  316. }
  317. for i, nodeID := range nodeIDs {
  318. if nodes[i].ID != nodeID {
  319. t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID)
  320. }
  321. node, err := client.GetNode(nodeID)
  322. if err != nil {
  323. t.Fatalf("error getting node %d: %s", i, err)
  324. }
  325. if node.ID != nodeID {
  326. t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID)
  327. }
  328. }
  329. // start both nodes
  330. for _, nodeID := range nodeIDs {
  331. if err := client.StartNode(nodeID); err != nil {
  332. t.Fatalf("error starting node %q: %s", nodeID, err)
  333. }
  334. }
  335. // connect the nodes
  336. for i := 0; i < nodeCount-1; i++ {
  337. peerId := i + 1
  338. if i == nodeCount-1 {
  339. peerId = 0
  340. }
  341. if err := client.ConnectNode(nodeIDs[i], nodeIDs[peerId]); err != nil {
  342. t.Fatalf("error connecting nodes: %s", err)
  343. }
  344. }
  345. return nodeIDs
  346. }
  347. type expectEvents struct {
  348. *testing.T
  349. events chan *Event
  350. sub event.Subscription
  351. }
  352. func (t *expectEvents) nodeEvent(id string, up bool) *Event {
  353. return &Event{
  354. Type: EventTypeNode,
  355. Node: &Node{
  356. Config: &adapters.NodeConfig{
  357. ID: discover.MustHexID(id),
  358. },
  359. Up: up,
  360. },
  361. }
  362. }
  363. func (t *expectEvents) connEvent(one, other string, up bool) *Event {
  364. return &Event{
  365. Type: EventTypeConn,
  366. Conn: &Conn{
  367. One: discover.MustHexID(one),
  368. Other: discover.MustHexID(other),
  369. Up: up,
  370. },
  371. }
  372. }
  373. func (t *expectEvents) expectMsgs(expected map[MsgFilter]int) {
  374. actual := make(map[MsgFilter]int)
  375. timeout := time.After(10 * time.Second)
  376. loop:
  377. for {
  378. select {
  379. case event := <-t.events:
  380. t.Logf("received %s event: %s", event.Type, event)
  381. if event.Type != EventTypeMsg || event.Msg.Received {
  382. continue loop
  383. }
  384. if event.Msg == nil {
  385. t.Fatal("expected event.Msg to be set")
  386. }
  387. filter := MsgFilter{
  388. Proto: event.Msg.Protocol,
  389. Code: int64(event.Msg.Code),
  390. }
  391. actual[filter]++
  392. if actual[filter] > expected[filter] {
  393. t.Fatalf("received too many msgs for filter: %v", filter)
  394. }
  395. if reflect.DeepEqual(actual, expected) {
  396. return
  397. }
  398. case err := <-t.sub.Err():
  399. t.Fatalf("network stream closed unexpectedly: %s", err)
  400. case <-timeout:
  401. t.Fatal("timed out waiting for expected events")
  402. }
  403. }
  404. }
  405. func (t *expectEvents) expect(events ...*Event) {
  406. timeout := time.After(10 * time.Second)
  407. i := 0
  408. for {
  409. select {
  410. case event := <-t.events:
  411. t.Logf("received %s event: %s", event.Type, event)
  412. expected := events[i]
  413. if event.Type != expected.Type {
  414. t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type)
  415. }
  416. switch expected.Type {
  417. case EventTypeNode:
  418. if event.Node == nil {
  419. t.Fatal("expected event.Node to be set")
  420. }
  421. if event.Node.ID() != expected.Node.ID() {
  422. t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
  423. }
  424. if event.Node.Up != expected.Node.Up {
  425. t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up)
  426. }
  427. case EventTypeConn:
  428. if event.Conn == nil {
  429. t.Fatal("expected event.Conn to be set")
  430. }
  431. if event.Conn.One != expected.Conn.One {
  432. t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.TerminalString(), event.Conn.One.TerminalString())
  433. }
  434. if event.Conn.Other != expected.Conn.Other {
  435. t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.TerminalString(), event.Conn.Other.TerminalString())
  436. }
  437. if event.Conn.Up != expected.Conn.Up {
  438. t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up)
  439. }
  440. }
  441. i++
  442. if i == len(events) {
  443. return
  444. }
  445. case err := <-t.sub.Err():
  446. t.Fatalf("network stream closed unexpectedly: %s", err)
  447. case <-timeout:
  448. t.Fatal("timed out waiting for expected events")
  449. }
  450. }
  451. }
  452. // TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API
  453. func TestHTTPNodeRPC(t *testing.T) {
  454. // start the server
  455. _, s := testHTTPServer(t)
  456. defer s.Close()
  457. // start a node in the network
  458. client := NewClient(s.URL)
  459. config := adapters.RandomNodeConfig()
  460. node, err := client.CreateNode(config)
  461. if err != nil {
  462. t.Fatalf("error creating node: %s", err)
  463. }
  464. if err := client.StartNode(node.ID); err != nil {
  465. t.Fatalf("error starting node: %s", err)
  466. }
  467. // create two RPC clients
  468. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  469. defer cancel()
  470. rpcClient1, err := client.RPCClient(ctx, node.ID)
  471. if err != nil {
  472. t.Fatalf("error getting node RPC client: %s", err)
  473. }
  474. rpcClient2, err := client.RPCClient(ctx, node.ID)
  475. if err != nil {
  476. t.Fatalf("error getting node RPC client: %s", err)
  477. }
  478. // subscribe to events using client 1
  479. events := make(chan int64, 1)
  480. sub, err := rpcClient1.Subscribe(ctx, "test", events, "events")
  481. if err != nil {
  482. t.Fatalf("error subscribing to events: %s", err)
  483. }
  484. defer sub.Unsubscribe()
  485. // call some RPC methods using client 2
  486. if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil {
  487. t.Fatalf("error calling RPC method: %s", err)
  488. }
  489. var result int64
  490. if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil {
  491. t.Fatalf("error calling RPC method: %s", err)
  492. }
  493. if result != 10 {
  494. t.Fatalf("expected result to be 10, got %d", result)
  495. }
  496. // check we got an event from client 1
  497. select {
  498. case event := <-events:
  499. if event != 10 {
  500. t.Fatalf("expected event to be 10, got %d", event)
  501. }
  502. case <-ctx.Done():
  503. t.Fatal(ctx.Err())
  504. }
  505. }
  506. // TestHTTPSnapshot tests creating and loading network snapshots
  507. func TestHTTPSnapshot(t *testing.T) {
  508. // start the server
  509. _, s := testHTTPServer(t)
  510. defer s.Close()
  511. // create a two-node network
  512. client := NewClient(s.URL)
  513. nodeCount := 2
  514. nodes := make([]*p2p.NodeInfo, nodeCount)
  515. for i := 0; i < nodeCount; i++ {
  516. config := adapters.RandomNodeConfig()
  517. node, err := client.CreateNode(config)
  518. if err != nil {
  519. t.Fatalf("error creating node: %s", err)
  520. }
  521. if err := client.StartNode(node.ID); err != nil {
  522. t.Fatalf("error starting node: %s", err)
  523. }
  524. nodes[i] = node
  525. }
  526. if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil {
  527. t.Fatalf("error connecting nodes: %s", err)
  528. }
  529. // store some state in the test services
  530. states := make([]string, nodeCount)
  531. for i, node := range nodes {
  532. rpc, err := client.RPCClient(context.Background(), node.ID)
  533. if err != nil {
  534. t.Fatalf("error getting RPC client: %s", err)
  535. }
  536. defer rpc.Close()
  537. state := fmt.Sprintf("%x", rand.Int())
  538. if err := rpc.Call(nil, "test_setState", []byte(state)); err != nil {
  539. t.Fatalf("error setting service state: %s", err)
  540. }
  541. states[i] = state
  542. }
  543. // create a snapshot
  544. snap, err := client.CreateSnapshot()
  545. if err != nil {
  546. t.Fatalf("error creating snapshot: %s", err)
  547. }
  548. for i, state := range states {
  549. gotState := snap.Nodes[i].Snapshots["test"]
  550. if string(gotState) != state {
  551. t.Fatalf("expected snapshot state %q, got %q", state, gotState)
  552. }
  553. }
  554. // create another network
  555. _, s = testHTTPServer(t)
  556. defer s.Close()
  557. client = NewClient(s.URL)
  558. // subscribe to events so we can check them later
  559. events := make(chan *Event, 100)
  560. var opts SubscribeOpts
  561. sub, err := client.SubscribeNetwork(events, opts)
  562. if err != nil {
  563. t.Fatalf("error subscribing to network events: %s", err)
  564. }
  565. defer sub.Unsubscribe()
  566. // load the snapshot
  567. if err := client.LoadSnapshot(snap); err != nil {
  568. t.Fatalf("error loading snapshot: %s", err)
  569. }
  570. // check the nodes and connection exists
  571. net, err := client.GetNetwork()
  572. if err != nil {
  573. t.Fatalf("error getting network: %s", err)
  574. }
  575. if len(net.Nodes) != nodeCount {
  576. t.Fatalf("expected network to have %d nodes, got %d", nodeCount, len(net.Nodes))
  577. }
  578. for i, node := range nodes {
  579. id := net.Nodes[i].ID().String()
  580. if id != node.ID {
  581. t.Fatalf("expected node %d to have ID %s, got %s", i, node.ID, id)
  582. }
  583. }
  584. if len(net.Conns) != 1 {
  585. t.Fatalf("expected network to have 1 connection, got %d", len(net.Conns))
  586. }
  587. conn := net.Conns[0]
  588. if conn.One.String() != nodes[0].ID {
  589. t.Fatalf("expected connection to have one=%q, got one=%q", nodes[0].ID, conn.One)
  590. }
  591. if conn.Other.String() != nodes[1].ID {
  592. t.Fatalf("expected connection to have other=%q, got other=%q", nodes[1].ID, conn.Other)
  593. }
  594. // check the node states were restored
  595. for i, node := range nodes {
  596. rpc, err := client.RPCClient(context.Background(), node.ID)
  597. if err != nil {
  598. t.Fatalf("error getting RPC client: %s", err)
  599. }
  600. defer rpc.Close()
  601. var state []byte
  602. if err := rpc.Call(&state, "test_getState"); err != nil {
  603. t.Fatalf("error getting service state: %s", err)
  604. }
  605. if string(state) != states[i] {
  606. t.Fatalf("expected snapshot state %q, got %q", states[i], state)
  607. }
  608. }
  609. // check we got all the events
  610. x := &expectEvents{t, events, sub}
  611. x.expect(
  612. x.nodeEvent(nodes[0].ID, false),
  613. x.nodeEvent(nodes[0].ID, true),
  614. x.nodeEvent(nodes[1].ID, false),
  615. x.nodeEvent(nodes[1].ID, true),
  616. x.connEvent(nodes[0].ID, nodes[1].ID, false),
  617. x.connEvent(nodes[0].ID, nodes[1].ID, true),
  618. )
  619. }
  620. // TestMsgFilterPassMultiple tests streaming message events using a filter
  621. // with multiple protocols
  622. func TestMsgFilterPassMultiple(t *testing.T) {
  623. // start the server
  624. _, s := testHTTPServer(t)
  625. defer s.Close()
  626. // subscribe to events with a message filter
  627. client := NewClient(s.URL)
  628. events := make(chan *Event, 10)
  629. opts := SubscribeOpts{
  630. Filter: "prb:0-test:0",
  631. }
  632. sub, err := client.SubscribeNetwork(events, opts)
  633. if err != nil {
  634. t.Fatalf("error subscribing to network events: %s", err)
  635. }
  636. defer sub.Unsubscribe()
  637. // start a simulation network
  638. startTestNetwork(t, client)
  639. // check we got the expected events
  640. x := &expectEvents{t, events, sub}
  641. x.expectMsgs(map[MsgFilter]int{
  642. {"test", 0}: 2,
  643. {"prb", 0}: 2,
  644. })
  645. }
  646. // TestMsgFilterPassWildcard tests streaming message events using a filter
  647. // with a code wildcard
  648. func TestMsgFilterPassWildcard(t *testing.T) {
  649. // start the server
  650. _, s := testHTTPServer(t)
  651. defer s.Close()
  652. // subscribe to events with a message filter
  653. client := NewClient(s.URL)
  654. events := make(chan *Event, 10)
  655. opts := SubscribeOpts{
  656. Filter: "prb:0,2-test:*",
  657. }
  658. sub, err := client.SubscribeNetwork(events, opts)
  659. if err != nil {
  660. t.Fatalf("error subscribing to network events: %s", err)
  661. }
  662. defer sub.Unsubscribe()
  663. // start a simulation network
  664. startTestNetwork(t, client)
  665. // check we got the expected events
  666. x := &expectEvents{t, events, sub}
  667. x.expectMsgs(map[MsgFilter]int{
  668. {"test", 2}: 2,
  669. {"test", 1}: 2,
  670. {"test", 0}: 2,
  671. {"prb", 0}: 2,
  672. })
  673. }
  674. // TestMsgFilterPassSingle tests streaming message events using a filter
  675. // with a single protocol and code
  676. func TestMsgFilterPassSingle(t *testing.T) {
  677. // start the server
  678. _, s := testHTTPServer(t)
  679. defer s.Close()
  680. // subscribe to events with a message filter
  681. client := NewClient(s.URL)
  682. events := make(chan *Event, 10)
  683. opts := SubscribeOpts{
  684. Filter: "dum:0",
  685. }
  686. sub, err := client.SubscribeNetwork(events, opts)
  687. if err != nil {
  688. t.Fatalf("error subscribing to network events: %s", err)
  689. }
  690. defer sub.Unsubscribe()
  691. // start a simulation network
  692. startTestNetwork(t, client)
  693. // check we got the expected events
  694. x := &expectEvents{t, events, sub}
  695. x.expectMsgs(map[MsgFilter]int{
  696. {"dum", 0}: 2,
  697. })
  698. }
  699. // TestMsgFilterPassSingle tests streaming message events using an invalid
  700. // filter
  701. func TestMsgFilterFailBadParams(t *testing.T) {
  702. // start the server
  703. _, s := testHTTPServer(t)
  704. defer s.Close()
  705. client := NewClient(s.URL)
  706. events := make(chan *Event, 10)
  707. opts := SubscribeOpts{
  708. Filter: "foo:",
  709. }
  710. _, err := client.SubscribeNetwork(events, opts)
  711. if err == nil {
  712. t.Fatalf("expected event subscription to fail but succeeded!")
  713. }
  714. opts.Filter = "bzz:aa"
  715. _, err = client.SubscribeNetwork(events, opts)
  716. if err == nil {
  717. t.Fatalf("expected event subscription to fail but succeeded!")
  718. }
  719. opts.Filter = "invalid"
  720. _, err = client.SubscribeNetwork(events, opts)
  721. if err == nil {
  722. t.Fatalf("expected event subscription to fail but succeeded!")
  723. }
  724. }