ethereum.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package eth
  2. import (
  3. "container/list"
  4. "github.com/ethereum/ethchain-go"
  5. "github.com/ethereum/ethdb-go"
  6. "github.com/ethereum/ethutil-go"
  7. "github.com/ethereum/ethwire-go"
  8. "io/ioutil"
  9. "log"
  10. "net"
  11. "net/http"
  12. "strconv"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
  18. // Loop thru the peers and close them (if we had them)
  19. for e := peers.Front(); e != nil; e = e.Next() {
  20. if peer, ok := e.Value.(*Peer); ok {
  21. callback(peer, e)
  22. }
  23. }
  24. }
  25. const (
  26. processReapingTimeout = 60 // TODO increase
  27. )
  28. type Ethereum struct {
  29. // Channel for shutting down the ethereum
  30. shutdownChan chan bool
  31. quit chan bool
  32. // DB interface
  33. //db *ethdb.LDBDatabase
  34. db ethutil.Database
  35. // Block manager for processing new blocks and managing the block chain
  36. BlockManager *ethchain.BlockManager
  37. // The transaction pool. Transaction can be pushed on this pool
  38. // for later including in the blocks
  39. TxPool *ethchain.TxPool
  40. // Peers (NYI)
  41. peers *list.List
  42. // Nonce
  43. Nonce uint64
  44. Addr net.Addr
  45. peerMut sync.Mutex
  46. // Capabilities for outgoing peers
  47. serverCaps Caps
  48. nat NAT
  49. // Specifies the desired amount of maximum peers
  50. MaxPeers int
  51. }
  52. func New(caps Caps, usePnp bool) (*Ethereum, error) {
  53. db, err := ethdb.NewLDBDatabase()
  54. //db, err := ethdb.NewMemDatabase()
  55. if err != nil {
  56. return nil, err
  57. }
  58. var nat NAT
  59. if usePnp {
  60. nat, err = Discover()
  61. if err != nil {
  62. log.Println("UPnP failed", err)
  63. }
  64. }
  65. ethutil.Config.Db = db
  66. nonce, _ := ethutil.RandomUint64()
  67. ethereum := &Ethereum{
  68. shutdownChan: make(chan bool),
  69. quit: make(chan bool),
  70. db: db,
  71. peers: list.New(),
  72. Nonce: nonce,
  73. serverCaps: caps,
  74. nat: nat,
  75. MaxPeers: 5,
  76. }
  77. ethereum.TxPool = ethchain.NewTxPool()
  78. ethereum.TxPool.Speaker = ethereum
  79. ethereum.BlockManager = ethchain.NewBlockManager(ethereum)
  80. ethereum.TxPool.BlockManager = ethereum.BlockManager
  81. ethereum.BlockManager.TransactionPool = ethereum.TxPool
  82. return ethereum, nil
  83. }
  84. func (s *Ethereum) AddPeer(conn net.Conn) {
  85. peer := NewPeer(conn, s, true)
  86. if peer != nil && s.peers.Len() < s.MaxPeers {
  87. s.peers.PushBack(peer)
  88. peer.Start()
  89. }
  90. }
  91. func (s *Ethereum) ProcessPeerList(addrs []string) {
  92. for _, addr := range addrs {
  93. // TODO Probably requires some sanity checks
  94. s.ConnectToPeer(addr)
  95. }
  96. }
  97. func (s *Ethereum) ConnectToPeer(addr string) error {
  98. var alreadyConnected bool
  99. eachPeer(s.peers, func(p *Peer, v *list.Element) {
  100. if p.conn == nil {
  101. return
  102. }
  103. phost, _, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
  104. ahost, _, _ := net.SplitHostPort(addr)
  105. if phost == ahost {
  106. alreadyConnected = true
  107. return
  108. }
  109. })
  110. if alreadyConnected {
  111. return nil
  112. }
  113. peer := NewOutboundPeer(addr, s, s.serverCaps)
  114. s.peers.PushBack(peer)
  115. return nil
  116. }
  117. func (s *Ethereum) OutboundPeers() []*Peer {
  118. // Create a new peer slice with at least the length of the total peers
  119. outboundPeers := make([]*Peer, s.peers.Len())
  120. length := 0
  121. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  122. if !p.inbound && p.conn != nil {
  123. outboundPeers[length] = p
  124. length++
  125. }
  126. })
  127. return outboundPeers[:length]
  128. }
  129. func (s *Ethereum) InboundPeers() []*Peer {
  130. // Create a new peer slice with at least the length of the total peers
  131. inboundPeers := make([]*Peer, s.peers.Len())
  132. length := 0
  133. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  134. if p.inbound {
  135. inboundPeers[length] = p
  136. length++
  137. }
  138. })
  139. return inboundPeers[:length]
  140. }
  141. func (s *Ethereum) InOutPeers() []*Peer {
  142. // Reap the dead peers first
  143. s.reapPeers()
  144. // Create a new peer slice with at least the length of the total peers
  145. inboundPeers := make([]*Peer, s.peers.Len())
  146. length := 0
  147. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  148. // Only return peers with an actual ip
  149. if len(p.host) > 0 {
  150. inboundPeers[length] = p
  151. length++
  152. }
  153. })
  154. return inboundPeers[:length]
  155. }
  156. func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) {
  157. msg := ethwire.NewMessage(msgType, data)
  158. s.BroadcastMsg(msg)
  159. }
  160. func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) {
  161. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  162. p.QueueMessage(msg)
  163. })
  164. }
  165. func (s *Ethereum) Peers() *list.List {
  166. return s.peers
  167. }
  168. func (s *Ethereum) reapPeers() {
  169. s.peerMut.Lock()
  170. defer s.peerMut.Unlock()
  171. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  172. if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
  173. s.peers.Remove(e)
  174. }
  175. })
  176. }
  177. func (s *Ethereum) ReapDeadPeerHandler() {
  178. reapTimer := time.NewTicker(processReapingTimeout * time.Second)
  179. for {
  180. select {
  181. case <-reapTimer.C:
  182. s.reapPeers()
  183. }
  184. }
  185. }
  186. // Start the ethereum
  187. func (s *Ethereum) Start() {
  188. // Bind to addr and port
  189. ln, err := net.Listen("tcp", ":30303")
  190. if err != nil {
  191. log.Println("Connection listening disabled. Acting as client")
  192. } else {
  193. // Starting accepting connections
  194. log.Println("Ready and accepting connections")
  195. // Start the peer handler
  196. go s.peerHandler(ln)
  197. }
  198. if s.nat != nil {
  199. go s.upnpUpdateThread()
  200. }
  201. // Start the reaping processes
  202. go s.ReapDeadPeerHandler()
  203. // Start the tx pool
  204. s.TxPool.Start()
  205. if ethutil.Config.Seed {
  206. log.Println("Seeding")
  207. // Testnet seed bootstrapping
  208. resp, err := http.Get("http://www.ethereum.org/servers.poc2.txt")
  209. if err != nil {
  210. log.Println("Fetching seed failed:", err)
  211. return
  212. }
  213. defer resp.Body.Close()
  214. body, err := ioutil.ReadAll(resp.Body)
  215. if err != nil {
  216. log.Println("Reading seed failed:", err)
  217. return
  218. }
  219. s.ConnectToPeer(string(body))
  220. }
  221. }
  222. func (s *Ethereum) peerHandler(listener net.Listener) {
  223. for {
  224. conn, err := listener.Accept()
  225. if err != nil {
  226. log.Println(err)
  227. continue
  228. }
  229. go s.AddPeer(conn)
  230. }
  231. }
  232. func (s *Ethereum) Stop() {
  233. // Close the database
  234. defer s.db.Close()
  235. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  236. p.Stop()
  237. })
  238. close(s.quit)
  239. s.TxPool.Stop()
  240. s.BlockManager.Stop()
  241. s.shutdownChan <- true
  242. }
  243. // This function will wait for a shutdown and resumes main thread execution
  244. func (s *Ethereum) WaitForShutdown() {
  245. <-s.shutdownChan
  246. }
  247. func (s *Ethereum) upnpUpdateThread() {
  248. // Go off immediately to prevent code duplication, thereafter we renew
  249. // lease every 15 minutes.
  250. timer := time.NewTimer(0 * time.Second)
  251. lport, _ := strconv.ParseInt("30303", 10, 16)
  252. first := true
  253. out:
  254. for {
  255. select {
  256. case <-timer.C:
  257. var err error
  258. _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60)
  259. if err != nil {
  260. log.Println("can't add UPnP port mapping:", err)
  261. break out
  262. }
  263. if first && err == nil {
  264. _, err = s.nat.GetExternalAddress()
  265. if err != nil {
  266. log.Println("UPnP can't get external address:", err)
  267. continue out
  268. }
  269. first = false
  270. }
  271. timer.Reset(time.Minute * 15)
  272. case <-s.quit:
  273. break out
  274. }
  275. }
  276. timer.Stop()
  277. if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil {
  278. log.Println("unable to remove UPnP port mapping:", err)
  279. } else {
  280. log.Println("succesfully disestablished UPnP port mapping")
  281. }
  282. }