intervals_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "io"
  23. "os"
  24. "sync"
  25. "testing"
  26. "time"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/node"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  32. "github.com/ethereum/go-ethereum/swarm/network"
  33. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  34. "github.com/ethereum/go-ethereum/swarm/state"
  35. "github.com/ethereum/go-ethereum/swarm/storage"
  36. )
  37. func TestIntervalsLive(t *testing.T) {
  38. testIntervals(t, true, nil, false)
  39. testIntervals(t, true, nil, true)
  40. }
  41. func TestIntervalsHistory(t *testing.T) {
  42. testIntervals(t, false, NewRange(9, 26), false)
  43. testIntervals(t, false, NewRange(9, 26), true)
  44. }
  45. func TestIntervalsLiveAndHistory(t *testing.T) {
  46. testIntervals(t, true, NewRange(9, 26), false)
  47. testIntervals(t, true, NewRange(9, 26), true)
  48. }
  49. func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
  50. nodes := 2
  51. chunkCount := dataChunkCount
  52. externalStreamName := "externalStream"
  53. externalStreamSessionAt := uint64(50)
  54. externalStreamMaxKeys := uint64(100)
  55. sim := simulation.New(map[string]simulation.ServiceFunc{
  56. "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  57. n := ctx.Config.Node()
  58. addr := network.NewAddr(n)
  59. store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
  60. if err != nil {
  61. return nil, nil, err
  62. }
  63. bucket.Store(bucketKeyStore, store)
  64. cleanup = func() {
  65. store.Close()
  66. os.RemoveAll(datadir)
  67. }
  68. localStore := store.(*storage.LocalStore)
  69. netStore, err := storage.NewNetStore(localStore, nil)
  70. if err != nil {
  71. return nil, nil, err
  72. }
  73. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  74. delivery := NewDelivery(kad, netStore)
  75. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  76. r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
  77. SkipCheck: skipCheck,
  78. })
  79. bucket.Store(bucketKeyRegistry, r)
  80. r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
  81. return newTestExternalClient(netStore), nil
  82. })
  83. r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
  84. return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
  85. })
  86. fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
  87. bucket.Store(bucketKeyFileStore, fileStore)
  88. return r, cleanup, nil
  89. },
  90. })
  91. defer sim.Close()
  92. log.Info("Adding nodes to simulation")
  93. _, err := sim.AddNodesAndConnectChain(nodes)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
  98. defer cancel()
  99. if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
  100. t.Fatal(err)
  101. }
  102. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
  103. nodeIDs := sim.UpNodeIDs()
  104. storer := nodeIDs[0]
  105. checker := nodeIDs[1]
  106. item, ok := sim.NodeItem(storer, bucketKeyFileStore)
  107. if !ok {
  108. return fmt.Errorf("No filestore")
  109. }
  110. fileStore := item.(*storage.FileStore)
  111. size := chunkCount * chunkSize
  112. _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
  113. if err != nil {
  114. log.Error("Store error: %v", "err", err)
  115. t.Fatal(err)
  116. }
  117. err = wait(ctx)
  118. if err != nil {
  119. log.Error("Wait error: %v", "err", err)
  120. t.Fatal(err)
  121. }
  122. item, ok = sim.NodeItem(checker, bucketKeyRegistry)
  123. if !ok {
  124. return fmt.Errorf("No registry")
  125. }
  126. registry := item.(*Registry)
  127. liveErrC := make(chan error)
  128. historyErrC := make(chan error)
  129. log.Debug("Watching for disconnections")
  130. disconnections := sim.PeerEvents(
  131. context.Background(),
  132. sim.NodeIDs(),
  133. simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
  134. )
  135. err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
  136. if err != nil {
  137. return err
  138. }
  139. go func() {
  140. for d := range disconnections {
  141. if d.Error != nil {
  142. log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
  143. t.Fatal(d.Error)
  144. }
  145. }
  146. }()
  147. go func() {
  148. if !live {
  149. close(liveErrC)
  150. return
  151. }
  152. var err error
  153. defer func() {
  154. liveErrC <- err
  155. }()
  156. // live stream
  157. var liveHashesChan chan []byte
  158. liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
  159. if err != nil {
  160. log.Error("get hashes", "err", err)
  161. return
  162. }
  163. i := externalStreamSessionAt
  164. // we have subscribed, enable notifications
  165. err = enableNotifications(registry, storer, NewStream(externalStreamName, "", true))
  166. if err != nil {
  167. return
  168. }
  169. for {
  170. select {
  171. case hash := <-liveHashesChan:
  172. h := binary.BigEndian.Uint64(hash)
  173. if h != i {
  174. err = fmt.Errorf("expected live hash %d, got %d", i, h)
  175. return
  176. }
  177. i++
  178. if i > externalStreamMaxKeys {
  179. return
  180. }
  181. case <-ctx.Done():
  182. return
  183. }
  184. }
  185. }()
  186. go func() {
  187. if live && history == nil {
  188. close(historyErrC)
  189. return
  190. }
  191. var err error
  192. defer func() {
  193. historyErrC <- err
  194. }()
  195. // history stream
  196. var historyHashesChan chan []byte
  197. historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
  198. if err != nil {
  199. log.Error("get hashes", "err", err)
  200. return
  201. }
  202. var i uint64
  203. historyTo := externalStreamMaxKeys
  204. if history != nil {
  205. i = history.From
  206. if history.To != 0 {
  207. historyTo = history.To
  208. }
  209. }
  210. // we have subscribed, enable notifications
  211. err = enableNotifications(registry, storer, NewStream(externalStreamName, "", false))
  212. if err != nil {
  213. return
  214. }
  215. for {
  216. select {
  217. case hash := <-historyHashesChan:
  218. h := binary.BigEndian.Uint64(hash)
  219. if h != i {
  220. err = fmt.Errorf("expected history hash %d, got %d", i, h)
  221. return
  222. }
  223. i++
  224. if i > historyTo {
  225. return
  226. }
  227. case <-ctx.Done():
  228. return
  229. }
  230. }
  231. }()
  232. if err := <-liveErrC; err != nil {
  233. return err
  234. }
  235. if err := <-historyErrC; err != nil {
  236. return err
  237. }
  238. return nil
  239. })
  240. if result.Error != nil {
  241. t.Fatal(result.Error)
  242. }
  243. }
  244. func getHashes(ctx context.Context, r *Registry, peerID enode.ID, s Stream) (chan []byte, error) {
  245. peer := r.getPeer(peerID)
  246. client, err := peer.getClient(ctx, s)
  247. if err != nil {
  248. return nil, err
  249. }
  250. c := client.Client.(*testExternalClient)
  251. return c.hashes, nil
  252. }
  253. func enableNotifications(r *Registry, peerID enode.ID, s Stream) error {
  254. peer := r.getPeer(peerID)
  255. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  256. defer cancel()
  257. client, err := peer.getClient(ctx, s)
  258. if err != nil {
  259. return err
  260. }
  261. close(client.Client.(*testExternalClient).enableNotificationsC)
  262. return nil
  263. }
  264. type testExternalClient struct {
  265. hashes chan []byte
  266. store storage.SyncChunkStore
  267. enableNotificationsC chan struct{}
  268. }
  269. func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient {
  270. return &testExternalClient{
  271. hashes: make(chan []byte),
  272. store: store,
  273. enableNotificationsC: make(chan struct{}),
  274. }
  275. }
  276. func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
  277. wait := c.store.FetchFunc(ctx, storage.Address(hash))
  278. if wait == nil {
  279. return nil
  280. }
  281. select {
  282. case c.hashes <- hash:
  283. case <-ctx.Done():
  284. log.Warn("testExternalClient NeedData context", "err", ctx.Err())
  285. return func(_ context.Context) error {
  286. return ctx.Err()
  287. }
  288. }
  289. return wait
  290. }
  291. func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
  292. return nil
  293. }
  294. func (c *testExternalClient) Close() {}
  295. const testExternalServerBatchSize = 10
  296. type testExternalServer struct {
  297. t string
  298. keyFunc func(key []byte, index uint64)
  299. sessionAt uint64
  300. maxKeys uint64
  301. }
  302. func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
  303. if keyFunc == nil {
  304. keyFunc = binary.BigEndian.PutUint64
  305. }
  306. return &testExternalServer{
  307. t: t,
  308. keyFunc: keyFunc,
  309. sessionAt: sessionAt,
  310. maxKeys: maxKeys,
  311. }
  312. }
  313. func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
  314. if from == 0 && to == 0 {
  315. from = s.sessionAt
  316. to = s.sessionAt + testExternalServerBatchSize
  317. }
  318. if to-from > testExternalServerBatchSize {
  319. to = from + testExternalServerBatchSize - 1
  320. }
  321. if from >= s.maxKeys && to > s.maxKeys {
  322. return nil, 0, 0, nil, io.EOF
  323. }
  324. if to > s.maxKeys {
  325. to = s.maxKeys
  326. }
  327. b := make([]byte, HashSize*(to-from+1))
  328. for i := from; i <= to; i++ {
  329. s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
  330. }
  331. return b, from, to, nil, nil
  332. }
  333. func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
  334. return make([]byte, 4096), nil
  335. }
  336. func (s *testExternalServer) Close() {}