handler.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Handler is the API for Mutable Resources
  17. // It enables creating, updating, syncing and retrieving resources and their update data
  18. package mru
  19. import (
  20. "bytes"
  21. "context"
  22. "fmt"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/swarm/storage/mru/lookup"
  26. "github.com/ethereum/go-ethereum/swarm/log"
  27. "github.com/ethereum/go-ethereum/swarm/storage"
  28. )
  29. type Handler struct {
  30. chunkStore *storage.NetStore
  31. HashSize int
  32. resources map[uint64]*cacheEntry
  33. resourceLock sync.RWMutex
  34. storeTimeout time.Duration
  35. queryMaxPeriods uint32
  36. }
  37. // HandlerParams pass parameters to the Handler constructor NewHandler
  38. // Signer and TimestampProvider are mandatory parameters
  39. type HandlerParams struct {
  40. }
  41. // hashPool contains a pool of ready hashers
  42. var hashPool sync.Pool
  43. // init initializes the package and hashPool
  44. func init() {
  45. hashPool = sync.Pool{
  46. New: func() interface{} {
  47. return storage.MakeHashFunc(resourceHashAlgorithm)()
  48. },
  49. }
  50. }
  51. // NewHandler creates a new Mutable Resource API
  52. func NewHandler(params *HandlerParams) *Handler {
  53. rh := &Handler{
  54. resources: make(map[uint64]*cacheEntry),
  55. }
  56. for i := 0; i < hasherCount; i++ {
  57. hashfunc := storage.MakeHashFunc(resourceHashAlgorithm)()
  58. if rh.HashSize == 0 {
  59. rh.HashSize = hashfunc.Size()
  60. }
  61. hashPool.Put(hashfunc)
  62. }
  63. return rh
  64. }
  65. // SetStore sets the store backend for the Mutable Resource API
  66. func (h *Handler) SetStore(store *storage.NetStore) {
  67. h.chunkStore = store
  68. }
  69. // Validate is a chunk validation method
  70. // If it looks like a resource update, the chunk address is checked against the userAddr of the update's signature
  71. // It implements the storage.ChunkValidator interface
  72. func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool {
  73. dataLength := len(data)
  74. if dataLength < minimumSignedUpdateLength {
  75. return false
  76. }
  77. // check if it is a properly formatted update chunk with
  78. // valid signature and proof of ownership of the resource it is trying
  79. // to update
  80. // First, deserialize the chunk
  81. var r Request
  82. if err := r.fromChunk(chunkAddr, data); err != nil {
  83. log.Debug("Invalid resource chunk", "addr", chunkAddr.Hex(), "err", err.Error())
  84. return false
  85. }
  86. // Verify signatures and that the signer actually owns the resource
  87. // If it fails, it means either the signature is not valid, data is corrupted
  88. // or someone is trying to update someone else's resource.
  89. if err := r.Verify(); err != nil {
  90. log.Debug("Invalid signature", "err", err)
  91. return false
  92. }
  93. return true
  94. }
  95. // GetContent retrieves the data payload of the last synced update of the Mutable Resource
  96. func (h *Handler) GetContent(view *View) (storage.Address, []byte, error) {
  97. if view == nil {
  98. return nil, nil, NewError(ErrInvalidValue, "view is nil")
  99. }
  100. rsrc := h.get(view)
  101. if rsrc == nil {
  102. return nil, nil, NewError(ErrNotFound, "resource does not exist")
  103. }
  104. return rsrc.lastKey, rsrc.data, nil
  105. }
  106. // NewRequest prepares a Request structure with all the necessary information to
  107. // just add the desired data and sign it.
  108. // The resulting structure can then be signed and passed to Handler.Update to be verified and sent
  109. func (h *Handler) NewRequest(ctx context.Context, view *View) (request *Request, err error) {
  110. if view == nil {
  111. return nil, NewError(ErrInvalidValue, "view cannot be nil")
  112. }
  113. now := TimestampProvider.Now().Time
  114. request = new(Request)
  115. request.Header.Version = ProtocolVersion
  116. query := NewQueryLatest(view, lookup.NoClue)
  117. rsrc, err := h.Lookup(ctx, query)
  118. if err != nil {
  119. if err.(*Error).code != ErrNotFound {
  120. return nil, err
  121. }
  122. // not finding updates means that there is a network error
  123. // or that the resource really does not have updates
  124. }
  125. request.View = *view
  126. // if we already have an update, then find next epoch
  127. if rsrc != nil {
  128. request.Epoch = lookup.GetNextEpoch(rsrc.Epoch, now)
  129. } else {
  130. request.Epoch = lookup.GetFirstEpoch(now)
  131. }
  132. return request, nil
  133. }
  134. // Lookup retrieves a specific or latest version of the resource
  135. // Lookup works differently depending on the configuration of `ID`
  136. // See the `ID` documentation and helper functions:
  137. // `LookupLatest` and `LookupBefore`
  138. // When looking for the latest update, it starts at the next period after the current time.
  139. // upon failure tries the corresponding keys of each previous period until one is found
  140. // (or startTime is reached, in which case there are no updates).
  141. func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) {
  142. timeLimit := query.TimeLimit
  143. if timeLimit == 0 { // if time limit is set to zero, the user wants to get the latest update
  144. timeLimit = TimestampProvider.Now().Time
  145. }
  146. if query.Hint == lookup.NoClue { // try to use our cache
  147. entry := h.get(&query.View)
  148. if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints
  149. query.Hint = entry.Epoch
  150. }
  151. }
  152. // we can't look for anything without a store
  153. if h.chunkStore == nil {
  154. return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
  155. }
  156. var ul ID
  157. ul.View = query.View
  158. var readCount int
  159. // Invoke the lookup engine.
  160. // The callback will be called every time the lookup algorithm needs to guess
  161. requestPtr, err := lookup.Lookup(timeLimit, query.Hint, func(epoch lookup.Epoch, now uint64) (interface{}, error) {
  162. readCount++
  163. ul.Epoch = epoch
  164. ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
  165. defer cancel()
  166. chunk, err := h.chunkStore.Get(ctx, ul.Addr())
  167. if err != nil { // TODO: check for catastrophic errors other than chunk not found
  168. return nil, nil
  169. }
  170. var request Request
  171. if err := request.fromChunk(chunk.Address(), chunk.Data()); err != nil {
  172. return nil, nil
  173. }
  174. if request.Time <= timeLimit {
  175. return &request, nil
  176. }
  177. return nil, nil
  178. })
  179. if err != nil {
  180. return nil, err
  181. }
  182. log.Info(fmt.Sprintf("Resource lookup finished in %d lookups", readCount))
  183. request, _ := requestPtr.(*Request)
  184. if request == nil {
  185. return nil, NewError(ErrNotFound, "no updates found")
  186. }
  187. return h.updateCache(request)
  188. }
  189. // update mutable resource cache map with specified content
  190. func (h *Handler) updateCache(request *Request) (*cacheEntry, error) {
  191. updateAddr := request.Addr()
  192. log.Trace("resource cache update", "topic", request.Topic.Hex(), "updatekey", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
  193. rsrc := h.get(&request.View)
  194. if rsrc == nil {
  195. rsrc = &cacheEntry{}
  196. h.set(&request.View, rsrc)
  197. }
  198. // update our rsrcs entry map
  199. rsrc.lastKey = updateAddr
  200. rsrc.ResourceUpdate = request.ResourceUpdate
  201. rsrc.Reader = bytes.NewReader(rsrc.data)
  202. return rsrc, nil
  203. }
  204. // Update adds an actual data update
  205. // Uses the Mutable Resource metadata currently loaded in the resources map entry.
  206. // It is the caller's responsibility to make sure that this data is not stale.
  207. // Note that a Mutable Resource update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature. An error will be returned if the total length of the chunk payload will exceed this limit.
  208. // Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update
  209. // on the network.
  210. func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Address, err error) {
  211. // we can't update anything without a store
  212. if h.chunkStore == nil {
  213. return nil, NewError(ErrInit, "Call Handler.SetStore() before updating")
  214. }
  215. rsrc := h.get(&r.View)
  216. if rsrc != nil && rsrc.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure
  217. return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist")
  218. }
  219. chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
  220. if err != nil {
  221. return nil, err
  222. }
  223. // send the chunk
  224. h.chunkStore.Put(ctx, chunk)
  225. log.Trace("resource update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data())
  226. // update our resources map cache entry if the new update is older than the one we have, if we have it.
  227. if rsrc != nil && r.Epoch.After(rsrc.Epoch) {
  228. rsrc.Epoch = r.Epoch
  229. rsrc.data = make([]byte, len(r.data))
  230. rsrc.lastKey = r.idAddr
  231. copy(rsrc.data, r.data)
  232. rsrc.Reader = bytes.NewReader(rsrc.data)
  233. }
  234. return r.idAddr, nil
  235. }
  236. // Retrieves the resource cache value for the given nameHash
  237. func (h *Handler) get(view *View) *cacheEntry {
  238. mapKey := view.mapKey()
  239. h.resourceLock.RLock()
  240. defer h.resourceLock.RUnlock()
  241. rsrc := h.resources[mapKey]
  242. return rsrc
  243. }
  244. // Sets the resource cache value for the given View
  245. func (h *Handler) set(view *View, rsrc *cacheEntry) {
  246. mapKey := view.mapKey()
  247. h.resourceLock.Lock()
  248. defer h.resourceLock.Unlock()
  249. h.resources[mapKey] = rsrc
  250. }