handler.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Handler is the API for feeds
  17. // It enables creating, updating, syncing and retrieving feed updates and their data
  18. package feed
  19. import (
  20. "bytes"
  21. "context"
  22. "fmt"
  23. "sync"
  24. "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
  25. "github.com/ethereum/go-ethereum/swarm/log"
  26. "github.com/ethereum/go-ethereum/swarm/storage"
  27. )
  28. type Handler struct {
  29. chunkStore *storage.NetStore
  30. HashSize int
  31. cache map[uint64]*cacheEntry
  32. cacheLock sync.RWMutex
  33. }
  34. // HandlerParams pass parameters to the Handler constructor NewHandler
  35. // Signer and TimestampProvider are mandatory parameters
  36. type HandlerParams struct {
  37. }
  38. // hashPool contains a pool of ready hashers
  39. var hashPool sync.Pool
  40. // init initializes the package and hashPool
  41. func init() {
  42. hashPool = sync.Pool{
  43. New: func() interface{} {
  44. return storage.MakeHashFunc(feedsHashAlgorithm)()
  45. },
  46. }
  47. }
  48. // NewHandler creates a new Swarm feeds API
  49. func NewHandler(params *HandlerParams) *Handler {
  50. fh := &Handler{
  51. cache: make(map[uint64]*cacheEntry),
  52. }
  53. for i := 0; i < hasherCount; i++ {
  54. hashfunc := storage.MakeHashFunc(feedsHashAlgorithm)()
  55. if fh.HashSize == 0 {
  56. fh.HashSize = hashfunc.Size()
  57. }
  58. hashPool.Put(hashfunc)
  59. }
  60. return fh
  61. }
  62. // SetStore sets the store backend for the Swarm feeds API
  63. func (h *Handler) SetStore(store *storage.NetStore) {
  64. h.chunkStore = store
  65. }
  66. // Validate is a chunk validation method
  67. // If it looks like a feed update, the chunk address is checked against the userAddr of the update's signature
  68. // It implements the storage.ChunkValidator interface
  69. func (h *Handler) Validate(chunk storage.Chunk) bool {
  70. if len(chunk.Data()) < minimumSignedUpdateLength {
  71. return false
  72. }
  73. // check if it is a properly formatted update chunk with
  74. // valid signature and proof of ownership of the feed it is trying
  75. // to update
  76. // First, deserialize the chunk
  77. var r Request
  78. if err := r.fromChunk(chunk); err != nil {
  79. log.Debug("Invalid feed update chunk", "addr", chunk.Address(), "err", err)
  80. return false
  81. }
  82. // Verify signatures and that the signer actually owns the feed
  83. // If it fails, it means either the signature is not valid, data is corrupted
  84. // or someone is trying to update someone else's feed.
  85. if err := r.Verify(); err != nil {
  86. log.Debug("Invalid feed update signature", "err", err)
  87. return false
  88. }
  89. return true
  90. }
  91. // GetContent retrieves the data payload of the last synced update of the feed
  92. func (h *Handler) GetContent(feed *Feed) (storage.Address, []byte, error) {
  93. if feed == nil {
  94. return nil, nil, NewError(ErrInvalidValue, "feed is nil")
  95. }
  96. feedUpdate := h.get(feed)
  97. if feedUpdate == nil {
  98. return nil, nil, NewError(ErrNotFound, "feed update not cached")
  99. }
  100. return feedUpdate.lastKey, feedUpdate.data, nil
  101. }
  102. // NewRequest prepares a Request structure with all the necessary information to
  103. // just add the desired data and sign it.
  104. // The resulting structure can then be signed and passed to Handler.Update to be verified and sent
  105. func (h *Handler) NewRequest(ctx context.Context, feed *Feed) (request *Request, err error) {
  106. if feed == nil {
  107. return nil, NewError(ErrInvalidValue, "feed cannot be nil")
  108. }
  109. now := TimestampProvider.Now().Time
  110. request = new(Request)
  111. request.Header.Version = ProtocolVersion
  112. query := NewQueryLatest(feed, lookup.NoClue)
  113. feedUpdate, err := h.Lookup(ctx, query)
  114. if err != nil {
  115. if err.(*Error).code != ErrNotFound {
  116. return nil, err
  117. }
  118. // not finding updates means that there is a network error
  119. // or that the feed really does not have updates
  120. }
  121. request.Feed = *feed
  122. // if we already have an update, then find next epoch
  123. if feedUpdate != nil {
  124. request.Epoch = lookup.GetNextEpoch(feedUpdate.Epoch, now)
  125. } else {
  126. request.Epoch = lookup.GetFirstEpoch(now)
  127. }
  128. return request, nil
  129. }
  130. // Lookup retrieves a specific or latest feed update
  131. // Lookup works differently depending on the configuration of `query`
  132. // See the `query` documentation and helper functions:
  133. // `NewQueryLatest` and `NewQuery`
  134. func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) {
  135. timeLimit := query.TimeLimit
  136. if timeLimit == 0 { // if time limit is set to zero, the user wants to get the latest update
  137. timeLimit = TimestampProvider.Now().Time
  138. }
  139. if query.Hint == lookup.NoClue { // try to use our cache
  140. entry := h.get(&query.Feed)
  141. if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints
  142. query.Hint = entry.Epoch
  143. }
  144. }
  145. // we can't look for anything without a store
  146. if h.chunkStore == nil {
  147. return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
  148. }
  149. var id ID
  150. id.Feed = query.Feed
  151. var readCount int
  152. // Invoke the lookup engine.
  153. // The callback will be called every time the lookup algorithm needs to guess
  154. requestPtr, err := lookup.Lookup(timeLimit, query.Hint, func(epoch lookup.Epoch, now uint64) (interface{}, error) {
  155. readCount++
  156. id.Epoch = epoch
  157. ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
  158. defer cancel()
  159. chunk, err := h.chunkStore.Get(ctx, id.Addr())
  160. if err != nil { // TODO: check for catastrophic errors other than chunk not found
  161. return nil, nil
  162. }
  163. var request Request
  164. if err := request.fromChunk(chunk); err != nil {
  165. return nil, nil
  166. }
  167. if request.Time <= timeLimit {
  168. return &request, nil
  169. }
  170. return nil, nil
  171. })
  172. if err != nil {
  173. return nil, err
  174. }
  175. log.Info(fmt.Sprintf("Feed lookup finished in %d lookups", readCount))
  176. request, _ := requestPtr.(*Request)
  177. if request == nil {
  178. return nil, NewError(ErrNotFound, "no feed updates found")
  179. }
  180. return h.updateCache(request)
  181. }
  182. // update feed updates cache with specified content
  183. func (h *Handler) updateCache(request *Request) (*cacheEntry, error) {
  184. updateAddr := request.Addr()
  185. log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
  186. feedUpdate := h.get(&request.Feed)
  187. if feedUpdate == nil {
  188. feedUpdate = &cacheEntry{}
  189. h.set(&request.Feed, feedUpdate)
  190. }
  191. // update our rsrcs entry map
  192. feedUpdate.lastKey = updateAddr
  193. feedUpdate.Update = request.Update
  194. feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
  195. return feedUpdate, nil
  196. }
  197. // Update publishes a feed update
  198. // Note that a feed update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature.
  199. // This results in a max payload of `maxUpdateDataLength` (check update.go for more details)
  200. // An error will be returned if the total length of the chunk payload will exceed this limit.
  201. // Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update
  202. // on the network.
  203. func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Address, err error) {
  204. // we can't update anything without a store
  205. if h.chunkStore == nil {
  206. return nil, NewError(ErrInit, "Call Handler.SetStore() before updating")
  207. }
  208. feedUpdate := h.get(&r.Feed)
  209. if feedUpdate != nil && feedUpdate.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure
  210. return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist")
  211. }
  212. chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
  213. if err != nil {
  214. return nil, err
  215. }
  216. // send the chunk
  217. h.chunkStore.Put(ctx, chunk)
  218. log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data())
  219. // update our feed updates map cache entry if the new update is older than the one we have, if we have it.
  220. if feedUpdate != nil && r.Epoch.After(feedUpdate.Epoch) {
  221. feedUpdate.Epoch = r.Epoch
  222. feedUpdate.data = make([]byte, len(r.data))
  223. feedUpdate.lastKey = r.idAddr
  224. copy(feedUpdate.data, r.data)
  225. feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
  226. }
  227. return r.idAddr, nil
  228. }
  229. // Retrieves the feed update cache value for the given nameHash
  230. func (h *Handler) get(feed *Feed) *cacheEntry {
  231. mapKey := feed.mapKey()
  232. h.cacheLock.RLock()
  233. defer h.cacheLock.RUnlock()
  234. feedUpdate := h.cache[mapKey]
  235. return feedUpdate
  236. }
  237. // Sets the feed update cache value for the given feed
  238. func (h *Handler) set(feed *Feed, feedUpdate *cacheEntry) {
  239. mapKey := feed.mapKey()
  240. h.cacheLock.Lock()
  241. defer h.cacheLock.Unlock()
  242. h.cache[mapKey] = feedUpdate
  243. }