backend.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package eth
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/ethereum/go-ethereum/core"
  6. "github.com/ethereum/go-ethereum/crypto"
  7. "github.com/ethereum/go-ethereum/ethdb"
  8. "github.com/ethereum/go-ethereum/ethutil"
  9. "github.com/ethereum/go-ethereum/event"
  10. ethlogger "github.com/ethereum/go-ethereum/logger"
  11. "github.com/ethereum/go-ethereum/p2p"
  12. "github.com/ethereum/go-ethereum/p2p/discover"
  13. "github.com/ethereum/go-ethereum/pow/ezp"
  14. "github.com/ethereum/go-ethereum/rpc"
  15. "github.com/ethereum/go-ethereum/whisper"
  16. )
  17. type Config struct {
  18. Name string
  19. KeyStore string
  20. DataDir string
  21. LogFile string
  22. LogLevel int
  23. KeyRing string
  24. MaxPeers int
  25. Port string
  26. NATType string
  27. PMPGateway string
  28. Shh bool
  29. Dial bool
  30. KeyManager *crypto.KeyManager
  31. }
  32. var logger = ethlogger.NewLogger("SERV")
  33. type Ethereum struct {
  34. // Channel for shutting down the ethereum
  35. shutdownChan chan bool
  36. quit chan bool
  37. // DB interface
  38. db ethutil.Database
  39. blacklist p2p.Blacklist
  40. //*** SERVICES ***
  41. // State manager for processing new blocks and managing the over all states
  42. blockProcessor *core.BlockProcessor
  43. txPool *core.TxPool
  44. chainManager *core.ChainManager
  45. blockPool *BlockPool
  46. whisper *whisper.Whisper
  47. net *p2p.Server
  48. eventMux *event.TypeMux
  49. txSub event.Subscription
  50. blockSub event.Subscription
  51. RpcServer rpc.RpcServer
  52. WsServer rpc.RpcServer
  53. keyManager *crypto.KeyManager
  54. logger ethlogger.LogSystem
  55. synclock sync.Mutex
  56. syncGroup sync.WaitGroup
  57. Mining bool
  58. }
  59. func New(config *Config) (*Ethereum, error) {
  60. // Boostrap database
  61. logger := ethlogger.New(config.DataDir, config.LogFile, config.LogLevel)
  62. db, err := ethdb.NewLDBDatabase("blockchain")
  63. if err != nil {
  64. return nil, err
  65. }
  66. // Perform database sanity checks
  67. d, _ := db.Get([]byte("ProtocolVersion"))
  68. protov := ethutil.NewValue(d).Uint()
  69. if protov != ProtocolVersion && protov != 0 {
  70. return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, ProtocolVersion, ethutil.Config.ExecPath+"/database")
  71. }
  72. // Create new keymanager
  73. var keyManager *crypto.KeyManager
  74. switch config.KeyStore {
  75. case "db":
  76. keyManager = crypto.NewDBKeyManager(db)
  77. case "file":
  78. keyManager = crypto.NewFileKeyManager(config.DataDir)
  79. default:
  80. return nil, fmt.Errorf("unknown keystore type: %s", config.KeyStore)
  81. }
  82. // Initialise the keyring
  83. keyManager.Init(config.KeyRing, 0, false)
  84. saveProtocolVersion(db)
  85. //ethutil.Config.Db = db
  86. eth := &Ethereum{
  87. shutdownChan: make(chan bool),
  88. quit: make(chan bool),
  89. db: db,
  90. keyManager: keyManager,
  91. blacklist: p2p.NewBlacklist(),
  92. eventMux: &event.TypeMux{},
  93. logger: logger,
  94. }
  95. eth.chainManager = core.NewChainManager(db, eth.EventMux())
  96. eth.txPool = core.NewTxPool(eth.EventMux())
  97. eth.blockProcessor = core.NewBlockProcessor(db, eth.txPool, eth.chainManager, eth.EventMux())
  98. eth.chainManager.SetProcessor(eth.blockProcessor)
  99. eth.whisper = whisper.New()
  100. hasBlock := eth.chainManager.HasBlock
  101. insertChain := eth.chainManager.InsertChain
  102. eth.blockPool = NewBlockPool(hasBlock, insertChain, ezp.Verify)
  103. ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
  104. protocols := []p2p.Protocol{ethProto, eth.whisper.Protocol()}
  105. nat, err := p2p.ParseNAT(config.NATType, config.PMPGateway)
  106. if err != nil {
  107. return nil, err
  108. }
  109. netprv, err := crypto.GenerateKey()
  110. if err != nil {
  111. return nil, fmt.Errorf("could not generate server key: %v", err)
  112. }
  113. eth.net = &p2p.Server{
  114. PrivateKey: netprv,
  115. Name: config.Name,
  116. MaxPeers: config.MaxPeers,
  117. Protocols: protocols,
  118. Blacklist: eth.blacklist,
  119. NAT: nat,
  120. NoDial: !config.Dial,
  121. }
  122. if len(config.Port) > 0 {
  123. eth.net.ListenAddr = ":" + config.Port
  124. }
  125. return eth, nil
  126. }
  127. func (s *Ethereum) KeyManager() *crypto.KeyManager {
  128. return s.keyManager
  129. }
  130. func (s *Ethereum) Logger() ethlogger.LogSystem {
  131. return s.logger
  132. }
  133. func (s *Ethereum) Name() string {
  134. return s.net.Name
  135. }
  136. func (s *Ethereum) ChainManager() *core.ChainManager {
  137. return s.chainManager
  138. }
  139. func (s *Ethereum) BlockProcessor() *core.BlockProcessor {
  140. return s.blockProcessor
  141. }
  142. func (s *Ethereum) TxPool() *core.TxPool {
  143. return s.txPool
  144. }
  145. func (s *Ethereum) BlockPool() *BlockPool {
  146. return s.blockPool
  147. }
  148. func (s *Ethereum) Whisper() *whisper.Whisper {
  149. return s.whisper
  150. }
  151. func (s *Ethereum) EventMux() *event.TypeMux {
  152. return s.eventMux
  153. }
  154. func (self *Ethereum) Db() ethutil.Database {
  155. return self.db
  156. }
  157. func (s *Ethereum) IsMining() bool {
  158. return s.Mining
  159. }
  160. func (s *Ethereum) IsListening() bool {
  161. // XXX TODO
  162. return false
  163. }
  164. func (s *Ethereum) PeerCount() int {
  165. return s.net.PeerCount()
  166. }
  167. func (s *Ethereum) Peers() []*p2p.Peer {
  168. return s.net.Peers()
  169. }
  170. func (s *Ethereum) MaxPeers() int {
  171. return s.net.MaxPeers
  172. }
  173. func (s *Ethereum) Coinbase() []byte {
  174. return nil // TODO
  175. }
  176. // Start the ethereum
  177. func (s *Ethereum) Start(seedNode string) error {
  178. err := s.net.Start()
  179. if err != nil {
  180. return err
  181. }
  182. // Start services
  183. s.txPool.Start()
  184. s.blockPool.Start()
  185. if s.whisper != nil {
  186. s.whisper.Start()
  187. }
  188. // broadcast transactions
  189. s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
  190. go s.txBroadcastLoop()
  191. // broadcast mined blocks
  192. s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
  193. go s.blockBroadcastLoop()
  194. logger.Infoln("Server started")
  195. return nil
  196. }
  197. func (self *Ethereum) SuggestPeer(nodeURL string) error {
  198. n, err := discover.ParseNode(nodeURL)
  199. if err != nil {
  200. return fmt.Errorf("invalid node URL: %v", err)
  201. }
  202. self.net.SuggestPeer(n)
  203. return nil
  204. }
  205. func (s *Ethereum) Stop() {
  206. // Close the database
  207. defer s.db.Close()
  208. close(s.quit)
  209. s.txSub.Unsubscribe() // quits txBroadcastLoop
  210. s.blockSub.Unsubscribe() // quits blockBroadcastLoop
  211. if s.RpcServer != nil {
  212. s.RpcServer.Stop()
  213. }
  214. if s.WsServer != nil {
  215. s.WsServer.Stop()
  216. }
  217. s.txPool.Stop()
  218. s.eventMux.Stop()
  219. s.blockPool.Stop()
  220. if s.whisper != nil {
  221. s.whisper.Stop()
  222. }
  223. logger.Infoln("Server stopped")
  224. close(s.shutdownChan)
  225. }
  226. // This function will wait for a shutdown and resumes main thread execution
  227. func (s *Ethereum) WaitForShutdown() {
  228. <-s.shutdownChan
  229. }
  230. // now tx broadcasting is taken out of txPool
  231. // handled here via subscription, efficiency?
  232. func (self *Ethereum) txBroadcastLoop() {
  233. // automatically stops if unsubscribe
  234. for obj := range self.txSub.Chan() {
  235. event := obj.(core.TxPreEvent)
  236. self.net.Broadcast("eth", TxMsg, event.Tx.RlpData())
  237. }
  238. }
  239. func (self *Ethereum) blockBroadcastLoop() {
  240. // automatically stops if unsubscribe
  241. for obj := range self.blockSub.Chan() {
  242. switch ev := obj.(type) {
  243. case core.NewMinedBlockEvent:
  244. self.net.Broadcast("eth", NewBlockMsg, ev.Block.RlpData(), ev.Block.Td)
  245. }
  246. }
  247. }
  248. func saveProtocolVersion(db ethutil.Database) {
  249. d, _ := db.Get([]byte("ProtocolVersion"))
  250. protocolVersion := ethutil.NewValue(d).Uint()
  251. if protocolVersion == 0 {
  252. db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
  253. }
  254. }