odr.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/rand"
  19. "encoding/binary"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/ethdb"
  24. "github.com/ethereum/go-ethereum/light"
  25. "github.com/ethereum/go-ethereum/logger"
  26. "github.com/ethereum/go-ethereum/logger/glog"
  27. "golang.org/x/net/context"
  28. )
  29. var (
  30. softRequestTimeout = time.Millisecond * 500
  31. hardRequestTimeout = time.Second * 10
  32. retryPeers = time.Second * 1
  33. )
  34. // peerDropFn is a callback type for dropping a peer detected as malicious.
  35. type peerDropFn func(id string)
  36. type odrPeerSelector interface {
  37. selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer
  38. adjustResponseTime(*poolEntry, time.Duration, bool)
  39. }
  40. type LesOdr struct {
  41. light.OdrBackend
  42. db ethdb.Database
  43. stop chan struct{}
  44. removePeer peerDropFn
  45. mlock, clock sync.Mutex
  46. sentReqs map[uint64]*sentReq
  47. serverPool odrPeerSelector
  48. }
  49. func NewLesOdr(db ethdb.Database) *LesOdr {
  50. return &LesOdr{
  51. db: db,
  52. stop: make(chan struct{}),
  53. sentReqs: make(map[uint64]*sentReq),
  54. }
  55. }
  56. func (odr *LesOdr) Stop() {
  57. close(odr.stop)
  58. }
  59. func (odr *LesOdr) Database() ethdb.Database {
  60. return odr.db
  61. }
  62. // validatorFunc is a function that processes a message and returns true if
  63. // it was a meaningful answer to a given request
  64. type validatorFunc func(ethdb.Database, *Msg) bool
  65. // sentReq is a request waiting for an answer that satisfies its valFunc
  66. type sentReq struct {
  67. valFunc validatorFunc
  68. sentTo map[*peer]chan struct{}
  69. lock sync.RWMutex // protects acces to sentTo
  70. answered chan struct{} // closed and set to nil when any peer answers it
  71. }
  72. const (
  73. MsgBlockBodies = iota
  74. MsgCode
  75. MsgReceipts
  76. MsgProofs
  77. MsgHeaderProofs
  78. )
  79. // Msg encodes a LES message that delivers reply data for a request
  80. type Msg struct {
  81. MsgType int
  82. ReqID uint64
  83. Obj interface{}
  84. }
  85. // Deliver is called by the LES protocol manager to deliver ODR reply messages to waiting requests
  86. func (self *LesOdr) Deliver(peer *peer, msg *Msg) error {
  87. var delivered chan struct{}
  88. self.mlock.Lock()
  89. req, ok := self.sentReqs[msg.ReqID]
  90. self.mlock.Unlock()
  91. if ok {
  92. req.lock.Lock()
  93. delivered, ok = req.sentTo[peer]
  94. req.lock.Unlock()
  95. }
  96. if !ok {
  97. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  98. }
  99. if req.valFunc(self.db, msg) {
  100. close(delivered)
  101. req.lock.Lock()
  102. delete(req.sentTo, peer)
  103. if req.answered != nil {
  104. close(req.answered)
  105. req.answered = nil
  106. }
  107. req.lock.Unlock()
  108. return nil
  109. }
  110. return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
  111. }
  112. func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout chan struct{}, reqWg *sync.WaitGroup) {
  113. stime := mclock.Now()
  114. defer func() {
  115. req.lock.Lock()
  116. delete(req.sentTo, peer)
  117. req.lock.Unlock()
  118. reqWg.Done()
  119. }()
  120. select {
  121. case <-delivered:
  122. if self.serverPool != nil {
  123. self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
  124. }
  125. return
  126. case <-time.After(softRequestTimeout):
  127. close(timeout)
  128. case <-self.stop:
  129. return
  130. }
  131. select {
  132. case <-delivered:
  133. case <-time.After(hardRequestTimeout):
  134. glog.V(logger.Debug).Infof("ODR hard request timeout from peer %v", peer.id)
  135. go self.removePeer(peer.id)
  136. case <-self.stop:
  137. return
  138. }
  139. if self.serverPool != nil {
  140. self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
  141. }
  142. }
  143. // networkRequest sends a request to known peers until an answer is received
  144. // or the context is cancelled
  145. func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error {
  146. answered := make(chan struct{})
  147. req := &sentReq{
  148. valFunc: lreq.Valid,
  149. sentTo: make(map[*peer]chan struct{}),
  150. answered: answered, // reply delivered by any peer
  151. }
  152. reqID := getNextReqID()
  153. self.mlock.Lock()
  154. self.sentReqs[reqID] = req
  155. self.mlock.Unlock()
  156. reqWg := new(sync.WaitGroup)
  157. reqWg.Add(1)
  158. defer reqWg.Done()
  159. go func() {
  160. reqWg.Wait()
  161. self.mlock.Lock()
  162. delete(self.sentReqs, reqID)
  163. self.mlock.Unlock()
  164. }()
  165. exclude := make(map[*peer]struct{})
  166. for {
  167. var p *peer
  168. if self.serverPool != nil {
  169. p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) {
  170. if _, ok := exclude[p]; ok || !lreq.CanSend(p) {
  171. return false, 0
  172. }
  173. return true, p.fcServer.CanSend(lreq.GetCost(p))
  174. }, ctx.Done())
  175. }
  176. if p == nil {
  177. select {
  178. case <-ctx.Done():
  179. return ctx.Err()
  180. case <-req.answered:
  181. return nil
  182. case <-time.After(retryPeers):
  183. }
  184. } else {
  185. exclude[p] = struct{}{}
  186. delivered := make(chan struct{})
  187. timeout := make(chan struct{})
  188. req.lock.Lock()
  189. req.sentTo[p] = delivered
  190. req.lock.Unlock()
  191. reqWg.Add(1)
  192. cost := lreq.GetCost(p)
  193. p.fcServer.SendRequest(reqID, cost)
  194. go self.requestPeer(req, p, delivered, timeout, reqWg)
  195. lreq.Request(reqID, p)
  196. select {
  197. case <-ctx.Done():
  198. return ctx.Err()
  199. case <-answered:
  200. return nil
  201. case <-timeout:
  202. }
  203. }
  204. }
  205. }
  206. // Retrieve tries to fetch an object from the local db, then from the LES network.
  207. // If the network retrieval was successful, it stores the object in local db.
  208. func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
  209. lreq := LesRequest(req)
  210. err = self.networkRequest(ctx, lreq)
  211. if err == nil {
  212. // retrieved from network, store in db
  213. req.StoreResult(self.db)
  214. } else {
  215. glog.V(logger.Debug).Infof("networkRequest err = %v", err)
  216. }
  217. return
  218. }
  219. func getNextReqID() uint64 {
  220. var rnd [8]byte
  221. rand.Read(rnd[:])
  222. return binary.BigEndian.Uint64(rnd[:])
  223. }