freezer_table.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "os"
  24. "path/filepath"
  25. "sync"
  26. "sync/atomic"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. "github.com/golang/snappy"
  31. )
  32. var (
  33. // errClosed is returned if an operation attempts to read from or write to the
  34. // freezer table after it has already been closed.
  35. errClosed = errors.New("closed")
  36. // errOutOfBounds is returned if the item requested is not contained within the
  37. // freezer table.
  38. errOutOfBounds = errors.New("out of bounds")
  39. // errNotSupported is returned if the database doesn't support the required operation.
  40. errNotSupported = errors.New("this operation is not supported")
  41. )
  42. // indexEntry contains the number/id of the file that the data resides in, as well as the
  43. // offset within the file to the end of the data.
  44. // In serialized form, the filenum is stored as uint16.
  45. type indexEntry struct {
  46. filenum uint32 // stored as uint16 ( 2 bytes )
  47. offset uint32 // stored as uint32 ( 4 bytes )
  48. }
  49. const indexEntrySize = 6
  50. // unmarshalBinary deserializes binary b into the rawIndex entry.
  51. func (i *indexEntry) unmarshalBinary(b []byte) {
  52. i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
  53. i.offset = binary.BigEndian.Uint32(b[2:6])
  54. }
  55. // append adds the encoded entry to the end of b.
  56. func (i *indexEntry) append(b []byte) []byte {
  57. offset := len(b)
  58. out := append(b, make([]byte, indexEntrySize)...)
  59. binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum))
  60. binary.BigEndian.PutUint32(out[offset+2:], i.offset)
  61. return out
  62. }
  63. // bounds returns the start- and end- offsets, and the file number of where to
  64. // read there data item marked by the two index entries. The two entries are
  65. // assumed to be sequential.
  66. func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
  67. if i.filenum != end.filenum {
  68. // If a piece of data 'crosses' a data-file,
  69. // it's actually in one piece on the second data-file.
  70. // We return a zero-indexEntry for the second file as start
  71. return 0, end.offset, end.filenum
  72. }
  73. return i.offset, end.offset, end.filenum
  74. }
  75. // freezerTable represents a single chained data table within the freezer (e.g. blocks).
  76. // It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
  77. // file (uncompressed 64 bit indices into the data file).
  78. type freezerTable struct {
  79. // WARNING: The `items` field is accessed atomically. On 32 bit platforms, only
  80. // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
  81. // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
  82. items uint64 // Number of items stored in the table (including items removed from tail)
  83. itemOffset uint64 // Number of items removed from the table
  84. // itemHidden is the number of items marked as deleted. Tail deletion is
  85. // only supported at file level which means the actual deletion will be
  86. // delayed until the entire data file is marked as deleted. Before that
  87. // these items will be hidden to prevent being visited again. The value
  88. // should never be lower than itemOffset.
  89. itemHidden uint64
  90. noCompression bool // if true, disables snappy compression. Note: does not work retroactively
  91. readonly bool
  92. maxFileSize uint32 // Max file size for data-files
  93. name string
  94. path string
  95. head *os.File // File descriptor for the data head of the table
  96. index *os.File // File descriptor for the indexEntry file of the table
  97. meta *os.File // File descriptor for metadata of the table
  98. files map[uint32]*os.File // open files
  99. headId uint32 // number of the currently active head file
  100. tailId uint32 // number of the earliest file
  101. headBytes int64 // Number of bytes written to the head file
  102. readMeter metrics.Meter // Meter for measuring the effective amount of data read
  103. writeMeter metrics.Meter // Meter for measuring the effective amount of data written
  104. sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables
  105. logger log.Logger // Logger with database path and table name embedded
  106. lock sync.RWMutex // Mutex protecting the data file descriptors
  107. }
  108. // newFreezerTable opens the given path as a freezer table.
  109. func newFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerTable, error) {
  110. return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly)
  111. }
  112. // newTable opens a freezer table, creating the data and index files if they are
  113. // non-existent. Both files are truncated to the shortest common length to ensure
  114. // they don't go out of sync.
  115. func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
  116. // Ensure the containing directory exists and open the indexEntry file
  117. if err := os.MkdirAll(path, 0755); err != nil {
  118. return nil, err
  119. }
  120. var idxName string
  121. if noCompression {
  122. idxName = fmt.Sprintf("%s.ridx", name) // raw index file
  123. } else {
  124. idxName = fmt.Sprintf("%s.cidx", name) // compressed index file
  125. }
  126. var (
  127. err error
  128. index *os.File
  129. meta *os.File
  130. )
  131. if readonly {
  132. // Will fail if table doesn't exist
  133. index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName))
  134. if err != nil {
  135. return nil, err
  136. }
  137. // TODO(rjl493456442) change it to read-only mode. Open the metadata file
  138. // in rw mode. It's a temporary solution for now and should be changed
  139. // whenever the tail deletion is actually used. The reason for this hack is
  140. // the additional meta file for each freezer table is added in order to support
  141. // tail deletion, but for most legacy nodes this file is missing. This check
  142. // will suddenly break lots of database relevant commands. So the metadata file
  143. // is always opened for mutation and nothing else will be written except
  144. // the initialization.
  145. meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name)))
  146. if err != nil {
  147. return nil, err
  148. }
  149. } else {
  150. index, err = openFreezerFileForAppend(filepath.Join(path, idxName))
  151. if err != nil {
  152. return nil, err
  153. }
  154. meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name)))
  155. if err != nil {
  156. return nil, err
  157. }
  158. }
  159. // Create the table and repair any past inconsistency
  160. tab := &freezerTable{
  161. index: index,
  162. meta: meta,
  163. files: make(map[uint32]*os.File),
  164. readMeter: readMeter,
  165. writeMeter: writeMeter,
  166. sizeGauge: sizeGauge,
  167. name: name,
  168. path: path,
  169. logger: log.New("database", path, "table", name),
  170. noCompression: noCompression,
  171. readonly: readonly,
  172. maxFileSize: maxFilesize,
  173. }
  174. if err := tab.repair(); err != nil {
  175. tab.Close()
  176. return nil, err
  177. }
  178. // Initialize the starting size counter
  179. size, err := tab.sizeNolock()
  180. if err != nil {
  181. tab.Close()
  182. return nil, err
  183. }
  184. tab.sizeGauge.Inc(int64(size))
  185. return tab, nil
  186. }
  187. // repair cross-checks the head and the index file and truncates them to
  188. // be in sync with each other after a potential crash / data loss.
  189. func (t *freezerTable) repair() error {
  190. // Create a temporary offset buffer to init files with and read indexEntry into
  191. buffer := make([]byte, indexEntrySize)
  192. // If we've just created the files, initialize the index with the 0 indexEntry
  193. stat, err := t.index.Stat()
  194. if err != nil {
  195. return err
  196. }
  197. if stat.Size() == 0 {
  198. if _, err := t.index.Write(buffer); err != nil {
  199. return err
  200. }
  201. }
  202. // Ensure the index is a multiple of indexEntrySize bytes
  203. if overflow := stat.Size() % indexEntrySize; overflow != 0 {
  204. truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path
  205. }
  206. // Retrieve the file sizes and prepare for truncation
  207. if stat, err = t.index.Stat(); err != nil {
  208. return err
  209. }
  210. offsetsSize := stat.Size()
  211. // Open the head file
  212. var (
  213. firstIndex indexEntry
  214. lastIndex indexEntry
  215. contentSize int64
  216. contentExp int64
  217. )
  218. // Read index zero, determine what file is the earliest
  219. // and what item offset to use
  220. t.index.ReadAt(buffer, 0)
  221. firstIndex.unmarshalBinary(buffer)
  222. // Assign the tail fields with the first stored index.
  223. // The total removed items is represented with an uint32,
  224. // which is not enough in theory but enough in practice.
  225. // TODO: use uint64 to represent total removed items.
  226. t.tailId = firstIndex.filenum
  227. t.itemOffset = uint64(firstIndex.offset)
  228. // Load metadata from the file
  229. meta, err := loadMetadata(t.meta, t.itemOffset)
  230. if err != nil {
  231. return err
  232. }
  233. t.itemHidden = meta.VirtualTail
  234. // Read the last index, use the default value in case the freezer is empty
  235. if offsetsSize == indexEntrySize {
  236. lastIndex = indexEntry{filenum: t.tailId, offset: 0}
  237. } else {
  238. t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
  239. lastIndex.unmarshalBinary(buffer)
  240. }
  241. if t.readonly {
  242. t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly)
  243. } else {
  244. t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend)
  245. }
  246. if err != nil {
  247. return err
  248. }
  249. if stat, err = t.head.Stat(); err != nil {
  250. return err
  251. }
  252. contentSize = stat.Size()
  253. // Keep truncating both files until they come in sync
  254. contentExp = int64(lastIndex.offset)
  255. for contentExp != contentSize {
  256. // Truncate the head file to the last offset pointer
  257. if contentExp < contentSize {
  258. t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
  259. if err := truncateFreezerFile(t.head, contentExp); err != nil {
  260. return err
  261. }
  262. contentSize = contentExp
  263. }
  264. // Truncate the index to point within the head file
  265. if contentExp > contentSize {
  266. t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
  267. if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
  268. return err
  269. }
  270. offsetsSize -= indexEntrySize
  271. // Read the new head index, use the default value in case
  272. // the freezer is already empty.
  273. var newLastIndex indexEntry
  274. if offsetsSize == indexEntrySize {
  275. newLastIndex = indexEntry{filenum: t.tailId, offset: 0}
  276. } else {
  277. t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
  278. newLastIndex.unmarshalBinary(buffer)
  279. }
  280. // We might have slipped back into an earlier head-file here
  281. if newLastIndex.filenum != lastIndex.filenum {
  282. // Release earlier opened file
  283. t.releaseFile(lastIndex.filenum)
  284. if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil {
  285. return err
  286. }
  287. if stat, err = t.head.Stat(); err != nil {
  288. // TODO, anything more we can do here?
  289. // A data file has gone missing...
  290. return err
  291. }
  292. contentSize = stat.Size()
  293. }
  294. lastIndex = newLastIndex
  295. contentExp = int64(lastIndex.offset)
  296. }
  297. }
  298. // Sync() fails for read-only files on windows.
  299. if !t.readonly {
  300. // Ensure all reparation changes have been written to disk
  301. if err := t.index.Sync(); err != nil {
  302. return err
  303. }
  304. if err := t.head.Sync(); err != nil {
  305. return err
  306. }
  307. if err := t.meta.Sync(); err != nil {
  308. return err
  309. }
  310. }
  311. // Update the item and byte counters and return
  312. t.items = t.itemOffset + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
  313. t.headBytes = contentSize
  314. t.headId = lastIndex.filenum
  315. // Delete the leftover files because of head deletion
  316. t.releaseFilesAfter(t.headId, true)
  317. // Delete the leftover files because of tail deletion
  318. t.releaseFilesBefore(t.tailId, true)
  319. // Close opened files and preopen all files
  320. if err := t.preopen(); err != nil {
  321. return err
  322. }
  323. t.logger.Debug("Chain freezer table opened", "items", t.items, "size", common.StorageSize(t.headBytes))
  324. return nil
  325. }
  326. // preopen opens all files that the freezer will need. This method should be called from an init-context,
  327. // since it assumes that it doesn't have to bother with locking
  328. // The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
  329. // obtain a write-lock within Retrieve.
  330. func (t *freezerTable) preopen() (err error) {
  331. // The repair might have already opened (some) files
  332. t.releaseFilesAfter(0, false)
  333. // Open all except head in RDONLY
  334. for i := t.tailId; i < t.headId; i++ {
  335. if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil {
  336. return err
  337. }
  338. }
  339. if t.readonly {
  340. t.head, err = t.openFile(t.headId, openFreezerFileForReadOnly)
  341. } else {
  342. // Open head in read/write
  343. t.head, err = t.openFile(t.headId, openFreezerFileForAppend)
  344. }
  345. return err
  346. }
  347. // truncateHead discards any recent data above the provided threshold number.
  348. func (t *freezerTable) truncateHead(items uint64) error {
  349. t.lock.Lock()
  350. defer t.lock.Unlock()
  351. // Ensure the given truncate target falls in the correct range
  352. existing := atomic.LoadUint64(&t.items)
  353. if existing <= items {
  354. return nil
  355. }
  356. if items < atomic.LoadUint64(&t.itemHidden) {
  357. return errors.New("truncation below tail")
  358. }
  359. // We need to truncate, save the old size for metrics tracking
  360. oldSize, err := t.sizeNolock()
  361. if err != nil {
  362. return err
  363. }
  364. // Something's out of sync, truncate the table's offset index
  365. log := t.logger.Debug
  366. if existing > items+1 {
  367. log = t.logger.Warn // Only loud warn if we delete multiple items
  368. }
  369. log("Truncating freezer table", "items", existing, "limit", items)
  370. // Truncate the index file first, the tail position is also considered
  371. // when calculating the new freezer table length.
  372. length := items - atomic.LoadUint64(&t.itemOffset)
  373. if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
  374. return err
  375. }
  376. // Calculate the new expected size of the data file and truncate it
  377. var expected indexEntry
  378. if length == 0 {
  379. expected = indexEntry{filenum: t.tailId, offset: 0}
  380. } else {
  381. buffer := make([]byte, indexEntrySize)
  382. if _, err := t.index.ReadAt(buffer, int64(length*indexEntrySize)); err != nil {
  383. return err
  384. }
  385. expected.unmarshalBinary(buffer)
  386. }
  387. // We might need to truncate back to older files
  388. if expected.filenum != t.headId {
  389. // If already open for reading, force-reopen for writing
  390. t.releaseFile(expected.filenum)
  391. newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend)
  392. if err != nil {
  393. return err
  394. }
  395. // Release any files _after the current head -- both the previous head
  396. // and any files which may have been opened for reading
  397. t.releaseFilesAfter(expected.filenum, true)
  398. // Set back the historic head
  399. t.head = newHead
  400. t.headId = expected.filenum
  401. }
  402. if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil {
  403. return err
  404. }
  405. // All data files truncated, set internal counters and return
  406. t.headBytes = int64(expected.offset)
  407. atomic.StoreUint64(&t.items, items)
  408. // Retrieve the new size and update the total size counter
  409. newSize, err := t.sizeNolock()
  410. if err != nil {
  411. return err
  412. }
  413. t.sizeGauge.Dec(int64(oldSize - newSize))
  414. return nil
  415. }
  416. // truncateTail discards any recent data before the provided threshold number.
  417. func (t *freezerTable) truncateTail(items uint64) error {
  418. t.lock.Lock()
  419. defer t.lock.Unlock()
  420. // Ensure the given truncate target falls in the correct range
  421. if atomic.LoadUint64(&t.itemHidden) >= items {
  422. return nil
  423. }
  424. if atomic.LoadUint64(&t.items) < items {
  425. return errors.New("truncation above head")
  426. }
  427. // Load the new tail index by the given new tail position
  428. var (
  429. newTailId uint32
  430. buffer = make([]byte, indexEntrySize)
  431. )
  432. if atomic.LoadUint64(&t.items) == items {
  433. newTailId = t.headId
  434. } else {
  435. offset := items - atomic.LoadUint64(&t.itemOffset)
  436. if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil {
  437. return err
  438. }
  439. var newTail indexEntry
  440. newTail.unmarshalBinary(buffer)
  441. newTailId = newTail.filenum
  442. }
  443. // Update the virtual tail marker and hidden these entries in table.
  444. atomic.StoreUint64(&t.itemHidden, items)
  445. if err := writeMetadata(t.meta, newMetadata(items)); err != nil {
  446. return err
  447. }
  448. // Hidden items still fall in the current tail file, no data file
  449. // can be dropped.
  450. if t.tailId == newTailId {
  451. return nil
  452. }
  453. // Hidden items fall in the incorrect range, returns the error.
  454. if t.tailId > newTailId {
  455. return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId)
  456. }
  457. // Hidden items exceed the current tail file, drop the relevant
  458. // data files. We need to truncate, save the old size for metrics
  459. // tracking.
  460. oldSize, err := t.sizeNolock()
  461. if err != nil {
  462. return err
  463. }
  464. // Count how many items can be deleted from the file.
  465. var (
  466. newDeleted = items
  467. deleted = atomic.LoadUint64(&t.itemOffset)
  468. )
  469. for current := items - 1; current >= deleted; current -= 1 {
  470. if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil {
  471. return err
  472. }
  473. var pre indexEntry
  474. pre.unmarshalBinary(buffer)
  475. if pre.filenum != newTailId {
  476. break
  477. }
  478. newDeleted = current
  479. }
  480. // Commit the changes of metadata file first before manipulating
  481. // the indexes file.
  482. if err := t.meta.Sync(); err != nil {
  483. return err
  484. }
  485. // Truncate the deleted index entries from the index file.
  486. err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error {
  487. tailIndex := indexEntry{
  488. filenum: newTailId,
  489. offset: uint32(newDeleted),
  490. }
  491. _, err := f.Write(tailIndex.append(nil))
  492. return err
  493. })
  494. if err != nil {
  495. return err
  496. }
  497. // Reopen the modified index file to load the changes
  498. if err := t.index.Close(); err != nil {
  499. return err
  500. }
  501. t.index, err = openFreezerFileForAppend(t.index.Name())
  502. if err != nil {
  503. return err
  504. }
  505. // Release any files before the current tail
  506. t.tailId = newTailId
  507. atomic.StoreUint64(&t.itemOffset, newDeleted)
  508. t.releaseFilesBefore(t.tailId, true)
  509. // Retrieve the new size and update the total size counter
  510. newSize, err := t.sizeNolock()
  511. if err != nil {
  512. return err
  513. }
  514. t.sizeGauge.Dec(int64(oldSize - newSize))
  515. return nil
  516. }
  517. // Close closes all opened files.
  518. func (t *freezerTable) Close() error {
  519. t.lock.Lock()
  520. defer t.lock.Unlock()
  521. var errs []error
  522. if err := t.index.Close(); err != nil {
  523. errs = append(errs, err)
  524. }
  525. t.index = nil
  526. if err := t.meta.Close(); err != nil {
  527. errs = append(errs, err)
  528. }
  529. t.meta = nil
  530. for _, f := range t.files {
  531. if err := f.Close(); err != nil {
  532. errs = append(errs, err)
  533. }
  534. }
  535. t.head = nil
  536. if errs != nil {
  537. return fmt.Errorf("%v", errs)
  538. }
  539. return nil
  540. }
  541. // openFile assumes that the write-lock is held by the caller
  542. func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) {
  543. var exist bool
  544. if f, exist = t.files[num]; !exist {
  545. var name string
  546. if t.noCompression {
  547. name = fmt.Sprintf("%s.%04d.rdat", t.name, num)
  548. } else {
  549. name = fmt.Sprintf("%s.%04d.cdat", t.name, num)
  550. }
  551. f, err = opener(filepath.Join(t.path, name))
  552. if err != nil {
  553. return nil, err
  554. }
  555. t.files[num] = f
  556. }
  557. return f, err
  558. }
  559. // releaseFile closes a file, and removes it from the open file cache.
  560. // Assumes that the caller holds the write lock
  561. func (t *freezerTable) releaseFile(num uint32) {
  562. if f, exist := t.files[num]; exist {
  563. delete(t.files, num)
  564. f.Close()
  565. }
  566. }
  567. // releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files
  568. func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
  569. for fnum, f := range t.files {
  570. if fnum > num {
  571. delete(t.files, fnum)
  572. f.Close()
  573. if remove {
  574. os.Remove(f.Name())
  575. }
  576. }
  577. }
  578. }
  579. // releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files
  580. func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) {
  581. for fnum, f := range t.files {
  582. if fnum < num {
  583. delete(t.files, fnum)
  584. f.Close()
  585. if remove {
  586. os.Remove(f.Name())
  587. }
  588. }
  589. }
  590. }
  591. // getIndices returns the index entries for the given from-item, covering 'count' items.
  592. // N.B: The actual number of returned indices for N items will always be N+1 (unless an
  593. // error is returned).
  594. // OBS: This method assumes that the caller has already verified (and/or trimmed) the range
  595. // so that the items are within bounds. If this method is used to read out of bounds,
  596. // it will return error.
  597. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) {
  598. // Apply the table-offset
  599. from = from - t.itemOffset
  600. // For reading N items, we need N+1 indices.
  601. buffer := make([]byte, (count+1)*indexEntrySize)
  602. if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil {
  603. return nil, err
  604. }
  605. var (
  606. indices []*indexEntry
  607. offset int
  608. )
  609. for i := from; i <= from+count; i++ {
  610. index := new(indexEntry)
  611. index.unmarshalBinary(buffer[offset:])
  612. offset += indexEntrySize
  613. indices = append(indices, index)
  614. }
  615. if from == 0 {
  616. // Special case if we're reading the first item in the freezer. We assume that
  617. // the first item always start from zero(regarding the deletion, we
  618. // only support deletion by files, so that the assumption is held).
  619. // This means we can use the first item metadata to carry information about
  620. // the 'global' offset, for the deletion-case
  621. indices[0].offset = 0
  622. indices[0].filenum = indices[1].filenum
  623. }
  624. return indices, nil
  625. }
  626. // Retrieve looks up the data offset of an item with the given number and retrieves
  627. // the raw binary blob from the data file.
  628. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
  629. items, err := t.RetrieveItems(item, 1, 0)
  630. if err != nil {
  631. return nil, err
  632. }
  633. return items[0], nil
  634. }
  635. // RetrieveItems returns multiple items in sequence, starting from the index 'start'.
  636. // It will return at most 'max' items, but will abort earlier to respect the
  637. // 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one
  638. // item, it _will_ return one element and possibly overflow the maxBytes.
  639. func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) {
  640. // First we read the 'raw' data, which might be compressed.
  641. diskData, sizes, err := t.retrieveItems(start, count, maxBytes)
  642. if err != nil {
  643. return nil, err
  644. }
  645. var (
  646. output = make([][]byte, 0, count)
  647. offset int // offset for reading
  648. outputSize int // size of uncompressed data
  649. )
  650. // Now slice up the data and decompress.
  651. for i, diskSize := range sizes {
  652. item := diskData[offset : offset+diskSize]
  653. offset += diskSize
  654. decompressedSize := diskSize
  655. if !t.noCompression {
  656. decompressedSize, _ = snappy.DecodedLen(item)
  657. }
  658. if i > 0 && uint64(outputSize+decompressedSize) > maxBytes {
  659. break
  660. }
  661. if !t.noCompression {
  662. data, err := snappy.Decode(nil, item)
  663. if err != nil {
  664. return nil, err
  665. }
  666. output = append(output, data)
  667. } else {
  668. output = append(output, item)
  669. }
  670. outputSize += decompressedSize
  671. }
  672. return output, nil
  673. }
  674. // retrieveItems reads up to 'count' items from the table. It reads at least
  675. // one item, but otherwise avoids reading more than maxBytes bytes.
  676. // It returns the (potentially compressed) data, and the sizes.
  677. func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) {
  678. t.lock.RLock()
  679. defer t.lock.RUnlock()
  680. // Ensure the table and the item are accessible
  681. if t.index == nil || t.head == nil {
  682. return nil, nil, errClosed
  683. }
  684. var (
  685. items = atomic.LoadUint64(&t.items) // the total items(head + 1)
  686. hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items
  687. )
  688. // Ensure the start is written, not deleted from the tail, and that the
  689. // caller actually wants something
  690. if items <= start || hidden > start || count == 0 {
  691. return nil, nil, errOutOfBounds
  692. }
  693. if start+count > items {
  694. count = items - start
  695. }
  696. var (
  697. output = make([]byte, maxBytes) // Buffer to read data into
  698. outputSize int // Used size of that buffer
  699. )
  700. // readData is a helper method to read a single data item from disk.
  701. readData := func(fileId, start uint32, length int) error {
  702. // In case a small limit is used, and the elements are large, may need to
  703. // realloc the read-buffer when reading the first (and only) item.
  704. if len(output) < length {
  705. output = make([]byte, length)
  706. }
  707. dataFile, exist := t.files[fileId]
  708. if !exist {
  709. return fmt.Errorf("missing data file %d", fileId)
  710. }
  711. if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil {
  712. return err
  713. }
  714. outputSize += length
  715. return nil
  716. }
  717. // Read all the indexes in one go
  718. indices, err := t.getIndices(start, count)
  719. if err != nil {
  720. return nil, nil, err
  721. }
  722. var (
  723. sizes []int // The sizes for each element
  724. totalSize = 0 // The total size of all data read so far
  725. readStart = indices[0].offset // Where, in the file, to start reading
  726. unreadSize = 0 // The size of the as-yet-unread data
  727. )
  728. for i, firstIndex := range indices[:len(indices)-1] {
  729. secondIndex := indices[i+1]
  730. // Determine the size of the item.
  731. offset1, offset2, _ := firstIndex.bounds(secondIndex)
  732. size := int(offset2 - offset1)
  733. // Crossing a file boundary?
  734. if secondIndex.filenum != firstIndex.filenum {
  735. // If we have unread data in the first file, we need to do that read now.
  736. if unreadSize > 0 {
  737. if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil {
  738. return nil, nil, err
  739. }
  740. unreadSize = 0
  741. }
  742. readStart = 0
  743. }
  744. if i > 0 && uint64(totalSize+size) > maxBytes {
  745. // About to break out due to byte limit being exceeded. We don't
  746. // read this last item, but we need to do the deferred reads now.
  747. if unreadSize > 0 {
  748. if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
  749. return nil, nil, err
  750. }
  751. }
  752. break
  753. }
  754. // Defer the read for later
  755. unreadSize += size
  756. totalSize += size
  757. sizes = append(sizes, size)
  758. if i == len(indices)-2 || uint64(totalSize) > maxBytes {
  759. // Last item, need to do the read now
  760. if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
  761. return nil, nil, err
  762. }
  763. break
  764. }
  765. }
  766. return output[:outputSize], sizes, nil
  767. }
  768. // has returns an indicator whether the specified number data is still accessible
  769. // in the freezer table.
  770. func (t *freezerTable) has(number uint64) bool {
  771. return atomic.LoadUint64(&t.items) > number && atomic.LoadUint64(&t.itemHidden) <= number
  772. }
  773. // size returns the total data size in the freezer table.
  774. func (t *freezerTable) size() (uint64, error) {
  775. t.lock.RLock()
  776. defer t.lock.RUnlock()
  777. return t.sizeNolock()
  778. }
  779. // sizeNolock returns the total data size in the freezer table without obtaining
  780. // the mutex first.
  781. func (t *freezerTable) sizeNolock() (uint64, error) {
  782. stat, err := t.index.Stat()
  783. if err != nil {
  784. return 0, err
  785. }
  786. total := uint64(t.maxFileSize)*uint64(t.headId-t.tailId) + uint64(t.headBytes) + uint64(stat.Size())
  787. return total, nil
  788. }
  789. // advanceHead should be called when the current head file would outgrow the file limits,
  790. // and a new file must be opened. The caller of this method must hold the write-lock
  791. // before calling this method.
  792. func (t *freezerTable) advanceHead() error {
  793. t.lock.Lock()
  794. defer t.lock.Unlock()
  795. // We open the next file in truncated mode -- if this file already
  796. // exists, we need to start over from scratch on it.
  797. nextID := t.headId + 1
  798. newHead, err := t.openFile(nextID, openFreezerFileTruncated)
  799. if err != nil {
  800. return err
  801. }
  802. // Close old file, and reopen in RDONLY mode.
  803. t.releaseFile(t.headId)
  804. t.openFile(t.headId, openFreezerFileForReadOnly)
  805. // Swap out the current head.
  806. t.head = newHead
  807. t.headBytes = 0
  808. t.headId = nextID
  809. return nil
  810. }
  811. // Sync pushes any pending data from memory out to disk. This is an expensive
  812. // operation, so use it with care.
  813. func (t *freezerTable) Sync() error {
  814. if err := t.index.Sync(); err != nil {
  815. return err
  816. }
  817. if err := t.meta.Sync(); err != nil {
  818. return err
  819. }
  820. return t.head.Sync()
  821. }
  822. func (t *freezerTable) dumpIndexStdout(start, stop int64) {
  823. t.dumpIndex(os.Stdout, start, stop)
  824. }
  825. func (t *freezerTable) dumpIndexString(start, stop int64) string {
  826. var out bytes.Buffer
  827. out.WriteString("\n")
  828. t.dumpIndex(&out, start, stop)
  829. return out.String()
  830. }
  831. func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
  832. meta, err := readMetadata(t.meta)
  833. if err != nil {
  834. fmt.Fprintf(w, "Failed to decode freezer table %v\n", err)
  835. return
  836. }
  837. fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.Version, atomic.LoadUint64(&t.itemOffset), atomic.LoadUint64(&t.itemHidden))
  838. buf := make([]byte, indexEntrySize)
  839. fmt.Fprintf(w, "| number | fileno | offset |\n")
  840. fmt.Fprintf(w, "|--------|--------|--------|\n")
  841. for i := uint64(start); ; i++ {
  842. if _, err := t.index.ReadAt(buf, int64((i+1)*indexEntrySize)); err != nil {
  843. break
  844. }
  845. var entry indexEntry
  846. entry.unmarshalBinary(buf)
  847. fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset)
  848. if stop > 0 && i >= uint64(stop) {
  849. break
  850. }
  851. }
  852. fmt.Fprintf(w, "|--------------------------|\n")
  853. }