server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sort"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/ethereum/eth-go/ethlog"
  11. )
  12. const (
  13. outboundAddressPoolSize = 10
  14. disconnectGracePeriod = 2
  15. )
  16. type Blacklist interface {
  17. Get([]byte) (bool, error)
  18. Put([]byte) error
  19. Delete([]byte) error
  20. Exists(pubkey []byte) (ok bool)
  21. }
  22. type BlacklistMap struct {
  23. blacklist map[string]bool
  24. lock sync.RWMutex
  25. }
  26. func NewBlacklist() *BlacklistMap {
  27. return &BlacklistMap{
  28. blacklist: make(map[string]bool),
  29. }
  30. }
  31. func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
  32. self.lock.RLock()
  33. defer self.lock.RUnlock()
  34. v, ok := self.blacklist[string(pubkey)]
  35. var err error
  36. if !ok {
  37. err = fmt.Errorf("not found")
  38. }
  39. return v, err
  40. }
  41. func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
  42. self.lock.RLock()
  43. defer self.lock.RUnlock()
  44. _, ok = self.blacklist[string(pubkey)]
  45. return
  46. }
  47. func (self *BlacklistMap) Put(pubkey []byte) error {
  48. self.lock.RLock()
  49. defer self.lock.RUnlock()
  50. self.blacklist[string(pubkey)] = true
  51. return nil
  52. }
  53. func (self *BlacklistMap) Delete(pubkey []byte) error {
  54. self.lock.RLock()
  55. defer self.lock.RUnlock()
  56. delete(self.blacklist, string(pubkey))
  57. return nil
  58. }
  59. type Server struct {
  60. network Network
  61. listening bool //needed?
  62. dialing bool //needed?
  63. closed bool
  64. identity ClientIdentity
  65. addr net.Addr
  66. port uint16
  67. protocols []string
  68. quit chan chan bool
  69. peersLock sync.RWMutex
  70. maxPeers int
  71. peers []*Peer
  72. peerSlots chan int
  73. peersTable map[string]int
  74. peersMsg *Msg
  75. peerCount int
  76. peerConnect chan net.Addr
  77. peerDisconnect chan DisconnectRequest
  78. blacklist Blacklist
  79. handlers Handlers
  80. }
  81. var logger = ethlog.NewLogger("P2P")
  82. func New(network Network, addr net.Addr, identity ClientIdentity, handlers Handlers, maxPeers int, blacklist Blacklist) *Server {
  83. // get alphabetical list of protocol names from handlers map
  84. protocols := []string{}
  85. for protocol := range handlers {
  86. protocols = append(protocols, protocol)
  87. }
  88. sort.Strings(protocols)
  89. _, port, _ := net.SplitHostPort(addr.String())
  90. intport, _ := strconv.Atoi(port)
  91. self := &Server{
  92. // NewSimpleClientIdentity(clientIdentifier, version, customIdentifier)
  93. network: network,
  94. identity: identity,
  95. addr: addr,
  96. port: uint16(intport),
  97. protocols: protocols,
  98. quit: make(chan chan bool),
  99. maxPeers: maxPeers,
  100. peers: make([]*Peer, maxPeers),
  101. peerSlots: make(chan int, maxPeers),
  102. peersTable: make(map[string]int),
  103. peerConnect: make(chan net.Addr, outboundAddressPoolSize),
  104. peerDisconnect: make(chan DisconnectRequest),
  105. blacklist: blacklist,
  106. handlers: handlers,
  107. }
  108. for i := 0; i < maxPeers; i++ {
  109. self.peerSlots <- i // fill up with indexes
  110. }
  111. return self
  112. }
  113. func (self *Server) NewAddr(host string, port int) (addr net.Addr, err error) {
  114. addr, err = self.network.NewAddr(host, port)
  115. return
  116. }
  117. func (self *Server) ParseAddr(address string) (addr net.Addr, err error) {
  118. addr, err = self.network.ParseAddr(address)
  119. return
  120. }
  121. func (self *Server) ClientIdentity() ClientIdentity {
  122. return self.identity
  123. }
  124. func (self *Server) PeersMessage() (msg *Msg, err error) {
  125. // TODO: memoize and reset when peers change
  126. self.peersLock.RLock()
  127. defer self.peersLock.RUnlock()
  128. msg = self.peersMsg
  129. if msg == nil {
  130. var peerData []interface{}
  131. for _, i := range self.peersTable {
  132. peer := self.peers[i]
  133. peerData = append(peerData, peer.Encode())
  134. }
  135. if len(peerData) == 0 {
  136. err = fmt.Errorf("no peers")
  137. } else {
  138. msg, err = NewMsg(PeersMsg, peerData...)
  139. self.peersMsg = msg //memoize
  140. }
  141. }
  142. return
  143. }
  144. func (self *Server) Peers() (peers []*Peer) {
  145. self.peersLock.RLock()
  146. defer self.peersLock.RUnlock()
  147. for _, peer := range self.peers {
  148. if peer != nil {
  149. peers = append(peers, peer)
  150. }
  151. }
  152. return
  153. }
  154. func (self *Server) PeerCount() int {
  155. self.peersLock.RLock()
  156. defer self.peersLock.RUnlock()
  157. return self.peerCount
  158. }
  159. var getPeersMsg, _ = NewMsg(GetPeersMsg)
  160. func (self *Server) PeerConnect(addr net.Addr) {
  161. // TODO: should buffer, filter and uniq
  162. // send GetPeersMsg if not blocking
  163. select {
  164. case self.peerConnect <- addr: // not enough peers
  165. self.Broadcast("", getPeersMsg)
  166. default: // we dont care
  167. }
  168. }
  169. func (self *Server) PeerDisconnect() chan DisconnectRequest {
  170. return self.peerDisconnect
  171. }
  172. func (self *Server) Blacklist() Blacklist {
  173. return self.blacklist
  174. }
  175. func (self *Server) Handlers() Handlers {
  176. return self.handlers
  177. }
  178. func (self *Server) Broadcast(protocol string, msg *Msg) {
  179. self.peersLock.RLock()
  180. defer self.peersLock.RUnlock()
  181. for _, peer := range self.peers {
  182. if peer != nil {
  183. peer.Write(protocol, msg)
  184. }
  185. }
  186. }
  187. // Start the server
  188. func (self *Server) Start(listen bool, dial bool) {
  189. self.network.Start()
  190. if listen {
  191. listener, err := self.network.Listener(self.addr)
  192. if err != nil {
  193. logger.Warnf("Error initializing listener: %v", err)
  194. logger.Warnf("Connection listening disabled")
  195. self.listening = false
  196. } else {
  197. self.listening = true
  198. logger.Infoln("Listen on %v: ready and accepting connections", listener.Addr())
  199. go self.inboundPeerHandler(listener)
  200. }
  201. }
  202. if dial {
  203. dialer, err := self.network.Dialer(self.addr)
  204. if err != nil {
  205. logger.Warnf("Error initializing dialer: %v", err)
  206. logger.Warnf("Connection dialout disabled")
  207. self.dialing = false
  208. } else {
  209. self.dialing = true
  210. logger.Infoln("Dial peers watching outbound address pool")
  211. go self.outboundPeerHandler(dialer)
  212. }
  213. }
  214. logger.Infoln("server started")
  215. }
  216. func (self *Server) Stop() {
  217. logger.Infoln("server stopping...")
  218. // // quit one loop if dialing
  219. if self.dialing {
  220. logger.Infoln("stop dialout...")
  221. dialq := make(chan bool)
  222. self.quit <- dialq
  223. <-dialq
  224. fmt.Println("quit another")
  225. }
  226. // quit the other loop if listening
  227. if self.listening {
  228. logger.Infoln("stop listening...")
  229. listenq := make(chan bool)
  230. self.quit <- listenq
  231. <-listenq
  232. fmt.Println("quit one")
  233. }
  234. fmt.Println("quit waited")
  235. logger.Infoln("stopping peers...")
  236. peers := []net.Addr{}
  237. self.peersLock.RLock()
  238. self.closed = true
  239. for _, peer := range self.peers {
  240. if peer != nil {
  241. peers = append(peers, peer.Address)
  242. }
  243. }
  244. self.peersLock.RUnlock()
  245. for _, address := range peers {
  246. go self.removePeer(DisconnectRequest{
  247. addr: address,
  248. reason: DiscQuitting,
  249. })
  250. }
  251. // wait till they actually disconnect
  252. // this is checked by draining the peerSlots (slots are released back if a peer is removed)
  253. i := 0
  254. fmt.Println("draining peers")
  255. FOR:
  256. for {
  257. select {
  258. case slot := <-self.peerSlots:
  259. i++
  260. fmt.Printf("%v: found slot %v", i, slot)
  261. if i == self.maxPeers {
  262. break FOR
  263. }
  264. }
  265. }
  266. logger.Infoln("server stopped")
  267. }
  268. // main loop for adding connections via listening
  269. func (self *Server) inboundPeerHandler(listener net.Listener) {
  270. for {
  271. select {
  272. case slot := <-self.peerSlots:
  273. go self.connectInboundPeer(listener, slot)
  274. case errc := <-self.quit:
  275. listener.Close()
  276. fmt.Println("quit listenloop")
  277. errc <- true
  278. return
  279. }
  280. }
  281. }
  282. // main loop for adding outbound peers based on peerConnect address pool
  283. // this same loop handles peer disconnect requests as well
  284. func (self *Server) outboundPeerHandler(dialer Dialer) {
  285. // addressChan initially set to nil (only watches peerConnect if we need more peers)
  286. var addressChan chan net.Addr
  287. slots := self.peerSlots
  288. var slot *int
  289. for {
  290. select {
  291. case i := <-slots:
  292. // we need a peer in slot i, slot reserved
  293. slot = &i
  294. // now we can watch for candidate peers in the next loop
  295. addressChan = self.peerConnect
  296. // do not consume more until candidate peer is found
  297. slots = nil
  298. case address := <-addressChan:
  299. // candidate peer found, will dial out asyncronously
  300. // if connection fails slot will be released
  301. go self.connectOutboundPeer(dialer, address, *slot)
  302. // we can watch if more peers needed in the next loop
  303. slots = self.peerSlots
  304. // until then we dont care about candidate peers
  305. addressChan = nil
  306. case request := <-self.peerDisconnect:
  307. go self.removePeer(request)
  308. case errc := <-self.quit:
  309. if addressChan != nil && slot != nil {
  310. self.peerSlots <- *slot
  311. }
  312. fmt.Println("quit dialloop")
  313. errc <- true
  314. return
  315. }
  316. }
  317. }
  318. // check if peer address already connected
  319. func (self *Server) connected(address net.Addr) (err error) {
  320. self.peersLock.RLock()
  321. defer self.peersLock.RUnlock()
  322. // fmt.Printf("address: %v\n", address)
  323. slot, found := self.peersTable[address.String()]
  324. if found {
  325. err = fmt.Errorf("already connected as peer %v (%v)", slot, address)
  326. }
  327. return
  328. }
  329. // connect to peer via listener.Accept()
  330. func (self *Server) connectInboundPeer(listener net.Listener, slot int) {
  331. var address net.Addr
  332. conn, err := listener.Accept()
  333. if err == nil {
  334. address = conn.RemoteAddr()
  335. err = self.connected(address)
  336. if err != nil {
  337. conn.Close()
  338. }
  339. }
  340. if err != nil {
  341. logger.Debugln(err)
  342. self.peerSlots <- slot
  343. } else {
  344. fmt.Printf("adding %v\n", address)
  345. go self.addPeer(conn, address, true, slot)
  346. }
  347. }
  348. // connect to peer via dial out
  349. func (self *Server) connectOutboundPeer(dialer Dialer, address net.Addr, slot int) {
  350. var conn net.Conn
  351. err := self.connected(address)
  352. if err == nil {
  353. conn, err = dialer.Dial(address.Network(), address.String())
  354. }
  355. if err != nil {
  356. logger.Debugln(err)
  357. self.peerSlots <- slot
  358. } else {
  359. go self.addPeer(conn, address, false, slot)
  360. }
  361. }
  362. // creates the new peer object and inserts it into its slot
  363. func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) {
  364. self.peersLock.Lock()
  365. defer self.peersLock.Unlock()
  366. if self.closed {
  367. fmt.Println("oopsy, not no longer need peer")
  368. conn.Close() //oopsy our bad
  369. self.peerSlots <- slot // release slot
  370. } else {
  371. peer := NewPeer(conn, address, inbound, self)
  372. self.peers[slot] = peer
  373. self.peersTable[address.String()] = slot
  374. self.peerCount++
  375. // reset peersmsg
  376. self.peersMsg = nil
  377. fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot)
  378. peer.Start()
  379. }
  380. }
  381. // removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
  382. func (self *Server) removePeer(request DisconnectRequest) {
  383. self.peersLock.Lock()
  384. address := request.addr
  385. slot := self.peersTable[address.String()]
  386. peer := self.peers[slot]
  387. fmt.Printf("removing peer %v %v (slot %v)\n", address, peer, slot)
  388. if peer == nil {
  389. logger.Debugf("already removed peer on %v", address)
  390. self.peersLock.Unlock()
  391. return
  392. }
  393. // remove from list and index
  394. self.peerCount--
  395. self.peers[slot] = nil
  396. delete(self.peersTable, address.String())
  397. // reset peersmsg
  398. self.peersMsg = nil
  399. fmt.Printf("removed peer %v (slot %v)\n", peer, slot)
  400. self.peersLock.Unlock()
  401. // sending disconnect message
  402. disconnectMsg, _ := NewMsg(DiscMsg, request.reason)
  403. peer.Write("", disconnectMsg)
  404. // be nice and wait
  405. time.Sleep(disconnectGracePeriod * time.Second)
  406. // switch off peer and close connections etc.
  407. fmt.Println("stopping peer")
  408. peer.Stop()
  409. fmt.Println("stopped peer")
  410. // release slot to signal need for a new peer, last!
  411. self.peerSlots <- slot
  412. }
  413. // fix handshake message to push to peers
  414. func (self *Server) Handshake() *Msg {
  415. fmt.Println(self.identity.Pubkey()[1:])
  416. msg, _ := NewMsg(HandshakeMsg, P2PVersion, []byte(self.identity.String()), []interface{}{self.protocols}, self.port, self.identity.Pubkey()[1:])
  417. return msg
  418. }
  419. func (self *Server) RegisterPubkey(candidate *Peer, pubkey []byte) error {
  420. // Check for blacklisting
  421. if self.blacklist.Exists(pubkey) {
  422. return fmt.Errorf("blacklisted")
  423. }
  424. self.peersLock.RLock()
  425. defer self.peersLock.RUnlock()
  426. for _, peer := range self.peers {
  427. if peer != nil && peer != candidate && bytes.Compare(peer.Pubkey, pubkey) == 0 {
  428. return fmt.Errorf("already connected")
  429. }
  430. }
  431. candidate.Pubkey = pubkey
  432. return nil
  433. }