json.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package codec
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/ethereum/go-ethereum/rpc/shared"
  8. )
  9. const (
  10. READ_TIMEOUT = 60 // in seconds
  11. MAX_REQUEST_SIZE = 1024 * 1024
  12. MAX_RESPONSE_SIZE = 1024 * 1024
  13. )
  14. // Json serialization support
  15. type JsonCodec struct {
  16. c net.Conn
  17. d *json.Decoder
  18. }
  19. // Create new JSON coder instance
  20. func NewJsonCoder(conn net.Conn) ApiCoder {
  21. return &JsonCodec{
  22. c: conn,
  23. d: json.NewDecoder(conn),
  24. }
  25. }
  26. // Read incoming request and parse it to RPC request
  27. func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) {
  28. deadline := time.Now().Add(READ_TIMEOUT * time.Second)
  29. if err := self.c.SetDeadline(deadline); err != nil {
  30. return nil, false, err
  31. }
  32. var incoming json.RawMessage
  33. err = self.d.Decode(&incoming)
  34. if err == nil {
  35. isBatch = incoming[0] == '['
  36. if isBatch {
  37. requests = make([]*shared.Request, 0)
  38. err = json.Unmarshal(incoming, &requests)
  39. } else {
  40. requests = make([]*shared.Request, 1)
  41. var singleRequest shared.Request
  42. if err = json.Unmarshal(incoming, &singleRequest); err == nil {
  43. requests[0] = &singleRequest
  44. }
  45. }
  46. return
  47. }
  48. self.c.Close()
  49. return nil, false, err
  50. }
  51. func (self *JsonCodec) ReadResponse() (interface{}, error) {
  52. bytesInBuffer := 0
  53. buf := make([]byte, MAX_RESPONSE_SIZE)
  54. deadline := time.Now().Add(READ_TIMEOUT * time.Second)
  55. if err := self.c.SetDeadline(deadline); err != nil {
  56. return nil, err
  57. }
  58. for {
  59. n, err := self.c.Read(buf[bytesInBuffer:])
  60. if err != nil {
  61. return nil, err
  62. }
  63. bytesInBuffer += n
  64. var failure shared.ErrorResponse
  65. if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil {
  66. return failure, fmt.Errorf(failure.Error.Message)
  67. }
  68. var success shared.SuccessResponse
  69. if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil {
  70. return success, nil
  71. }
  72. }
  73. self.c.Close()
  74. return nil, fmt.Errorf("Unable to read response")
  75. }
  76. // Decode data
  77. func (self *JsonCodec) Decode(data []byte, msg interface{}) error {
  78. return json.Unmarshal(data, msg)
  79. }
  80. // Encode message
  81. func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) {
  82. return json.Marshal(msg)
  83. }
  84. // Parse JSON data from conn to obj
  85. func (self *JsonCodec) WriteResponse(res interface{}) error {
  86. data, err := json.Marshal(res)
  87. if err != nil {
  88. self.c.Close()
  89. return err
  90. }
  91. bytesWritten := 0
  92. for bytesWritten < len(data) {
  93. n, err := self.c.Write(data[bytesWritten:])
  94. if err != nil {
  95. self.c.Close()
  96. return err
  97. }
  98. bytesWritten += n
  99. }
  100. return nil
  101. }
  102. // Close decoder and encoder
  103. func (self *JsonCodec) Close() {
  104. self.c.Close()
  105. }