common_test.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "bytes"
  19. "context"
  20. "crypto/rand"
  21. "flag"
  22. "fmt"
  23. "io"
  24. "sync"
  25. "testing"
  26. "time"
  27. "github.com/ethereum/go-ethereum/log"
  28. colorable "github.com/mattn/go-colorable"
  29. )
  30. var (
  31. loglevel = flag.Int("loglevel", 3, "verbosity of logs")
  32. )
  33. func init() {
  34. flag.Parse()
  35. log.PrintOrigins(true)
  36. log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
  37. }
  38. type brokenLimitedReader struct {
  39. lr io.Reader
  40. errAt int
  41. off int
  42. size int
  43. }
  44. func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader {
  45. return &brokenLimitedReader{
  46. lr: data,
  47. errAt: errAt,
  48. size: size,
  49. }
  50. }
  51. func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) {
  52. return mput(store, processors, n, GenerateRandomChunk)
  53. }
  54. func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) {
  55. wg := sync.WaitGroup{}
  56. wg.Add(processors)
  57. c := make(chan *Chunk)
  58. for i := 0; i < processors; i++ {
  59. go func() {
  60. defer wg.Done()
  61. for chunk := range c {
  62. wg.Add(1)
  63. chunk := chunk
  64. store.Put(context.TODO(), chunk)
  65. go func() {
  66. defer wg.Done()
  67. <-chunk.dbStoredC
  68. }()
  69. }
  70. }()
  71. }
  72. fa := f
  73. if _, ok := store.(*MemStore); ok {
  74. fa = func(i int64) *Chunk {
  75. chunk := f(i)
  76. chunk.markAsStored()
  77. return chunk
  78. }
  79. }
  80. for i := 0; i < n; i++ {
  81. chunk := fa(int64(i))
  82. hs = append(hs, chunk.Addr)
  83. c <- chunk
  84. }
  85. close(c)
  86. wg.Wait()
  87. return hs
  88. }
  89. func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error {
  90. wg := sync.WaitGroup{}
  91. wg.Add(len(hs))
  92. errc := make(chan error)
  93. for _, k := range hs {
  94. go func(h Address) {
  95. defer wg.Done()
  96. chunk, err := store.Get(context.TODO(), h)
  97. if err != nil {
  98. errc <- err
  99. return
  100. }
  101. if f != nil {
  102. err = f(h, chunk)
  103. if err != nil {
  104. errc <- err
  105. return
  106. }
  107. }
  108. }(k)
  109. }
  110. go func() {
  111. wg.Wait()
  112. close(errc)
  113. }()
  114. var err error
  115. select {
  116. case err = <-errc:
  117. case <-time.NewTimer(5 * time.Second).C:
  118. err = fmt.Errorf("timed out after 5 seconds")
  119. }
  120. return err
  121. }
  122. func testDataReader(l int) (r io.Reader) {
  123. return io.LimitReader(rand.Reader, int64(l))
  124. }
  125. func (r *brokenLimitedReader) Read(buf []byte) (int, error) {
  126. if r.off+len(buf) > r.errAt {
  127. return 0, fmt.Errorf("Broken reader")
  128. }
  129. r.off += len(buf)
  130. return r.lr.Read(buf)
  131. }
  132. func generateRandomData(l int) (r io.Reader, slice []byte) {
  133. slice = make([]byte, l)
  134. if _, err := rand.Read(slice); err != nil {
  135. panic("rand error")
  136. }
  137. r = io.LimitReader(bytes.NewReader(slice), int64(l))
  138. return
  139. }
  140. func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
  141. hs := mputRandomChunks(m, processors, n, chunksize)
  142. err := mget(m, hs, nil)
  143. if err != nil {
  144. t.Fatalf("testStore failed: %v", err)
  145. }
  146. }
  147. func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
  148. hs := mputRandomChunks(m, processors, n, chunksize)
  149. f := func(h Address, chunk *Chunk) error {
  150. if !bytes.Equal(h, chunk.Addr) {
  151. return fmt.Errorf("key does not match retrieved chunk Key")
  152. }
  153. hasher := MakeHashFunc(DefaultHash)()
  154. hasher.ResetWithLength(chunk.SData[:8])
  155. hasher.Write(chunk.SData[8:])
  156. exp := hasher.Sum(nil)
  157. if !bytes.Equal(h, exp) {
  158. return fmt.Errorf("key is not hash of chunk data")
  159. }
  160. return nil
  161. }
  162. err := mget(m, hs, f)
  163. if err != nil {
  164. t.Fatalf("testStore failed: %v", err)
  165. }
  166. }
  167. func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
  168. chunks := make([]*Chunk, n)
  169. i := 0
  170. f := func(dataSize int64) *Chunk {
  171. chunk := GenerateRandomChunk(dataSize)
  172. chunks[i] = chunk
  173. i++
  174. return chunk
  175. }
  176. mput(store, processors, n, f)
  177. f = func(dataSize int64) *Chunk {
  178. chunk := chunks[i]
  179. i++
  180. return chunk
  181. }
  182. b.ReportAllocs()
  183. b.ResetTimer()
  184. for j := 0; j < b.N; j++ {
  185. i = 0
  186. mput(store, processors, n, f)
  187. }
  188. }
  189. func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
  190. hs := mputRandomChunks(store, processors, n, chunksize)
  191. b.ReportAllocs()
  192. b.ResetTimer()
  193. for i := 0; i < b.N; i++ {
  194. err := mget(store, hs, nil)
  195. if err != nil {
  196. b.Fatalf("mget failed: %v", err)
  197. }
  198. }
  199. }