pyramid.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "io"
  21. "math"
  22. "strings"
  23. "sync"
  24. "github.com/ethereum/go-ethereum/common"
  25. )
  26. const (
  27. processors = 8
  28. )
  29. type Tree struct {
  30. Chunks int64
  31. Levels []map[int64]*Node
  32. Lock sync.RWMutex
  33. }
  34. type Node struct {
  35. Pending int64
  36. Size uint64
  37. Children []common.Hash
  38. Last bool
  39. }
  40. func (self *Node) String() string {
  41. var children []string
  42. for _, node := range self.Children {
  43. children = append(children, node.Hex())
  44. }
  45. return fmt.Sprintf("pending: %v, size: %v, last :%v, children: %v", self.Pending, self.Size, self.Last, strings.Join(children, ", "))
  46. }
  47. type Task struct {
  48. Index int64 // Index of the chunk being processed
  49. Size uint64
  50. Data []byte // Binary blob of the chunk
  51. Last bool
  52. }
  53. type PyramidChunker struct {
  54. hashFunc Hasher
  55. chunkSize int64
  56. hashSize int64
  57. branches int64
  58. workerCount int
  59. }
  60. func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
  61. self = &PyramidChunker{}
  62. self.hashFunc = MakeHashFunc(params.Hash)
  63. self.branches = params.Branches
  64. self.hashSize = int64(self.hashFunc().Size())
  65. self.chunkSize = self.hashSize * self.branches
  66. self.workerCount = 1
  67. return
  68. }
  69. func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
  70. chunks := (size + self.chunkSize - 1) / self.chunkSize
  71. depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1
  72. results := Tree{
  73. Chunks: chunks,
  74. Levels: make([]map[int64]*Node, depth),
  75. }
  76. for i := 0; i < depth; i++ {
  77. results.Levels[i] = make(map[int64]*Node)
  78. }
  79. // Create a pool of workers to crunch through the file
  80. tasks := make(chan *Task, 2*processors)
  81. pend := new(sync.WaitGroup)
  82. abortC := make(chan bool)
  83. for i := 0; i < processors; i++ {
  84. pend.Add(1)
  85. go self.processor(pend, swg, tasks, chunkC, &results)
  86. }
  87. // Feed the chunks into the task pool
  88. read := 0
  89. for index := 0; ; index++ {
  90. buffer := make([]byte, self.chunkSize+8)
  91. n, err := data.Read(buffer[8:])
  92. read += n
  93. last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF
  94. if err != nil && !last {
  95. close(abortC)
  96. break
  97. }
  98. binary.LittleEndian.PutUint64(buffer[:8], uint64(n))
  99. pend.Add(1)
  100. select {
  101. case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}:
  102. case <-abortC:
  103. return nil, err
  104. }
  105. if last {
  106. break
  107. }
  108. }
  109. // Wait for the workers and return
  110. close(tasks)
  111. pend.Wait()
  112. key := results.Levels[0][0].Children[0][:]
  113. return key, nil
  114. }
  115. func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) {
  116. defer pend.Done()
  117. // Start processing leaf chunks ad infinitum
  118. hasher := self.hashFunc()
  119. for task := range tasks {
  120. depth, pow := len(results.Levels)-1, self.branches
  121. size := task.Size
  122. data := task.Data
  123. var node *Node
  124. for depth >= 0 {
  125. // New chunk received, reset the hasher and start processing
  126. hasher.Reset()
  127. if node == nil { // Leaf node, hash the data chunk
  128. hasher.Write(task.Data)
  129. } else { // Internal node, hash the children
  130. size = node.Size
  131. data = make([]byte, hasher.Size()*len(node.Children)+8)
  132. binary.LittleEndian.PutUint64(data[:8], size)
  133. hasher.Write(data[:8])
  134. for i, hash := range node.Children {
  135. copy(data[i*hasher.Size()+8:], hash[:])
  136. hasher.Write(hash[:])
  137. }
  138. }
  139. hash := hasher.Sum(nil)
  140. last := task.Last || (node != nil) && node.Last
  141. // Insert the subresult into the memoization tree
  142. results.Lock.Lock()
  143. if node = results.Levels[depth][task.Index/pow]; node == nil {
  144. // Figure out the pending tasks
  145. pending := self.branches
  146. if task.Index/pow == results.Chunks/pow {
  147. pending = (results.Chunks + pow/self.branches - 1) / (pow / self.branches) % self.branches
  148. }
  149. node = &Node{pending, 0, make([]common.Hash, pending), last}
  150. results.Levels[depth][task.Index/pow] = node
  151. }
  152. node.Pending--
  153. i := task.Index / (pow / self.branches) % self.branches
  154. if last {
  155. node.Last = true
  156. }
  157. copy(node.Children[i][:], hash)
  158. node.Size += size
  159. left := node.Pending
  160. if chunkC != nil {
  161. if swg != nil {
  162. swg.Add(1)
  163. }
  164. select {
  165. case chunkC <- &Chunk{Key: hash, SData: data, wg: swg}:
  166. // case <- self.quitC
  167. }
  168. }
  169. if depth+1 < len(results.Levels) {
  170. delete(results.Levels[depth+1], task.Index/(pow/self.branches))
  171. }
  172. results.Lock.Unlock()
  173. // If there's more work to be done, leave for others
  174. if left > 0 {
  175. break
  176. }
  177. // We're the last ones in this batch, merge the children together
  178. depth--
  179. pow *= self.branches
  180. }
  181. pend.Done()
  182. }
  183. }