| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- // Copyright 2015 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package rpc
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "reflect"
- "strings"
- "sync"
- "time"
- )
- const (
- vsn = "2.0"
- serviceMethodSeparator = "_"
- subscribeMethodSuffix = "_subscribe"
- unsubscribeMethodSuffix = "_unsubscribe"
- notificationMethodSuffix = "_subscription"
- defaultWriteTimeout = 10 * time.Second // used if context has no deadline
- )
- var null = json.RawMessage("null")
- type subscriptionResult struct {
- ID string `json:"subscription"`
- Result json.RawMessage `json:"result,omitempty"`
- }
- // A value of this type can a JSON-RPC request, notification, successful response or
- // error response. Which one it is depends on the fields.
- type jsonrpcMessage struct {
- Version string `json:"jsonrpc,omitempty"`
- ID json.RawMessage `json:"id,omitempty"`
- Method string `json:"method,omitempty"`
- Params json.RawMessage `json:"params,omitempty"`
- Error *jsonError `json:"error,omitempty"`
- Result json.RawMessage `json:"result,omitempty"`
- }
- func (msg *jsonrpcMessage) isNotification() bool {
- return msg.ID == nil && msg.Method != ""
- }
- func (msg *jsonrpcMessage) isCall() bool {
- return msg.hasValidID() && msg.Method != ""
- }
- func (msg *jsonrpcMessage) isResponse() bool {
- return msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil)
- }
- func (msg *jsonrpcMessage) hasValidID() bool {
- return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '['
- }
- func (msg *jsonrpcMessage) isSubscribe() bool {
- return strings.HasSuffix(msg.Method, subscribeMethodSuffix)
- }
- func (msg *jsonrpcMessage) isUnsubscribe() bool {
- return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix)
- }
- func (msg *jsonrpcMessage) namespace() string {
- elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2)
- return elem[0]
- }
- func (msg *jsonrpcMessage) String() string {
- b, _ := json.Marshal(msg)
- return string(b)
- }
- func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage {
- resp := errorMessage(err)
- resp.ID = msg.ID
- return resp
- }
- func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage {
- enc, err := json.Marshal(result)
- if err != nil {
- // TODO: wrap with 'internal server error'
- return msg.errorResponse(err)
- }
- return &jsonrpcMessage{Version: vsn, ID: msg.ID, Result: enc}
- }
- func errorMessage(err error) *jsonrpcMessage {
- msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{
- Code: defaultErrorCode,
- Message: err.Error(),
- }}
- ec, ok := err.(Error)
- if ok {
- msg.Error.Code = ec.ErrorCode()
- }
- de, ok := err.(DataError)
- if ok {
- msg.Error.Data = de.ErrorData()
- }
- return msg
- }
- type jsonError struct {
- Code int `json:"code"`
- Message string `json:"message"`
- Data interface{} `json:"data,omitempty"`
- }
- func (err *jsonError) Error() string {
- if err.Message == "" {
- return fmt.Sprintf("json-rpc error %d", err.Code)
- }
- return err.Message
- }
- func (err *jsonError) ErrorCode() int {
- return err.Code
- }
- func (err *jsonError) ErrorData() interface{} {
- return err.Data
- }
- // Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec.
- type Conn interface {
- io.ReadWriteCloser
- SetWriteDeadline(time.Time) error
- }
- type deadlineCloser interface {
- io.Closer
- SetWriteDeadline(time.Time) error
- }
- // ConnRemoteAddr wraps the RemoteAddr operation, which returns a description
- // of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this
- // description is used in log messages.
- type ConnRemoteAddr interface {
- RemoteAddr() string
- }
- // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
- // support for parsing arguments and serializing (result) objects.
- type jsonCodec struct {
- remote string
- closer sync.Once // close closed channel once
- closeCh chan interface{} // closed on Close
- decode func(v interface{}) error // decoder to allow multiple transports
- encMu sync.Mutex // guards the encoder
- encode func(v interface{}) error // encoder to allow multiple transports
- conn deadlineCloser
- }
- // NewFuncCodec creates a codec which uses the given functions to read and write. If conn
- // implements ConnRemoteAddr, log messages will use it to include the remote address of
- // the connection.
- func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec {
- codec := &jsonCodec{
- closeCh: make(chan interface{}),
- encode: encode,
- decode: decode,
- conn: conn,
- }
- if ra, ok := conn.(ConnRemoteAddr); ok {
- codec.remote = ra.RemoteAddr()
- }
- return codec
- }
- // NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
- // messages will use it to include the remote address of the connection.
- func NewCodec(conn Conn) ServerCodec {
- enc := json.NewEncoder(conn)
- dec := json.NewDecoder(conn)
- dec.UseNumber()
- return NewFuncCodec(conn, enc.Encode, dec.Decode)
- }
- func (c *jsonCodec) remoteAddr() string {
- return c.remote
- }
- func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err error) {
- // Decode the next JSON object in the input stream.
- // This verifies basic syntax, etc.
- var rawmsg json.RawMessage
- if err := c.decode(&rawmsg); err != nil {
- return nil, false, err
- }
- messages, batch = parseMessage(rawmsg)
- for i, msg := range messages {
- if msg == nil {
- // Message is JSON 'null'. Replace with zero value so it
- // will be treated like any other invalid message.
- messages[i] = new(jsonrpcMessage)
- }
- }
- return messages, batch, nil
- }
- func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error {
- c.encMu.Lock()
- defer c.encMu.Unlock()
- deadline, ok := ctx.Deadline()
- if !ok {
- deadline = time.Now().Add(defaultWriteTimeout)
- }
- c.conn.SetWriteDeadline(deadline)
- return c.encode(v)
- }
- func (c *jsonCodec) close() {
- c.closer.Do(func() {
- close(c.closeCh)
- c.conn.Close()
- })
- }
- // Closed returns a channel which will be closed when Close is called
- func (c *jsonCodec) closed() <-chan interface{} {
- return c.closeCh
- }
- // parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error
- // checks in this function because the raw message has already been syntax-checked when it
- // is called. Any non-JSON-RPC messages in the input return the zero value of
- // jsonrpcMessage.
- func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) {
- if !isBatch(raw) {
- msgs := []*jsonrpcMessage{{}}
- json.Unmarshal(raw, &msgs[0])
- return msgs, false
- }
- dec := json.NewDecoder(bytes.NewReader(raw))
- dec.Token() // skip '['
- var msgs []*jsonrpcMessage
- for dec.More() {
- msgs = append(msgs, new(jsonrpcMessage))
- dec.Decode(&msgs[len(msgs)-1])
- }
- return msgs, true
- }
- // isBatch returns true when the first non-whitespace characters is '['
- func isBatch(raw json.RawMessage) bool {
- for _, c := range raw {
- // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt)
- if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d {
- continue
- }
- return c == '['
- }
- return false
- }
- // parsePositionalArguments tries to parse the given args to an array of values with the
- // given types. It returns the parsed values or an error when the args could not be
- // parsed. Missing optional arguments are returned as reflect.Zero values.
- func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) {
- dec := json.NewDecoder(bytes.NewReader(rawArgs))
- var args []reflect.Value
- tok, err := dec.Token()
- switch {
- case err == io.EOF || tok == nil && err == nil:
- // "params" is optional and may be empty. Also allow "params":null even though it's
- // not in the spec because our own client used to send it.
- case err != nil:
- return nil, err
- case tok == json.Delim('['):
- // Read argument array.
- if args, err = parseArgumentArray(dec, types); err != nil {
- return nil, err
- }
- default:
- return nil, errors.New("non-array args")
- }
- // Set any missing args to nil.
- for i := len(args); i < len(types); i++ {
- if types[i].Kind() != reflect.Ptr {
- return nil, fmt.Errorf("missing value for required argument %d", i)
- }
- args = append(args, reflect.Zero(types[i]))
- }
- return args, nil
- }
- func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) {
- args := make([]reflect.Value, 0, len(types))
- for i := 0; dec.More(); i++ {
- if i >= len(types) {
- return args, fmt.Errorf("too many arguments, want at most %d", len(types))
- }
- argval := reflect.New(types[i])
- if err := dec.Decode(argval.Interface()); err != nil {
- return args, fmt.Errorf("invalid argument %d: %v", i, err)
- }
- if argval.IsNil() && types[i].Kind() != reflect.Ptr {
- return args, fmt.Errorf("missing value for required argument %d", i)
- }
- args = append(args, argval.Elem())
- }
- // Read end of args array.
- _, err := dec.Token()
- return args, err
- }
- // parseSubscriptionName extracts the subscription name from an encoded argument array.
- func parseSubscriptionName(rawArgs json.RawMessage) (string, error) {
- dec := json.NewDecoder(bytes.NewReader(rawArgs))
- if tok, _ := dec.Token(); tok != json.Delim('[') {
- return "", errors.New("non-array args")
- }
- v, _ := dec.Token()
- method, ok := v.(string)
- if !ok {
- return "", errors.New("expected subscription name as first argument")
- }
- return method, nil
- }
|