http.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package comms
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "strings"
  23. "sync"
  24. "time"
  25. "bytes"
  26. "io"
  27. "io/ioutil"
  28. "github.com/ethereum/go-ethereum/fdtrack"
  29. "github.com/ethereum/go-ethereum/logger"
  30. "github.com/ethereum/go-ethereum/logger/glog"
  31. "github.com/ethereum/go-ethereum/rpc/codec"
  32. "github.com/ethereum/go-ethereum/rpc/shared"
  33. "github.com/rs/cors"
  34. )
  35. const (
  36. serverIdleTimeout = 10 * time.Second // idle keep-alive connections
  37. serverReadTimeout = 15 * time.Second // per-request read timeout
  38. serverWriteTimeout = 15 * time.Second // per-request read timeout
  39. )
  40. var (
  41. httpServerMu sync.Mutex
  42. httpServer *stopServer
  43. )
  44. type HttpConfig struct {
  45. ListenAddress string
  46. ListenPort uint
  47. CorsDomain string
  48. }
  49. // stopServer augments http.Server with idle connection tracking.
  50. // Idle keep-alive connections are shut down when Close is called.
  51. type stopServer struct {
  52. *http.Server
  53. l net.Listener
  54. // connection tracking state
  55. mu sync.Mutex
  56. shutdown bool // true when Stop has returned
  57. idle map[net.Conn]struct{}
  58. }
  59. type handler struct {
  60. codec codec.Codec
  61. api shared.EthereumApi
  62. }
  63. // StartHTTP starts listening for RPC requests sent via HTTP.
  64. func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error {
  65. httpServerMu.Lock()
  66. defer httpServerMu.Unlock()
  67. addr := fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)
  68. if httpServer != nil {
  69. if addr != httpServer.Addr {
  70. return fmt.Errorf("RPC service already running on %s ", httpServer.Addr)
  71. }
  72. return nil // RPC service already running on given host/port
  73. }
  74. // Set up the request handler, wrapping it with CORS headers if configured.
  75. handler := http.Handler(&handler{codec, api})
  76. if len(cfg.CorsDomain) > 0 {
  77. opts := cors.Options{
  78. AllowedMethods: []string{"POST"},
  79. AllowedOrigins: strings.Split(cfg.CorsDomain, " "),
  80. }
  81. handler = cors.New(opts).Handler(handler)
  82. }
  83. // Start the server.
  84. s, err := listenHTTP(addr, handler)
  85. if err != nil {
  86. glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err)
  87. return err
  88. }
  89. httpServer = s
  90. return nil
  91. }
  92. func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  93. w.Header().Set("Content-Type", "application/json")
  94. // Limit request size to resist DoS
  95. if req.ContentLength > maxHttpSizeReqLength {
  96. err := fmt.Errorf("Request too large")
  97. response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
  98. sendJSON(w, &response)
  99. return
  100. }
  101. defer req.Body.Close()
  102. payload, err := ioutil.ReadAll(req.Body)
  103. if err != nil {
  104. err := fmt.Errorf("Could not read request body")
  105. response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
  106. sendJSON(w, &response)
  107. return
  108. }
  109. c := h.codec.New(nil)
  110. var rpcReq shared.Request
  111. if err = c.Decode(payload, &rpcReq); err == nil {
  112. reply, err := h.api.Execute(&rpcReq)
  113. res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
  114. sendJSON(w, &res)
  115. return
  116. }
  117. var reqBatch []shared.Request
  118. if err = c.Decode(payload, &reqBatch); err == nil {
  119. resBatch := make([]*interface{}, len(reqBatch))
  120. resCount := 0
  121. for i, rpcReq := range reqBatch {
  122. reply, err := h.api.Execute(&rpcReq)
  123. if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal
  124. resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
  125. resCount += 1
  126. }
  127. }
  128. // make response omitting nil entries
  129. sendJSON(w, resBatch[:resCount])
  130. return
  131. }
  132. // invalid request
  133. err = fmt.Errorf("Could not decode request")
  134. res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err)
  135. sendJSON(w, res)
  136. }
  137. func sendJSON(w io.Writer, v interface{}) {
  138. if glog.V(logger.Detail) {
  139. if payload, err := json.MarshalIndent(v, "", "\t"); err == nil {
  140. glog.Infof("Sending payload: %s", payload)
  141. }
  142. }
  143. if err := json.NewEncoder(w).Encode(v); err != nil {
  144. glog.V(logger.Error).Infoln("Error sending JSON:", err)
  145. }
  146. }
  147. // Stop closes all active HTTP connections and shuts down the server.
  148. func StopHttp() {
  149. httpServerMu.Lock()
  150. defer httpServerMu.Unlock()
  151. if httpServer != nil {
  152. httpServer.Close()
  153. httpServer = nil
  154. }
  155. }
  156. func listenHTTP(addr string, h http.Handler) (*stopServer, error) {
  157. l, err := net.Listen("tcp", addr)
  158. if err != nil {
  159. return nil, err
  160. }
  161. l = fdtrack.WrapListener("rpc", l)
  162. s := &stopServer{l: l, idle: make(map[net.Conn]struct{})}
  163. s.Server = &http.Server{
  164. Addr: addr,
  165. Handler: h,
  166. ReadTimeout: serverReadTimeout,
  167. WriteTimeout: serverWriteTimeout,
  168. ConnState: s.connState,
  169. }
  170. go s.Serve(l)
  171. return s, nil
  172. }
  173. func (s *stopServer) connState(c net.Conn, state http.ConnState) {
  174. s.mu.Lock()
  175. defer s.mu.Unlock()
  176. // Close c immediately if we're past shutdown.
  177. if s.shutdown {
  178. if state != http.StateClosed {
  179. c.Close()
  180. }
  181. return
  182. }
  183. if state == http.StateIdle {
  184. s.idle[c] = struct{}{}
  185. } else {
  186. delete(s.idle, c)
  187. }
  188. }
  189. func (s *stopServer) Close() {
  190. s.mu.Lock()
  191. defer s.mu.Unlock()
  192. // Shut down the acceptor. No new connections can be created.
  193. s.l.Close()
  194. // Drop all idle connections. Non-idle connections will be
  195. // closed by connState as soon as they become idle.
  196. s.shutdown = true
  197. for c := range s.idle {
  198. glog.V(logger.Detail).Infof("closing idle connection %v", c.RemoteAddr())
  199. c.Close()
  200. delete(s.idle, c)
  201. }
  202. }
  203. type httpClient struct {
  204. address string
  205. port uint
  206. codec codec.ApiCoder
  207. lastRes interface{}
  208. lastErr error
  209. }
  210. // Create a new in process client
  211. func NewHttpClient(cfg HttpConfig, c codec.Codec) *httpClient {
  212. return &httpClient{
  213. address: cfg.ListenAddress,
  214. port: cfg.ListenPort,
  215. codec: c.New(nil),
  216. }
  217. }
  218. func (self *httpClient) Close() {
  219. // do nothing
  220. }
  221. func (self *httpClient) Send(req interface{}) error {
  222. var body []byte
  223. var err error
  224. self.lastRes = nil
  225. self.lastErr = nil
  226. if body, err = self.codec.Encode(req); err != nil {
  227. return err
  228. }
  229. httpReq, err := http.NewRequest("POST", fmt.Sprintf("%s:%d", self.address, self.port), bytes.NewBuffer(body))
  230. if err != nil {
  231. return err
  232. }
  233. httpReq.Header.Set("Content-Type", "application/json")
  234. client := http.Client{}
  235. resp, err := client.Do(httpReq)
  236. if err != nil {
  237. return err
  238. }
  239. defer resp.Body.Close()
  240. if resp.Status == "200 OK" {
  241. reply, _ := ioutil.ReadAll(resp.Body)
  242. var rpcSuccessResponse shared.SuccessResponse
  243. if err = self.codec.Decode(reply, &rpcSuccessResponse); err == nil {
  244. self.lastRes = rpcSuccessResponse.Result
  245. self.lastErr = err
  246. return nil
  247. } else {
  248. var rpcErrorResponse shared.ErrorResponse
  249. if err = self.codec.Decode(reply, &rpcErrorResponse); err == nil {
  250. self.lastRes = rpcErrorResponse.Error
  251. self.lastErr = err
  252. return nil
  253. } else {
  254. return err
  255. }
  256. }
  257. }
  258. return fmt.Errorf("Not implemented")
  259. }
  260. func (self *httpClient) Recv() (interface{}, error) {
  261. return self.lastRes, self.lastErr
  262. }
  263. func (self *httpClient) SupportedModules() (map[string]string, error) {
  264. var body []byte
  265. var err error
  266. payload := shared.Request{
  267. Id: 1,
  268. Jsonrpc: "2.0",
  269. Method: "modules",
  270. }
  271. if body, err = self.codec.Encode(payload); err != nil {
  272. return nil, err
  273. }
  274. req, err := http.NewRequest("POST", fmt.Sprintf("%s:%d", self.address, self.port), bytes.NewBuffer(body))
  275. if err != nil {
  276. return nil, err
  277. }
  278. req.Header.Set("Content-Type", "application/json")
  279. client := http.Client{}
  280. resp, err := client.Do(req)
  281. if err != nil {
  282. return nil, err
  283. }
  284. defer resp.Body.Close()
  285. if resp.Status == "200 OK" {
  286. reply, _ := ioutil.ReadAll(resp.Body)
  287. var rpcRes shared.SuccessResponse
  288. if err = self.codec.Decode(reply, &rpcRes); err != nil {
  289. return nil, err
  290. }
  291. result := make(map[string]string)
  292. if modules, ok := rpcRes.Result.(map[string]interface{}); ok {
  293. for a, v := range modules {
  294. result[a] = fmt.Sprintf("%s", v)
  295. }
  296. return result, nil
  297. }
  298. err = fmt.Errorf("Unable to parse module response - %v", rpcRes.Result)
  299. } else {
  300. fmt.Printf("resp.Status = %s\n", resp.Status)
  301. fmt.Printf("err = %v\n", err)
  302. }
  303. return nil, err
  304. }