pyramid.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "context"
  19. "encoding/binary"
  20. "errors"
  21. "io"
  22. "io/ioutil"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/swarm/log"
  26. )
  27. /*
  28. The main idea of a pyramid chunker is to process the input data without knowing the entire size apriori.
  29. For this to be achieved, the chunker tree is built from the ground up until the data is exhausted.
  30. This opens up new aveneus such as easy append and other sort of modifications to the tree thereby avoiding
  31. duplication of data chunks.
  32. Below is an example of a two level chunks tree. The leaf chunks are called data chunks and all the above
  33. chunks are called tree chunks. The tree chunk above data chunks is level 0 and so on until it reaches
  34. the root tree chunk.
  35. T10 <- Tree chunk lvl1
  36. |
  37. __________________________|_____________________________
  38. / | | \
  39. / | \ \
  40. __T00__ ___T01__ ___T02__ ___T03__ <- Tree chunks lvl 0
  41. / / \ / / \ / / \ / / \
  42. / / \ / / \ / / \ / / \
  43. D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 <- Data Chunks
  44. The split function continuously read the data and creates data chunks and send them to storage.
  45. When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree
  46. entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal
  47. is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one
  48. tree entry is present in certain level. The key of tree entry is given out as the rootKey of the file.
  49. */
  50. var (
  51. errLoadingTreeRootChunk = errors.New("LoadTree Error: Could not load root chunk")
  52. errLoadingTreeChunk = errors.New("LoadTree Error: Could not load chunk")
  53. )
  54. const (
  55. ChunkProcessors = 8
  56. splitTimeout = time.Minute * 5
  57. )
  58. const (
  59. DataChunk = 0
  60. TreeChunk = 1
  61. )
  62. type PyramidSplitterParams struct {
  63. SplitterParams
  64. getter Getter
  65. }
  66. func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, getter Getter, chunkSize int64) *PyramidSplitterParams {
  67. hashSize := putter.RefSize()
  68. return &PyramidSplitterParams{
  69. SplitterParams: SplitterParams{
  70. ChunkerParams: ChunkerParams{
  71. chunkSize: chunkSize,
  72. hashSize: hashSize,
  73. },
  74. reader: reader,
  75. putter: putter,
  76. addr: addr,
  77. },
  78. getter: getter,
  79. }
  80. }
  81. /*
  82. When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
  83. New chunks to store are store using the putter which the caller provides.
  84. */
  85. func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
  86. return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split(ctx)
  87. }
  88. func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
  89. return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, DefaultChunkSize)).Append(ctx)
  90. }
  91. // Entry to create a tree node
  92. type TreeEntry struct {
  93. level int
  94. branchCount int64
  95. subtreeSize uint64
  96. chunk []byte
  97. key []byte
  98. index int // used in append to indicate the index of existing tree entry
  99. updatePending bool // indicates if the entry is loaded from existing tree
  100. }
  101. func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry {
  102. return &TreeEntry{
  103. level: 0,
  104. branchCount: 0,
  105. subtreeSize: 0,
  106. chunk: make([]byte, pyramid.chunkSize+8),
  107. key: make([]byte, pyramid.hashSize),
  108. index: 0,
  109. updatePending: false,
  110. }
  111. }
  112. // Used by the hash processor to create a data/tree chunk and send to storage
  113. type chunkJob struct {
  114. key Address
  115. chunk []byte
  116. parentWg *sync.WaitGroup
  117. }
  118. type PyramidChunker struct {
  119. chunkSize int64
  120. hashSize int64
  121. branches int64
  122. reader io.Reader
  123. putter Putter
  124. getter Getter
  125. key Address
  126. workerCount int64
  127. workerLock sync.RWMutex
  128. jobC chan *chunkJob
  129. wg *sync.WaitGroup
  130. errC chan error
  131. quitC chan bool
  132. rootKey []byte
  133. chunkLevel [][]*TreeEntry
  134. }
  135. func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) {
  136. pc = &PyramidChunker{}
  137. pc.reader = params.reader
  138. pc.hashSize = params.hashSize
  139. pc.branches = params.chunkSize / pc.hashSize
  140. pc.chunkSize = pc.hashSize * pc.branches
  141. pc.putter = params.putter
  142. pc.getter = params.getter
  143. pc.key = params.addr
  144. pc.workerCount = 0
  145. pc.jobC = make(chan *chunkJob, 2*ChunkProcessors)
  146. pc.wg = &sync.WaitGroup{}
  147. pc.errC = make(chan error)
  148. pc.quitC = make(chan bool)
  149. pc.rootKey = make([]byte, pc.hashSize)
  150. pc.chunkLevel = make([][]*TreeEntry, pc.branches)
  151. return
  152. }
  153. func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader {
  154. return &LazyChunkReader{
  155. key: addr,
  156. depth: depth,
  157. chunkSize: pc.chunkSize,
  158. branches: pc.branches,
  159. hashSize: pc.hashSize,
  160. getter: getter,
  161. }
  162. }
  163. func (pc *PyramidChunker) incrementWorkerCount() {
  164. pc.workerLock.Lock()
  165. defer pc.workerLock.Unlock()
  166. pc.workerCount += 1
  167. }
  168. func (pc *PyramidChunker) getWorkerCount() int64 {
  169. pc.workerLock.Lock()
  170. defer pc.workerLock.Unlock()
  171. return pc.workerCount
  172. }
  173. func (pc *PyramidChunker) decrementWorkerCount() {
  174. pc.workerLock.Lock()
  175. defer pc.workerLock.Unlock()
  176. pc.workerCount -= 1
  177. }
  178. func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
  179. log.Debug("pyramid.chunker: Split()")
  180. pc.wg.Add(1)
  181. pc.prepareChunks(false)
  182. // closes internal error channel if all subprocesses in the workgroup finished
  183. go func() {
  184. // waiting for all chunks to finish
  185. pc.wg.Wait()
  186. //We close errC here because this is passed down to 8 parallel routines underneath.
  187. // if a error happens in one of them.. that particular routine raises error...
  188. // once they all complete successfully, the control comes back and we can safely close this here.
  189. close(pc.errC)
  190. }()
  191. defer close(pc.quitC)
  192. defer pc.putter.Close()
  193. select {
  194. case err := <-pc.errC:
  195. if err != nil {
  196. return nil, nil, err
  197. }
  198. case <-time.NewTimer(splitTimeout).C:
  199. }
  200. return pc.rootKey, pc.putter.Wait, nil
  201. }
  202. func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
  203. log.Debug("pyramid.chunker: Append()")
  204. // Load the right most unfinished tree chunks in every level
  205. pc.loadTree()
  206. pc.wg.Add(1)
  207. pc.prepareChunks(true)
  208. // closes internal error channel if all subprocesses in the workgroup finished
  209. go func() {
  210. // waiting for all chunks to finish
  211. pc.wg.Wait()
  212. close(pc.errC)
  213. }()
  214. defer close(pc.quitC)
  215. defer pc.putter.Close()
  216. select {
  217. case err := <-pc.errC:
  218. if err != nil {
  219. return nil, nil, err
  220. }
  221. case <-time.NewTimer(splitTimeout).C:
  222. }
  223. return pc.rootKey, pc.putter.Wait, nil
  224. }
  225. func (pc *PyramidChunker) processor(id int64) {
  226. defer pc.decrementWorkerCount()
  227. for {
  228. select {
  229. case job, ok := <-pc.jobC:
  230. if !ok {
  231. return
  232. }
  233. pc.processChunk(id, job)
  234. case <-pc.quitC:
  235. return
  236. }
  237. }
  238. }
  239. func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) {
  240. log.Debug("pyramid.chunker: processChunk()", "id", id)
  241. ref, err := pc.putter.Put(context.TODO(), job.chunk)
  242. if err != nil {
  243. pc.errC <- err
  244. }
  245. // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
  246. copy(job.key, ref)
  247. // send off new chunk to storage
  248. job.parentWg.Done()
  249. }
  250. func (pc *PyramidChunker) loadTree() error {
  251. log.Debug("pyramid.chunker: loadTree()")
  252. // Get the root chunk to get the total size
  253. chunkData, err := pc.getter.Get(context.TODO(), Reference(pc.key))
  254. if err != nil {
  255. return errLoadingTreeRootChunk
  256. }
  257. chunkSize := chunkData.Size()
  258. log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize)
  259. //if data size is less than a chunk... add a parent with update as pending
  260. if chunkSize <= pc.chunkSize {
  261. newEntry := &TreeEntry{
  262. level: 0,
  263. branchCount: 1,
  264. subtreeSize: uint64(chunkSize),
  265. chunk: make([]byte, pc.chunkSize+8),
  266. key: make([]byte, pc.hashSize),
  267. index: 0,
  268. updatePending: true,
  269. }
  270. copy(newEntry.chunk[8:], pc.key)
  271. pc.chunkLevel[0] = append(pc.chunkLevel[0], newEntry)
  272. return nil
  273. }
  274. var treeSize int64
  275. var depth int
  276. treeSize = pc.chunkSize
  277. for ; treeSize < chunkSize; treeSize *= pc.branches {
  278. depth++
  279. }
  280. log.Trace("pyramid.chunker", "depth", depth)
  281. // Add the root chunk entry
  282. branchCount := int64(len(chunkData)-8) / pc.hashSize
  283. newEntry := &TreeEntry{
  284. level: depth - 1,
  285. branchCount: branchCount,
  286. subtreeSize: uint64(chunkSize),
  287. chunk: chunkData,
  288. key: pc.key,
  289. index: 0,
  290. updatePending: true,
  291. }
  292. pc.chunkLevel[depth-1] = append(pc.chunkLevel[depth-1], newEntry)
  293. // Add the rest of the tree
  294. for lvl := depth - 1; lvl >= 1; lvl-- {
  295. //TODO(jmozah): instead of loading finished branches and then trim in the end,
  296. //avoid loading them in the first place
  297. for _, ent := range pc.chunkLevel[lvl] {
  298. branchCount = int64(len(ent.chunk)-8) / pc.hashSize
  299. for i := int64(0); i < branchCount; i++ {
  300. key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)]
  301. newChunkData, err := pc.getter.Get(context.TODO(), Reference(key))
  302. if err != nil {
  303. return errLoadingTreeChunk
  304. }
  305. newChunkSize := newChunkData.Size()
  306. bewBranchCount := int64(len(newChunkData)-8) / pc.hashSize
  307. newEntry := &TreeEntry{
  308. level: lvl - 1,
  309. branchCount: bewBranchCount,
  310. subtreeSize: uint64(newChunkSize),
  311. chunk: newChunkData,
  312. key: key,
  313. index: 0,
  314. updatePending: true,
  315. }
  316. pc.chunkLevel[lvl-1] = append(pc.chunkLevel[lvl-1], newEntry)
  317. }
  318. // We need to get only the right most unfinished branch.. so trim all finished branches
  319. if int64(len(pc.chunkLevel[lvl-1])) >= pc.branches {
  320. pc.chunkLevel[lvl-1] = nil
  321. }
  322. }
  323. }
  324. return nil
  325. }
  326. func (pc *PyramidChunker) prepareChunks(isAppend bool) {
  327. log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend)
  328. defer pc.wg.Done()
  329. chunkWG := &sync.WaitGroup{}
  330. pc.incrementWorkerCount()
  331. go pc.processor(pc.workerCount)
  332. parent := NewTreeEntry(pc)
  333. var unfinishedChunkData ChunkData
  334. var unfinishedChunkSize int64
  335. if isAppend && len(pc.chunkLevel[0]) != 0 {
  336. lastIndex := len(pc.chunkLevel[0]) - 1
  337. ent := pc.chunkLevel[0][lastIndex]
  338. if ent.branchCount < pc.branches {
  339. parent = &TreeEntry{
  340. level: 0,
  341. branchCount: ent.branchCount,
  342. subtreeSize: ent.subtreeSize,
  343. chunk: ent.chunk,
  344. key: ent.key,
  345. index: lastIndex,
  346. updatePending: true,
  347. }
  348. lastBranch := parent.branchCount - 1
  349. lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize]
  350. var err error
  351. unfinishedChunkData, err = pc.getter.Get(context.TODO(), lastKey)
  352. if err != nil {
  353. pc.errC <- err
  354. }
  355. unfinishedChunkSize = unfinishedChunkData.Size()
  356. if unfinishedChunkSize < pc.chunkSize {
  357. parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize)
  358. parent.branchCount = parent.branchCount - 1
  359. } else {
  360. unfinishedChunkData = nil
  361. }
  362. }
  363. }
  364. for index := 0; ; index++ {
  365. var err error
  366. chunkData := make([]byte, pc.chunkSize+8)
  367. var readBytes int
  368. if unfinishedChunkData != nil {
  369. copy(chunkData, unfinishedChunkData)
  370. readBytes += int(unfinishedChunkSize)
  371. unfinishedChunkData = nil
  372. log.Trace("pyramid.chunker: found unfinished chunk", "readBytes", readBytes)
  373. }
  374. var res []byte
  375. res, err = ioutil.ReadAll(io.LimitReader(pc.reader, int64(len(chunkData)-(8+readBytes))))
  376. // hack for ioutil.ReadAll:
  377. // a successful call to ioutil.ReadAll returns err == nil, not err == EOF, whereas we
  378. // want to propagate the io.EOF error
  379. if len(res) == 0 && err == nil {
  380. err = io.EOF
  381. }
  382. copy(chunkData[8+readBytes:], res)
  383. readBytes += len(res)
  384. log.Trace("pyramid.chunker: copied all data", "readBytes", readBytes)
  385. if err != nil {
  386. if err == io.EOF || err == io.ErrUnexpectedEOF {
  387. pc.cleanChunkLevels()
  388. // Check if we are appending or the chunk is the only one.
  389. if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) {
  390. // Data is exactly one chunk.. pick the last chunk key as root
  391. chunkWG.Wait()
  392. lastChunksKey := parent.chunk[8 : 8+pc.hashSize]
  393. copy(pc.rootKey, lastChunksKey)
  394. break
  395. }
  396. } else {
  397. close(pc.quitC)
  398. break
  399. }
  400. }
  401. // Data ended in chunk boundary.. just signal to start bulding tree
  402. if readBytes == 0 {
  403. pc.buildTree(isAppend, parent, chunkWG, true, nil)
  404. break
  405. } else {
  406. pkey := pc.enqueueDataChunk(chunkData, uint64(readBytes), parent, chunkWG)
  407. // update tree related parent data structures
  408. parent.subtreeSize += uint64(readBytes)
  409. parent.branchCount++
  410. // Data got exhausted... signal to send any parent tree related chunks
  411. if int64(readBytes) < pc.chunkSize {
  412. pc.cleanChunkLevels()
  413. // only one data chunk .. so dont add any parent chunk
  414. if parent.branchCount <= 1 {
  415. chunkWG.Wait()
  416. if isAppend || pc.depth() == 0 {
  417. // No need to build the tree if the depth is 0
  418. // or we are appending.
  419. // Just use the last key.
  420. copy(pc.rootKey, pkey)
  421. } else {
  422. // We need to build the tree and and provide the lonely
  423. // chunk key to replace the last tree chunk key.
  424. pc.buildTree(isAppend, parent, chunkWG, true, pkey)
  425. }
  426. break
  427. }
  428. pc.buildTree(isAppend, parent, chunkWG, true, nil)
  429. break
  430. }
  431. if parent.branchCount == pc.branches {
  432. pc.buildTree(isAppend, parent, chunkWG, false, nil)
  433. parent = NewTreeEntry(pc)
  434. }
  435. }
  436. workers := pc.getWorkerCount()
  437. if int64(len(pc.jobC)) > workers && workers < ChunkProcessors {
  438. pc.incrementWorkerCount()
  439. go pc.processor(pc.workerCount)
  440. }
  441. }
  442. }
  443. func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync.WaitGroup, last bool, lonelyChunkKey []byte) {
  444. chunkWG.Wait()
  445. pc.enqueueTreeChunk(ent, chunkWG, last)
  446. compress := false
  447. endLvl := pc.branches
  448. for lvl := int64(0); lvl < pc.branches; lvl++ {
  449. lvlCount := int64(len(pc.chunkLevel[lvl]))
  450. if lvlCount >= pc.branches {
  451. endLvl = lvl + 1
  452. compress = true
  453. break
  454. }
  455. }
  456. if !compress && !last {
  457. return
  458. }
  459. // Wait for all the keys to be processed before compressing the tree
  460. chunkWG.Wait()
  461. for lvl := int64(ent.level); lvl < endLvl; lvl++ {
  462. lvlCount := int64(len(pc.chunkLevel[lvl]))
  463. if lvlCount == 1 && last {
  464. copy(pc.rootKey, pc.chunkLevel[lvl][0].key)
  465. return
  466. }
  467. for startCount := int64(0); startCount < lvlCount; startCount += pc.branches {
  468. endCount := startCount + pc.branches
  469. if endCount > lvlCount {
  470. endCount = lvlCount
  471. }
  472. var nextLvlCount int64
  473. var tempEntry *TreeEntry
  474. if len(pc.chunkLevel[lvl+1]) > 0 {
  475. nextLvlCount = int64(len(pc.chunkLevel[lvl+1]) - 1)
  476. tempEntry = pc.chunkLevel[lvl+1][nextLvlCount]
  477. }
  478. if isAppend && tempEntry != nil && tempEntry.updatePending {
  479. updateEntry := &TreeEntry{
  480. level: int(lvl + 1),
  481. branchCount: 0,
  482. subtreeSize: 0,
  483. chunk: make([]byte, pc.chunkSize+8),
  484. key: make([]byte, pc.hashSize),
  485. index: int(nextLvlCount),
  486. updatePending: true,
  487. }
  488. for index := int64(0); index < lvlCount; index++ {
  489. updateEntry.branchCount++
  490. updateEntry.subtreeSize += pc.chunkLevel[lvl][index].subtreeSize
  491. copy(updateEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], pc.chunkLevel[lvl][index].key[:pc.hashSize])
  492. }
  493. pc.enqueueTreeChunk(updateEntry, chunkWG, last)
  494. } else {
  495. noOfBranches := endCount - startCount
  496. newEntry := &TreeEntry{
  497. level: int(lvl + 1),
  498. branchCount: noOfBranches,
  499. subtreeSize: 0,
  500. chunk: make([]byte, (noOfBranches*pc.hashSize)+8),
  501. key: make([]byte, pc.hashSize),
  502. index: int(nextLvlCount),
  503. updatePending: false,
  504. }
  505. index := int64(0)
  506. for i := startCount; i < endCount; i++ {
  507. entry := pc.chunkLevel[lvl][i]
  508. newEntry.subtreeSize += entry.subtreeSize
  509. copy(newEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], entry.key[:pc.hashSize])
  510. index++
  511. }
  512. // Lonely chunk key is the key of the last chunk that is only one on the last branch.
  513. // In this case, ignore the its tree chunk key and replace it with the lonely chunk key.
  514. if lonelyChunkKey != nil {
  515. // Overwrite the last tree chunk key with the lonely data chunk key.
  516. copy(newEntry.chunk[int64(len(newEntry.chunk))-pc.hashSize:], lonelyChunkKey[:pc.hashSize])
  517. }
  518. pc.enqueueTreeChunk(newEntry, chunkWG, last)
  519. }
  520. }
  521. if !isAppend {
  522. chunkWG.Wait()
  523. if compress {
  524. pc.chunkLevel[lvl] = nil
  525. }
  526. }
  527. }
  528. }
  529. func (pc *PyramidChunker) enqueueTreeChunk(ent *TreeEntry, chunkWG *sync.WaitGroup, last bool) {
  530. if ent != nil && ent.branchCount > 0 {
  531. // wait for data chunks to get over before processing the tree chunk
  532. if last {
  533. chunkWG.Wait()
  534. }
  535. binary.LittleEndian.PutUint64(ent.chunk[:8], ent.subtreeSize)
  536. ent.key = make([]byte, pc.hashSize)
  537. chunkWG.Add(1)
  538. select {
  539. case pc.jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*pc.hashSize+8], chunkWG}:
  540. case <-pc.quitC:
  541. }
  542. // Update or append based on weather it is a new entry or being reused
  543. if ent.updatePending {
  544. chunkWG.Wait()
  545. pc.chunkLevel[ent.level][ent.index] = ent
  546. } else {
  547. pc.chunkLevel[ent.level] = append(pc.chunkLevel[ent.level], ent)
  548. }
  549. }
  550. }
  551. func (pc *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup) Address {
  552. binary.LittleEndian.PutUint64(chunkData[:8], size)
  553. pkey := parent.chunk[8+parent.branchCount*pc.hashSize : 8+(parent.branchCount+1)*pc.hashSize]
  554. chunkWG.Add(1)
  555. select {
  556. case pc.jobC <- &chunkJob{pkey, chunkData[:size+8], chunkWG}:
  557. case <-pc.quitC:
  558. }
  559. return pkey
  560. }
  561. // depth returns the number of chunk levels.
  562. // It is used to detect if there is only one data chunk
  563. // left for the last branch.
  564. func (pc *PyramidChunker) depth() (d int) {
  565. for _, l := range pc.chunkLevel {
  566. if l == nil {
  567. return
  568. }
  569. d++
  570. }
  571. return
  572. }
  573. // cleanChunkLevels removes gaps (nil levels) between chunk levels
  574. // that are not nil.
  575. func (pc *PyramidChunker) cleanChunkLevels() {
  576. for i, l := range pc.chunkLevel {
  577. if l == nil {
  578. pc.chunkLevel = append(pc.chunkLevel[:i], append(pc.chunkLevel[i+1:], nil)...)
  579. }
  580. }
  581. }