http.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package comms
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "strings"
  23. "sync"
  24. "time"
  25. "bytes"
  26. "io"
  27. "io/ioutil"
  28. "github.com/ethereum/go-ethereum/logger"
  29. "github.com/ethereum/go-ethereum/logger/glog"
  30. "github.com/ethereum/go-ethereum/rpc/codec"
  31. "github.com/ethereum/go-ethereum/rpc/shared"
  32. "github.com/rs/cors"
  33. )
  34. const (
  35. serverIdleTimeout = 10 * time.Second // idle keep-alive connections
  36. serverReadTimeout = 15 * time.Second // per-request read timeout
  37. serverWriteTimeout = 15 * time.Second // per-request read timeout
  38. )
  39. var (
  40. httpServerMu sync.Mutex
  41. httpServer *stopServer
  42. )
  43. type HttpConfig struct {
  44. ListenAddress string
  45. ListenPort uint
  46. CorsDomain string
  47. }
  48. // stopServer augments http.Server with idle connection tracking.
  49. // Idle keep-alive connections are shut down when Close is called.
  50. type stopServer struct {
  51. *http.Server
  52. l net.Listener
  53. // connection tracking state
  54. mu sync.Mutex
  55. shutdown bool // true when Stop has returned
  56. idle map[net.Conn]struct{}
  57. }
  58. type handler struct {
  59. codec codec.Codec
  60. api shared.EthereumApi
  61. }
  62. // StartHTTP starts listening for RPC requests sent via HTTP.
  63. func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error {
  64. httpServerMu.Lock()
  65. defer httpServerMu.Unlock()
  66. addr := fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)
  67. if httpServer != nil {
  68. if addr != httpServer.Addr {
  69. return fmt.Errorf("RPC service already running on %s ", httpServer.Addr)
  70. }
  71. return nil // RPC service already running on given host/port
  72. }
  73. // Set up the request handler, wrapping it with CORS headers if configured.
  74. handler := http.Handler(&handler{codec, api})
  75. if len(cfg.CorsDomain) > 0 {
  76. opts := cors.Options{
  77. AllowedMethods: []string{"POST"},
  78. AllowedOrigins: strings.Split(cfg.CorsDomain, " "),
  79. }
  80. handler = cors.New(opts).Handler(handler)
  81. }
  82. // Start the server.
  83. s, err := listenHTTP(addr, handler)
  84. if err != nil {
  85. glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err)
  86. return err
  87. }
  88. httpServer = s
  89. return nil
  90. }
  91. func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  92. w.Header().Set("Content-Type", "application/json")
  93. // Limit request size to resist DoS
  94. if req.ContentLength > maxHttpSizeReqLength {
  95. err := fmt.Errorf("Request too large")
  96. response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
  97. sendJSON(w, &response)
  98. return
  99. }
  100. defer req.Body.Close()
  101. payload, err := ioutil.ReadAll(req.Body)
  102. if err != nil {
  103. err := fmt.Errorf("Could not read request body")
  104. response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
  105. sendJSON(w, &response)
  106. return
  107. }
  108. c := h.codec.New(nil)
  109. var rpcReq shared.Request
  110. if err = c.Decode(payload, &rpcReq); err == nil {
  111. reply, err := h.api.Execute(&rpcReq)
  112. res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
  113. sendJSON(w, &res)
  114. return
  115. }
  116. var reqBatch []shared.Request
  117. if err = c.Decode(payload, &reqBatch); err == nil {
  118. resBatch := make([]*interface{}, len(reqBatch))
  119. resCount := 0
  120. for i, rpcReq := range reqBatch {
  121. reply, err := h.api.Execute(&rpcReq)
  122. if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal
  123. resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
  124. resCount += 1
  125. }
  126. }
  127. // make response omitting nil entries
  128. sendJSON(w, resBatch[:resCount])
  129. return
  130. }
  131. // invalid request
  132. err = fmt.Errorf("Could not decode request")
  133. res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err)
  134. sendJSON(w, res)
  135. }
  136. func sendJSON(w io.Writer, v interface{}) {
  137. if glog.V(logger.Detail) {
  138. if payload, err := json.MarshalIndent(v, "", "\t"); err == nil {
  139. glog.Infof("Sending payload: %s", payload)
  140. }
  141. }
  142. if err := json.NewEncoder(w).Encode(v); err != nil {
  143. glog.V(logger.Error).Infoln("Error sending JSON:", err)
  144. }
  145. }
  146. // Stop closes all active HTTP connections and shuts down the server.
  147. func StopHttp() {
  148. httpServerMu.Lock()
  149. defer httpServerMu.Unlock()
  150. if httpServer != nil {
  151. httpServer.Close()
  152. httpServer = nil
  153. }
  154. }
  155. func listenHTTP(addr string, h http.Handler) (*stopServer, error) {
  156. l, err := net.Listen("tcp", addr)
  157. if err != nil {
  158. return nil, err
  159. }
  160. s := &stopServer{l: l, idle: make(map[net.Conn]struct{})}
  161. s.Server = &http.Server{
  162. Addr: addr,
  163. Handler: h,
  164. ReadTimeout: serverReadTimeout,
  165. WriteTimeout: serverWriteTimeout,
  166. ConnState: s.connState,
  167. }
  168. go s.Serve(l)
  169. return s, nil
  170. }
  171. func (s *stopServer) connState(c net.Conn, state http.ConnState) {
  172. s.mu.Lock()
  173. defer s.mu.Unlock()
  174. // Close c immediately if we're past shutdown.
  175. if s.shutdown {
  176. if state != http.StateClosed {
  177. c.Close()
  178. }
  179. return
  180. }
  181. if state == http.StateIdle {
  182. s.idle[c] = struct{}{}
  183. } else {
  184. delete(s.idle, c)
  185. }
  186. }
  187. func (s *stopServer) Close() {
  188. s.mu.Lock()
  189. defer s.mu.Unlock()
  190. // Shut down the acceptor. No new connections can be created.
  191. s.l.Close()
  192. // Drop all idle connections. Non-idle connections will be
  193. // closed by connState as soon as they become idle.
  194. s.shutdown = true
  195. for c := range s.idle {
  196. glog.V(logger.Detail).Infof("closing idle connection %v", c.RemoteAddr())
  197. c.Close()
  198. delete(s.idle, c)
  199. }
  200. }
  201. type httpClient struct {
  202. address string
  203. port uint
  204. codec codec.ApiCoder
  205. lastRes interface{}
  206. lastErr error
  207. }
  208. // Create a new in process client
  209. func NewHttpClient(cfg HttpConfig, c codec.Codec) *httpClient {
  210. return &httpClient{
  211. address: cfg.ListenAddress,
  212. port: cfg.ListenPort,
  213. codec: c.New(nil),
  214. }
  215. }
  216. func (self *httpClient) Close() {
  217. // do nothing
  218. }
  219. func (self *httpClient) Send(req interface{}) error {
  220. var body []byte
  221. var err error
  222. self.lastRes = nil
  223. self.lastErr = nil
  224. if body, err = self.codec.Encode(req); err != nil {
  225. return err
  226. }
  227. httpReq, err := http.NewRequest("POST", fmt.Sprintf("%s:%d", self.address, self.port), bytes.NewBuffer(body))
  228. if err != nil {
  229. return err
  230. }
  231. httpReq.Header.Set("Content-Type", "application/json")
  232. client := http.Client{}
  233. resp, err := client.Do(httpReq)
  234. if err != nil {
  235. return err
  236. }
  237. defer resp.Body.Close()
  238. if resp.Status == "200 OK" {
  239. reply, _ := ioutil.ReadAll(resp.Body)
  240. var rpcSuccessResponse shared.SuccessResponse
  241. if err = self.codec.Decode(reply, &rpcSuccessResponse); err == nil {
  242. self.lastRes = rpcSuccessResponse.Result
  243. self.lastErr = err
  244. return nil
  245. } else {
  246. var rpcErrorResponse shared.ErrorResponse
  247. if err = self.codec.Decode(reply, &rpcErrorResponse); err == nil {
  248. self.lastRes = rpcErrorResponse.Error
  249. self.lastErr = err
  250. return nil
  251. } else {
  252. return err
  253. }
  254. }
  255. }
  256. return fmt.Errorf("Not implemented")
  257. }
  258. func (self *httpClient) Recv() (interface{}, error) {
  259. return self.lastRes, self.lastErr
  260. }
  261. func (self *httpClient) SupportedModules() (map[string]string, error) {
  262. var body []byte
  263. var err error
  264. payload := shared.Request{
  265. Id: 1,
  266. Jsonrpc: "2.0",
  267. Method: "modules",
  268. }
  269. if body, err = self.codec.Encode(payload); err != nil {
  270. return nil, err
  271. }
  272. req, err := http.NewRequest("POST", fmt.Sprintf("%s:%d", self.address, self.port), bytes.NewBuffer(body))
  273. if err != nil {
  274. return nil, err
  275. }
  276. req.Header.Set("Content-Type", "application/json")
  277. client := http.Client{}
  278. resp, err := client.Do(req)
  279. if err != nil {
  280. return nil, err
  281. }
  282. defer resp.Body.Close()
  283. if resp.Status == "200 OK" {
  284. reply, _ := ioutil.ReadAll(resp.Body)
  285. var rpcRes shared.SuccessResponse
  286. if err = self.codec.Decode(reply, &rpcRes); err != nil {
  287. return nil, err
  288. }
  289. result := make(map[string]string)
  290. if modules, ok := rpcRes.Result.(map[string]interface{}); ok {
  291. for a, v := range modules {
  292. result[a] = fmt.Sprintf("%s", v)
  293. }
  294. return result, nil
  295. }
  296. err = fmt.Errorf("Unable to parse module response - %v", rpcRes.Result)
  297. } else {
  298. fmt.Printf("resp.Status = %s\n", resp.Status)
  299. fmt.Printf("err = %v\n", err)
  300. }
  301. return nil, err
  302. }