pyramid.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "context"
  19. "encoding/binary"
  20. "errors"
  21. "io"
  22. "io/ioutil"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/swarm/chunk"
  26. "github.com/ethereum/go-ethereum/swarm/log"
  27. )
  28. /*
  29. The main idea of a pyramid chunker is to process the input data without knowing the entire size apriori.
  30. For this to be achieved, the chunker tree is built from the ground up until the data is exhausted.
  31. This opens up new aveneus such as easy append and other sort of modifications to the tree thereby avoiding
  32. duplication of data chunks.
  33. Below is an example of a two level chunks tree. The leaf chunks are called data chunks and all the above
  34. chunks are called tree chunks. The tree chunk above data chunks is level 0 and so on until it reaches
  35. the root tree chunk.
  36. T10 <- Tree chunk lvl1
  37. |
  38. __________________________|_____________________________
  39. / | | \
  40. / | \ \
  41. __T00__ ___T01__ ___T02__ ___T03__ <- Tree chunks lvl 0
  42. / / \ / / \ / / \ / / \
  43. / / \ / / \ / / \ / / \
  44. D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 <- Data Chunks
  45. The split function continuously read the data and creates data chunks and send them to storage.
  46. When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree
  47. entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal
  48. is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one
  49. tree entry is present in certain level. The key of tree entry is given out as the rootAddress of the file.
  50. */
  51. var (
  52. errLoadingTreeRootChunk = errors.New("LoadTree Error: Could not load root chunk")
  53. errLoadingTreeChunk = errors.New("LoadTree Error: Could not load chunk")
  54. )
  55. const (
  56. ChunkProcessors = 8
  57. splitTimeout = time.Minute * 5
  58. )
  59. type PyramidSplitterParams struct {
  60. SplitterParams
  61. getter Getter
  62. }
  63. func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, getter Getter, chunkSize int64) *PyramidSplitterParams {
  64. hashSize := putter.RefSize()
  65. return &PyramidSplitterParams{
  66. SplitterParams: SplitterParams{
  67. ChunkerParams: ChunkerParams{
  68. chunkSize: chunkSize,
  69. hashSize: hashSize,
  70. },
  71. reader: reader,
  72. putter: putter,
  73. addr: addr,
  74. },
  75. getter: getter,
  76. }
  77. }
  78. /*
  79. When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes.
  80. New chunks to store are store using the putter which the caller provides.
  81. */
  82. func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) {
  83. return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize), tag).Split(ctx)
  84. }
  85. func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) {
  86. return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize), tag).Append(ctx)
  87. }
  88. // Entry to create a tree node
  89. type TreeEntry struct {
  90. level int
  91. branchCount int64
  92. subtreeSize uint64
  93. chunk []byte
  94. key []byte
  95. index int // used in append to indicate the index of existing tree entry
  96. updatePending bool // indicates if the entry is loaded from existing tree
  97. }
  98. func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry {
  99. return &TreeEntry{
  100. level: 0,
  101. branchCount: 0,
  102. subtreeSize: 0,
  103. chunk: make([]byte, pyramid.chunkSize+8),
  104. key: make([]byte, pyramid.hashSize),
  105. index: 0,
  106. updatePending: false,
  107. }
  108. }
  109. // Used by the hash processor to create a data/tree chunk and send to storage
  110. type chunkJob struct {
  111. key Address
  112. chunk []byte
  113. parentWg *sync.WaitGroup
  114. }
  115. type PyramidChunker struct {
  116. chunkSize int64
  117. hashSize int64
  118. branches int64
  119. reader io.Reader
  120. putter Putter
  121. getter Getter
  122. key Address
  123. tag *chunk.Tag
  124. workerCount int64
  125. workerLock sync.RWMutex
  126. jobC chan *chunkJob
  127. wg *sync.WaitGroup
  128. errC chan error
  129. quitC chan bool
  130. rootAddress []byte
  131. chunkLevel [][]*TreeEntry
  132. }
  133. func NewPyramidSplitter(params *PyramidSplitterParams, tag *chunk.Tag) (pc *PyramidChunker) {
  134. pc = &PyramidChunker{}
  135. pc.reader = params.reader
  136. pc.hashSize = params.hashSize
  137. pc.branches = params.chunkSize / pc.hashSize
  138. pc.chunkSize = pc.hashSize * pc.branches
  139. pc.putter = params.putter
  140. pc.getter = params.getter
  141. pc.key = params.addr
  142. pc.tag = tag
  143. pc.workerCount = 0
  144. pc.jobC = make(chan *chunkJob, 2*ChunkProcessors)
  145. pc.wg = &sync.WaitGroup{}
  146. pc.errC = make(chan error)
  147. pc.quitC = make(chan bool)
  148. pc.rootAddress = make([]byte, pc.hashSize)
  149. pc.chunkLevel = make([][]*TreeEntry, pc.branches)
  150. return
  151. }
  152. func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader {
  153. return &LazyChunkReader{
  154. addr: addr,
  155. depth: depth,
  156. chunkSize: pc.chunkSize,
  157. branches: pc.branches,
  158. hashSize: pc.hashSize,
  159. getter: getter,
  160. }
  161. }
  162. func (pc *PyramidChunker) incrementWorkerCount() {
  163. pc.workerLock.Lock()
  164. defer pc.workerLock.Unlock()
  165. pc.workerCount += 1
  166. }
  167. func (pc *PyramidChunker) getWorkerCount() int64 {
  168. pc.workerLock.Lock()
  169. defer pc.workerLock.Unlock()
  170. return pc.workerCount
  171. }
  172. func (pc *PyramidChunker) decrementWorkerCount() {
  173. pc.workerLock.Lock()
  174. defer pc.workerLock.Unlock()
  175. pc.workerCount -= 1
  176. }
  177. func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
  178. pc.wg.Add(1)
  179. pc.prepareChunks(ctx, false)
  180. // closes internal error channel if all subprocesses in the workgroup finished
  181. go func() {
  182. // waiting for all chunks to finish
  183. pc.wg.Wait()
  184. //We close errC here because this is passed down to 8 parallel routines underneath.
  185. // if a error happens in one of them.. that particular routine raises error...
  186. // once they all complete successfully, the control comes back and we can safely close this here.
  187. close(pc.errC)
  188. }()
  189. defer close(pc.quitC)
  190. defer pc.putter.Close()
  191. select {
  192. case err := <-pc.errC:
  193. if err != nil {
  194. return nil, nil, err
  195. }
  196. case <-ctx.Done():
  197. _ = pc.putter.Wait(ctx) //???
  198. return nil, nil, ctx.Err()
  199. }
  200. return pc.rootAddress, pc.putter.Wait, nil
  201. }
  202. func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
  203. // Load the right most unfinished tree chunks in every level
  204. pc.loadTree(ctx)
  205. pc.wg.Add(1)
  206. pc.prepareChunks(ctx, true)
  207. // closes internal error channel if all subprocesses in the workgroup finished
  208. go func() {
  209. // waiting for all chunks to finish
  210. pc.wg.Wait()
  211. close(pc.errC)
  212. }()
  213. defer close(pc.quitC)
  214. defer pc.putter.Close()
  215. select {
  216. case err := <-pc.errC:
  217. if err != nil {
  218. return nil, nil, err
  219. }
  220. case <-time.NewTimer(splitTimeout).C:
  221. }
  222. return pc.rootAddress, pc.putter.Wait, nil
  223. }
  224. func (pc *PyramidChunker) processor(ctx context.Context, id int64) {
  225. defer pc.decrementWorkerCount()
  226. for {
  227. select {
  228. case job, ok := <-pc.jobC:
  229. if !ok {
  230. return
  231. }
  232. pc.processChunk(ctx, id, job)
  233. pc.tag.Inc(chunk.StateSplit)
  234. case <-pc.quitC:
  235. return
  236. }
  237. }
  238. }
  239. func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) {
  240. ref, err := pc.putter.Put(ctx, job.chunk)
  241. if err != nil {
  242. select {
  243. case pc.errC <- err:
  244. case <-pc.quitC:
  245. }
  246. }
  247. // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
  248. copy(job.key, ref)
  249. // send off new chunk to storage
  250. job.parentWg.Done()
  251. }
  252. func (pc *PyramidChunker) loadTree(ctx context.Context) error {
  253. // Get the root chunk to get the total size
  254. chunkData, err := pc.getter.Get(ctx, Reference(pc.key))
  255. if err != nil {
  256. return errLoadingTreeRootChunk
  257. }
  258. chunkSize := int64(chunkData.Size())
  259. log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize)
  260. //if data size is less than a chunk... add a parent with update as pending
  261. if chunkSize <= pc.chunkSize {
  262. newEntry := &TreeEntry{
  263. level: 0,
  264. branchCount: 1,
  265. subtreeSize: uint64(chunkSize),
  266. chunk: make([]byte, pc.chunkSize+8),
  267. key: make([]byte, pc.hashSize),
  268. index: 0,
  269. updatePending: true,
  270. }
  271. copy(newEntry.chunk[8:], pc.key)
  272. pc.chunkLevel[0] = append(pc.chunkLevel[0], newEntry)
  273. return nil
  274. }
  275. var treeSize int64
  276. var depth int
  277. treeSize = pc.chunkSize
  278. for ; treeSize < chunkSize; treeSize *= pc.branches {
  279. depth++
  280. }
  281. log.Trace("pyramid.chunker", "depth", depth)
  282. // Add the root chunk entry
  283. branchCount := int64(len(chunkData)-8) / pc.hashSize
  284. newEntry := &TreeEntry{
  285. level: depth - 1,
  286. branchCount: branchCount,
  287. subtreeSize: uint64(chunkSize),
  288. chunk: chunkData,
  289. key: pc.key,
  290. index: 0,
  291. updatePending: true,
  292. }
  293. pc.chunkLevel[depth-1] = append(pc.chunkLevel[depth-1], newEntry)
  294. // Add the rest of the tree
  295. for lvl := depth - 1; lvl >= 1; lvl-- {
  296. //TODO(jmozah): instead of loading finished branches and then trim in the end,
  297. //avoid loading them in the first place
  298. for _, ent := range pc.chunkLevel[lvl] {
  299. branchCount = int64(len(ent.chunk)-8) / pc.hashSize
  300. for i := int64(0); i < branchCount; i++ {
  301. key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)]
  302. newChunkData, err := pc.getter.Get(ctx, Reference(key))
  303. if err != nil {
  304. return errLoadingTreeChunk
  305. }
  306. newChunkSize := newChunkData.Size()
  307. bewBranchCount := int64(len(newChunkData)-8) / pc.hashSize
  308. newEntry := &TreeEntry{
  309. level: lvl - 1,
  310. branchCount: bewBranchCount,
  311. subtreeSize: newChunkSize,
  312. chunk: newChunkData,
  313. key: key,
  314. index: 0,
  315. updatePending: true,
  316. }
  317. pc.chunkLevel[lvl-1] = append(pc.chunkLevel[lvl-1], newEntry)
  318. }
  319. // We need to get only the right most unfinished branch.. so trim all finished branches
  320. if int64(len(pc.chunkLevel[lvl-1])) >= pc.branches {
  321. pc.chunkLevel[lvl-1] = nil
  322. }
  323. }
  324. }
  325. return nil
  326. }
  327. func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) {
  328. defer pc.wg.Done()
  329. chunkWG := &sync.WaitGroup{}
  330. pc.incrementWorkerCount()
  331. go pc.processor(ctx, pc.workerCount)
  332. parent := NewTreeEntry(pc)
  333. var unfinishedChunkData ChunkData
  334. var unfinishedChunkSize uint64
  335. if isAppend && len(pc.chunkLevel[0]) != 0 {
  336. lastIndex := len(pc.chunkLevel[0]) - 1
  337. ent := pc.chunkLevel[0][lastIndex]
  338. if ent.branchCount < pc.branches {
  339. parent = &TreeEntry{
  340. level: 0,
  341. branchCount: ent.branchCount,
  342. subtreeSize: ent.subtreeSize,
  343. chunk: ent.chunk,
  344. key: ent.key,
  345. index: lastIndex,
  346. updatePending: true,
  347. }
  348. lastBranch := parent.branchCount - 1
  349. lastAddress := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize]
  350. var err error
  351. unfinishedChunkData, err = pc.getter.Get(ctx, lastAddress)
  352. if err != nil {
  353. pc.errC <- err
  354. }
  355. unfinishedChunkSize = unfinishedChunkData.Size()
  356. if unfinishedChunkSize < uint64(pc.chunkSize) {
  357. parent.subtreeSize = parent.subtreeSize - unfinishedChunkSize
  358. parent.branchCount = parent.branchCount - 1
  359. } else {
  360. unfinishedChunkData = nil
  361. }
  362. }
  363. }
  364. for index := 0; ; index++ {
  365. var err error
  366. chunkData := make([]byte, pc.chunkSize+8)
  367. var readBytes int
  368. if unfinishedChunkData != nil {
  369. copy(chunkData, unfinishedChunkData)
  370. readBytes += int(unfinishedChunkSize)
  371. unfinishedChunkData = nil
  372. log.Trace("pyramid.chunker: found unfinished chunk", "readBytes", readBytes)
  373. }
  374. var res []byte
  375. res, err = ioutil.ReadAll(io.LimitReader(pc.reader, int64(len(chunkData)-(8+readBytes))))
  376. // hack for ioutil.ReadAll:
  377. // a successful call to ioutil.ReadAll returns err == nil, not err == EOF, whereas we
  378. // want to propagate the io.EOF error
  379. if len(res) == 0 && err == nil {
  380. err = io.EOF
  381. }
  382. copy(chunkData[8+readBytes:], res)
  383. readBytes += len(res)
  384. log.Trace("pyramid.chunker: copied all data", "readBytes", readBytes)
  385. if err != nil {
  386. if err == io.EOF || err == io.ErrUnexpectedEOF {
  387. pc.cleanChunkLevels()
  388. // Check if we are appending or the chunk is the only one.
  389. if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) {
  390. // Data is exactly one chunk.. pick the last chunk key as root
  391. chunkWG.Wait()
  392. lastChunksAddress := parent.chunk[8 : 8+pc.hashSize]
  393. copy(pc.rootAddress, lastChunksAddress)
  394. break
  395. }
  396. } else {
  397. close(pc.quitC)
  398. break
  399. }
  400. }
  401. // Data ended in chunk boundary.. just signal to start bulding tree
  402. if readBytes == 0 {
  403. pc.buildTree(isAppend, parent, chunkWG, true, nil)
  404. break
  405. } else {
  406. pkey := pc.enqueueDataChunk(chunkData, uint64(readBytes), parent, chunkWG)
  407. // update tree related parent data structures
  408. parent.subtreeSize += uint64(readBytes)
  409. parent.branchCount++
  410. // Data got exhausted... signal to send any parent tree related chunks
  411. if int64(readBytes) < pc.chunkSize {
  412. pc.cleanChunkLevels()
  413. // only one data chunk .. so dont add any parent chunk
  414. if parent.branchCount <= 1 {
  415. chunkWG.Wait()
  416. if isAppend || pc.depth() == 0 {
  417. // No need to build the tree if the depth is 0
  418. // or we are appending.
  419. // Just use the last key.
  420. copy(pc.rootAddress, pkey)
  421. } else {
  422. // We need to build the tree and and provide the lonely
  423. // chunk key to replace the last tree chunk key.
  424. pc.buildTree(isAppend, parent, chunkWG, true, pkey)
  425. }
  426. break
  427. }
  428. pc.buildTree(isAppend, parent, chunkWG, true, nil)
  429. break
  430. }
  431. if parent.branchCount == pc.branches {
  432. pc.buildTree(isAppend, parent, chunkWG, false, nil)
  433. parent = NewTreeEntry(pc)
  434. }
  435. }
  436. workers := pc.getWorkerCount()
  437. if int64(len(pc.jobC)) > workers && workers < ChunkProcessors {
  438. pc.incrementWorkerCount()
  439. go pc.processor(ctx, pc.workerCount)
  440. }
  441. }
  442. }
  443. func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync.WaitGroup, last bool, lonelyChunkKey []byte) {
  444. chunkWG.Wait()
  445. pc.enqueueTreeChunk(ent, chunkWG, last)
  446. compress := false
  447. endLvl := pc.branches
  448. for lvl := int64(0); lvl < pc.branches; lvl++ {
  449. lvlCount := int64(len(pc.chunkLevel[lvl]))
  450. if lvlCount >= pc.branches {
  451. endLvl = lvl + 1
  452. compress = true
  453. break
  454. }
  455. }
  456. if !compress && !last {
  457. return
  458. }
  459. // Wait for all the keys to be processed before compressing the tree
  460. chunkWG.Wait()
  461. for lvl := int64(ent.level); lvl < endLvl; lvl++ {
  462. lvlCount := int64(len(pc.chunkLevel[lvl]))
  463. if lvlCount == 1 && last {
  464. copy(pc.rootAddress, pc.chunkLevel[lvl][0].key)
  465. return
  466. }
  467. for startCount := int64(0); startCount < lvlCount; startCount += pc.branches {
  468. endCount := startCount + pc.branches
  469. if endCount > lvlCount {
  470. endCount = lvlCount
  471. }
  472. var nextLvlCount int64
  473. var tempEntry *TreeEntry
  474. if len(pc.chunkLevel[lvl+1]) > 0 {
  475. nextLvlCount = int64(len(pc.chunkLevel[lvl+1]) - 1)
  476. tempEntry = pc.chunkLevel[lvl+1][nextLvlCount]
  477. }
  478. if isAppend && tempEntry != nil && tempEntry.updatePending {
  479. updateEntry := &TreeEntry{
  480. level: int(lvl + 1),
  481. branchCount: 0,
  482. subtreeSize: 0,
  483. chunk: make([]byte, pc.chunkSize+8),
  484. key: make([]byte, pc.hashSize),
  485. index: int(nextLvlCount),
  486. updatePending: true,
  487. }
  488. for index := int64(0); index < lvlCount; index++ {
  489. updateEntry.branchCount++
  490. updateEntry.subtreeSize += pc.chunkLevel[lvl][index].subtreeSize
  491. copy(updateEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], pc.chunkLevel[lvl][index].key[:pc.hashSize])
  492. }
  493. pc.enqueueTreeChunk(updateEntry, chunkWG, last)
  494. } else {
  495. noOfBranches := endCount - startCount
  496. newEntry := &TreeEntry{
  497. level: int(lvl + 1),
  498. branchCount: noOfBranches,
  499. subtreeSize: 0,
  500. chunk: make([]byte, (noOfBranches*pc.hashSize)+8),
  501. key: make([]byte, pc.hashSize),
  502. index: int(nextLvlCount),
  503. updatePending: false,
  504. }
  505. index := int64(0)
  506. for i := startCount; i < endCount; i++ {
  507. entry := pc.chunkLevel[lvl][i]
  508. newEntry.subtreeSize += entry.subtreeSize
  509. copy(newEntry.chunk[8+(index*pc.hashSize):8+((index+1)*pc.hashSize)], entry.key[:pc.hashSize])
  510. index++
  511. }
  512. // Lonely chunk key is the key of the last chunk that is only one on the last branch.
  513. // In this case, ignore the its tree chunk key and replace it with the lonely chunk key.
  514. if lonelyChunkKey != nil {
  515. // Overwrite the last tree chunk key with the lonely data chunk key.
  516. copy(newEntry.chunk[int64(len(newEntry.chunk))-pc.hashSize:], lonelyChunkKey[:pc.hashSize])
  517. }
  518. pc.enqueueTreeChunk(newEntry, chunkWG, last)
  519. }
  520. }
  521. if !isAppend {
  522. chunkWG.Wait()
  523. if compress {
  524. pc.chunkLevel[lvl] = nil
  525. }
  526. }
  527. }
  528. }
  529. func (pc *PyramidChunker) enqueueTreeChunk(ent *TreeEntry, chunkWG *sync.WaitGroup, last bool) {
  530. if ent != nil && ent.branchCount > 0 {
  531. // wait for data chunks to get over before processing the tree chunk
  532. if last {
  533. chunkWG.Wait()
  534. }
  535. binary.LittleEndian.PutUint64(ent.chunk[:8], ent.subtreeSize)
  536. ent.key = make([]byte, pc.hashSize)
  537. chunkWG.Add(1)
  538. select {
  539. case pc.jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*pc.hashSize+8], chunkWG}:
  540. case <-pc.quitC:
  541. }
  542. // Update or append based on weather it is a new entry or being reused
  543. if ent.updatePending {
  544. chunkWG.Wait()
  545. pc.chunkLevel[ent.level][ent.index] = ent
  546. } else {
  547. pc.chunkLevel[ent.level] = append(pc.chunkLevel[ent.level], ent)
  548. }
  549. }
  550. }
  551. func (pc *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup) Address {
  552. binary.LittleEndian.PutUint64(chunkData[:8], size)
  553. pkey := parent.chunk[8+parent.branchCount*pc.hashSize : 8+(parent.branchCount+1)*pc.hashSize]
  554. chunkWG.Add(1)
  555. select {
  556. case pc.jobC <- &chunkJob{pkey, chunkData[:size+8], chunkWG}:
  557. case <-pc.quitC:
  558. }
  559. return pkey
  560. }
  561. // depth returns the number of chunk levels.
  562. // It is used to detect if there is only one data chunk
  563. // left for the last branch.
  564. func (pc *PyramidChunker) depth() (d int) {
  565. for _, l := range pc.chunkLevel {
  566. if l == nil {
  567. return
  568. }
  569. d++
  570. }
  571. return
  572. }
  573. // cleanChunkLevels removes gaps (nil levels) between chunk levels
  574. // that are not nil.
  575. func (pc *PyramidChunker) cleanChunkLevels() {
  576. for i, l := range pc.chunkLevel {
  577. if l == nil {
  578. pc.chunkLevel = append(pc.chunkLevel[:i], append(pc.chunkLevel[i+1:], nil)...)
  579. }
  580. }
  581. }