Sfoglia il codice sorgente

vendor: udpate leveldb upstream (#19284)

gary rong 6 anni fa
parent
commit
def1b0d7e1

+ 40 - 11
vendor/github.com/syndtr/goleveldb/leveldb/session.go

@@ -47,15 +47,24 @@ type session struct {
 	o        *cachedOptions
 	icmp     *iComparer
 	tops     *tOps
-	fileRef  map[int64]int
 
 	manifest       *journal.Writer
 	manifestWriter storage.Writer
 	manifestFd     storage.FileDesc
 
-	stCompPtrs []internalKey // compaction pointers; need external synchronization
-	stVersion  *version      // current version
-	vmu        sync.Mutex
+	stCompPtrs  []internalKey // compaction pointers; need external synchronization
+	stVersion   *version      // current version
+	ntVersionId int64         // next version id to assign
+	refCh       chan *vTask
+	relCh       chan *vTask
+	deltaCh     chan *vDelta
+	abandon     chan int64
+	closeC      chan struct{}
+	closeW      sync.WaitGroup
+	vmu         sync.Mutex
+
+	// Testing fields
+	fileRefCh chan chan map[int64]int // channel used to pass current reference stat
 }
 
 // Creates new initialized session instance.
@@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
 		return
 	}
 	s = &session{
-		stor:     newIStorage(stor),
-		storLock: storLock,
-		fileRef:  make(map[int64]int),
+		stor:      newIStorage(stor),
+		storLock:  storLock,
+		refCh:     make(chan *vTask),
+		relCh:     make(chan *vTask),
+		deltaCh:   make(chan *vDelta),
+		abandon:   make(chan int64),
+		fileRefCh: make(chan chan map[int64]int),
+		closeC:    make(chan struct{}),
 	}
 	s.setOptions(o)
 	s.tops = newTableOps(s)
-	s.setVersion(newVersion(s))
+
+	s.closeW.Add(1)
+	go s.refLoop()
+	s.setVersion(nil, newVersion(s))
 	s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
 	return
 }
@@ -90,7 +107,11 @@ func (s *session) close() {
 	}
 	s.manifest = nil
 	s.manifestWriter = nil
-	s.setVersion(&version{s: s, closing: true})
+	s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId})
+
+	// Close all background goroutines
+	close(s.closeC)
+	s.closeW.Wait()
 }
 
 // Release session lock.
@@ -180,7 +201,7 @@ func (s *session) recover() (err error) {
 	}
 
 	s.manifestFd = fd
-	s.setVersion(staging.finish(false))
+	s.setVersion(rec, staging.finish(false))
 	s.setNextFileNum(rec.nextFileNum)
 	s.recordCommited(rec)
 	return nil
@@ -194,6 +215,14 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
 	// spawn new version based on current version
 	nv := v.spawn(r, trivial)
 
