tracer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. // Copyright (c) 2017-2018 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "fmt"
  17. "io"
  18. "os"
  19. "reflect"
  20. "strconv"
  21. "sync"
  22. "time"
  23. "github.com/opentracing/opentracing-go"
  24. "github.com/opentracing/opentracing-go/ext"
  25. "github.com/uber/jaeger-client-go/internal/baggage"
  26. "github.com/uber/jaeger-client-go/internal/throttler"
  27. "github.com/uber/jaeger-client-go/log"
  28. "github.com/uber/jaeger-client-go/utils"
  29. )
  30. // Tracer implements opentracing.Tracer.
  31. type Tracer struct {
  32. serviceName string
  33. hostIPv4 uint32 // this is for zipkin endpoint conversion
  34. sampler Sampler
  35. reporter Reporter
  36. metrics Metrics
  37. logger log.Logger
  38. timeNow func() time.Time
  39. randomNumber func() uint64
  40. options struct {
  41. poolSpans bool
  42. gen128Bit bool // whether to generate 128bit trace IDs
  43. zipkinSharedRPCSpan bool
  44. highTraceIDGenerator func() uint64 // custom high trace ID generator
  45. maxTagValueLength int
  46. // more options to come
  47. }
  48. // pool for Span objects
  49. spanPool sync.Pool
  50. injectors map[interface{}]Injector
  51. extractors map[interface{}]Extractor
  52. observer compositeObserver
  53. tags []Tag
  54. process Process
  55. baggageRestrictionManager baggage.RestrictionManager
  56. baggageSetter *baggageSetter
  57. debugThrottler throttler.Throttler
  58. }
  59. // NewTracer creates Tracer implementation that reports tracing to Jaeger.
  60. // The returned io.Closer can be used in shutdown hooks to ensure that the internal
  61. // queue of the Reporter is drained and all buffered spans are submitted to collectors.
  62. func NewTracer(
  63. serviceName string,
  64. sampler Sampler,
  65. reporter Reporter,
  66. options ...TracerOption,
  67. ) (opentracing.Tracer, io.Closer) {
  68. t := &Tracer{
  69. serviceName: serviceName,
  70. sampler: sampler,
  71. reporter: reporter,
  72. injectors: make(map[interface{}]Injector),
  73. extractors: make(map[interface{}]Extractor),
  74. metrics: *NewNullMetrics(),
  75. spanPool: sync.Pool{New: func() interface{} {
  76. return &Span{}
  77. }},
  78. }
  79. for _, option := range options {
  80. option(t)
  81. }
  82. // register default injectors/extractors unless they are already provided via options
  83. textPropagator := newTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
  84. t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
  85. httpHeaderPropagator := newHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
  86. t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
  87. binaryPropagator := newBinaryPropagator(t)
  88. t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
  89. // TODO remove after TChannel supports OpenTracing
  90. interopPropagator := &jaegerTraceContextPropagator{tracer: t}
  91. t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
  92. zipkinPropagator := &zipkinPropagator{tracer: t}
  93. t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
  94. if t.baggageRestrictionManager != nil {
  95. t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
  96. } else {
  97. t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
  98. }
  99. if t.debugThrottler == nil {
  100. t.debugThrottler = throttler.DefaultThrottler{}
  101. }
  102. if t.randomNumber == nil {
  103. rng := utils.NewRand(time.Now().UnixNano())
  104. t.randomNumber = func() uint64 {
  105. return uint64(rng.Int63())
  106. }
  107. }
  108. if t.timeNow == nil {
  109. t.timeNow = time.Now
  110. }
  111. if t.logger == nil {
  112. t.logger = log.NullLogger
  113. }
  114. // Set tracer-level tags
  115. t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
  116. if hostname, err := os.Hostname(); err == nil {
  117. t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
  118. }
  119. if ip, err := utils.HostIP(); err == nil {
  120. t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
  121. t.hostIPv4 = utils.PackIPAsUint32(ip)
  122. } else {
  123. t.logger.Error("Unable to determine this host's IP address: " + err.Error())
  124. }
  125. if t.options.gen128Bit {
  126. if t.options.highTraceIDGenerator == nil {
  127. t.options.highTraceIDGenerator = t.randomNumber
  128. }
  129. } else if t.options.highTraceIDGenerator != nil {
  130. t.logger.Error("Overriding high trace ID generator but not generating " +
  131. "128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
  132. }
  133. if t.options.maxTagValueLength == 0 {
  134. t.options.maxTagValueLength = DefaultMaxTagValueLength
  135. }
  136. t.process = Process{
  137. Service: serviceName,
  138. UUID: strconv.FormatUint(t.randomNumber(), 16),
  139. Tags: t.tags,
  140. }
  141. if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
  142. throttler.SetProcess(t.process)
  143. }
  144. return t, t
  145. }
  146. // addCodec adds registers injector and extractor for given propagation format if not already defined.
  147. func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
  148. if _, ok := t.injectors[format]; !ok {
  149. t.injectors[format] = injector
  150. }
  151. if _, ok := t.extractors[format]; !ok {
  152. t.extractors[format] = extractor
  153. }
  154. }
  155. // StartSpan implements StartSpan() method of opentracing.Tracer.
  156. func (t *Tracer) StartSpan(
  157. operationName string,
  158. options ...opentracing.StartSpanOption,
  159. ) opentracing.Span {
  160. sso := opentracing.StartSpanOptions{}
  161. for _, o := range options {
  162. o.Apply(&sso)
  163. }
  164. return t.startSpanWithOptions(operationName, sso)
  165. }
  166. func (t *Tracer) startSpanWithOptions(
  167. operationName string,
  168. options opentracing.StartSpanOptions,
  169. ) opentracing.Span {
  170. if options.StartTime.IsZero() {
  171. options.StartTime = t.timeNow()
  172. }
  173. // Predicate whether the given span context is a valid reference
  174. // which may be used as parent / debug ID / baggage items source
  175. isValidReference := func(ctx SpanContext) bool {
  176. return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0
  177. }
  178. var references []Reference
  179. var parent SpanContext
  180. var hasParent bool // need this because `parent` is a value, not reference
  181. for _, ref := range options.References {
  182. ctx, ok := ref.ReferencedContext.(SpanContext)
  183. if !ok {
  184. t.logger.Error(fmt.Sprintf(
  185. "Reference contains invalid type of SpanReference: %s",
  186. reflect.ValueOf(ref.ReferencedContext)))
  187. continue
  188. }
  189. if !isValidReference(ctx) {
  190. continue
  191. }
  192. references = append(references, Reference{Type: ref.Type, Context: ctx})
  193. if !hasParent {
  194. parent = ctx
  195. hasParent = ref.Type == opentracing.ChildOfRef
  196. }
  197. }
  198. if !hasParent && isValidReference(parent) {
  199. // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
  200. // the FollowFromRef as the parent
  201. hasParent = true
  202. }
  203. rpcServer := false
  204. if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
  205. rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
  206. }
  207. var samplerTags []Tag
  208. var ctx SpanContext
  209. newTrace := false
  210. if !hasParent || !parent.IsValid() {
  211. newTrace = true
  212. ctx.traceID.Low = t.randomID()
  213. if t.options.gen128Bit {
  214. ctx.traceID.High = t.options.highTraceIDGenerator()
  215. }
  216. ctx.spanID = SpanID(ctx.traceID.Low)
  217. ctx.parentID = 0
  218. ctx.flags = byte(0)
  219. if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
  220. ctx.flags |= (flagSampled | flagDebug)
  221. samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
  222. } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
  223. ctx.flags |= flagSampled
  224. samplerTags = tags
  225. }
  226. } else {
  227. ctx.traceID = parent.traceID
  228. if rpcServer && t.options.zipkinSharedRPCSpan {
  229. // Support Zipkin's one-span-per-RPC model
  230. ctx.spanID = parent.spanID
  231. ctx.parentID = parent.parentID
  232. } else {
  233. ctx.spanID = SpanID(t.randomID())
  234. ctx.parentID = parent.spanID
  235. }
  236. ctx.flags = parent.flags
  237. }
  238. if hasParent {
  239. // copy baggage items
  240. if l := len(parent.baggage); l > 0 {
  241. ctx.baggage = make(map[string]string, len(parent.baggage))
  242. for k, v := range parent.baggage {
  243. ctx.baggage[k] = v
  244. }
  245. }
  246. }
  247. sp := t.newSpan()
  248. sp.context = ctx
  249. sp.observer = t.observer.OnStartSpan(sp, operationName, options)
  250. return t.startSpanInternal(
  251. sp,
  252. operationName,
  253. options.StartTime,
  254. samplerTags,
  255. options.Tags,
  256. newTrace,
  257. rpcServer,
  258. references,
  259. )
  260. }
  261. // Inject implements Inject() method of opentracing.Tracer
  262. func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
  263. c, ok := ctx.(SpanContext)
  264. if !ok {
  265. return opentracing.ErrInvalidSpanContext
  266. }
  267. if injector, ok := t.injectors[format]; ok {
  268. return injector.Inject(c, carrier)
  269. }
  270. return opentracing.ErrUnsupportedFormat
  271. }
  272. // Extract implements Extract() method of opentracing.Tracer
  273. func (t *Tracer) Extract(
  274. format interface{},
  275. carrier interface{},
  276. ) (opentracing.SpanContext, error) {
  277. if extractor, ok := t.extractors[format]; ok {
  278. return extractor.Extract(carrier)
  279. }
  280. return nil, opentracing.ErrUnsupportedFormat
  281. }
  282. // Close releases all resources used by the Tracer and flushes any remaining buffered spans.
  283. func (t *Tracer) Close() error {
  284. t.reporter.Close()
  285. t.sampler.Close()
  286. if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
  287. mgr.Close()
  288. }
  289. if throttler, ok := t.debugThrottler.(io.Closer); ok {
  290. throttler.Close()
  291. }
  292. return nil
  293. }
  294. // Tags returns a slice of tracer-level tags.
  295. func (t *Tracer) Tags() []opentracing.Tag {
  296. tags := make([]opentracing.Tag, len(t.tags))
  297. for i, tag := range t.tags {
  298. tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
  299. }
  300. return tags
  301. }
  302. // newSpan returns an instance of a clean Span object.
  303. // If options.PoolSpans is true, the spans are retrieved from an object pool.
  304. func (t *Tracer) newSpan() *Span {
  305. if !t.options.poolSpans {
  306. return &Span{}
  307. }
  308. sp := t.spanPool.Get().(*Span)
  309. sp.context = emptyContext
  310. sp.tracer = nil
  311. sp.tags = nil
  312. sp.logs = nil
  313. return sp
  314. }
  315. func (t *Tracer) startSpanInternal(
  316. sp *Span,
  317. operationName string,
  318. startTime time.Time,
  319. internalTags []Tag,
  320. tags opentracing.Tags,
  321. newTrace bool,
  322. rpcServer bool,
  323. references []Reference,
  324. ) *Span {
  325. sp.tracer = t
  326. sp.operationName = operationName
  327. sp.startTime = startTime
  328. sp.duration = 0
  329. sp.references = references
  330. sp.firstInProcess = rpcServer || sp.context.parentID == 0
  331. if len(tags) > 0 || len(internalTags) > 0 {
  332. sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
  333. copy(sp.tags, internalTags)
  334. for k, v := range tags {
  335. sp.observer.OnSetTag(k, v)
  336. if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) {
  337. continue
  338. }
  339. sp.setTagNoLocking(k, v)
  340. }
  341. }
  342. // emit metrics
  343. if sp.context.IsSampled() {
  344. t.metrics.SpansStartedSampled.Inc(1)
  345. if newTrace {
  346. // We cannot simply check for parentID==0 because in Zipkin model the
  347. // server-side RPC span has the exact same trace/span/parent IDs as the
  348. // calling client-side span, but obviously the server side span is
  349. // no longer a root span of the trace.
  350. t.metrics.TracesStartedSampled.Inc(1)
  351. } else if sp.firstInProcess {
  352. t.metrics.TracesJoinedSampled.Inc(1)
  353. }
  354. } else {
  355. t.metrics.SpansStartedNotSampled.Inc(1)
  356. if newTrace {
  357. t.metrics.TracesStartedNotSampled.Inc(1)
  358. } else if sp.firstInProcess {
  359. t.metrics.TracesJoinedNotSampled.Inc(1)
  360. }
  361. }
  362. return sp
  363. }
  364. func (t *Tracer) reportSpan(sp *Span) {
  365. t.metrics.SpansFinished.Inc(1)
  366. if sp.context.IsSampled() {
  367. t.reporter.Report(sp)
  368. }
  369. if t.options.poolSpans {
  370. t.spanPool.Put(sp)
  371. }
  372. }
  373. // randomID generates a random trace/span ID, using tracer.random() generator.
  374. // It never returns 0.
  375. func (t *Tracer) randomID() uint64 {
  376. val := t.randomNumber()
  377. for val == 0 {
  378. val = t.randomNumber()
  379. }
  380. return val
  381. }
  382. // (NB) span must hold the lock before making this call
  383. func (t *Tracer) setBaggage(sp *Span, key, value string) {
  384. t.baggageSetter.setBaggage(sp, key, value)
  385. }
  386. // (NB) span must hold the lock before making this call
  387. func (t *Tracer) isDebugAllowed(operation string) bool {
  388. return t.debugThrottler.IsAllowed(operation)
  389. }