ethereum.go 15 KB


  1. package eth
  2. import (
  3. "container/list"
  4. "encoding/json"
  5. "fmt"
  6. "math/big"
  7. "math/rand"
  8. "net"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/ethereum/go-ethereum/core"
  16. "github.com/ethereum/go-ethereum/crypto"
  17. "github.com/ethereum/go-ethereum/ethutil"
  18. "github.com/ethereum/go-ethereum/event"
  19. "github.com/ethereum/go-ethereum/logger"
  20. "github.com/ethereum/go-ethereum/rpc"
  21. "github.com/ethereum/go-ethereum/state"
  22. "github.com/ethereum/go-ethereum/wire"
  23. )
  24. const (
  25. seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt"
  26. seedNodeAddress = "poc-7.ethdev.com:30303"
  27. )
  28. var loggerger = logger.NewLogger("SERV")
  29. func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
  30. // Loop thru the peers and close them (if we had them)
  31. for e := peers.Front(); e != nil; e = e.Next() {
  32. callback(e.Value.(*Peer), e)
  33. }
  34. }
  35. const (
  36. processReapingTimeout = 60 // TODO increase
  37. )
  38. type Ethereum struct {
  39. // Channel for shutting down the ethereum
  40. shutdownChan chan bool
  41. quit chan bool
  42. // DB interface
  43. db ethutil.Database
  44. // State manager for processing new blocks and managing the over all states
  45. blockManager *core.BlockManager
  46. // The transaction pool. Transaction can be pushed on this pool
  47. // for later including in the blocks
  48. txPool *core.TxPool
  49. // The canonical chain
  50. blockChain *core.ChainManager
  51. // The block pool
  52. blockPool *BlockPool
  53. // Eventer
  54. eventMux event.TypeMux
  55. // Peers
  56. peers *list.List
  57. // Nonce
  58. Nonce uint64
  59. Addr net.Addr
  60. Port string
  61. blacklist [][]byte
  62. peerMut sync.Mutex
  63. // Capabilities for outgoing peers
  64. serverCaps Caps
  65. nat NAT
  66. // Specifies the desired amount of maximum peers
  67. MaxPeers int
  68. Mining bool
  69. listening bool
  70. RpcServer *rpc.JsonRpcServer
  71. keyManager *crypto.KeyManager
  72. clientIdentity wire.ClientIdentity
  73. isUpToDate bool
  74. filterMu sync.RWMutex
  75. filterId int
  76. filters map[int]*core.Filter
  77. }
  78. func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
  79. var err error
  80. var nat NAT
  81. if usePnp {
  82. nat, err = Discover()
  83. if err != nil {
  84. loggerger.Debugln("UPnP failed", err)
  85. }
  86. }
  87. bootstrapDb(db)
  88. ethutil.Config.Db = db
  89. nonce, _ := ethutil.RandomUint64()
  90. ethereum := &Ethereum{
  91. shutdownChan: make(chan bool),
  92. quit: make(chan bool),
  93. db: db,
  94. peers: list.New(),
  95. Nonce: nonce,
  96. serverCaps: caps,
  97. nat: nat,
  98. keyManager: keyManager,
  99. clientIdentity: clientIdentity,
  100. isUpToDate: true,
  101. filters: make(map[int]*core.Filter),
  102. }
  103. ethereum.blockPool = NewBlockPool(ethereum)
  104. ethereum.txPool = core.NewTxPool(ethereum)
  105. ethereum.blockChain = core.NewChainManager(ethereum.EventMux())
  106. ethereum.blockManager = core.NewBlockManager(ethereum)
  107. ethereum.blockChain.SetProcessor(ethereum.blockManager)
  108. // Start the tx pool
  109. ethereum.txPool.Start()
  110. return ethereum, nil
  111. }
  112. func (s *Ethereum) KeyManager() *crypto.KeyManager {
  113. return s.keyManager
  114. }
  115. func (s *Ethereum) ClientIdentity() wire.ClientIdentity {
  116. return s.clientIdentity
  117. }
  118. func (s *Ethereum) ChainManager() *core.ChainManager {
  119. return s.blockChain
  120. }
  121. func (s *Ethereum) BlockManager() *core.BlockManager {
  122. return s.blockManager
  123. }
  124. func (s *Ethereum) TxPool() *core.TxPool {
  125. return s.txPool
  126. }
  127. func (s *Ethereum) BlockPool() *BlockPool {
  128. return s.blockPool
  129. }
  130. func (s *Ethereum) EventMux() *event.TypeMux {
  131. return &s.eventMux
  132. }
  133. func (self *Ethereum) Db() ethutil.Database {
  134. return self.db
  135. }
  136. func (s *Ethereum) ServerCaps() Caps {
  137. return s.serverCaps
  138. }
  139. func (s *Ethereum) IsMining() bool {
  140. return s.Mining
  141. }
  142. func (s *Ethereum) PeerCount() int {
  143. return s.peers.Len()
  144. }
  145. func (s *Ethereum) IsUpToDate() bool {
  146. upToDate := true
  147. eachPeer(s.peers, func(peer *Peer, e *list.Element) {
  148. if atomic.LoadInt32(&peer.connected) == 1 {
  149. if peer.catchingUp == true && peer.versionKnown {
  150. upToDate = false
  151. }
  152. }
  153. })
  154. return upToDate
  155. }
  156. func (s *Ethereum) PushPeer(peer *Peer) {
  157. s.peers.PushBack(peer)
  158. }
  159. func (s *Ethereum) IsListening() bool {
  160. return s.listening
  161. }
  162. func (s *Ethereum) HighestTDPeer() (td *big.Int) {
  163. td = big.NewInt(0)
  164. eachPeer(s.peers, func(p *Peer, v *list.Element) {
  165. if p.td.Cmp(td) > 0 {
  166. td = p.td
  167. }
  168. })
  169. return
  170. }
  171. func (self *Ethereum) BlacklistPeer(peer *Peer) {
  172. self.blacklist = append(self.blacklist, peer.pubkey)
  173. }
  174. func (s *Ethereum) AddPeer(conn net.Conn) {
  175. peer := NewPeer(conn, s, true)
  176. if peer != nil {
  177. if s.peers.Len() < s.MaxPeers {
  178. peer.Start()
  179. } else {
  180. loggerger.Debugf("Max connected peers reached. Not adding incoming peer.")
  181. }
  182. }
  183. }
  184. func (s *Ethereum) ProcessPeerList(addrs []string) {
  185. for _, addr := range addrs {
  186. // TODO Probably requires some sanity checks
  187. s.ConnectToPeer(addr)
  188. }
  189. }
  190. func (s *Ethereum) ConnectToPeer(addr string) error {
  191. if s.peers.Len() < s.MaxPeers {
  192. var alreadyConnected bool
  193. ahost, aport, _ := net.SplitHostPort(addr)
  194. var chost string
  195. ips, err := net.LookupIP(ahost)
  196. if err != nil {
  197. return err
  198. } else {
  199. // If more then one ip is available try stripping away the ipv6 ones
  200. if len(ips) > 1 {
  201. var ipsv4 []net.IP
  202. // For now remove the ipv6 addresses
  203. for _, ip := range ips {
  204. if strings.Contains(ip.String(), "::") {
  205. continue
  206. } else {
  207. ipsv4 = append(ipsv4, ip)
  208. }
  209. }
  210. if len(ipsv4) == 0 {
  211. return fmt.Errorf("[SERV] No IPV4 addresses available for hostname")
  212. }
  213. // Pick a random ipv4 address, simulating round-robin DNS.
  214. rand.Seed(time.Now().UTC().UnixNano())
  215. i := rand.Intn(len(ipsv4))
  216. chost = ipsv4[i].String()
  217. } else {
  218. if len(ips) == 0 {
  219. return fmt.Errorf("[SERV] No IPs resolved for the given hostname")
  220. return nil
  221. }
  222. chost = ips[0].String()
  223. }
  224. }
  225. eachPeer(s.peers, func(p *Peer, v *list.Element) {
  226. if p.conn == nil {
  227. return
  228. }
  229. phost, pport, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
  230. if phost == chost && pport == aport {
  231. alreadyConnected = true
  232. //loggerger.Debugf("Peer %s already added.\n", chost)
  233. return
  234. }
  235. })
  236. if alreadyConnected {
  237. return nil
  238. }
  239. NewOutboundPeer(addr, s, s.serverCaps)
  240. }
  241. return nil
  242. }
  243. func (s *Ethereum) OutboundPeers() []*Peer {
  244. // Create a new peer slice with at least the length of the total peers
  245. outboundPeers := make([]*Peer, s.peers.Len())
  246. length := 0
  247. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  248. if !p.inbound && p.conn != nil {
  249. outboundPeers[length] = p
  250. length++
  251. }
  252. })
  253. return outboundPeers[:length]
  254. }
  255. func (s *Ethereum) InboundPeers() []*Peer {
  256. // Create a new peer slice with at least the length of the total peers
  257. inboundPeers := make([]*Peer, s.peers.Len())
  258. length := 0
  259. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  260. if p.inbound {
  261. inboundPeers[length] = p
  262. length++
  263. }
  264. })
  265. return inboundPeers[:length]
  266. }
  267. func (s *Ethereum) InOutPeers() []*Peer {
  268. // Reap the dead peers first
  269. s.reapPeers()
  270. // Create a new peer slice with at least the length of the total peers
  271. inboundPeers := make([]*Peer, s.peers.Len())
  272. length := 0
  273. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  274. // Only return peers with an actual ip
  275. if len(p.host) > 0 {
  276. inboundPeers[length] = p
  277. length++
  278. }
  279. })
  280. return inboundPeers[:length]
  281. }
  282. func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) {
  283. msg := wire.NewMessage(msgType, data)
  284. s.BroadcastMsg(msg)
  285. }
  286. func (s *Ethereum) BroadcastMsg(msg *wire.Msg) {
  287. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  288. p.QueueMessage(msg)
  289. })
  290. }
  291. func (s *Ethereum) Peers() *list.List {
  292. return s.peers
  293. }
  294. func (s *Ethereum) reapPeers() {
  295. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  296. if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
  297. s.removePeerElement(e)
  298. }
  299. })
  300. }
  301. func (s *Ethereum) removePeerElement(e *list.Element) {
  302. s.peerMut.Lock()
  303. defer s.peerMut.Unlock()
  304. s.peers.Remove(e)
  305. s.eventMux.Post(PeerListEvent{s.peers})
  306. }
  307. func (s *Ethereum) RemovePeer(p *Peer) {
  308. eachPeer(s.peers, func(peer *Peer, e *list.Element) {
  309. if peer == p {
  310. s.removePeerElement(e)
  311. }
  312. })
  313. }
  314. func (s *Ethereum) reapDeadPeerHandler() {
  315. reapTimer := time.NewTicker(processReapingTimeout * time.Second)
  316. for {
  317. select {
  318. case <-reapTimer.C:
  319. s.reapPeers()
  320. }
  321. }
  322. }
  323. // Start the ethereum
  324. func (s *Ethereum) Start(seed bool) {
  325. s.blockPool.Start()
  326. s.blockManager.Start()
  327. // Bind to addr and port
  328. ln, err := net.Listen("tcp", ":"+s.Port)
  329. if err != nil {
  330. loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port)
  331. s.listening = false
  332. } else {
  333. s.listening = true
  334. // Starting accepting connections
  335. loggerger.Infoln("Ready and accepting connections")
  336. // Start the peer handler
  337. go s.peerHandler(ln)
  338. }
  339. if s.nat != nil {
  340. go s.upnpUpdateThread()
  341. }
  342. // Start the reaping processes
  343. go s.reapDeadPeerHandler()
  344. go s.update()
  345. go s.filterLoop()
  346. if seed {
  347. s.Seed()
  348. }
  349. s.ConnectToPeer("localhost:40404")
  350. loggerger.Infoln("Server started")
  351. }
  352. func (s *Ethereum) Seed() {
  353. // Sorry Py person. I must blacklist. you perform badly
  354. s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031"))
  355. ips := PastPeers()
  356. if len(ips) > 0 {
  357. for _, ip := range ips {
  358. loggerger.Infoln("Connecting to previous peer ", ip)
  359. s.ConnectToPeer(ip)
  360. }
  361. } else {
  362. loggerger.Debugln("Retrieving seed nodes")
  363. // Eth-Go Bootstrapping
  364. ips, er := net.LookupIP("seed.bysh.me")
  365. if er == nil {
  366. peers := []string{}
  367. for _, ip := range ips {
  368. node := fmt.Sprintf("%s:%d", ip.String(), 30303)
  369. loggerger.Debugln("Found DNS Go Peer:", node)
  370. peers = append(peers, node)
  371. }
  372. s.ProcessPeerList(peers)
  373. }
  374. // Official DNS Bootstrapping
  375. _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org")
  376. if err == nil {
  377. peers := []string{}
  378. // Iterate SRV nodes
  379. for _, n := range nodes {
  380. target := n.Target
  381. port := strconv.Itoa(int(n.Port))
  382. // Resolve target to ip (Go returns list, so may resolve to multiple ips?)
  383. addr, err := net.LookupHost(target)
  384. if err == nil {
  385. for _, a := range addr {
  386. // Build string out of SRV port and Resolved IP
  387. peer := net.JoinHostPort(a, port)
  388. loggerger.Debugln("Found DNS Bootstrap Peer:", peer)
  389. peers = append(peers, peer)
  390. }
  391. } else {
  392. loggerger.Debugln("Couldn't resolve :", target)
  393. }
  394. }
  395. // Connect to Peer list
  396. s.ProcessPeerList(peers)
  397. }
  398. s.ConnectToPeer(seedNodeAddress)
  399. }
  400. }
  401. func (s *Ethereum) peerHandler(listener net.Listener) {
  402. for {
  403. conn, err := listener.Accept()
  404. if err != nil {
  405. loggerger.Debugln(err)
  406. continue
  407. }
  408. go s.AddPeer(conn)
  409. }
  410. }
  411. func (s *Ethereum) Stop() {
  412. // Stop eventMux first, it will close all subscriptions.
  413. s.eventMux.Stop()
  414. // Close the database
  415. defer s.db.Close()
  416. var ips []string
  417. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  418. ips = append(ips, p.conn.RemoteAddr().String())
  419. })
  420. if len(ips) > 0 {
  421. d, _ := json.MarshalIndent(ips, "", " ")
  422. ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d)
  423. }
  424. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  425. p.Stop()
  426. })
  427. close(s.quit)
  428. if s.RpcServer != nil {
  429. s.RpcServer.Stop()
  430. }
  431. s.txPool.Stop()
  432. s.blockManager.Stop()
  433. s.blockPool.Stop()
  434. loggerger.Infoln("Server stopped")
  435. close(s.shutdownChan)
  436. }
  437. // This function will wait for a shutdown and resumes main thread execution
  438. func (s *Ethereum) WaitForShutdown() {
  439. <-s.shutdownChan
  440. }
  441. func (s *Ethereum) upnpUpdateThread() {
  442. // Go off immediately to prevent code duplication, thereafter we renew
  443. // lease every 15 minutes.
  444. timer := time.NewTimer(5 * time.Minute)
  445. lport, _ := strconv.ParseInt(s.Port, 10, 16)
  446. first := true
  447. out:
  448. for {
  449. select {
  450. case <-timer.C:
  451. var err error
  452. _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60)
  453. if err != nil {
  454. loggerger.Debugln("can't add UPnP port mapping:", err)
  455. break out
  456. }
  457. if first && err == nil {
  458. _, err = s.nat.GetExternalAddress()
  459. if err != nil {
  460. loggerger.Debugln("UPnP can't get external address:", err)
  461. continue out
  462. }
  463. first = false
  464. }
  465. timer.Reset(time.Minute * 15)
  466. case <-s.quit:
  467. break out
  468. }
  469. }
  470. timer.Stop()
  471. if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil {
  472. loggerger.Debugln("unable to remove UPnP port mapping:", err)
  473. } else {
  474. loggerger.Debugln("succesfully disestablished UPnP port mapping")
  475. }
  476. }
  477. func (self *Ethereum) update() {
  478. upToDateTimer := time.NewTicker(1 * time.Second)
  479. out:
  480. for {
  481. select {
  482. case <-upToDateTimer.C:
  483. if self.IsUpToDate() && !self.isUpToDate {
  484. self.eventMux.Post(ChainSyncEvent{false})
  485. self.isUpToDate = true
  486. } else if !self.IsUpToDate() && self.isUpToDate {
  487. self.eventMux.Post(ChainSyncEvent{true})
  488. self.isUpToDate = false
  489. }
  490. case <-self.quit:
  491. break out
  492. }
  493. }
  494. }
  495. // InstallFilter adds filter for blockchain events.
  496. // The filter's callbacks will run for matching blocks and messages.
  497. // The filter should not be modified after it has been installed.
  498. func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) {
  499. self.filterMu.Lock()
  500. id = self.filterId
  501. self.filters[id] = filter
  502. self.filterId++
  503. self.filterMu.Unlock()
  504. return id
  505. }
  506. func (self *Ethereum) UninstallFilter(id int) {
  507. self.filterMu.Lock()
  508. delete(self.filters, id)
  509. self.filterMu.Unlock()
  510. }
  511. // GetFilter retrieves a filter installed using InstallFilter.
  512. // The filter may not be modified.
  513. func (self *Ethereum) GetFilter(id int) *core.Filter {
  514. self.filterMu.RLock()
  515. defer self.filterMu.RUnlock()
  516. return self.filters[id]
  517. }
  518. func (self *Ethereum) filterLoop() {
  519. // Subscribe to events
  520. events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
  521. for event := range events.Chan() {
  522. switch event := event.(type) {
  523. case core.NewBlockEvent:
  524. self.filterMu.RLock()
  525. for _, filter := range self.filters {
  526. if filter.BlockCallback != nil {
  527. filter.BlockCallback(event.Block)
  528. }
  529. }
  530. self.filterMu.RUnlock()
  531. case state.Messages:
  532. self.filterMu.RLock()
  533. for _, filter := range self.filters {
  534. if filter.MessageCallback != nil {
  535. msgs := filter.FilterMessages(event)
  536. if len(msgs) > 0 {
  537. filter.MessageCallback(msgs)
  538. }
  539. }
  540. }
  541. self.filterMu.RUnlock()
  542. }
  543. }
  544. }
  545. func bootstrapDb(db ethutil.Database) {
  546. d, _ := db.Get([]byte("ProtocolVersion"))
  547. protov := ethutil.NewValue(d).Uint()
  548. if protov == 0 {
  549. db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
  550. }
  551. }
  552. func PastPeers() []string {
  553. var ips []string
  554. data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"))
  555. json.Unmarshal([]byte(data), &ips)
  556. return ips
  557. }