|
|
@@ -18,6 +18,7 @@ package stream
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "fmt"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
@@ -58,7 +59,7 @@ func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- return NewSwarmSyncerServer(po, netStore, p.ID().String()+"|"+string(po))
|
|
|
+ return NewSwarmSyncerServer(po, netStore, fmt.Sprintf("%s|%d", p.ID(), po))
|
|
|
})
|
|
|
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
|
|
|
// return NewOutgoingProvableSwarmSyncer(po, db)
|
|
|
@@ -146,16 +147,16 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
|
|
|
if batchSize >= BatchSize {
|
|
|
iterate = false
|
|
|
metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1)
|
|
|
- log.Debug("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
|
|
|
+ log.Trace("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
|
|
|
}
|
|
|
if timer == nil {
|
|
|
timer = time.NewTimer(batchTimeout)
|
|
|
} else {
|
|
|
- log.Debug("syncer pull subscription - stopping timer", "correlateId", s.correlateId)
|
|
|
+ log.Trace("syncer pull subscription - stopping timer", "correlateId", s.correlateId)
|
|
|
if !timer.Stop() {
|
|
|
<-timer.C
|
|
|
}
|
|
|
- log.Debug("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId)
|
|
|
+ log.Trace("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId)
|
|
|
timer.Reset(batchTimeout)
|
|
|
}
|
|
|
timerC = timer.C
|
|
|
@@ -164,10 +165,10 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
|
|
|
// received after some time
|
|
|
iterate = false
|
|
|
metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1)
|
|
|
- log.Debug("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
|
|
|
+ log.Trace("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
|
|
|
case <-s.quit:
|
|
|
iterate = false
|
|
|
- log.Debug("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
|
|
|
+ log.Trace("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
|
|
|
}
|
|
|
}
|
|
|
if batchStartID == nil {
|