backend.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package eth
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "github.com/ethereum/go-ethereum/core"
  7. "github.com/ethereum/go-ethereum/crypto"
  8. "github.com/ethereum/go-ethereum/ethdb"
  9. "github.com/ethereum/go-ethereum/ethutil"
  10. "github.com/ethereum/go-ethereum/event"
  11. ethlogger "github.com/ethereum/go-ethereum/logger"
  12. "github.com/ethereum/go-ethereum/p2p"
  13. "github.com/ethereum/go-ethereum/pow/ezp"
  14. "github.com/ethereum/go-ethereum/rpc"
  15. "github.com/ethereum/go-ethereum/whisper"
  16. )
  17. const (
  18. seedNodeAddress = "poc-8.ethdev.com:30303"
  19. )
  20. type Config struct {
  21. Name string
  22. Version string
  23. Identifier string
  24. KeyStore string
  25. DataDir string
  26. LogFile string
  27. LogLevel int
  28. KeyRing string
  29. MaxPeers int
  30. Port string
  31. NATType string
  32. PMPGateway string
  33. Shh bool
  34. Dial bool
  35. KeyManager *crypto.KeyManager
  36. }
  37. var logger = ethlogger.NewLogger("SERV")
  38. type Ethereum struct {
  39. // Channel for shutting down the ethereum
  40. shutdownChan chan bool
  41. quit chan bool
  42. // DB interface
  43. db ethutil.Database
  44. blacklist p2p.Blacklist
  45. //*** SERVICES ***
  46. // State manager for processing new blocks and managing the over all states
  47. blockProcessor *core.BlockProcessor
  48. txPool *core.TxPool
  49. chainManager *core.ChainManager
  50. blockPool *BlockPool
  51. whisper *whisper.Whisper
  52. net *p2p.Server
  53. eventMux *event.TypeMux
  54. txSub event.Subscription
  55. blockSub event.Subscription
  56. RpcServer rpc.RpcServer
  57. WsServer rpc.RpcServer
  58. keyManager *crypto.KeyManager
  59. clientIdentity p2p.ClientIdentity
  60. logger ethlogger.LogSystem
  61. synclock sync.Mutex
  62. syncGroup sync.WaitGroup
  63. Mining bool
  64. }
  65. func New(config *Config) (*Ethereum, error) {
  66. // Boostrap database
  67. logger := ethlogger.New(config.DataDir, config.LogFile, config.LogLevel)
  68. db, err := ethdb.NewLDBDatabase("blockchain")
  69. if err != nil {
  70. return nil, err
  71. }
  72. // Perform database sanity checks
  73. d, _ := db.Get([]byte("ProtocolVersion"))
  74. protov := ethutil.NewValue(d).Uint()
  75. if protov != ProtocolVersion && protov != 0 {
  76. return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, ProtocolVersion, ethutil.Config.ExecPath+"/database")
  77. }
  78. // Create new keymanager
  79. var keyManager *crypto.KeyManager
  80. switch config.KeyStore {
  81. case "db":
  82. keyManager = crypto.NewDBKeyManager(db)
  83. case "file":
  84. keyManager = crypto.NewFileKeyManager(config.DataDir)
  85. default:
  86. return nil, fmt.Errorf("unknown keystore type: %s", config.KeyStore)
  87. }
  88. // Initialise the keyring
  89. keyManager.Init(config.KeyRing, 0, false)
  90. // Create a new client id for this instance. This will help identifying the node on the network
  91. clientId := p2p.NewSimpleClientIdentity(config.Name, config.Version, config.Identifier, keyManager.PublicKey())
  92. saveProtocolVersion(db)
  93. //ethutil.Config.Db = db
  94. eth := &Ethereum{
  95. shutdownChan: make(chan bool),
  96. quit: make(chan bool),
  97. db: db,
  98. keyManager: keyManager,
  99. clientIdentity: clientId,
  100. blacklist: p2p.NewBlacklist(),
  101. eventMux: &event.TypeMux{},
  102. logger: logger,
  103. }
  104. eth.chainManager = core.NewChainManager(db, eth.EventMux())
  105. eth.txPool = core.NewTxPool(eth.EventMux())
  106. eth.blockProcessor = core.NewBlockProcessor(db, eth.txPool, eth.chainManager, eth.EventMux())
  107. eth.chainManager.SetProcessor(eth.blockProcessor)
  108. eth.whisper = whisper.New()
  109. hasBlock := eth.chainManager.HasBlock
  110. insertChain := eth.chainManager.InsertChain
  111. eth.blockPool = NewBlockPool(hasBlock, insertChain, ezp.Verify)
  112. ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
  113. protocols := []p2p.Protocol{ethProto, eth.whisper.Protocol()}
  114. nat, err := p2p.ParseNAT(config.NATType, config.PMPGateway)
  115. if err != nil {
  116. return nil, err
  117. }
  118. eth.net = &p2p.Server{
  119. Identity: clientId,
  120. MaxPeers: config.MaxPeers,
  121. Protocols: protocols,
  122. Blacklist: eth.blacklist,
  123. NAT: nat,
  124. NoDial: !config.Dial,
  125. }
  126. if len(config.Port) > 0 {
  127. eth.net.ListenAddr = ":" + config.Port
  128. }
  129. return eth, nil
  130. }
  131. func (s *Ethereum) KeyManager() *crypto.KeyManager {
  132. return s.keyManager
  133. }
  134. func (s *Ethereum) Logger() ethlogger.LogSystem {
  135. return s.logger
  136. }
  137. func (s *Ethereum) ClientIdentity() p2p.ClientIdentity {
  138. return s.clientIdentity
  139. }
  140. func (s *Ethereum) ChainManager() *core.ChainManager {
  141. return s.chainManager
  142. }
  143. func (s *Ethereum) BlockProcessor() *core.BlockProcessor {
  144. return s.blockProcessor
  145. }
  146. func (s *Ethereum) TxPool() *core.TxPool {
  147. return s.txPool
  148. }
  149. func (s *Ethereum) BlockPool() *BlockPool {
  150. return s.blockPool
  151. }
  152. func (s *Ethereum) Whisper() *whisper.Whisper {
  153. return s.whisper
  154. }
  155. func (s *Ethereum) EventMux() *event.TypeMux {
  156. return s.eventMux
  157. }
  158. func (self *Ethereum) Db() ethutil.Database {
  159. return self.db
  160. }
  161. func (s *Ethereum) IsMining() bool {
  162. return s.Mining
  163. }
  164. func (s *Ethereum) IsListening() bool {
  165. // XXX TODO
  166. return false
  167. }
  168. func (s *Ethereum) PeerCount() int {
  169. return s.net.PeerCount()
  170. }
  171. func (s *Ethereum) Peers() []*p2p.Peer {
  172. return s.net.Peers()
  173. }
  174. func (s *Ethereum) MaxPeers() int {
  175. return s.net.MaxPeers
  176. }
  177. func (s *Ethereum) Coinbase() []byte {
  178. return nil // TODO
  179. }
  180. // Start the ethereum
  181. func (s *Ethereum) Start(seed bool) error {
  182. err := s.net.Start()
  183. if err != nil {
  184. return err
  185. }
  186. // Start services
  187. s.txPool.Start()
  188. s.blockPool.Start()
  189. if s.whisper != nil {
  190. s.whisper.Start()
  191. }
  192. // broadcast transactions
  193. s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
  194. go s.txBroadcastLoop()
  195. // broadcast mined blocks
  196. s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
  197. go s.blockBroadcastLoop()
  198. // TODO: read peers here
  199. if seed {
  200. logger.Infof("Connect to seed node %v", seedNodeAddress)
  201. if err := s.SuggestPeer(seedNodeAddress); err != nil {
  202. logger.Infoln(err)
  203. }
  204. }
  205. logger.Infoln("Server started")
  206. return nil
  207. }
  208. func (self *Ethereum) SuggestPeer(addr string) error {
  209. netaddr, err := net.ResolveTCPAddr("tcp", addr)
  210. if err != nil {
  211. logger.Errorf("couldn't resolve %s:", addr, err)
  212. return err
  213. }
  214. self.net.SuggestPeer(netaddr.IP, netaddr.Port, nil)
  215. return nil
  216. }
  217. func (s *Ethereum) Stop() {
  218. // Close the database
  219. defer s.db.Close()
  220. close(s.quit)
  221. s.txSub.Unsubscribe() // quits txBroadcastLoop
  222. s.blockSub.Unsubscribe() // quits blockBroadcastLoop
  223. if s.RpcServer != nil {
  224. s.RpcServer.Stop()
  225. }
  226. if s.WsServer != nil {
  227. s.WsServer.Stop()
  228. }
  229. s.txPool.Stop()
  230. s.eventMux.Stop()
  231. s.blockPool.Stop()
  232. if s.whisper != nil {
  233. s.whisper.Stop()
  234. }
  235. logger.Infoln("Server stopped")
  236. close(s.shutdownChan)
  237. }
  238. // This function will wait for a shutdown and resumes main thread execution
  239. func (s *Ethereum) WaitForShutdown() {
  240. <-s.shutdownChan
  241. }
  242. // now tx broadcasting is taken out of txPool
  243. // handled here via subscription, efficiency?
  244. func (self *Ethereum) txBroadcastLoop() {
  245. // automatically stops if unsubscribe
  246. for obj := range self.txSub.Chan() {
  247. event := obj.(core.TxPreEvent)
  248. self.net.Broadcast("eth", TxMsg, event.Tx.RlpData())
  249. }
  250. }
  251. func (self *Ethereum) blockBroadcastLoop() {
  252. // automatically stops if unsubscribe
  253. for obj := range self.blockSub.Chan() {
  254. switch ev := obj.(type) {
  255. case core.NewMinedBlockEvent:
  256. self.net.Broadcast("eth", NewBlockMsg, ev.Block.RlpData(), ev.Block.Td)
  257. }
  258. }
  259. }
  260. func saveProtocolVersion(db ethutil.Database) {
  261. d, _ := db.Get([]byte("ProtocolVersion"))
  262. protocolVersion := ethutil.NewValue(d).Uint()
  263. if protocolVersion == 0 {
  264. db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
  265. }
  266. }