notify_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package notify
  2. import (
  3. "bytes"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "os"
  8. "testing"
  9. "time"
  10. "github.com/ethereum/go-ethereum/common/hexutil"
  11. "github.com/ethereum/go-ethereum/crypto"
  12. "github.com/ethereum/go-ethereum/log"
  13. "github.com/ethereum/go-ethereum/node"
  14. "github.com/ethereum/go-ethereum/p2p/enode"
  15. "github.com/ethereum/go-ethereum/p2p/simulations"
  16. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  17. "github.com/ethereum/go-ethereum/swarm/network"
  18. "github.com/ethereum/go-ethereum/swarm/pss"
  19. "github.com/ethereum/go-ethereum/swarm/state"
  20. whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
  21. )
  22. var (
  23. loglevel = flag.Int("l", 3, "loglevel")
  24. psses map[string]*pss.Pss
  25. w *whisper.Whisper
  26. wapi *whisper.PublicWhisperAPI
  27. )
  28. func init() {
  29. flag.Parse()
  30. hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
  31. hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs)
  32. h := log.CallerFileHandler(hf)
  33. log.Root().SetHandler(h)
  34. w = whisper.New(&whisper.DefaultConfig)
  35. wapi = whisper.NewPublicWhisperAPI(w)
  36. psses = make(map[string]*pss.Pss)
  37. }
  38. // Creates a client node and notifier node
  39. // Client sends pss notifications requests
  40. // notifier sends initial notification with symmetric key, and
  41. // second notification symmetrically encrypted
  42. func TestStart(t *testing.T) {
  43. adapter := adapters.NewSimAdapter(newServices(false))
  44. net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
  45. ID: "0",
  46. DefaultService: "bzz",
  47. })
  48. defer net.Shutdown()
  49. leftNodeConf := adapters.RandomNodeConfig()
  50. leftNodeConf.Services = []string{"bzz", "pss"}
  51. leftNode, err := net.NewNodeWithConfig(leftNodeConf)
  52. if err != nil {
  53. t.Fatal(err)
  54. }
  55. err = net.Start(leftNode.ID())
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. rightNodeConf := adapters.RandomNodeConfig()
  60. rightNodeConf.Services = []string{"bzz", "pss"}
  61. rightNode, err := net.NewNodeWithConfig(rightNodeConf)
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. err = net.Start(rightNode.ID())
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. err = net.Connect(rightNode.ID(), leftNode.ID())
  70. if err != nil {
  71. t.Fatal(err)
  72. }
  73. leftRpc, err := leftNode.Client()
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. rightRpc, err := rightNode.Client()
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. var leftAddr string
  82. err = leftRpc.Call(&leftAddr, "pss_baseAddr")
  83. if err != nil {
  84. t.Fatal(err)
  85. }
  86. var rightAddr string
  87. err = rightRpc.Call(&rightAddr, "pss_baseAddr")
  88. if err != nil {
  89. t.Fatal(err)
  90. }
  91. var leftPub string
  92. err = leftRpc.Call(&leftPub, "pss_getPublicKey")
  93. if err != nil {
  94. t.Fatal(err)
  95. }
  96. var rightPub string
  97. err = rightRpc.Call(&rightPub, "pss_getPublicKey")
  98. if err != nil {
  99. t.Fatal(err)
  100. }
  101. rsrcName := "foo.eth"
  102. rsrcTopic := pss.BytesToTopic([]byte(rsrcName))
  103. // wait for kademlia table to populate
  104. time.Sleep(time.Second)
  105. ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
  106. defer cancel()
  107. rmsgC := make(chan *pss.APIMsg)
  108. rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic, false, false)
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. defer rightSub.Unsubscribe()
  113. updateC := make(chan []byte)
  114. var updateMsg []byte
  115. ctrlClient := NewController(psses[rightPub])
  116. ctrlNotifier := NewController(psses[leftPub])
  117. ctrlNotifier.NewNotifier("foo.eth", 2, updateC)
  118. pubkeybytes, err := hexutil.Decode(leftPub)
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. pubkey, err := crypto.UnmarshalPubkey(pubkeybytes)
  123. if err != nil {
  124. t.Fatal(err)
  125. }
  126. addrbytes, err := hexutil.Decode(leftAddr)
  127. if err != nil {
  128. t.Fatal(err)
  129. }
  130. copyOfUpdateMsg := make([]byte, len(updateMsg))
  131. copy(copyOfUpdateMsg, updateMsg)
  132. ctrlClientError := make(chan error, 1)
  133. ctrlClient.Subscribe(rsrcName, pubkey, addrbytes, func(s string, b []byte) error {
  134. if s != "foo.eth" || !bytes.Equal(copyOfUpdateMsg, b) {
  135. ctrlClientError <- fmt.Errorf("unexpected result in client handler: '%s':'%x'", s, b)
  136. } else {
  137. log.Info("client handler receive", "s", s, "b", b)
  138. }
  139. return nil
  140. })
  141. var inMsg *pss.APIMsg
  142. select {
  143. case inMsg = <-rmsgC:
  144. case err := <-ctrlClientError:
  145. t.Fatal(err)
  146. case <-ctx.Done():
  147. t.Fatal(ctx.Err())
  148. }
  149. dMsg, err := NewMsgFromPayload(inMsg.Msg)
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. if dMsg.namestring != rsrcName {
  154. t.Fatalf("expected name '%s', got '%s'", rsrcName, dMsg.namestring)
  155. }
  156. if !bytes.Equal(dMsg.Payload[:len(updateMsg)], updateMsg) {
  157. t.Fatalf("expected payload first %d bytes '%x', got '%x'", len(updateMsg), updateMsg, dMsg.Payload[:len(updateMsg)])
  158. }
  159. if len(updateMsg)+symKeyLength != len(dMsg.Payload) {
  160. t.Fatalf("expected payload length %d, have %d", len(updateMsg)+symKeyLength, len(dMsg.Payload))
  161. }
  162. rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic, false, false)
  163. if err != nil {
  164. t.Fatal(err)
  165. }
  166. defer rightSubUpdate.Unsubscribe()
  167. updateMsg = []byte("plugh")
  168. updateC <- updateMsg
  169. select {
  170. case inMsg = <-rmsgC:
  171. case <-ctx.Done():
  172. log.Error("timed out waiting for msg", "topic", fmt.Sprintf("%x", rsrcTopic))
  173. t.Fatal(ctx.Err())
  174. }
  175. dMsg, err = NewMsgFromPayload(inMsg.Msg)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. if dMsg.namestring != rsrcName {
  180. t.Fatalf("expected name %s, got %s", rsrcName, dMsg.namestring)
  181. }
  182. if !bytes.Equal(dMsg.Payload, updateMsg) {
  183. t.Fatalf("expected payload '%x', got '%x'", updateMsg, dMsg.Payload)
  184. }
  185. }
  186. func newServices(allowRaw bool) adapters.Services {
  187. stateStore := state.NewInmemoryStore()
  188. kademlias := make(map[enode.ID]*network.Kademlia)
  189. kademlia := func(id enode.ID) *network.Kademlia {
  190. if k, ok := kademlias[id]; ok {
  191. return k
  192. }
  193. params := network.NewKadParams()
  194. params.NeighbourhoodSize = 2
  195. params.MaxBinSize = 3
  196. params.MinBinSize = 1
  197. params.MaxRetries = 1000
  198. params.RetryExponent = 2
  199. params.RetryInterval = 1000000
  200. kademlias[id] = network.NewKademlia(id[:], params)
  201. return kademlias[id]
  202. }
  203. return adapters.Services{
  204. "pss": func(ctx *adapters.ServiceContext) (node.Service, error) {
  205. ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
  206. defer cancel()
  207. keys, err := wapi.NewKeyPair(ctxlocal)
  208. if err != nil {
  209. return nil, err
  210. }
  211. privkey, err := w.GetPrivateKey(keys)
  212. if err != nil {
  213. return nil, err
  214. }
  215. pssp := pss.NewPssParams().WithPrivateKey(privkey)
  216. pssp.MsgTTL = time.Second * 30
  217. pssp.AllowRaw = allowRaw
  218. pskad := kademlia(ctx.Config.ID)
  219. ps, err := pss.NewPss(pskad, pssp)
  220. if err != nil {
  221. return nil, err
  222. }
  223. //psses[common.ToHex(crypto.FromECDSAPub(&privkey.PublicKey))] = ps
  224. psses[hexutil.Encode(crypto.FromECDSAPub(&privkey.PublicKey))] = ps
  225. return ps, nil
  226. },
  227. "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
  228. addr := network.NewAddr(ctx.Config.Node())
  229. hp := network.NewHiveParams()
  230. hp.Discovery = false
  231. config := &network.BzzConfig{
  232. OverlayAddr: addr.Over(),
  233. UnderlayAddr: addr.Under(),
  234. HiveParams: hp,
  235. }
  236. return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil
  237. },
  238. }
  239. }