+	// abandon useless version id to prevent blocking version processing loop.
+	defer func() {
+		if err != nil {
+			s.abandon <- nv.id
+			s.logf("commit@abandon useless vid D%d", nv.id)
+		}
+	}()
+
 	if s.manifest == nil {
 		// manifest journal writer not yet created, create one
 		err = s.newManifest(r, nv)
@@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
 
 	// finally, apply new version if no error rise
 	if err == nil {
-		s.setVersion(nv)
+		s.setVersion(r, nv)
 	}
 
 	return

+ 8 - 4
vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go

@@ -181,10 +181,14 @@ func (c *compaction) expand() {
 
 	t0, t1 := c.levels[0], c.levels[1]
 	imin, imax := t0.getRange(c.s.icmp)
-	// We expand t0 here just incase ukey hop across tables.
-	t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
-	if len(t0) != len(c.levels[0]) {
-		imin, imax = t0.getRange(c.s.icmp)
+
+	// For non-zero levels, the ukey can't hop across tables at all.
+	if c.sourceLevel == 0 {
+		// We expand t0 here just incase ukey hop across tables.
+		t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
+		if len(t0) != len(c.levels[0]) {
+			imin, imax = t0.getRange(c.s.icmp)
+		}
 	}
 	t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
 	// Get entire range covered by compaction.

+ 224 - 12
vendor/github.com/syndtr/goleveldb/leveldb/session_util.go

@@ -9,6 +9,7 @@ package leveldb
 import (
 	"fmt"
 	"sync/atomic"
+	"time"
 
 	"github.com/syndtr/goleveldb/leveldb/journal"
 	"github.com/syndtr/goleveldb/leveldb/storage"
@@ -39,19 +40,213 @@ func (s *session) newTemp() storage.FileDesc {
 	return storage.FileDesc{Type: storage.TypeTemp, Num: num}
 }
 
-func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
-	ref += s.fileRef[fd.Num]
-	if ref > 0 {
-		s.fileRef[fd.Num] = ref
-	} else if ref == 0 {
-		delete(s.fileRef, fd.Num)
-	} else {
-		panic(fmt.Sprintf("negative ref: %v", fd))
-	}
-	return ref
+// Session state.
+
+const (
+	// maxCachedNumber represents the maximum number of version tasks
+	// that can be cached in the ref loop.
+	maxCachedNumber = 256
+
+	// maxCachedTime represents the maximum time for ref loop to cache
+	// a version task.
+	maxCachedTime = 5 * time.Minute
+)
+
+// vDelta indicates the change information between the next version
+// and the currently specified version
+type vDelta struct {
+	vid     int64
+	added   []int64
+	deleted []int64
 }
 
-// Session state.
+// vTask defines a version task for either reference or release.
+type vTask struct {
+	vid     int64
+	files   []tFiles
+	created time.Time
+}
+
+func (s *session) refLoop() {
+	var (
+		fileRef    = make(map[int64]int)    // Table file reference counter
+		ref        = make(map[int64]*vTask) // Current referencing version store
+		deltas     = make(map[int64]*vDelta)
+		referenced = make(map[int64]struct{})
+		released   = make(map[int64]*vDelta)  // Released version that waiting for processing
+		abandoned  = make(map[int64]struct{}) // Abandoned version id
+		next, last int64
+	)
+	// addFileRef adds file reference counter with specified file number and
+	// reference value
+	addFileRef := func(fnum int64, ref int) int {
+		ref += fileRef[fnum]
+		if ref > 0 {
+			fileRef[fnum] = ref
+		} else if ref == 0 {
+			delete(fileRef, fnum)
+		} else {
+			panic(fmt.Sprintf("negative ref: %v", fnum))
+		}
+		return ref
+	}
+	// skipAbandoned skips useless abandoned version id.
+	skipAbandoned := func() bool {
+		if _, exist := abandoned[next]; exist {
+			delete(abandoned, next)
+			return true
+		}
+		return false
+	}
+	// applyDelta applies version change to current file reference.
+	applyDelta := func(d *vDelta) {
+		for _, t := range d.added {
+			addFileRef(t, 1)
+		}
+		for _, t := range d.deleted {
+			if addFileRef(t, -1) == 0 {
+				s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
+			}
+		}
+	}
+
+	timer := time.NewTimer(0)
+	<-timer.C // discard the initial tick
+	defer timer.Stop()
+
+	// processTasks processes version tasks in strict order.
+	//
+	// If we want to use delta to reduce the cost of file references and dereferences,
+	// we must strictly follow the id of the version, otherwise some files that are
+	// being referenced will be deleted.
+	//
+	// In addition, some db operations (such as iterators) may cause a version to be
+	// referenced for a long time. In order to prevent such operations from blocking
+	// the entire processing queue, we will properly convert some of the version tasks
+	// into full file references and releases.
+	processTasks := func() {
+		timer.Reset(maxCachedTime)
+		// Make sure we don't cache too many version tasks.
+		for {
+			// Skip any abandoned version number to prevent blocking processing.
+			if skipAbandoned() {
+				next += 1
+				continue
+			}
+			// Don't bother the version that has been released.
+			if _, exist := released[next]; exist {
+				break
+			}
+			// Ensure the specified version has been referenced.
+			if _, exist := ref[next]; !exist {
+				break
+			}
+			if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
+				break
+			}
+			// Convert version task into full file references and releases mode.
+			// Reference version(i+1) first and wait version(i) to release.
+			// FileRef(i+1) = FileRef(i) + Delta(i)
+			for _, tt := range ref[next].files {
+				for _, t := range tt {
+					addFileRef(t.fd.Num, 1)
+				}
+			}
+			// Note, if some compactions take a long time, even more than 5 minutes,
+			// we may miss the corresponding delta information here.
+			// Fortunately it will not affect the correctness of the file reference,
+			// and we can apply the delta once we receive it.
+			if d := deltas[next]; d != nil {
+				applyDelta(d)
+			}
+			referenced[next] = struct{}{}
+			delete(ref, next)
+			delete(deltas, next)
+			next += 1
+		}
+
+		// Use delta information to process all released versions.
+		for {
+			if skipAbandoned() {
+				next += 1
+				continue
+			}
+			if d, exist := released[next]; exist {
+				if d != nil {
+					applyDelta(d)
+				}
+				delete(released, next)
+				next += 1
+				continue
+			}
+			return
+		}
+	}
+
+	for {
+		processTasks()
+
+		select {
+		case t := <-s.refCh:
+			if _, exist := ref[t.vid]; exist {
+				panic("duplicate reference request")
+			}
+			ref[t.vid] = t
+			if t.vid > last {
+				last = t.vid
+			}
+
+		case d := <-s.deltaCh:
+			if _, exist := ref[d.vid]; !exist {
+				if _, exist2 := referenced[d.vid]; !exist2 {
+					panic("invalid release request")
+				}
+				// The reference opt is already expired, apply
+				// delta here.
+				applyDelta(d)
+				continue
+			}
+			deltas[d.vid] = d
+
+		case t := <-s.relCh:
+			if _, exist := referenced[t.vid]; exist {
+				for _, tt := range t.files {
+					for _, t := range tt {
+						if addFileRef(t.fd.Num, -1) == 0 {
+							s.tops.remove(t.fd)
+						}
+					}
+				}
+				delete(referenced, t.vid)
+				continue
+			}
+			if _, exist := ref[t.vid]; !exist {
+				panic("invalid release request")
+			}
+			released[t.vid] = deltas[t.vid]
+			delete(deltas, t.vid)
+			delete(ref, t.vid)
+
+		case id := <-s.abandon:
+			if id >= next {
+				abandoned[id] = struct{}{}
+			}
+
+		case <-timer.C:
+
+		case r := <-s.fileRefCh:
+			ref := make(map[int64]int)
+			for f, c := range fileRef {
+				ref[f] = c
+			}
+			r <- ref
+
+		case <-s.closeC:
+			s.closeW.Done()
+			return
+		}
+	}
+}
 
 // Get current version. This will incr version ref, must call
 // version.release (exactly once) after use.
@@ -69,13 +264,30 @@ func (s *session) tLen(level int) int {
 }
 
 // Set current version to v.
-func (s *session) setVersion(v *version) {
+func (s *session) setVersion(r *sessionRecord, v *version) {
 	s.vmu.Lock()
 	defer s.vmu.Unlock()
 	// Hold by session. It is important to call this first before releasing
 	// current version, otherwise the still used files might get released.
 	v.incref()
 	if s.stVersion != nil {
+		if r != nil {
+			var (
+				added   = make([]int64, 0, len(r.addedTables))
+				deleted = make([]int64, 0, len(r.deletedTables))
+			)
+			for _, t := range r.addedTables {
+				added = append(added, t.num)
+			}
+			for _, t := range r.deletedTables {
+				deleted = append(deleted, t.num)
+			}
+			select {
+			case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
+			case <-v.s.closeC:
+				s.log("reference loop already exist")
+			}
+		}
 		// Release current version.
 		s.stVersion.releaseNB()
 	}

+ 70 - 11
vendor/github.com/syndtr/goleveldb/leveldb/table.go

@@ -7,6 +7,7 @@
 package leveldb
 
 import (
+	"bytes"
 	"fmt"
 	"sort"
 	"sync/atomic"
@@ -158,6 +159,22 @@ func (tf tFiles) searchNumLess(num int64) int {
 	})
 }
 
+// Searches smallest index of tables whose its smallest
+// key is after the given key.
+func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int {
+	return sort.Search(len(tf), func(i int) bool {
+		return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0
+	})
+}
+
+// Searches smallest index of tables whose its largest
+// key is after the given key.
+func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int {
+	return sort.Search(len(tf), func(i int) bool {
+		return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0
+	})
+}
+
 // Returns true if given key range overlaps with one or more
 // tables key range. If unsorted is true then binary search will not be used.
 func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
@@ -189,6 +206,50 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo
 // expanded.
 // The dst content will be overwritten.
 func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
+	// Short circuit if tf is empty
+	if len(tf) == 0 {
+		return nil
+	}
+	// For non-zero levels, there is no ukey hop across at all.
+	// And what's more, the files in these levels are strictly sorted,
+	// so use binary search instead of heavy traverse.
+	if !overlapped {
+		var begin, end int
+		// Determine the begin index of the overlapped file
+		if umin != nil {
+			index := tf.searchMinUkey(icmp, umin)
+			if index == 0 {
+				begin = 0
+			} else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 {
+				// The min ukey overlaps with the index-1 file, expand it.
+				begin = index - 1
+			} else {
+				begin = index
+			}
+		}
+		// Determine the end index of the overlapped file
+		if umax != nil {
+			index := tf.searchMaxUkey(icmp, umax)
+			if index == len(tf) {
+				end = len(tf)
+			} else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 {
+				// The max ukey overlaps with the index file, expand it.
+				end = index + 1
+			} else {
+				end = index
+			}
+		} else {
+			end = len(tf)
+		}
+		// Ensure the overlapped file indexes are valid.
+		if begin >= end {
+			return nil
+		}
+		dst = make([]*tFile, end-begin)
+		copy(dst, tf[begin:end])
+		return dst
+	}
+
 	dst = dst[:0]
 	for i := 0; i < len(tf); {
 		t := tf[i]
@@ -201,11 +262,9 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove
 			} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
 				umax = t.imax.ukey()
 				// Restart search if it is overlapped.
-				if overlapped {
-					dst = dst[:0]
-					i = 0
-					continue
-				}
+				dst = dst[:0]
+				i = 0
+				continue
 			}
 
 			dst = append(dst, t)
@@ -424,15 +483,15 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
 
 // Removes table from persistent storage. It waits until
 // no one use the the table.
-func (t *tOps) remove(f *tFile) {
-	t.cache.Delete(0, uint64(f.fd.Num), func() {
-		if err := t.s.stor.Remove(f.fd); err != nil {
-			t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
+func (t *tOps) remove(fd storage.FileDesc) {
+	t.cache.Delete(0, uint64(fd.Num), func() {
+		if err := t.s.stor.Remove(fd); err != nil {
+			t.s.logf("table@remove removing @%d %q", fd.Num, err)
 		} else {
-			t.s.logf("table@remove removed @%d", f.fd.Num)
+			t.s.logf("table@remove removed @%d", fd.Num)
 		}
 		if t.evictRemoved && t.bcache != nil {
-			t.bcache.EvictNS(uint64(f.fd.Num))
+			t.bcache.EvictNS(uint64(fd.Num))
 		}
 	})
 }

+ 17 - 14
vendor/github.com/syndtr/goleveldb/leveldb/version.go

@@ -9,6 +9,7 @@ package leveldb
 import (
 	"fmt"
 	"sync/atomic"
+	"time"
 	"unsafe"
 
 	"github.com/syndtr/goleveldb/leveldb/iterator"
@@ -22,7 +23,8 @@ type tSet struct {
 }
 
 type version struct {
-	s *session
+	id int64 // unique monotonous increasing version id
+	s  *session
 
 	levels []tFiles
 
@@ -39,8 +41,11 @@ type version struct {
 	released bool
 }
 
+// newVersion creates a new version with an unique monotonous increasing id.
 func newVersion(s *session) *version {
-	return &version{s: s}
+	id := atomic.AddInt64(&s.ntVersionId, 1)
+	nv := &version{s: s, id: id - 1}
+	return nv
 }
 
 func (v *version) incref() {
@@ -50,11 +55,11 @@ func (v *version) incref() {
 
 	v.ref++
 	if v.ref == 1 {
-		// Incr file ref.
-		for _, tt := range v.levels {
-			for _, t := range tt {
-				v.s.addFileRef(t.fd, 1)
-			}
+		select {
+		case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
+			// We can use v.levels directly here since it is immutable.
+		case <-v.s.closeC:
+			v.s.log("reference loop already exist")
 		}
 	}
 }
@@ -66,13 +71,11 @@ func (v *version) releaseNB() {
 	} else if v.ref < 0 {
 		panic("negative version ref")
 	}
-
-	for _, tt := range v.levels {
-		for _, t := range tt {
-			if v.s.addFileRef(t.fd, -1) == 0 {
-				v.s.tops.remove(t)
-			}
-		}
+	select {
+	case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
+		// We can use v.levels directly here since it is immutable.
+	case <-v.s.closeC:
+		v.s.log("reference loop already exist")
 	}
 
 	v.released = true

+ 3 - 3
vendor/vendor.json

@@ -478,10 +478,10 @@
 			"revisionTime": "2017-07-05T02:17:15Z"
 		},
 		{
-			"checksumSHA1": "4vxle8JfbPDO0ndiBUjMmRXGBQM=",
+			"checksumSHA1": "4NTmfUj7H5J59M2wCnp3/8FWt1I=",
 			"path": "github.com/syndtr/goleveldb/leveldb",
-			"revision": "3a907f965fc16db5f7787e18d4434bbe46d47f6e",
-			"revisionTime": "2019-03-04T06:08:05Z"
+			"revision": "c3a204f8e96543bb0cc090385c001078f184fc46",
+			"revisionTime": "2019-03-18T03:00:20Z"
 		},
 		{
 			"checksumSHA1": "mPNraL2edpk/2FYq26rSXfMHbJg=",