service.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "reflect"
  22. "runtime"
  23. "strings"
  24. "sync"
  25. "unicode"
  26. "github.com/ethereum/go-ethereum/log"
  27. )
  28. var (
  29. contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
  30. errorType = reflect.TypeOf((*error)(nil)).Elem()
  31. subscriptionType = reflect.TypeOf(Subscription{})
  32. stringType = reflect.TypeOf("")
  33. )
  34. type serviceRegistry struct {
  35. mu sync.Mutex
  36. services map[string]service
  37. }
  38. // service represents a registered object.
  39. type service struct {
  40. name string // name for service
  41. callbacks map[string]*callback // registered handlers
  42. subscriptions map[string]*callback // available subscriptions/notifications
  43. }
  44. // callback is a method callback which was registered in the server
  45. type callback struct {
  46. fn reflect.Value // the function
  47. rcvr reflect.Value // receiver object of method, set if fn is method
  48. argTypes []reflect.Type // input argument types
  49. hasCtx bool // method's first argument is a context (not included in argTypes)
  50. errPos int // err return idx, of -1 when method cannot return error
  51. isSubscribe bool // true if this is a subscription callback
  52. }
  53. func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
  54. rcvrVal := reflect.ValueOf(rcvr)
  55. if name == "" {
  56. return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
  57. }
  58. callbacks := suitableCallbacks(rcvrVal)
  59. if len(callbacks) == 0 {
  60. return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
  61. }
  62. r.mu.Lock()
  63. defer r.mu.Unlock()
  64. if r.services == nil {
  65. r.services = make(map[string]service)
  66. }
  67. svc, ok := r.services[name]
  68. if !ok {
  69. svc = service{
  70. name: name,
  71. callbacks: make(map[string]*callback),
  72. subscriptions: make(map[string]*callback),
  73. }
  74. r.services[name] = svc
  75. }
  76. for name, cb := range callbacks {
  77. if cb.isSubscribe {
  78. svc.subscriptions[name] = cb
  79. } else {
  80. svc.callbacks[name] = cb
  81. }
  82. }
  83. return nil
  84. }
  85. // callback returns the callback corresponding to the given RPC method name.
  86. func (r *serviceRegistry) callback(method string) *callback {
  87. elem := strings.SplitN(method, serviceMethodSeparator, 2)
  88. if len(elem) != 2 {
  89. return nil
  90. }
  91. r.mu.Lock()
  92. defer r.mu.Unlock()
  93. return r.services[elem[0]].callbacks[elem[1]]
  94. }
  95. // subscription returns a subscription callback in the given service.
  96. func (r *serviceRegistry) subscription(service, name string) *callback {
  97. r.mu.Lock()
  98. defer r.mu.Unlock()
  99. return r.services[service].subscriptions[name]
  100. }
  101. // suitableCallbacks iterates over the methods of the given type. It determines if a method
  102. // satisfies the criteria for a RPC callback or a subscription callback and adds it to the
  103. // collection of callbacks. See server documentation for a summary of these criteria.
  104. func suitableCallbacks(receiver reflect.Value) map[string]*callback {
  105. typ := receiver.Type()
  106. callbacks := make(map[string]*callback)
  107. for m := 0; m < typ.NumMethod(); m++ {
  108. method := typ.Method(m)
  109. if method.PkgPath != "" {
  110. continue // method not exported
  111. }
  112. cb := newCallback(receiver, method.Func)
  113. if cb == nil {
  114. continue // function invalid
  115. }
  116. name := formatName(method.Name)
  117. callbacks[name] = cb
  118. }
  119. return callbacks
  120. }
  121. // newCallback turns fn (a function) into a callback object. It returns nil if the function
  122. // is unsuitable as an RPC callback.
  123. func newCallback(receiver, fn reflect.Value) *callback {
  124. fntype := fn.Type()
  125. c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
  126. // Determine parameter types. They must all be exported or builtin types.
  127. c.makeArgTypes()
  128. // Verify return types. The function must return at most one error
  129. // and/or one other non-error value.
  130. outs := make([]reflect.Type, fntype.NumOut())
  131. for i := 0; i < fntype.NumOut(); i++ {
  132. outs[i] = fntype.Out(i)
  133. }
  134. if len(outs) > 2 {
  135. return nil
  136. }
  137. // If an error is returned, it must be the last returned value.
  138. switch {
  139. case len(outs) == 1 && isErrorType(outs[0]):
  140. c.errPos = 0
  141. case len(outs) == 2:
  142. if isErrorType(outs[0]) || !isErrorType(outs[1]) {
  143. return nil
  144. }
  145. c.errPos = 1
  146. }
  147. return c
  148. }
  149. // makeArgTypes composes the argTypes list.
  150. func (c *callback) makeArgTypes() {
  151. fntype := c.fn.Type()
  152. // Skip receiver and context.Context parameter (if present).
  153. firstArg := 0
  154. if c.rcvr.IsValid() {
  155. firstArg++
  156. }
  157. if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType {
  158. c.hasCtx = true
  159. firstArg++
  160. }
  161. // Add all remaining parameters.
  162. c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg)
  163. for i := firstArg; i < fntype.NumIn(); i++ {
  164. c.argTypes[i-firstArg] = fntype.In(i)
  165. }
  166. }
  167. // call invokes the callback.
  168. func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) {
  169. // Create the argument slice.
  170. fullargs := make([]reflect.Value, 0, 2+len(args))
  171. if c.rcvr.IsValid() {
  172. fullargs = append(fullargs, c.rcvr)
  173. }
  174. if c.hasCtx {
  175. fullargs = append(fullargs, reflect.ValueOf(ctx))
  176. }
  177. fullargs = append(fullargs, args...)
  178. // Catch panic while running the callback.
  179. defer func() {
  180. if err := recover(); err != nil {
  181. const size = 64 << 10
  182. buf := make([]byte, size)
  183. buf = buf[:runtime.Stack(buf, false)]
  184. log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf))
  185. errRes = errors.New("method handler crashed")
  186. }
  187. }()
  188. // Run the callback.
  189. results := c.fn.Call(fullargs)
  190. if len(results) == 0 {
  191. return nil, nil
  192. }
  193. if c.errPos >= 0 && !results[c.errPos].IsNil() {
  194. // Method has returned non-nil error value.
  195. err := results[c.errPos].Interface().(error)
  196. return reflect.Value{}, err
  197. }
  198. return results[0].Interface(), nil
  199. }
  200. // Is t context.Context or *context.Context?
  201. func isContextType(t reflect.Type) bool {
  202. for t.Kind() == reflect.Ptr {
  203. t = t.Elem()
  204. }
  205. return t == contextType
  206. }
  207. // Does t satisfy the error interface?
  208. func isErrorType(t reflect.Type) bool {
  209. for t.Kind() == reflect.Ptr {
  210. t = t.Elem()
  211. }
  212. return t.Implements(errorType)
  213. }
  214. // Is t Subscription or *Subscription?
  215. func isSubscriptionType(t reflect.Type) bool {
  216. for t.Kind() == reflect.Ptr {
  217. t = t.Elem()
  218. }
  219. return t == subscriptionType
  220. }
  221. // isPubSub tests whether the given method has as as first argument a context.Context and
  222. // returns the pair (Subscription, error).
  223. func isPubSub(methodType reflect.Type) bool {
  224. // numIn(0) is the receiver type
  225. if methodType.NumIn() < 2 || methodType.NumOut() != 2 {
  226. return false
  227. }
  228. return isContextType(methodType.In(1)) &&
  229. isSubscriptionType(methodType.Out(0)) &&
  230. isErrorType(methodType.Out(1))
  231. }
  232. // formatName converts to first character of name to lowercase.
  233. func formatName(name string) string {
  234. ret := []rune(name)
  235. if len(ret) > 0 {
  236. ret[0] = unicode.ToLower(ret[0])
  237. }
  238. return string(ret)
  239. }