intervals_test.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "io"
  23. "sync"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum/node"
  27. "github.com/ethereum/go-ethereum/p2p/discover"
  28. "github.com/ethereum/go-ethereum/p2p/simulations"
  29. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  30. "github.com/ethereum/go-ethereum/rpc"
  31. "github.com/ethereum/go-ethereum/swarm/network"
  32. streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
  33. "github.com/ethereum/go-ethereum/swarm/state"
  34. "github.com/ethereum/go-ethereum/swarm/storage"
  35. )
  36. var (
  37. externalStreamName = "externalStream"
  38. externalStreamSessionAt uint64 = 50
  39. externalStreamMaxKeys uint64 = 100
  40. )
  41. func newIntervalsStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
  42. id := ctx.Config.ID
  43. addr := toAddr(id)
  44. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  45. store := stores[id].(*storage.LocalStore)
  46. db := storage.NewDBAPI(store)
  47. delivery := NewDelivery(kad, db)
  48. deliveries[id] = delivery
  49. r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
  50. SkipCheck: defaultSkipCheck,
  51. })
  52. r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
  53. return newTestExternalClient(db), nil
  54. })
  55. r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
  56. return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
  57. })
  58. go func() {
  59. waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
  60. }()
  61. return &TestExternalRegistry{r}, nil
  62. }
  63. func TestIntervals(t *testing.T) {
  64. testIntervals(t, true, nil, false)
  65. testIntervals(t, false, NewRange(9, 26), false)
  66. testIntervals(t, true, NewRange(9, 26), false)
  67. testIntervals(t, true, nil, true)
  68. testIntervals(t, false, NewRange(9, 26), true)
  69. testIntervals(t, true, NewRange(9, 26), true)
  70. }
  71. func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
  72. nodes := 2
  73. chunkCount := dataChunkCount
  74. defer setDefaultSkipCheck(defaultSkipCheck)
  75. defaultSkipCheck = skipCheck
  76. toAddr = network.NewAddrFromNodeID
  77. conf := &streamTesting.RunConfig{
  78. Adapter: *adapter,
  79. NodeCount: nodes,
  80. ConnLevel: 1,
  81. ToAddr: toAddr,
  82. Services: services,
  83. DefaultService: "intervalsStreamer",
  84. }
  85. sim, teardown, err := streamTesting.NewSimulation(conf)
  86. var rpcSubscriptionsWg sync.WaitGroup
  87. defer func() {
  88. rpcSubscriptionsWg.Wait()
  89. teardown()
  90. }()
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. stores = make(map[discover.NodeID]storage.ChunkStore)
  95. deliveries = make(map[discover.NodeID]*Delivery)
  96. for i, id := range sim.IDs {
  97. stores[id] = sim.Stores[i]
  98. }
  99. peerCount = func(id discover.NodeID) int {
  100. return 1
  101. }
  102. fileStore := storage.NewFileStore(sim.Stores[0], storage.NewFileStoreParams())
  103. size := chunkCount * chunkSize
  104. _, wait, err := fileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
  105. wait()
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. errc := make(chan error, 1)
  110. waitPeerErrC = make(chan error)
  111. quitC := make(chan struct{})
  112. defer close(quitC)
  113. action := func(ctx context.Context) error {
  114. i := 0
  115. for err := range waitPeerErrC {
  116. if err != nil {
  117. return fmt.Errorf("error waiting for peers: %s", err)
  118. }
  119. i++
  120. if i == nodes {
  121. break
  122. }
  123. }
  124. id := sim.IDs[1]
  125. err := sim.CallClient(id, func(client *rpc.Client) error {
  126. sid := sim.IDs[0]
  127. doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
  128. if err != nil {
  129. return err
  130. }
  131. rpcSubscriptionsWg.Add(1)
  132. go func() {
  133. <-doneC
  134. rpcSubscriptionsWg.Done()
  135. }()
  136. ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
  137. defer cancel()
  138. err = client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(externalStreamName, "", live), history, Top)
  139. if err != nil {
  140. return err
  141. }
  142. liveErrC := make(chan error)
  143. historyErrC := make(chan error)
  144. go func() {
  145. if !live {
  146. close(liveErrC)
  147. return
  148. }
  149. var err error
  150. defer func() {
  151. liveErrC <- err
  152. }()
  153. // live stream
  154. liveHashesChan := make(chan []byte)
  155. liveSubscription, err := client.Subscribe(ctx, "stream", liveHashesChan, "getHashes", sid, NewStream(externalStreamName, "", true))
  156. if err != nil {
  157. return
  158. }
  159. defer liveSubscription.Unsubscribe()
  160. i := externalStreamSessionAt
  161. // we have subscribed, enable notifications
  162. err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", true))
  163. if err != nil {
  164. return
  165. }
  166. for {
  167. select {
  168. case hash := <-liveHashesChan:
  169. h := binary.BigEndian.Uint64(hash)
  170. if h != i {
  171. err = fmt.Errorf("expected live hash %d, got %d", i, h)
  172. return
  173. }
  174. i++
  175. if i > externalStreamMaxKeys {
  176. return
  177. }
  178. case err = <-liveSubscription.Err():
  179. return
  180. case <-ctx.Done():
  181. return
  182. }
  183. }
  184. }()
  185. go func() {
  186. if live && history == nil {
  187. close(historyErrC)
  188. return
  189. }
  190. var err error
  191. defer func() {
  192. historyErrC <- err
  193. }()
  194. // history stream
  195. historyHashesChan := make(chan []byte)
  196. historySubscription, err := client.Subscribe(ctx, "stream", historyHashesChan, "getHashes", sid, NewStream(externalStreamName, "", false))
  197. if err != nil {
  198. return
  199. }
  200. defer historySubscription.Unsubscribe()
  201. var i uint64
  202. historyTo := externalStreamMaxKeys
  203. if history != nil {
  204. i = history.From
  205. if history.To != 0 {
  206. historyTo = history.To
  207. }
  208. }
  209. // we have subscribed, enable notifications
  210. err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", false))
  211. if err != nil {
  212. return
  213. }
  214. for {
  215. select {
  216. case hash := <-historyHashesChan:
  217. h := binary.BigEndian.Uint64(hash)
  218. if h != i {
  219. err = fmt.Errorf("expected history hash %d, got %d", i, h)
  220. return
  221. }
  222. i++
  223. if i > historyTo {
  224. return
  225. }
  226. case err = <-historySubscription.Err():
  227. return
  228. case <-ctx.Done():
  229. return
  230. }
  231. }
  232. }()
  233. if err := <-liveErrC; err != nil {
  234. return err
  235. }
  236. if err := <-historyErrC; err != nil {
  237. return err
  238. }
  239. return nil
  240. })
  241. return err
  242. }
  243. check := func(ctx context.Context, id discover.NodeID) (bool, error) {
  244. select {
  245. case err := <-errc:
  246. return false, err
  247. case <-ctx.Done():
  248. return false, ctx.Err()
  249. default:
  250. }
  251. return true, nil
  252. }
  253. conf.Step = &simulations.Step{
  254. Action: action,
  255. Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
  256. Expect: &simulations.Expectation{
  257. Nodes: sim.IDs[1:1],
  258. Check: check,
  259. },
  260. }
  261. startedAt := time.Now()
  262. timeout := 300 * time.Second
  263. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  264. defer cancel()
  265. result, err := sim.Run(ctx, conf)
  266. finishedAt := time.Now()
  267. if err != nil {
  268. t.Fatalf("Setting up simulation failed: %v", err)
  269. }
  270. if result.Error != nil {
  271. t.Fatalf("Simulation failed: %s", result.Error)
  272. }
  273. streamTesting.CheckResult(t, result, startedAt, finishedAt)
  274. }