chunker_test.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "bytes"
  19. "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "io"
  23. "runtime"
  24. "sync"
  25. "testing"
  26. "time"
  27. )
  28. /*
  29. Tests TreeChunker by splitting and joining a random byte slice
  30. */
  31. type test interface {
  32. Fatalf(string, ...interface{})
  33. Logf(string, ...interface{})
  34. }
  35. type chunkerTester struct {
  36. inputs map[uint64][]byte
  37. chunks map[string]*Chunk
  38. t test
  39. }
  40. func (self *chunkerTester) checkChunks(t *testing.T, want int) {
  41. l := len(self.chunks)
  42. if l != want {
  43. t.Errorf("expected %v chunks, got %v", want, l)
  44. }
  45. }
  46. func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup, expectedError error) (key Key) {
  47. // reset
  48. self.chunks = make(map[string]*Chunk)
  49. if self.inputs == nil {
  50. self.inputs = make(map[uint64][]byte)
  51. }
  52. quitC := make(chan bool)
  53. timeout := time.After(600 * time.Second)
  54. if chunkC != nil {
  55. go func() {
  56. for {
  57. select {
  58. case <-timeout:
  59. self.t.Fatalf("Join timeout error")
  60. case <-quitC:
  61. return
  62. case chunk := <-chunkC:
  63. // self.chunks = append(self.chunks, chunk)
  64. self.chunks[chunk.Key.String()] = chunk
  65. if chunk.wg != nil {
  66. chunk.wg.Done()
  67. }
  68. }
  69. }
  70. }()
  71. }
  72. key, err := chunker.Split(data, size, chunkC, swg, nil)
  73. if err != nil && expectedError == nil {
  74. self.t.Fatalf("Split error: %v", err)
  75. } else if expectedError != nil && (err == nil || err.Error() != expectedError.Error()) {
  76. self.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err)
  77. }
  78. if chunkC != nil {
  79. if swg != nil {
  80. swg.Wait()
  81. }
  82. close(quitC)
  83. }
  84. return
  85. }
  86. func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader {
  87. // reset but not the chunks
  88. reader := chunker.Join(key, chunkC)
  89. timeout := time.After(600 * time.Second)
  90. i := 0
  91. go func() {
  92. for {
  93. select {
  94. case <-timeout:
  95. self.t.Fatalf("Join timeout error")
  96. case chunk, ok := <-chunkC:
  97. if !ok {
  98. close(quitC)
  99. return
  100. }
  101. // this just mocks the behaviour of a chunk store retrieval
  102. stored, success := self.chunks[chunk.Key.String()]
  103. if !success {
  104. self.t.Fatalf("not found")
  105. return
  106. }
  107. chunk.SData = stored.SData
  108. chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
  109. close(chunk.C)
  110. i++
  111. }
  112. }
  113. }()
  114. return reader
  115. }
  116. func testRandomBrokenData(splitter Splitter, n int, tester *chunkerTester) {
  117. data := io.LimitReader(rand.Reader, int64(n))
  118. brokendata := brokenLimitReader(data, n, n/2)
  119. buf := make([]byte, n)
  120. _, err := brokendata.Read(buf)
  121. if err == nil || err.Error() != "Broken reader" {
  122. tester.t.Fatalf("Broken reader is not broken, hence broken. Returns: %v", err)
  123. }
  124. data = io.LimitReader(rand.Reader, int64(n))
  125. brokendata = brokenLimitReader(data, n, n/2)
  126. chunkC := make(chan *Chunk, 1000)
  127. swg := &sync.WaitGroup{}
  128. key := tester.Split(splitter, brokendata, int64(n), chunkC, swg, fmt.Errorf("Broken reader"))
  129. tester.t.Logf(" Key = %v\n", key)
  130. }
  131. func testRandomData(splitter Splitter, n int, tester *chunkerTester) {
  132. if tester.inputs == nil {
  133. tester.inputs = make(map[uint64][]byte)
  134. }
  135. input, found := tester.inputs[uint64(n)]
  136. var data io.Reader
  137. if !found {
  138. data, input = testDataReaderAndSlice(n)
  139. tester.inputs[uint64(n)] = input
  140. } else {
  141. data = io.LimitReader(bytes.NewReader(input), int64(n))
  142. }
  143. chunkC := make(chan *Chunk, 1000)
  144. swg := &sync.WaitGroup{}
  145. key := tester.Split(splitter, data, int64(n), chunkC, swg, nil)
  146. tester.t.Logf(" Key = %v\n", key)
  147. chunkC = make(chan *Chunk, 1000)
  148. quitC := make(chan bool)
  149. chunker := NewTreeChunker(NewChunkerParams())
  150. reader := tester.Join(chunker, key, 0, chunkC, quitC)
  151. output := make([]byte, n)
  152. r, err := reader.Read(output)
  153. if r != n || err != io.EOF {
  154. tester.t.Fatalf("read error read: %v n = %v err = %v\n", r, n, err)
  155. }
  156. if input != nil {
  157. if !bytes.Equal(output, input) {
  158. tester.t.Fatalf("input and output mismatch\n IN: %v\nOUT: %v\n", input, output)
  159. }
  160. }
  161. close(chunkC)
  162. <-quitC
  163. }
  164. func TestRandomData(t *testing.T) {
  165. // sizes := []int{123456}
  166. sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678}
  167. tester := &chunkerTester{t: t}
  168. chunker := NewTreeChunker(NewChunkerParams())
  169. for _, s := range sizes {
  170. testRandomData(chunker, s, tester)
  171. }
  172. pyramid := NewPyramidChunker(NewChunkerParams())
  173. for _, s := range sizes {
  174. testRandomData(pyramid, s, tester)
  175. }
  176. }
  177. func TestRandomBrokenData(t *testing.T) {
  178. sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678}
  179. tester := &chunkerTester{t: t}
  180. chunker := NewTreeChunker(NewChunkerParams())
  181. for _, s := range sizes {
  182. testRandomBrokenData(chunker, s, tester)
  183. t.Logf("done size: %v", s)
  184. }
  185. }
  186. func readAll(reader LazySectionReader, result []byte) {
  187. size := int64(len(result))
  188. var end int64
  189. for pos := int64(0); pos < size; pos += 1000 {
  190. if pos+1000 > size {
  191. end = size
  192. } else {
  193. end = pos + 1000
  194. }
  195. reader.ReadAt(result[pos:end], pos)
  196. }
  197. }
  198. func benchReadAll(reader LazySectionReader) {
  199. size, _ := reader.Size(nil)
  200. output := make([]byte, 1000)
  201. for pos := int64(0); pos < size; pos += 1000 {
  202. reader.ReadAt(output, pos)
  203. }
  204. }
  205. func benchmarkJoin(n int, t *testing.B) {
  206. t.ReportAllocs()
  207. for i := 0; i < t.N; i++ {
  208. chunker := NewTreeChunker(NewChunkerParams())
  209. tester := &chunkerTester{t: t}
  210. data := testDataReader(n)
  211. chunkC := make(chan *Chunk, 1000)
  212. swg := &sync.WaitGroup{}
  213. key := tester.Split(chunker, data, int64(n), chunkC, swg, nil)
  214. // t.StartTimer()
  215. chunkC = make(chan *Chunk, 1000)
  216. quitC := make(chan bool)
  217. reader := tester.Join(chunker, key, i, chunkC, quitC)
  218. benchReadAll(reader)
  219. close(chunkC)
  220. <-quitC
  221. // t.StopTimer()
  222. }
  223. stats := new(runtime.MemStats)
  224. runtime.ReadMemStats(stats)
  225. fmt.Println(stats.Sys)
  226. }
  227. func benchmarkSplitTree(n int, t *testing.B) {
  228. t.ReportAllocs()
  229. for i := 0; i < t.N; i++ {
  230. chunker := NewTreeChunker(NewChunkerParams())
  231. tester := &chunkerTester{t: t}
  232. data := testDataReader(n)
  233. tester.Split(chunker, data, int64(n), nil, nil, nil)
  234. }
  235. stats := new(runtime.MemStats)
  236. runtime.ReadMemStats(stats)
  237. fmt.Println(stats.Sys)
  238. }
  239. func benchmarkSplitPyramid(n int, t *testing.B) {
  240. t.ReportAllocs()
  241. for i := 0; i < t.N; i++ {
  242. splitter := NewPyramidChunker(NewChunkerParams())
  243. tester := &chunkerTester{t: t}
  244. data := testDataReader(n)
  245. tester.Split(splitter, data, int64(n), nil, nil, nil)
  246. }
  247. stats := new(runtime.MemStats)
  248. runtime.ReadMemStats(stats)
  249. fmt.Println(stats.Sys)
  250. }
  251. func BenchmarkJoin_2(t *testing.B) { benchmarkJoin(100, t) }
  252. func BenchmarkJoin_3(t *testing.B) { benchmarkJoin(1000, t) }
  253. func BenchmarkJoin_4(t *testing.B) { benchmarkJoin(10000, t) }
  254. func BenchmarkJoin_5(t *testing.B) { benchmarkJoin(100000, t) }
  255. func BenchmarkJoin_6(t *testing.B) { benchmarkJoin(1000000, t) }
  256. func BenchmarkJoin_7(t *testing.B) { benchmarkJoin(10000000, t) }
  257. func BenchmarkJoin_8(t *testing.B) { benchmarkJoin(100000000, t) }
  258. func BenchmarkSplitTree_2(t *testing.B) { benchmarkSplitTree(100, t) }
  259. func BenchmarkSplitTree_2h(t *testing.B) { benchmarkSplitTree(500, t) }
  260. func BenchmarkSplitTree_3(t *testing.B) { benchmarkSplitTree(1000, t) }
  261. func BenchmarkSplitTree_3h(t *testing.B) { benchmarkSplitTree(5000, t) }
  262. func BenchmarkSplitTree_4(t *testing.B) { benchmarkSplitTree(10000, t) }
  263. func BenchmarkSplitTree_4h(t *testing.B) { benchmarkSplitTree(50000, t) }
  264. func BenchmarkSplitTree_5(t *testing.B) { benchmarkSplitTree(100000, t) }
  265. func BenchmarkSplitTree_6(t *testing.B) { benchmarkSplitTree(1000000, t) }
  266. func BenchmarkSplitTree_7(t *testing.B) { benchmarkSplitTree(10000000, t) }
  267. func BenchmarkSplitTree_8(t *testing.B) { benchmarkSplitTree(100000000, t) }
  268. func BenchmarkSplitPyramid_2(t *testing.B) { benchmarkSplitPyramid(100, t) }
  269. func BenchmarkSplitPyramid_2h(t *testing.B) { benchmarkSplitPyramid(500, t) }
  270. func BenchmarkSplitPyramid_3(t *testing.B) { benchmarkSplitPyramid(1000, t) }
  271. func BenchmarkSplitPyramid_3h(t *testing.B) { benchmarkSplitPyramid(5000, t) }
  272. func BenchmarkSplitPyramid_4(t *testing.B) { benchmarkSplitPyramid(10000, t) }
  273. func BenchmarkSplitPyramid_4h(t *testing.B) { benchmarkSplitPyramid(50000, t) }
  274. func BenchmarkSplitPyramid_5(t *testing.B) { benchmarkSplitPyramid(100000, t) }
  275. func BenchmarkSplitPyramid_6(t *testing.B) { benchmarkSplitPyramid(1000000, t) }
  276. func BenchmarkSplitPyramid_7(t *testing.B) { benchmarkSplitPyramid(10000000, t) }
  277. func BenchmarkSplitPyramid_8(t *testing.B) { benchmarkSplitPyramid(100000000, t) }
  278. // godep go test -bench ./swarm/storage -cpuprofile cpu.out -memprofile mem.out