pss_test.go 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package pss
  17. import (
  18. "bytes"
  19. "context"
  20. "crypto/ecdsa"
  21. "encoding/binary"
  22. "encoding/hex"
  23. "encoding/json"
  24. "flag"
  25. "fmt"
  26. "io/ioutil"
  27. "math/rand"
  28. "os"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "testing"
  33. "time"
  34. "github.com/ethereum/go-ethereum/common"
  35. "github.com/ethereum/go-ethereum/common/hexutil"
  36. "github.com/ethereum/go-ethereum/crypto"
  37. "github.com/ethereum/go-ethereum/log"
  38. "github.com/ethereum/go-ethereum/metrics"
  39. "github.com/ethereum/go-ethereum/metrics/influxdb"
  40. "github.com/ethereum/go-ethereum/node"
  41. "github.com/ethereum/go-ethereum/p2p"
  42. "github.com/ethereum/go-ethereum/p2p/enode"
  43. "github.com/ethereum/go-ethereum/p2p/protocols"
  44. "github.com/ethereum/go-ethereum/p2p/simulations"
  45. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  46. "github.com/ethereum/go-ethereum/rpc"
  47. "github.com/ethereum/go-ethereum/swarm/network"
  48. "github.com/ethereum/go-ethereum/swarm/pot"
  49. "github.com/ethereum/go-ethereum/swarm/state"
  50. whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
  51. )
  52. var (
  53. initOnce = sync.Once{}
  54. loglevel = flag.Int("loglevel", 2, "logging verbosity")
  55. longrunning = flag.Bool("longrunning", false, "do run long-running tests")
  56. w *whisper.Whisper
  57. wapi *whisper.PublicWhisperAPI
  58. psslogmain log.Logger
  59. pssprotocols map[string]*protoCtrl
  60. useHandshake bool
  61. noopHandlerFunc = func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
  62. return nil
  63. }
  64. )
  65. func init() {
  66. flag.Parse()
  67. rand.Seed(time.Now().Unix())
  68. adapters.RegisterServices(newServices(false))
  69. initTest()
  70. }
  71. func initTest() {
  72. initOnce.Do(
  73. func() {
  74. psslogmain = log.New("psslog", "*")
  75. hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
  76. hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs)
  77. h := log.CallerFileHandler(hf)
  78. log.Root().SetHandler(h)
  79. w = whisper.New(&whisper.DefaultConfig)
  80. wapi = whisper.NewPublicWhisperAPI(w)
  81. pssprotocols = make(map[string]*protoCtrl)
  82. },
  83. )
  84. }
  85. // test that topic conversion functions give predictable results
  86. func TestTopic(t *testing.T) {
  87. api := &API{}
  88. topicstr := strings.Join([]string{PingProtocol.Name, strconv.Itoa(int(PingProtocol.Version))}, ":")
  89. // bytestotopic is the authoritative topic conversion source
  90. topicobj := BytesToTopic([]byte(topicstr))
  91. // string to topic and bytes to topic must match
  92. topicapiobj, _ := api.StringToTopic(topicstr)
  93. if topicobj != topicapiobj {
  94. t.Fatalf("bytes and string topic conversion mismatch; %s != %s", topicobj, topicapiobj)
  95. }
  96. // string representation of topichex
  97. topichex := topicobj.String()
  98. // protocoltopic wrapper on pingtopic should be same as topicstring
  99. // check that it matches
  100. pingtopichex := PingTopic.String()
  101. if topichex != pingtopichex {
  102. t.Fatalf("protocol topic conversion mismatch; %s != %s", topichex, pingtopichex)
  103. }
  104. // json marshal of topic
  105. topicjsonout, err := topicobj.MarshalJSON()
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. if string(topicjsonout)[1:len(topicjsonout)-1] != topichex {
  110. t.Fatalf("topic json marshal mismatch; %s != \"%s\"", topicjsonout, topichex)
  111. }
  112. // json unmarshal of topic
  113. var topicjsonin Topic
  114. topicjsonin.UnmarshalJSON(topicjsonout)
  115. if topicjsonin != topicobj {
  116. t.Fatalf("topic json unmarshal mismatch: %x != %x", topicjsonin, topicobj)
  117. }
  118. }
  119. // test bit packing of message control flags
  120. func TestMsgParams(t *testing.T) {
  121. var ctrl byte
  122. ctrl |= pssControlRaw
  123. p := newMsgParamsFromBytes([]byte{ctrl})
  124. m := newPssMsg(p)
  125. if !m.isRaw() || m.isSym() {
  126. t.Fatal("expected raw=true and sym=false")
  127. }
  128. ctrl |= pssControlSym
  129. p = newMsgParamsFromBytes([]byte{ctrl})
  130. m = newPssMsg(p)
  131. if !m.isRaw() || !m.isSym() {
  132. t.Fatal("expected raw=true and sym=true")
  133. }
  134. ctrl &= 0xff &^ pssControlRaw
  135. p = newMsgParamsFromBytes([]byte{ctrl})
  136. m = newPssMsg(p)
  137. if m.isRaw() || !m.isSym() {
  138. t.Fatal("expected raw=false and sym=true")
  139. }
  140. }
  141. // test if we can insert into cache, match items with cache and cache expiry
  142. func TestCache(t *testing.T) {
  143. var err error
  144. to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f")
  145. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  146. defer cancel()
  147. keys, err := wapi.NewKeyPair(ctx)
  148. privkey, err := w.GetPrivateKey(keys)
  149. if err != nil {
  150. t.Fatal(err)
  151. }
  152. ps := newTestPss(privkey, nil, nil)
  153. defer ps.Stop()
  154. pp := NewPssParams().WithPrivateKey(privkey)
  155. data := []byte("foo")
  156. datatwo := []byte("bar")
  157. datathree := []byte("baz")
  158. wparams := &whisper.MessageParams{
  159. TTL: defaultWhisperTTL,
  160. Src: privkey,
  161. Dst: &privkey.PublicKey,
  162. Topic: whisper.TopicType(PingTopic),
  163. WorkTime: defaultWhisperWorkTime,
  164. PoW: defaultWhisperPoW,
  165. Payload: data,
  166. }
  167. woutmsg, err := whisper.NewSentMessage(wparams)
  168. env, err := woutmsg.Wrap(wparams)
  169. msg := &PssMsg{
  170. Payload: env,
  171. To: to,
  172. }
  173. wparams.Payload = datatwo
  174. woutmsg, err = whisper.NewSentMessage(wparams)
  175. envtwo, err := woutmsg.Wrap(wparams)
  176. msgtwo := &PssMsg{
  177. Payload: envtwo,
  178. To: to,
  179. }
  180. wparams.Payload = datathree
  181. woutmsg, err = whisper.NewSentMessage(wparams)
  182. envthree, err := woutmsg.Wrap(wparams)
  183. msgthree := &PssMsg{
  184. Payload: envthree,
  185. To: to,
  186. }
  187. digest := ps.digest(msg)
  188. if err != nil {
  189. t.Fatalf("could not store cache msgone: %v", err)
  190. }
  191. digesttwo := ps.digest(msgtwo)
  192. if err != nil {
  193. t.Fatalf("could not store cache msgtwo: %v", err)
  194. }
  195. digestthree := ps.digest(msgthree)
  196. if err != nil {
  197. t.Fatalf("could not store cache msgthree: %v", err)
  198. }
  199. if digest == digesttwo {
  200. t.Fatalf("different msgs return same hash: %d", digesttwo)
  201. }
  202. // check the cache
  203. err = ps.addFwdCache(msg)
  204. if err != nil {
  205. t.Fatalf("write to pss expire cache failed: %v", err)
  206. }
  207. if !ps.checkFwdCache(msg) {
  208. t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg)
  209. }
  210. if ps.checkFwdCache(msgtwo) {
  211. t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo)
  212. }
  213. time.Sleep(pp.CacheTTL + 1*time.Second)
  214. err = ps.addFwdCache(msgthree)
  215. if err != nil {
  216. t.Fatalf("write to pss expire cache failed: %v", err)
  217. }
  218. if ps.checkFwdCache(msg) {
  219. t.Fatalf("message %v should have expired from cache but checkCache returned true", msg)
  220. }
  221. if _, ok := ps.fwdCache[digestthree]; !ok {
  222. t.Fatalf("unexpired message should be in the cache: %v", digestthree)
  223. }
  224. if _, ok := ps.fwdCache[digesttwo]; ok {
  225. t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo)
  226. }
  227. }
  228. // matching of address hints; whether a message could be or is for the node
  229. func TestAddressMatch(t *testing.T) {
  230. localaddr := network.RandomAddr().Over()
  231. copy(localaddr[:8], []byte("deadbeef"))
  232. remoteaddr := []byte("feedbeef")
  233. kadparams := network.NewKadParams()
  234. kad := network.NewKademlia(localaddr, kadparams)
  235. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  236. defer cancel()
  237. keys, err := wapi.NewKeyPair(ctx)
  238. if err != nil {
  239. t.Fatalf("Could not generate private key: %v", err)
  240. }
  241. privkey, err := w.GetPrivateKey(keys)
  242. pssp := NewPssParams().WithPrivateKey(privkey)
  243. ps, err := NewPss(kad, pssp)
  244. if err != nil {
  245. t.Fatal(err.Error())
  246. }
  247. pssmsg := &PssMsg{
  248. To: remoteaddr,
  249. }
  250. // differ from first byte
  251. if ps.isSelfRecipient(pssmsg) {
  252. t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
  253. }
  254. if ps.isSelfPossibleRecipient(pssmsg, false) {
  255. t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8])
  256. }
  257. // 8 first bytes same
  258. copy(remoteaddr[:4], localaddr[:4])
  259. if ps.isSelfRecipient(pssmsg) {
  260. t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
  261. }
  262. if !ps.isSelfPossibleRecipient(pssmsg, false) {
  263. t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
  264. }
  265. // all bytes same
  266. pssmsg.To = localaddr
  267. if !ps.isSelfRecipient(pssmsg) {
  268. t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr)
  269. }
  270. if !ps.isSelfPossibleRecipient(pssmsg, false) {
  271. t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
  272. }
  273. }
  274. // test that message is handled by sender if a prox handler exists and sender is in prox of message
  275. func TestProxShortCircuit(t *testing.T) {
  276. // sender node address
  277. localAddr := network.RandomAddr().Over()
  278. localPotAddr := pot.NewAddressFromBytes(localAddr)
  279. // set up kademlia
  280. kadParams := network.NewKadParams()
  281. kad := network.NewKademlia(localAddr, kadParams)
  282. peerCount := kad.MinBinSize + 1
  283. // set up pss
  284. privKey, err := crypto.GenerateKey()
  285. pssp := NewPssParams().WithPrivateKey(privKey)
  286. ps, err := NewPss(kad, pssp)
  287. if err != nil {
  288. t.Fatal(err.Error())
  289. }
  290. // create kademlia peers, so we have peers both inside and outside minproxlimit
  291. var peers []*network.Peer
  292. proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes()
  293. distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes()
  294. for i := 0; i < peerCount; i++ {
  295. rw := &p2p.MsgPipeRW{}
  296. ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{})
  297. protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{})
  298. peerAddr := pot.RandomAddressAt(localPotAddr, i)
  299. bzzPeer := &network.BzzPeer{
  300. Peer: protoPeer,
  301. BzzAddr: &network.BzzAddr{
  302. OAddr: peerAddr.Bytes(),
  303. UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])),
  304. },
  305. }
  306. peer := network.NewPeer(bzzPeer, kad)
  307. kad.On(peer)
  308. peers = append(peers, peer)
  309. }
  310. // register it marking prox capability
  311. delivered := make(chan struct{})
  312. rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
  313. log.Trace("in allowraw handler")
  314. delivered <- struct{}{}
  315. return nil
  316. }
  317. topic := BytesToTopic([]byte{0x2a})
  318. hndlrProxDereg := ps.Register(&topic, &handler{
  319. f: rawHandlerFunc,
  320. caps: &handlerCaps{
  321. raw: true,
  322. prox: true,
  323. },
  324. })
  325. defer hndlrProxDereg()
  326. // send message too far away for sender to be in prox
  327. // reception of this message should time out
  328. errC := make(chan error)
  329. go func() {
  330. err := ps.SendRaw(distantMessageAddress, topic, []byte("foo"))
  331. if err != nil {
  332. errC <- err
  333. }
  334. }()
  335. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  336. defer cancel()
  337. select {
  338. case <-delivered:
  339. t.Fatal("raw distant message delivered")
  340. case err := <-errC:
  341. t.Fatal(err)
  342. case <-ctx.Done():
  343. }
  344. // send message that should be within sender prox
  345. // this message should be delivered
  346. go func() {
  347. err := ps.SendRaw(proxMessageAddress, topic, []byte("bar"))
  348. if err != nil {
  349. errC <- err
  350. }
  351. }()
  352. ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
  353. defer cancel()
  354. select {
  355. case <-delivered:
  356. case err := <-errC:
  357. t.Fatal(err)
  358. case <-ctx.Done():
  359. t.Fatal("raw timeout")
  360. }
  361. // try the same prox message with sym and asym send
  362. proxAddrPss := PssAddress(proxMessageAddress)
  363. symKeyId, err := ps.GenerateSymmetricKey(topic, proxAddrPss, true)
  364. go func() {
  365. err := ps.SendSym(symKeyId, topic, []byte("baz"))
  366. if err != nil {
  367. errC <- err
  368. }
  369. }()
  370. ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
  371. defer cancel()
  372. select {
  373. case <-delivered:
  374. case err := <-errC:
  375. t.Fatal(err)
  376. case <-ctx.Done():
  377. t.Fatal("sym timeout")
  378. }
  379. err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, proxAddrPss)
  380. if err != nil {
  381. t.Fatal(err)
  382. }
  383. pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey))
  384. go func() {
  385. err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy"))
  386. if err != nil {
  387. errC <- err
  388. }
  389. }()
  390. ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
  391. defer cancel()
  392. select {
  393. case <-delivered:
  394. case err := <-errC:
  395. t.Fatal(err)
  396. case <-ctx.Done():
  397. t.Fatal("asym timeout")
  398. }
  399. }
  400. // verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it
  401. // note that in these tests we use the raw capability on handlers for convenience
  402. func TestAddressMatchProx(t *testing.T) {
  403. // recipient node address
  404. localAddr := network.RandomAddr().Over()
  405. localPotAddr := pot.NewAddressFromBytes(localAddr)
  406. // set up kademlia
  407. kadparams := network.NewKadParams()
  408. kad := network.NewKademlia(localAddr, kadparams)
  409. nnPeerCount := kad.MinBinSize
  410. peerCount := nnPeerCount + 2
  411. // set up pss
  412. privKey, err := crypto.GenerateKey()
  413. pssp := NewPssParams().WithPrivateKey(privKey)
  414. ps, err := NewPss(kad, pssp)
  415. if err != nil {
  416. t.Fatal(err.Error())
  417. }
  418. // create kademlia peers, so we have peers both inside and outside minproxlimit
  419. var peers []*network.Peer
  420. for i := 0; i < peerCount; i++ {
  421. rw := &p2p.MsgPipeRW{}
  422. ptpPeer := p2p.NewPeer(enode.ID{}, "362436 call me anytime", []p2p.Cap{})
  423. protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{})
  424. peerAddr := pot.RandomAddressAt(localPotAddr, i)
  425. bzzPeer := &network.BzzPeer{
  426. Peer: protoPeer,
  427. BzzAddr: &network.BzzAddr{
  428. OAddr: peerAddr.Bytes(),
  429. UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])),
  430. },
  431. }
  432. peer := network.NewPeer(bzzPeer, kad)
  433. kad.On(peer)
  434. peers = append(peers, peer)
  435. }
  436. // TODO: create a test in the network package to make a table with n peers where n-m are proxpeers
  437. // meanwhile test regression for kademlia since we are compiling the test parameters from different packages
  438. var proxes int
  439. var conns int
  440. depth := kad.NeighbourhoodDepth()
  441. kad.EachConn(nil, peerCount, func(p *network.Peer, po int) bool {
  442. conns++
  443. if po >= depth {
  444. proxes++
  445. }
  446. return true
  447. })
  448. if proxes != nnPeerCount {
  449. t.Fatalf("expected %d proxpeers, have %d", nnPeerCount, proxes)
  450. } else if conns != peerCount {
  451. t.Fatalf("expected %d peers total, have %d", peerCount, proxes)
  452. }
  453. // remote address distances from localAddr to try and the expected outcomes if we use prox handler
  454. remoteDistances := []int{
  455. 255,
  456. nnPeerCount + 1,
  457. nnPeerCount,
  458. nnPeerCount - 1,
  459. 0,
  460. }
  461. expects := []bool{
  462. true,
  463. true,
  464. true,
  465. false,
  466. false,
  467. }
  468. // first the unit test on the method that calculates possible receipient using prox
  469. for i, distance := range remoteDistances {
  470. pssMsg := newPssMsg(&msgParams{})
  471. pssMsg.To = make([]byte, len(localAddr))
  472. copy(pssMsg.To, localAddr)
  473. var byteIdx = distance / 8
  474. pssMsg.To[byteIdx] ^= 1 << uint(7-(distance%8))
  475. log.Trace(fmt.Sprintf("addrmatch %v", bytes.Equal(pssMsg.To, localAddr)))
  476. if ps.isSelfPossibleRecipient(pssMsg, true) != expects[i] {
  477. t.Fatalf("expected distance %d to be %v", distance, expects[i])
  478. }
  479. }
  480. // we move up to higher level and test the actual message handler
  481. // for each distance check if we are possible recipient when prox variant is used is set
  482. // this handler will increment a counter for every message that gets passed to the handler
  483. var receives int
  484. rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
  485. log.Trace("in allowraw handler")
  486. receives++
  487. return nil
  488. }
  489. // register it marking prox capability
  490. topic := BytesToTopic([]byte{0x2a})
  491. hndlrProxDereg := ps.Register(&topic, &handler{
  492. f: rawHandlerFunc,
  493. caps: &handlerCaps{
  494. raw: true,
  495. prox: true,
  496. },
  497. })
  498. // test the distances
  499. var prevReceive int
  500. for i, distance := range remoteDistances {
  501. remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
  502. remoteAddr := remotePotAddr.Bytes()
  503. var data [32]byte
  504. rand.Read(data[:])
  505. pssMsg := newPssMsg(&msgParams{raw: true})
  506. pssMsg.To = remoteAddr
  507. pssMsg.Expire = uint32(time.Now().Unix() + 4200)
  508. pssMsg.Payload = &whisper.Envelope{
  509. Topic: whisper.TopicType(topic),
  510. Data: data[:],
  511. }
  512. log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr)
  513. ps.handlePssMsg(context.TODO(), pssMsg)
  514. if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) {
  515. t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i])
  516. }
  517. prevReceive = receives
  518. }
  519. // now add a non prox-capable handler and test
  520. ps.Register(&topic, &handler{
  521. f: rawHandlerFunc,
  522. caps: &handlerCaps{
  523. raw: true,
  524. },
  525. })
  526. receives = 0
  527. prevReceive = 0
  528. for i, distance := range remoteDistances {
  529. remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
  530. remoteAddr := remotePotAddr.Bytes()
  531. var data [32]byte
  532. rand.Read(data[:])
  533. pssMsg := newPssMsg(&msgParams{raw: true})
  534. pssMsg.To = remoteAddr
  535. pssMsg.Expire = uint32(time.Now().Unix() + 4200)
  536. pssMsg.Payload = &whisper.Envelope{
  537. Topic: whisper.TopicType(topic),
  538. Data: data[:],
  539. }
  540. log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr)
  541. ps.handlePssMsg(context.TODO(), pssMsg)
  542. if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) {
  543. t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i])
  544. }
  545. prevReceive = receives
  546. }
  547. // now deregister the prox capable handler, now none of the messages will be handled
  548. hndlrProxDereg()
  549. receives = 0
  550. for _, distance := range remoteDistances {
  551. remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
  552. remoteAddr := remotePotAddr.Bytes()
  553. pssMsg := newPssMsg(&msgParams{raw: true})
  554. pssMsg.To = remoteAddr
  555. pssMsg.Expire = uint32(time.Now().Unix() + 4200)
  556. pssMsg.Payload = &whisper.Envelope{
  557. Topic: whisper.TopicType(topic),
  558. Data: []byte(remotePotAddr.String()),
  559. }
  560. log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr)
  561. ps.handlePssMsg(context.TODO(), pssMsg)
  562. if receives != 0 {
  563. t.Fatalf("expected distance %d to not be recipient when prox is not set for handler", distance)
  564. }
  565. }
  566. }
  567. // verify that message queueing happens when it should, and that expired and corrupt messages are dropped
  568. func TestMessageProcessing(t *testing.T) {
  569. t.Skip("Disabled due to probable faulty logic for outbox expectations")
  570. // setup
  571. privkey, err := crypto.GenerateKey()
  572. if err != nil {
  573. t.Fatal(err.Error())
  574. }
  575. addr := make([]byte, 32)
  576. addr[0] = 0x01
  577. ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams())
  578. defer ps.Stop()
  579. // message should pass
  580. msg := newPssMsg(&msgParams{})
  581. msg.To = addr
  582. msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
  583. msg.Payload = &whisper.Envelope{
  584. Topic: [4]byte{},
  585. Data: []byte{0x66, 0x6f, 0x6f},
  586. }
  587. if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
  588. t.Fatal(err.Error())
  589. }
  590. tmr := time.NewTimer(time.Millisecond * 100)
  591. var outmsg *PssMsg
  592. select {
  593. case outmsg = <-ps.outbox:
  594. case <-tmr.C:
  595. default:
  596. }
  597. if outmsg != nil {
  598. t.Fatalf("expected outbox empty after full address on msg, but had message %s", msg)
  599. }
  600. // message should pass and queue due to partial length
  601. msg.To = addr[0:1]
  602. msg.Payload.Data = []byte{0x78, 0x79, 0x80, 0x80, 0x79}
  603. if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
  604. t.Fatal(err.Error())
  605. }
  606. tmr.Reset(time.Millisecond * 100)
  607. outmsg = nil
  608. select {
  609. case outmsg = <-ps.outbox:
  610. case <-tmr.C:
  611. }
  612. if outmsg == nil {
  613. t.Fatal("expected message in outbox on encrypt fail, but empty")
  614. }
  615. outmsg = nil
  616. select {
  617. case outmsg = <-ps.outbox:
  618. default:
  619. }
  620. if outmsg != nil {
  621. t.Fatalf("expected only one queued message but also had message %v", msg)
  622. }
  623. // full address mismatch should put message in queue
  624. msg.To[0] = 0xff
  625. if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
  626. t.Fatal(err.Error())
  627. }
  628. tmr.Reset(time.Millisecond * 10)
  629. outmsg = nil
  630. select {
  631. case outmsg = <-ps.outbox:
  632. case <-tmr.C:
  633. }
  634. if outmsg == nil {
  635. t.Fatal("expected message in outbox on address mismatch, but empty")
  636. }
  637. outmsg = nil
  638. select {
  639. case outmsg = <-ps.outbox:
  640. default:
  641. }
  642. if outmsg != nil {
  643. t.Fatalf("expected only one queued message but also had message %v", msg)
  644. }
  645. // expired message should be dropped
  646. msg.Expire = uint32(time.Now().Add(-time.Second).Unix())
  647. if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
  648. t.Fatal(err.Error())
  649. }
  650. tmr.Reset(time.Millisecond * 10)
  651. outmsg = nil
  652. select {
  653. case outmsg = <-ps.outbox:
  654. case <-tmr.C:
  655. default:
  656. }
  657. if outmsg != nil {
  658. t.Fatalf("expected empty queue but have message %v", msg)
  659. }
  660. // invalid message should return error
  661. fckedupmsg := &struct {
  662. pssMsg *PssMsg
  663. }{
  664. pssMsg: &PssMsg{},
  665. }
  666. if err := ps.handlePssMsg(context.TODO(), fckedupmsg); err == nil {
  667. t.Fatalf("expected error from processMsg but error nil")
  668. }
  669. // outbox full should return error
  670. msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
  671. for i := 0; i < defaultOutboxCapacity; i++ {
  672. ps.outbox <- msg
  673. }
  674. msg.Payload.Data = []byte{0x62, 0x61, 0x72}
  675. err = ps.handlePssMsg(context.TODO(), msg)
  676. if err == nil {
  677. t.Fatal("expected error when mailbox full, but was nil")
  678. }
  679. }
  680. // set and generate pubkeys and symkeys
  681. func TestKeys(t *testing.T) {
  682. // make our key and init pss with it
  683. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  684. defer cancel()
  685. ourkeys, err := wapi.NewKeyPair(ctx)
  686. if err != nil {
  687. t.Fatalf("create 'our' key fail")
  688. }
  689. ctx, cancel2 := context.WithTimeout(context.Background(), time.Second)
  690. defer cancel2()
  691. theirkeys, err := wapi.NewKeyPair(ctx)
  692. if err != nil {
  693. t.Fatalf("create 'their' key fail")
  694. }
  695. ourprivkey, err := w.GetPrivateKey(ourkeys)
  696. if err != nil {
  697. t.Fatalf("failed to retrieve 'our' private key")
  698. }
  699. theirprivkey, err := w.GetPrivateKey(theirkeys)
  700. if err != nil {
  701. t.Fatalf("failed to retrieve 'their' private key")
  702. }
  703. ps := newTestPss(ourprivkey, nil, nil)
  704. defer ps.Stop()
  705. // set up peer with mock address, mapped to mocked publicaddress and with mocked symkey
  706. addr := make(PssAddress, 32)
  707. copy(addr, network.RandomAddr().Over())
  708. outkey := network.RandomAddr().Over()
  709. topicobj := BytesToTopic([]byte("foo:42"))
  710. ps.SetPeerPublicKey(&theirprivkey.PublicKey, topicobj, addr)
  711. outkeyid, err := ps.SetSymmetricKey(outkey, topicobj, addr, false)
  712. if err != nil {
  713. t.Fatalf("failed to set 'our' outgoing symmetric key")
  714. }
  715. // make a symmetric key that we will send to peer for encrypting messages to us
  716. inkeyid, err := ps.GenerateSymmetricKey(topicobj, addr, true)
  717. if err != nil {
  718. t.Fatalf("failed to set 'our' incoming symmetric key")
  719. }
  720. // get the key back from whisper, check that it's still the same
  721. outkeyback, err := ps.w.GetSymKey(outkeyid)
  722. if err != nil {
  723. t.Fatalf(err.Error())
  724. }
  725. inkey, err := ps.w.GetSymKey(inkeyid)
  726. if err != nil {
  727. t.Fatalf(err.Error())
  728. }
  729. if !bytes.Equal(outkeyback, outkey) {
  730. t.Fatalf("passed outgoing symkey doesnt equal stored: %x / %x", outkey, outkeyback)
  731. }
  732. t.Logf("symout: %v", outkeyback)
  733. t.Logf("symin: %v", inkey)
  734. // check that the key is stored in the peerpool
  735. psp := ps.symKeyPool[inkeyid][topicobj]
  736. if !bytes.Equal(psp.address, addr) {
  737. t.Fatalf("inkey address does not match; %p != %p", psp.address, addr)
  738. }
  739. }
  740. // check that we can retrieve previously added public key entires per topic and peer
  741. func TestGetPublickeyEntries(t *testing.T) {
  742. privkey, err := crypto.GenerateKey()
  743. if err != nil {
  744. t.Fatal(err)
  745. }
  746. ps := newTestPss(privkey, nil, nil)
  747. defer ps.Stop()
  748. peeraddr := network.RandomAddr().Over()
  749. topicaddr := make(map[Topic]PssAddress)
  750. topicaddr[Topic{0x13}] = peeraddr
  751. topicaddr[Topic{0x2a}] = peeraddr[:16]
  752. topicaddr[Topic{0x02, 0x9a}] = []byte{}
  753. remoteprivkey, err := crypto.GenerateKey()
  754. if err != nil {
  755. t.Fatal(err)
  756. }
  757. remotepubkeybytes := crypto.FromECDSAPub(&remoteprivkey.PublicKey)
  758. remotepubkeyhex := common.ToHex(remotepubkeybytes)
  759. pssapi := NewAPI(ps)
  760. for to, a := range topicaddr {
  761. err = pssapi.SetPeerPublicKey(remotepubkeybytes, to, a)
  762. if err != nil {
  763. t.Fatal(err)
  764. }
  765. }
  766. intopic, err := pssapi.GetPeerTopics(remotepubkeyhex)
  767. if err != nil {
  768. t.Fatal(err)
  769. }
  770. OUTER:
  771. for _, tnew := range intopic {
  772. for torig, addr := range topicaddr {
  773. if bytes.Equal(torig[:], tnew[:]) {
  774. inaddr, err := pssapi.GetPeerAddress(remotepubkeyhex, torig)
  775. if err != nil {
  776. t.Fatal(err)
  777. }
  778. if !bytes.Equal(addr, inaddr) {
  779. t.Fatalf("Address mismatch for topic %x; got %x, expected %x", torig, inaddr, addr)
  780. }
  781. delete(topicaddr, torig)
  782. continue OUTER
  783. }
  784. }
  785. t.Fatalf("received topic %x did not match any existing topics", tnew)
  786. }
  787. if len(topicaddr) != 0 {
  788. t.Fatalf("%d topics were not matched", len(topicaddr))
  789. }
  790. }
  791. // forwarding should skip peers that do not have matching pss capabilities
  792. func TestPeerCapabilityMismatch(t *testing.T) {
  793. // create privkey for forwarder node
  794. privkey, err := crypto.GenerateKey()
  795. if err != nil {
  796. t.Fatal(err)
  797. }
  798. // initialize kad
  799. baseaddr := network.RandomAddr()
  800. kad := network.NewKademlia((baseaddr).Over(), network.NewKadParams())
  801. rw := &p2p.MsgPipeRW{}
  802. // one peer has a mismatching version of pss
  803. wrongpssaddr := network.RandomAddr()
  804. wrongpsscap := p2p.Cap{
  805. Name: pssProtocolName,
  806. Version: 0,
  807. }
  808. nid := enode.ID{0x01}
  809. wrongpsspeer := network.NewPeer(&network.BzzPeer{
  810. Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(wrongpssaddr.Over()), []p2p.Cap{wrongpsscap}), rw, nil),
  811. BzzAddr: &network.BzzAddr{OAddr: wrongpssaddr.Over(), UAddr: nil},
  812. }, kad)
  813. // one peer doesn't even have pss (boo!)
  814. nopssaddr := network.RandomAddr()
  815. nopsscap := p2p.Cap{
  816. Name: "nopss",
  817. Version: 1,
  818. }
  819. nid = enode.ID{0x02}
  820. nopsspeer := network.NewPeer(&network.BzzPeer{
  821. Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(nopssaddr.Over()), []p2p.Cap{nopsscap}), rw, nil),
  822. BzzAddr: &network.BzzAddr{OAddr: nopssaddr.Over(), UAddr: nil},
  823. }, kad)
  824. // add peers to kademlia and activate them
  825. // it's safe so don't check errors
  826. kad.Register(wrongpsspeer.BzzAddr)
  827. kad.On(wrongpsspeer)
  828. kad.Register(nopsspeer.BzzAddr)
  829. kad.On(nopsspeer)
  830. // create pss
  831. pssmsg := &PssMsg{
  832. To: []byte{},
  833. Expire: uint32(time.Now().Add(time.Second).Unix()),
  834. Payload: &whisper.Envelope{},
  835. }
  836. ps := newTestPss(privkey, kad, nil)
  837. defer ps.Stop()
  838. // run the forward
  839. // it is enough that it completes; trying to send to incapable peers would create segfault
  840. ps.forward(pssmsg)
  841. }
  842. // verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed
  843. func TestRawAllow(t *testing.T) {
  844. // set up pss like so many times before
  845. privKey, err := crypto.GenerateKey()
  846. if err != nil {
  847. t.Fatal(err)
  848. }
  849. baseAddr := network.RandomAddr()
  850. kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams())
  851. ps := newTestPss(privKey, kad, nil)
  852. defer ps.Stop()
  853. topic := BytesToTopic([]byte{0x2a})
  854. // create handler innards that increments every time a message hits it
  855. var receives int
  856. rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
  857. log.Trace("in allowraw handler")
  858. receives++
  859. return nil
  860. }
  861. // wrap this handler function with a handler without raw capability and register it
  862. hndlrNoRaw := &handler{
  863. f: rawHandlerFunc,
  864. }
  865. ps.Register(&topic, hndlrNoRaw)
  866. // test it with a raw message, should be poo-poo
  867. pssMsg := newPssMsg(&msgParams{
  868. raw: true,
  869. })
  870. pssMsg.To = baseAddr.OAddr
  871. pssMsg.Expire = uint32(time.Now().Unix() + 4200)
  872. pssMsg.Payload = &whisper.Envelope{
  873. Topic: whisper.TopicType(topic),
  874. }
  875. ps.handlePssMsg(context.TODO(), pssMsg)
  876. if receives > 0 {
  877. t.Fatalf("Expected handler not to be executed with raw cap off")
  878. }
  879. // now wrap the same handler function with raw capabilities and register it
  880. hndlrRaw := &handler{
  881. f: rawHandlerFunc,
  882. caps: &handlerCaps{
  883. raw: true,
  884. },
  885. }
  886. deregRawHandler := ps.Register(&topic, hndlrRaw)
  887. // should work now
  888. pssMsg.Payload.Data = []byte("Raw Deal")
  889. ps.handlePssMsg(context.TODO(), pssMsg)
  890. if receives == 0 {
  891. t.Fatalf("Expected handler to be executed with raw cap on")
  892. }
  893. // now deregister the raw capable handler
  894. prevReceives := receives
  895. deregRawHandler()
  896. // check that raw messages fail again
  897. pssMsg.Payload.Data = []byte("Raw Trump")
  898. ps.handlePssMsg(context.TODO(), pssMsg)
  899. if receives != prevReceives {
  900. t.Fatalf("Expected handler not to be executed when raw handler is retracted")
  901. }
  902. }
  903. // BELOW HERE ARE TESTS USING THE SIMULATION FRAMEWORK
  904. // tests that the API layer can handle edge case values
  905. func TestApi(t *testing.T) {
  906. clients, err := setupNetwork(2, true)
  907. if err != nil {
  908. t.Fatal(err)
  909. }
  910. topic := "0xdeadbeef"
  911. err = clients[0].Call(nil, "pss_sendRaw", "0x", topic, "0x666f6f")
  912. if err != nil {
  913. t.Fatal(err)
  914. }
  915. err = clients[0].Call(nil, "pss_sendRaw", "0xabcdef", topic, "0x")
  916. if err == nil {
  917. t.Fatal("expected error on empty msg")
  918. }
  919. overflowAddr := [33]byte{}
  920. err = clients[0].Call(nil, "pss_sendRaw", hexutil.Encode(overflowAddr[:]), topic, "0x666f6f")
  921. if err == nil {
  922. t.Fatal("expected error on send too big address")
  923. }
  924. }
  925. // verifies that nodes can send and receive raw (verbatim) messages
  926. func TestSendRaw(t *testing.T) {
  927. t.Run("32", testSendRaw)
  928. t.Run("8", testSendRaw)
  929. t.Run("0", testSendRaw)
  930. }
  931. func testSendRaw(t *testing.T) {
  932. var addrsize int64
  933. var err error
  934. paramstring := strings.Split(t.Name(), "/")
  935. addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
  936. log.Info("raw send test", "addrsize", addrsize)
  937. clients, err := setupNetwork(2, true)
  938. if err != nil {
  939. t.Fatal(err)
  940. }
  941. topic := "0xdeadbeef"
  942. var loaddrhex string
  943. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  944. if err != nil {
  945. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  946. }
  947. loaddrhex = loaddrhex[:2+(addrsize*2)]
  948. var roaddrhex string
  949. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  950. if err != nil {
  951. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  952. }
  953. roaddrhex = roaddrhex[:2+(addrsize*2)]
  954. time.Sleep(time.Millisecond * 500)
  955. // at this point we've verified that symkeys are saved and match on each peer
  956. // now try sending symmetrically encrypted message, both directions
  957. lmsgC := make(chan APIMsg)
  958. lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
  959. defer lcancel()
  960. lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true, false)
  961. log.Trace("lsub", "id", lsub)
  962. defer lsub.Unsubscribe()
  963. rmsgC := make(chan APIMsg)
  964. rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
  965. defer rcancel()
  966. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true, false)
  967. log.Trace("rsub", "id", rsub)
  968. defer rsub.Unsubscribe()
  969. // send and verify delivery
  970. lmsg := []byte("plugh")
  971. err = clients[1].Call(nil, "pss_sendRaw", loaddrhex, topic, hexutil.Encode(lmsg))
  972. if err != nil {
  973. t.Fatal(err)
  974. }
  975. select {
  976. case recvmsg := <-lmsgC:
  977. if !bytes.Equal(recvmsg.Msg, lmsg) {
  978. t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
  979. }
  980. case cerr := <-lctx.Done():
  981. t.Fatalf("test message (left) timed out: %v", cerr)
  982. }
  983. rmsg := []byte("xyzzy")
  984. err = clients[0].Call(nil, "pss_sendRaw", roaddrhex, topic, hexutil.Encode(rmsg))
  985. if err != nil {
  986. t.Fatal(err)
  987. }
  988. select {
  989. case recvmsg := <-rmsgC:
  990. if !bytes.Equal(recvmsg.Msg, rmsg) {
  991. t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
  992. }
  993. case cerr := <-rctx.Done():
  994. t.Fatalf("test message (right) timed out: %v", cerr)
  995. }
  996. }
  997. // send symmetrically encrypted message between two directly connected peers
  998. func TestSendSym(t *testing.T) {
  999. t.Run("32", testSendSym)
  1000. t.Run("8", testSendSym)
  1001. t.Run("0", testSendSym)
  1002. }
  1003. func testSendSym(t *testing.T) {
  1004. // address hint size
  1005. var addrsize int64
  1006. var err error
  1007. paramstring := strings.Split(t.Name(), "/")
  1008. addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
  1009. log.Info("sym send test", "addrsize", addrsize)
  1010. clients, err := setupNetwork(2, false)
  1011. if err != nil {
  1012. t.Fatal(err)
  1013. }
  1014. var topic string
  1015. err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
  1016. if err != nil {
  1017. t.Fatal(err)
  1018. }
  1019. var loaddrhex string
  1020. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  1021. if err != nil {
  1022. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  1023. }
  1024. loaddrhex = loaddrhex[:2+(addrsize*2)]
  1025. var roaddrhex string
  1026. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  1027. if err != nil {
  1028. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  1029. }
  1030. roaddrhex = roaddrhex[:2+(addrsize*2)]
  1031. // retrieve public key from pss instance
  1032. // set this public key reciprocally
  1033. var lpubkeyhex string
  1034. err = clients[0].Call(&lpubkeyhex, "pss_getPublicKey")
  1035. if err != nil {
  1036. t.Fatalf("rpc get node 1 pubkey fail: %v", err)
  1037. }
  1038. var rpubkeyhex string
  1039. err = clients[1].Call(&rpubkeyhex, "pss_getPublicKey")
  1040. if err != nil {
  1041. t.Fatalf("rpc get node 2 pubkey fail: %v", err)
  1042. }
  1043. time.Sleep(time.Millisecond * 500)
  1044. // at this point we've verified that symkeys are saved and match on each peer
  1045. // now try sending symmetrically encrypted message, both directions
  1046. lmsgC := make(chan APIMsg)
  1047. lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
  1048. defer lcancel()
  1049. lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
  1050. log.Trace("lsub", "id", lsub)
  1051. defer lsub.Unsubscribe()
  1052. rmsgC := make(chan APIMsg)
  1053. rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
  1054. defer rcancel()
  1055. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
  1056. log.Trace("rsub", "id", rsub)
  1057. defer rsub.Unsubscribe()
  1058. lrecvkey := network.RandomAddr().Over()
  1059. rrecvkey := network.RandomAddr().Over()
  1060. var lkeyids [2]string
  1061. var rkeyids [2]string
  1062. // manually set reciprocal symkeys
  1063. err = clients[0].Call(&lkeyids, "psstest_setSymKeys", rpubkeyhex, lrecvkey, rrecvkey, defaultSymKeySendLimit, topic, roaddrhex)
  1064. if err != nil {
  1065. t.Fatal(err)
  1066. }
  1067. err = clients[1].Call(&rkeyids, "psstest_setSymKeys", lpubkeyhex, rrecvkey, lrecvkey, defaultSymKeySendLimit, topic, loaddrhex)
  1068. if err != nil {
  1069. t.Fatal(err)
  1070. }
  1071. // send and verify delivery
  1072. lmsg := []byte("plugh")
  1073. err = clients[1].Call(nil, "pss_sendSym", rkeyids[1], topic, hexutil.Encode(lmsg))
  1074. if err != nil {
  1075. t.Fatal(err)
  1076. }
  1077. select {
  1078. case recvmsg := <-lmsgC:
  1079. if !bytes.Equal(recvmsg.Msg, lmsg) {
  1080. t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
  1081. }
  1082. case cerr := <-lctx.Done():
  1083. t.Fatalf("test message timed out: %v", cerr)
  1084. }
  1085. rmsg := []byte("xyzzy")
  1086. err = clients[0].Call(nil, "pss_sendSym", lkeyids[1], topic, hexutil.Encode(rmsg))
  1087. if err != nil {
  1088. t.Fatal(err)
  1089. }
  1090. select {
  1091. case recvmsg := <-rmsgC:
  1092. if !bytes.Equal(recvmsg.Msg, rmsg) {
  1093. t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
  1094. }
  1095. case cerr := <-rctx.Done():
  1096. t.Fatalf("test message timed out: %v", cerr)
  1097. }
  1098. }
  1099. // send asymmetrically encrypted message between two directly connected peers
  1100. func TestSendAsym(t *testing.T) {
  1101. t.Run("32", testSendAsym)
  1102. t.Run("8", testSendAsym)
  1103. t.Run("0", testSendAsym)
  1104. }
  1105. func testSendAsym(t *testing.T) {
  1106. // address hint size
  1107. var addrsize int64
  1108. var err error
  1109. paramstring := strings.Split(t.Name(), "/")
  1110. addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
  1111. log.Info("asym send test", "addrsize", addrsize)
  1112. clients, err := setupNetwork(2, false)
  1113. if err != nil {
  1114. t.Fatal(err)
  1115. }
  1116. var topic string
  1117. err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
  1118. if err != nil {
  1119. t.Fatal(err)
  1120. }
  1121. time.Sleep(time.Millisecond * 250)
  1122. var loaddrhex string
  1123. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  1124. if err != nil {
  1125. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  1126. }
  1127. loaddrhex = loaddrhex[:2+(addrsize*2)]
  1128. var roaddrhex string
  1129. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  1130. if err != nil {
  1131. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  1132. }
  1133. roaddrhex = roaddrhex[:2+(addrsize*2)]
  1134. // retrieve public key from pss instance
  1135. // set this public key reciprocally
  1136. var lpubkey string
  1137. err = clients[0].Call(&lpubkey, "pss_getPublicKey")
  1138. if err != nil {
  1139. t.Fatalf("rpc get node 1 pubkey fail: %v", err)
  1140. }
  1141. var rpubkey string
  1142. err = clients[1].Call(&rpubkey, "pss_getPublicKey")
  1143. if err != nil {
  1144. t.Fatalf("rpc get node 2 pubkey fail: %v", err)
  1145. }
  1146. time.Sleep(time.Millisecond * 500) // replace with hive healthy code
  1147. lmsgC := make(chan APIMsg)
  1148. lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
  1149. defer lcancel()
  1150. lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
  1151. log.Trace("lsub", "id", lsub)
  1152. defer lsub.Unsubscribe()
  1153. rmsgC := make(chan APIMsg)
  1154. rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
  1155. defer rcancel()
  1156. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
  1157. log.Trace("rsub", "id", rsub)
  1158. defer rsub.Unsubscribe()
  1159. // store reciprocal public keys
  1160. err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, roaddrhex)
  1161. if err != nil {
  1162. t.Fatal(err)
  1163. }
  1164. err = clients[1].Call(nil, "pss_setPeerPublicKey", lpubkey, topic, loaddrhex)
  1165. if err != nil {
  1166. t.Fatal(err)
  1167. }
  1168. // send and verify delivery
  1169. rmsg := []byte("xyzzy")
  1170. err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
  1171. if err != nil {
  1172. t.Fatal(err)
  1173. }
  1174. select {
  1175. case recvmsg := <-rmsgC:
  1176. if !bytes.Equal(recvmsg.Msg, rmsg) {
  1177. t.Fatalf("node 2 received payload mismatch: expected %v, got %v", rmsg, recvmsg.Msg)
  1178. }
  1179. case cerr := <-rctx.Done():
  1180. t.Fatalf("test message timed out: %v", cerr)
  1181. }
  1182. lmsg := []byte("plugh")
  1183. err = clients[1].Call(nil, "pss_sendAsym", lpubkey, topic, hexutil.Encode(lmsg))
  1184. if err != nil {
  1185. t.Fatal(err)
  1186. }
  1187. select {
  1188. case recvmsg := <-lmsgC:
  1189. if !bytes.Equal(recvmsg.Msg, lmsg) {
  1190. t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg.Msg)
  1191. }
  1192. case cerr := <-lctx.Done():
  1193. t.Fatalf("test message timed out: %v", cerr)
  1194. }
  1195. }
  1196. type Job struct {
  1197. Msg []byte
  1198. SendNode enode.ID
  1199. RecvNode enode.ID
  1200. }
  1201. func worker(id int, jobs <-chan Job, rpcs map[enode.ID]*rpc.Client, pubkeys map[enode.ID]string, topic string) {
  1202. for j := range jobs {
  1203. rpcs[j.SendNode].Call(nil, "pss_sendAsym", pubkeys[j.RecvNode], topic, hexutil.Encode(j.Msg))
  1204. }
  1205. }
  1206. func TestNetwork(t *testing.T) {
  1207. t.Run("16/1000/4/sim", testNetwork)
  1208. }
  1209. // params in run name:
  1210. // nodes/msgs/addrbytes/adaptertype
  1211. // if adaptertype is exec uses execadapter, simadapter otherwise
  1212. func TestNetwork2000(t *testing.T) {
  1213. //enableMetrics()
  1214. if !*longrunning {
  1215. t.Skip("run with --longrunning flag to run extensive network tests")
  1216. }
  1217. t.Run("3/2000/4/sim", testNetwork)
  1218. t.Run("4/2000/4/sim", testNetwork)
  1219. t.Run("8/2000/4/sim", testNetwork)
  1220. t.Run("16/2000/4/sim", testNetwork)
  1221. }
  1222. func TestNetwork5000(t *testing.T) {
  1223. //enableMetrics()
  1224. if !*longrunning {
  1225. t.Skip("run with --longrunning flag to run extensive network tests")
  1226. }
  1227. t.Run("3/5000/4/sim", testNetwork)
  1228. t.Run("4/5000/4/sim", testNetwork)
  1229. t.Run("8/5000/4/sim", testNetwork)
  1230. t.Run("16/5000/4/sim", testNetwork)
  1231. }
  1232. func TestNetwork10000(t *testing.T) {
  1233. //enableMetrics()
  1234. if !*longrunning {
  1235. t.Skip("run with --longrunning flag to run extensive network tests")
  1236. }
  1237. t.Run("3/10000/4/sim", testNetwork)
  1238. t.Run("4/10000/4/sim", testNetwork)
  1239. t.Run("8/10000/4/sim", testNetwork)
  1240. }
  1241. func testNetwork(t *testing.T) {
  1242. paramstring := strings.Split(t.Name(), "/")
  1243. nodecount, _ := strconv.ParseInt(paramstring[1], 10, 0)
  1244. msgcount, _ := strconv.ParseInt(paramstring[2], 10, 0)
  1245. addrsize, _ := strconv.ParseInt(paramstring[3], 10, 0)
  1246. adapter := paramstring[4]
  1247. log.Info("network test", "nodecount", nodecount, "msgcount", msgcount, "addrhintsize", addrsize)
  1248. nodes := make([]enode.ID, nodecount)
  1249. bzzaddrs := make(map[enode.ID]string, nodecount)
  1250. rpcs := make(map[enode.ID]*rpc.Client, nodecount)
  1251. pubkeys := make(map[enode.ID]string, nodecount)
  1252. sentmsgs := make([][]byte, msgcount)
  1253. recvmsgs := make([]bool, msgcount)
  1254. nodemsgcount := make(map[enode.ID]int, nodecount)
  1255. trigger := make(chan enode.ID)
  1256. var a adapters.NodeAdapter
  1257. if adapter == "exec" {
  1258. dirname, err := ioutil.TempDir(".", "")
  1259. if err != nil {
  1260. t.Fatal(err)
  1261. }
  1262. a = adapters.NewExecAdapter(dirname)
  1263. } else if adapter == "tcp" {
  1264. a = adapters.NewTCPAdapter(newServices(false))
  1265. } else if adapter == "sim" {
  1266. a = adapters.NewSimAdapter(newServices(false))
  1267. }
  1268. net := simulations.NewNetwork(a, &simulations.NetworkConfig{
  1269. ID: "0",
  1270. })
  1271. defer net.Shutdown()
  1272. f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodecount))
  1273. if err != nil {
  1274. t.Fatal(err)
  1275. }
  1276. jsonbyte, err := ioutil.ReadAll(f)
  1277. if err != nil {
  1278. t.Fatal(err)
  1279. }
  1280. var snap simulations.Snapshot
  1281. err = json.Unmarshal(jsonbyte, &snap)
  1282. if err != nil {
  1283. t.Fatal(err)
  1284. }
  1285. err = net.Load(&snap)
  1286. if err != nil {
  1287. //TODO: Fix p2p simulation framework to not crash when loading 32-nodes
  1288. //t.Fatal(err)
  1289. }
  1290. time.Sleep(1 * time.Second)
  1291. triggerChecks := func(trigger chan enode.ID, id enode.ID, rpcclient *rpc.Client, topic string) error {
  1292. msgC := make(chan APIMsg)
  1293. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1294. defer cancel()
  1295. sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false, false)
  1296. if err != nil {
  1297. t.Fatal(err)
  1298. }
  1299. go func() {
  1300. defer sub.Unsubscribe()
  1301. for {
  1302. select {
  1303. case recvmsg := <-msgC:
  1304. idx, _ := binary.Uvarint(recvmsg.Msg)
  1305. if !recvmsgs[idx] {
  1306. log.Debug("msg recv", "idx", idx, "id", id)
  1307. recvmsgs[idx] = true
  1308. trigger <- id
  1309. }
  1310. case <-sub.Err():
  1311. return
  1312. }
  1313. }
  1314. }()
  1315. return nil
  1316. }
  1317. var topic string
  1318. for i, nod := range net.GetNodes() {
  1319. nodes[i] = nod.ID()
  1320. rpcs[nodes[i]], err = nod.Client()
  1321. if err != nil {
  1322. t.Fatal(err)
  1323. }
  1324. if topic == "" {
  1325. err = rpcs[nodes[i]].Call(&topic, "pss_stringToTopic", "foo:42")
  1326. if err != nil {
  1327. t.Fatal(err)
  1328. }
  1329. }
  1330. var pubkey string
  1331. err = rpcs[nodes[i]].Call(&pubkey, "pss_getPublicKey")
  1332. if err != nil {
  1333. t.Fatal(err)
  1334. }
  1335. pubkeys[nod.ID()] = pubkey
  1336. var addrhex string
  1337. err = rpcs[nodes[i]].Call(&addrhex, "pss_baseAddr")
  1338. if err != nil {
  1339. t.Fatal(err)
  1340. }
  1341. bzzaddrs[nodes[i]] = addrhex
  1342. err = triggerChecks(trigger, nodes[i], rpcs[nodes[i]], topic)
  1343. if err != nil {
  1344. t.Fatal(err)
  1345. }
  1346. }
  1347. time.Sleep(1 * time.Second)
  1348. // setup workers
  1349. jobs := make(chan Job, 10)
  1350. for w := 1; w <= 10; w++ {
  1351. go worker(w, jobs, rpcs, pubkeys, topic)
  1352. }
  1353. time.Sleep(1 * time.Second)
  1354. for i := 0; i < int(msgcount); i++ {
  1355. sendnodeidx := rand.Intn(int(nodecount))
  1356. recvnodeidx := rand.Intn(int(nodecount - 1))
  1357. if recvnodeidx >= sendnodeidx {
  1358. recvnodeidx++
  1359. }
  1360. nodemsgcount[nodes[recvnodeidx]]++
  1361. sentmsgs[i] = make([]byte, 8)
  1362. c := binary.PutUvarint(sentmsgs[i], uint64(i))
  1363. if c == 0 {
  1364. t.Fatal("0 byte message")
  1365. }
  1366. if err != nil {
  1367. t.Fatal(err)
  1368. }
  1369. err = rpcs[nodes[sendnodeidx]].Call(nil, "pss_setPeerPublicKey", pubkeys[nodes[recvnodeidx]], topic, bzzaddrs[nodes[recvnodeidx]])
  1370. if err != nil {
  1371. t.Fatal(err)
  1372. }
  1373. jobs <- Job{
  1374. Msg: sentmsgs[i],
  1375. SendNode: nodes[sendnodeidx],
  1376. RecvNode: nodes[recvnodeidx],
  1377. }
  1378. }
  1379. finalmsgcount := 0
  1380. ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1381. defer cancel()
  1382. outer:
  1383. for i := 0; i < int(msgcount); i++ {
  1384. select {
  1385. case id := <-trigger:
  1386. nodemsgcount[id]--
  1387. finalmsgcount++
  1388. case <-ctx.Done():
  1389. log.Warn("timeout")
  1390. break outer
  1391. }
  1392. }
  1393. for i, msg := range recvmsgs {
  1394. if !msg {
  1395. log.Debug("missing message", "idx", i)
  1396. }
  1397. }
  1398. t.Logf("%d of %d messages received", finalmsgcount, msgcount)
  1399. if finalmsgcount != int(msgcount) {
  1400. t.Fatalf("%d messages were not received", int(msgcount)-finalmsgcount)
  1401. }
  1402. }
  1403. // check that in a network of a -> b -> c -> a
  1404. // a doesn't receive a sent message twice
  1405. func TestDeduplication(t *testing.T) {
  1406. var err error
  1407. clients, err := setupNetwork(3, false)
  1408. if err != nil {
  1409. t.Fatal(err)
  1410. }
  1411. var addrsize = 32
  1412. var loaddrhex string
  1413. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  1414. if err != nil {
  1415. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  1416. }
  1417. loaddrhex = loaddrhex[:2+(addrsize*2)]
  1418. var roaddrhex string
  1419. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  1420. if err != nil {
  1421. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  1422. }
  1423. roaddrhex = roaddrhex[:2+(addrsize*2)]
  1424. var xoaddrhex string
  1425. err = clients[2].Call(&xoaddrhex, "pss_baseAddr")
  1426. if err != nil {
  1427. t.Fatalf("rpc get node 3 baseaddr fail: %v", err)
  1428. }
  1429. xoaddrhex = xoaddrhex[:2+(addrsize*2)]
  1430. log.Info("peer", "l", loaddrhex, "r", roaddrhex, "x", xoaddrhex)
  1431. var topic string
  1432. err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
  1433. if err != nil {
  1434. t.Fatal(err)
  1435. }
  1436. time.Sleep(time.Millisecond * 250)
  1437. // retrieve public key from pss instance
  1438. // set this public key reciprocally
  1439. var rpubkey string
  1440. err = clients[1].Call(&rpubkey, "pss_getPublicKey")
  1441. if err != nil {
  1442. t.Fatalf("rpc get receivenode pubkey fail: %v", err)
  1443. }
  1444. time.Sleep(time.Millisecond * 500) // replace with hive healthy code
  1445. rmsgC := make(chan APIMsg)
  1446. rctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
  1447. defer cancel()
  1448. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
  1449. log.Trace("rsub", "id", rsub)
  1450. defer rsub.Unsubscribe()
  1451. // store public key for recipient
  1452. // zero-length address means forward to all
  1453. // we have just two peers, they will be in proxbin, and will both receive
  1454. err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, "0x")
  1455. if err != nil {
  1456. t.Fatal(err)
  1457. }
  1458. // send and verify delivery
  1459. rmsg := []byte("xyzzy")
  1460. err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
  1461. if err != nil {
  1462. t.Fatal(err)
  1463. }
  1464. var receivedok bool
  1465. OUTER:
  1466. for {
  1467. select {
  1468. case <-rmsgC:
  1469. if receivedok {
  1470. t.Fatalf("duplicate message received")
  1471. }
  1472. receivedok = true
  1473. case <-rctx.Done():
  1474. break OUTER
  1475. }
  1476. }
  1477. if !receivedok {
  1478. t.Fatalf("message did not arrive")
  1479. }
  1480. }
  1481. // symmetric send performance with varying message sizes
  1482. func BenchmarkSymkeySend(b *testing.B) {
  1483. b.Run(fmt.Sprintf("%d", 256), benchmarkSymKeySend)
  1484. b.Run(fmt.Sprintf("%d", 1024), benchmarkSymKeySend)
  1485. b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkSymKeySend)
  1486. b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkSymKeySend)
  1487. b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkSymKeySend)
  1488. }
  1489. func benchmarkSymKeySend(b *testing.B) {
  1490. msgsizestring := strings.Split(b.Name(), "/")
  1491. if len(msgsizestring) != 2 {
  1492. b.Fatalf("benchmark called without msgsize param")
  1493. }
  1494. msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
  1495. if err != nil {
  1496. b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
  1497. }
  1498. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1499. defer cancel()
  1500. keys, err := wapi.NewKeyPair(ctx)
  1501. privkey, err := w.GetPrivateKey(keys)
  1502. ps := newTestPss(privkey, nil, nil)
  1503. defer ps.Stop()
  1504. msg := make([]byte, msgsize)
  1505. rand.Read(msg)
  1506. topic := BytesToTopic([]byte("foo"))
  1507. to := make(PssAddress, 32)
  1508. copy(to[:], network.RandomAddr().Over())
  1509. symkeyid, err := ps.GenerateSymmetricKey(topic, to, true)
  1510. if err != nil {
  1511. b.Fatalf("could not generate symkey: %v", err)
  1512. }
  1513. symkey, err := ps.w.GetSymKey(symkeyid)
  1514. if err != nil {
  1515. b.Fatalf("could not retrieve symkey: %v", err)
  1516. }
  1517. ps.SetSymmetricKey(symkey, topic, to, false)
  1518. b.ResetTimer()
  1519. for i := 0; i < b.N; i++ {
  1520. ps.SendSym(symkeyid, topic, msg)
  1521. }
  1522. }
  1523. // asymmetric send performance with varying message sizes
  1524. func BenchmarkAsymkeySend(b *testing.B) {
  1525. b.Run(fmt.Sprintf("%d", 256), benchmarkAsymKeySend)
  1526. b.Run(fmt.Sprintf("%d", 1024), benchmarkAsymKeySend)
  1527. b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkAsymKeySend)
  1528. b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkAsymKeySend)
  1529. b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkAsymKeySend)
  1530. }
  1531. func benchmarkAsymKeySend(b *testing.B) {
  1532. msgsizestring := strings.Split(b.Name(), "/")
  1533. if len(msgsizestring) != 2 {
  1534. b.Fatalf("benchmark called without msgsize param")
  1535. }
  1536. msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
  1537. if err != nil {
  1538. b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
  1539. }
  1540. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1541. defer cancel()
  1542. keys, err := wapi.NewKeyPair(ctx)
  1543. privkey, err := w.GetPrivateKey(keys)
  1544. ps := newTestPss(privkey, nil, nil)
  1545. defer ps.Stop()
  1546. msg := make([]byte, msgsize)
  1547. rand.Read(msg)
  1548. topic := BytesToTopic([]byte("foo"))
  1549. to := make(PssAddress, 32)
  1550. copy(to[:], network.RandomAddr().Over())
  1551. ps.SetPeerPublicKey(&privkey.PublicKey, topic, to)
  1552. b.ResetTimer()
  1553. for i := 0; i < b.N; i++ {
  1554. ps.SendAsym(common.ToHex(crypto.FromECDSAPub(&privkey.PublicKey)), topic, msg)
  1555. }
  1556. }
  1557. func BenchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
  1558. for i := 100; i < 100000; i = i * 10 {
  1559. for j := 32; j < 10000; j = j * 8 {
  1560. b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceChangeaddr)
  1561. }
  1562. //b.Run(fmt.Sprintf("%d", i), benchmarkSymkeyBruteforceChangeaddr)
  1563. }
  1564. }
  1565. // decrypt performance using symkey cache, worst case
  1566. // (decrypt key always last in cache)
  1567. func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
  1568. keycountstring := strings.Split(b.Name(), "/")
  1569. cachesize := int64(0)
  1570. var ps *Pss
  1571. if len(keycountstring) < 2 {
  1572. b.Fatalf("benchmark called without count param")
  1573. }
  1574. keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
  1575. if err != nil {
  1576. b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
  1577. }
  1578. if len(keycountstring) == 3 {
  1579. cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
  1580. if err != nil {
  1581. b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
  1582. }
  1583. }
  1584. pssmsgs := make([]*PssMsg, 0, keycount)
  1585. var keyid string
  1586. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1587. defer cancel()
  1588. keys, err := wapi.NewKeyPair(ctx)
  1589. privkey, err := w.GetPrivateKey(keys)
  1590. if cachesize > 0 {
  1591. ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
  1592. } else {
  1593. ps = newTestPss(privkey, nil, nil)
  1594. }
  1595. defer ps.Stop()
  1596. topic := BytesToTopic([]byte("foo"))
  1597. for i := 0; i < int(keycount); i++ {
  1598. to := make(PssAddress, 32)
  1599. copy(to[:], network.RandomAddr().Over())
  1600. keyid, err = ps.GenerateSymmetricKey(topic, to, true)
  1601. if err != nil {
  1602. b.Fatalf("cant generate symkey #%d: %v", i, err)
  1603. }
  1604. symkey, err := ps.w.GetSymKey(keyid)
  1605. if err != nil {
  1606. b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
  1607. }
  1608. wparams := &whisper.MessageParams{
  1609. TTL: defaultWhisperTTL,
  1610. KeySym: symkey,
  1611. Topic: whisper.TopicType(topic),
  1612. WorkTime: defaultWhisperWorkTime,
  1613. PoW: defaultWhisperPoW,
  1614. Payload: []byte("xyzzy"),
  1615. Padding: []byte("1234567890abcdef"),
  1616. }
  1617. woutmsg, err := whisper.NewSentMessage(wparams)
  1618. if err != nil {
  1619. b.Fatalf("could not create whisper message: %v", err)
  1620. }
  1621. env, err := woutmsg.Wrap(wparams)
  1622. if err != nil {
  1623. b.Fatalf("could not generate whisper envelope: %v", err)
  1624. }
  1625. ps.Register(&topic, &handler{
  1626. f: noopHandlerFunc,
  1627. })
  1628. pssmsgs = append(pssmsgs, &PssMsg{
  1629. To: to,
  1630. Payload: env,
  1631. })
  1632. }
  1633. b.ResetTimer()
  1634. for i := 0; i < b.N; i++ {
  1635. if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1], false, false); err != nil {
  1636. b.Fatalf("pss processing failed: %v", err)
  1637. }
  1638. }
  1639. }
  1640. func BenchmarkSymkeyBruteforceSameaddr(b *testing.B) {
  1641. for i := 100; i < 100000; i = i * 10 {
  1642. for j := 32; j < 10000; j = j * 8 {
  1643. b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceSameaddr)
  1644. }
  1645. }
  1646. }
  1647. // decrypt performance using symkey cache, best case
  1648. // (decrypt key always first in cache)
  1649. func benchmarkSymkeyBruteforceSameaddr(b *testing.B) {
  1650. var keyid string
  1651. var ps *Pss
  1652. cachesize := int64(0)
  1653. keycountstring := strings.Split(b.Name(), "/")
  1654. if len(keycountstring) < 2 {
  1655. b.Fatalf("benchmark called without count param")
  1656. }
  1657. keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
  1658. if err != nil {
  1659. b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
  1660. }
  1661. if len(keycountstring) == 3 {
  1662. cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
  1663. if err != nil {
  1664. b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
  1665. }
  1666. }
  1667. addr := make([]PssAddress, keycount)
  1668. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1669. defer cancel()
  1670. keys, err := wapi.NewKeyPair(ctx)
  1671. privkey, err := w.GetPrivateKey(keys)
  1672. if cachesize > 0 {
  1673. ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
  1674. } else {
  1675. ps = newTestPss(privkey, nil, nil)
  1676. }
  1677. defer ps.Stop()
  1678. topic := BytesToTopic([]byte("foo"))
  1679. for i := 0; i < int(keycount); i++ {
  1680. copy(addr[i], network.RandomAddr().Over())
  1681. keyid, err = ps.GenerateSymmetricKey(topic, addr[i], true)
  1682. if err != nil {
  1683. b.Fatalf("cant generate symkey #%d: %v", i, err)
  1684. }
  1685. }
  1686. symkey, err := ps.w.GetSymKey(keyid)
  1687. if err != nil {
  1688. b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
  1689. }
  1690. wparams := &whisper.MessageParams{
  1691. TTL: defaultWhisperTTL,
  1692. KeySym: symkey,
  1693. Topic: whisper.TopicType(topic),
  1694. WorkTime: defaultWhisperWorkTime,
  1695. PoW: defaultWhisperPoW,
  1696. Payload: []byte("xyzzy"),
  1697. Padding: []byte("1234567890abcdef"),
  1698. }
  1699. woutmsg, err := whisper.NewSentMessage(wparams)
  1700. if err != nil {
  1701. b.Fatalf("could not create whisper message: %v", err)
  1702. }
  1703. env, err := woutmsg.Wrap(wparams)
  1704. if err != nil {
  1705. b.Fatalf("could not generate whisper envelope: %v", err)
  1706. }
  1707. ps.Register(&topic, &handler{
  1708. f: noopHandlerFunc,
  1709. })
  1710. pssmsg := &PssMsg{
  1711. To: addr[len(addr)-1][:],
  1712. Payload: env,
  1713. }
  1714. for i := 0; i < b.N; i++ {
  1715. if err := ps.process(pssmsg, false, false); err != nil {
  1716. b.Fatalf("pss processing failed: %v", err)
  1717. }
  1718. }
  1719. }
  1720. // setup simulated network with bzz/discovery and pss services.
  1721. // connects nodes in a circle
  1722. // if allowRaw is set, omission of builtin pss encryption is enabled (see PssParams)
  1723. func setupNetwork(numnodes int, allowRaw bool) (clients []*rpc.Client, err error) {
  1724. nodes := make([]*simulations.Node, numnodes)
  1725. clients = make([]*rpc.Client, numnodes)
  1726. if numnodes < 2 {
  1727. return nil, fmt.Errorf("Minimum two nodes in network")
  1728. }
  1729. adapter := adapters.NewSimAdapter(newServices(allowRaw))
  1730. net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
  1731. ID: "0",
  1732. DefaultService: "bzz",
  1733. })
  1734. for i := 0; i < numnodes; i++ {
  1735. nodeconf := adapters.RandomNodeConfig()
  1736. nodeconf.Services = []string{"bzz", pssProtocolName}
  1737. nodes[i], err = net.NewNodeWithConfig(nodeconf)
  1738. if err != nil {
  1739. return nil, fmt.Errorf("error creating node 1: %v", err)
  1740. }
  1741. err = net.Start(nodes[i].ID())
  1742. if err != nil {
  1743. return nil, fmt.Errorf("error starting node 1: %v", err)
  1744. }
  1745. if i > 0 {
  1746. err = net.Connect(nodes[i].ID(), nodes[i-1].ID())
  1747. if err != nil {
  1748. return nil, fmt.Errorf("error connecting nodes: %v", err)
  1749. }
  1750. }
  1751. clients[i], err = nodes[i].Client()
  1752. if err != nil {
  1753. return nil, fmt.Errorf("create node 1 rpc client fail: %v", err)
  1754. }
  1755. }
  1756. if numnodes > 2 {
  1757. err = net.Connect(nodes[0].ID(), nodes[len(nodes)-1].ID())
  1758. if err != nil {
  1759. return nil, fmt.Errorf("error connecting first and last nodes")
  1760. }
  1761. }
  1762. return clients, nil
  1763. }
  1764. func newServices(allowRaw bool) adapters.Services {
  1765. stateStore := state.NewInmemoryStore()
  1766. kademlias := make(map[enode.ID]*network.Kademlia)
  1767. kademlia := func(id enode.ID) *network.Kademlia {
  1768. if k, ok := kademlias[id]; ok {
  1769. return k
  1770. }
  1771. params := network.NewKadParams()
  1772. params.NeighbourhoodSize = 2
  1773. params.MaxBinSize = 3
  1774. params.MinBinSize = 1
  1775. params.MaxRetries = 1000
  1776. params.RetryExponent = 2
  1777. params.RetryInterval = 1000000
  1778. kademlias[id] = network.NewKademlia(id[:], params)
  1779. return kademlias[id]
  1780. }
  1781. return adapters.Services{
  1782. pssProtocolName: func(ctx *adapters.ServiceContext) (node.Service, error) {
  1783. // execadapter does not exec init()
  1784. initTest()
  1785. ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
  1786. defer cancel()
  1787. keys, err := wapi.NewKeyPair(ctxlocal)
  1788. privkey, err := w.GetPrivateKey(keys)
  1789. pssp := NewPssParams().WithPrivateKey(privkey)
  1790. pssp.AllowRaw = allowRaw
  1791. pskad := kademlia(ctx.Config.ID)
  1792. ps, err := NewPss(pskad, pssp)
  1793. if err != nil {
  1794. return nil, err
  1795. }
  1796. ping := &Ping{
  1797. OutC: make(chan bool),
  1798. Pong: true,
  1799. }
  1800. p2pp := NewPingProtocol(ping)
  1801. pp, err := RegisterProtocol(ps, &PingTopic, PingProtocol, p2pp, &ProtocolParams{Asymmetric: true})
  1802. if err != nil {
  1803. return nil, err
  1804. }
  1805. if useHandshake {
  1806. SetHandshakeController(ps, NewHandshakeParams())
  1807. }
  1808. ps.Register(&PingTopic, &handler{
  1809. f: pp.Handle,
  1810. caps: &handlerCaps{
  1811. raw: true,
  1812. },
  1813. })
  1814. ps.addAPI(rpc.API{
  1815. Namespace: "psstest",
  1816. Version: "0.3",
  1817. Service: NewAPITest(ps),
  1818. Public: false,
  1819. })
  1820. if err != nil {
  1821. log.Error("Couldnt register pss protocol", "err", err)
  1822. os.Exit(1)
  1823. }
  1824. pssprotocols[ctx.Config.ID.String()] = &protoCtrl{
  1825. C: ping.OutC,
  1826. protocol: pp,
  1827. run: p2pp.Run,
  1828. }
  1829. return ps, nil
  1830. },
  1831. "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
  1832. addr := network.NewAddr(ctx.Config.Node())
  1833. hp := network.NewHiveParams()
  1834. hp.Discovery = false
  1835. config := &network.BzzConfig{
  1836. OverlayAddr: addr.Over(),
  1837. UnderlayAddr: addr.Under(),
  1838. HiveParams: hp,
  1839. }
  1840. return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil
  1841. },
  1842. }
  1843. }
  1844. func newTestPss(privkey *ecdsa.PrivateKey, kad *network.Kademlia, ppextra *PssParams) *Pss {
  1845. nid := enode.PubkeyToIDV4(&privkey.PublicKey)
  1846. // set up routing if kademlia is not passed to us
  1847. if kad == nil {
  1848. kp := network.NewKadParams()
  1849. kp.NeighbourhoodSize = 3
  1850. kad = network.NewKademlia(nid[:], kp)
  1851. }
  1852. // create pss
  1853. pp := NewPssParams().WithPrivateKey(privkey)
  1854. if ppextra != nil {
  1855. pp.SymKeyCacheCapacity = ppextra.SymKeyCacheCapacity
  1856. }
  1857. ps, err := NewPss(kad, pp)
  1858. if err != nil {
  1859. return nil
  1860. }
  1861. ps.Start(nil)
  1862. return ps
  1863. }
  1864. // API calls for test/development use
  1865. type APITest struct {
  1866. *Pss
  1867. }
  1868. func NewAPITest(ps *Pss) *APITest {
  1869. return &APITest{Pss: ps}
  1870. }
  1871. func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic Topic, to hexutil.Bytes) ([2]string, error) {
  1872. recvsymkeyid, err := apitest.SetSymmetricKey(recvsymkey, topic, PssAddress(to), true)
  1873. if err != nil {
  1874. return [2]string{}, err
  1875. }
  1876. sendsymkeyid, err := apitest.SetSymmetricKey(sendsymkey, topic, PssAddress(to), false)
  1877. if err != nil {
  1878. return [2]string{}, err
  1879. }
  1880. return [2]string{recvsymkeyid, sendsymkeyid}, nil
  1881. }
  1882. func (apitest *APITest) Clean() (int, error) {
  1883. return apitest.Pss.cleanKeys(), nil
  1884. }
  1885. // enableMetrics is starting InfluxDB reporter so that we collect stats when running tests locally
  1886. func enableMetrics() {
  1887. metrics.Enabled = true
  1888. go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 1*time.Second, "http://localhost:8086", "metrics", "admin", "admin", "swarm.", map[string]string{
  1889. "host": "test",
  1890. })
  1891. }