depo.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "fmt"
  21. "time"
  22. "github.com/ethereum/go-ethereum/log"
  23. "github.com/ethereum/go-ethereum/swarm/storage"
  24. )
  25. // Handler for storage/retrieval related protocol requests
  26. // implements the StorageHandler interface used by the bzz protocol
  27. type Depo struct {
  28. hashfunc storage.SwarmHasher
  29. localStore storage.ChunkStore
  30. netStore storage.ChunkStore
  31. }
  32. func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo {
  33. return &Depo{
  34. hashfunc: hash,
  35. localStore: localStore,
  36. netStore: remoteStore, // entrypoint internal
  37. }
  38. }
  39. // Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state
  40. // * the remote sync state is just stored and handled in protocol
  41. // * filters through the new syncRequests and send the ones missing
  42. // * back immediately as a deliveryRequest message
  43. // * empty message just pings back for more (is this needed?)
  44. // * strict signed sync states may be needed.
  45. func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error {
  46. unsynced := req.Unsynced
  47. var missing []*syncRequest
  48. var chunk *storage.Chunk
  49. var err error
  50. for _, req := range unsynced {
  51. // skip keys that are found,
  52. chunk, err = self.localStore.Get(storage.Key(req.Key[:]))
  53. if err != nil || chunk.SData == nil {
  54. missing = append(missing, req)
  55. }
  56. }
  57. log.Debug(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State))
  58. log.Trace(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v", unsynced))
  59. // send delivery request with missing keys
  60. err = p.deliveryRequest(missing)
  61. if err != nil {
  62. return err
  63. }
  64. // set peers state to persist
  65. p.syncState = req.State
  66. return nil
  67. }
  68. // Handles deliveryRequestMsg
  69. // * serves actual chunks asked by the remote peer
  70. // by pushing to the delivery queue (sync db) of the correct priority
  71. // (remote peer is free to reprioritize)
  72. // * the message implies remote peer wants more, so trigger for
  73. // * new outgoing unsynced keys message is fired
  74. func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error {
  75. deliver := req.Deliver
  76. // queue the actual delivery of a chunk ()
  77. log.Trace(fmt.Sprintf("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver))
  78. for _, sreq := range deliver {
  79. // TODO: look up in cache here or in deliveries
  80. // priorities are taken from the message so the remote party can
  81. // reprioritise to at their leisure
  82. // r = self.pullCached(sreq.Key) // pulls and deletes from cache
  83. Push(p, sreq.Key, sreq.Priority)
  84. }
  85. // sends it out as unsyncedKeysMsg
  86. p.syncer.sendUnsyncedKeys()
  87. return nil
  88. }
  89. // the entrypoint for store requests coming from the bzz wire protocol
  90. // if key found locally, return. otherwise
  91. // remote is untrusted, so hash is verified and chunk passed on to NetStore
  92. func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
  93. var islocal bool
  94. req.from = p
  95. chunk, err := self.localStore.Get(req.Key)
  96. switch {
  97. case err != nil:
  98. log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key))
  99. // not found in memory cache, ie., a genuine store request
  100. // create chunk
  101. chunk = storage.NewChunk(req.Key, nil)
  102. case chunk.SData == nil:
  103. // found chunk in memory store, needs the data, validate now
  104. log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v. request entry found", req))
  105. default:
  106. // data is found, store request ignored
  107. // this should update access count?
  108. log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req))
  109. islocal = true
  110. //return
  111. }
  112. hasher := self.hashfunc()
  113. hasher.Write(req.SData)
  114. if !bytes.Equal(hasher.Sum(nil), req.Key) {
  115. // data does not validate, ignore
  116. // TODO: peer should be penalised/dropped?
  117. log.Warn(fmt.Sprintf("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req))
  118. return
  119. }
  120. if islocal {
  121. return
  122. }
  123. // update chunk with size and data
  124. chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
  125. chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
  126. log.Trace(fmt.Sprintf("delivery of %v from %v", chunk, p))
  127. chunk.Source = p
  128. self.netStore.Put(chunk)
  129. }
  130. // entrypoint for retrieve requests coming from the bzz wire protocol
  131. // checks swap balance - return if peer has no credit
  132. func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) {
  133. req.from = p
  134. // swap - record credit for 1 request
  135. // note that only charge actual reqsearches
  136. var err error
  137. if p.swap != nil {
  138. err = p.swap.Add(1)
  139. }
  140. if err != nil {
  141. log.Warn(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err))
  142. return
  143. }
  144. // call storage.NetStore#Get which
  145. // blocks until local retrieval finished
  146. // launches cloud retrieval
  147. chunk, _ := self.netStore.Get(req.Key)
  148. req = self.strategyUpdateRequest(chunk.Req, req)
  149. // check if we can immediately deliver
  150. if chunk.SData != nil {
  151. log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log()))
  152. if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size {
  153. sreq := &storeRequestMsgData{
  154. Id: req.Id,
  155. Key: chunk.Key,
  156. SData: chunk.SData,
  157. requestTimeout: req.timeout, //
  158. }
  159. p.syncer.addRequest(sreq, DeliverReq)
  160. } else {
  161. log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()))
  162. }
  163. } else {
  164. log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()))
  165. }
  166. }
  167. // add peer request the chunk and decides the timeout for the response if still searching
  168. func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) {
  169. log.Trace(fmt.Sprintf("Depo.strategyUpdateRequest: key %v", origReq.Key.Log()))
  170. // we do not create an alternative one
  171. req = origReq
  172. if rs != nil {
  173. self.addRequester(rs, req)
  174. req.setTimeout(self.searchTimeout(rs, req))
  175. }
  176. return
  177. }
  178. // decides the timeout promise sent with the immediate peers response to a retrieve request
  179. // if timeout is explicitly set and expired
  180. func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) {
  181. reqt := req.getTimeout()
  182. t := time.Now().Add(searchTimeout)
  183. if reqt != nil && reqt.Before(t) {
  184. return reqt
  185. } else {
  186. return &t
  187. }
  188. }
  189. /*
  190. adds a new peer to an existing open request
  191. only add if less than requesterCount peers forwarded the same request id so far
  192. note this is done irrespective of status (searching or found)
  193. */
  194. func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) {
  195. log.Trace(fmt.Sprintf("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.Id))
  196. list := rs.Requesters[req.Id]
  197. rs.Requesters[req.Id] = append(list, req)
  198. }