influxdb.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. // Package client implements a now-deprecated client for InfluxDB;
  2. // use github.com/influxdata/influxdb/client/v2 instead.
  3. package client // import "github.com/influxdata/influxdb/client"
  4. import (
  5. "bytes"
  6. "context"
  7. "crypto/tls"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "io/ioutil"
  13. "net"
  14. "net/http"
  15. "net/url"
  16. "strconv"
  17. "strings"
  18. "time"
  19. "github.com/influxdata/influxdb/models"
  20. )
  21. const (
  22. // DefaultHost is the default host used to connect to an InfluxDB instance
  23. DefaultHost = "localhost"
  24. // DefaultPort is the default port used to connect to an InfluxDB instance
  25. DefaultPort = 8086
  26. // DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
  27. DefaultTimeout = 0
  28. )
  29. // Query is used to send a command to the server. Both Command and Database are required.
  30. type Query struct {
  31. Command string
  32. Database string
  33. // Chunked tells the server to send back chunked responses. This places
  34. // less load on the server by sending back chunks of the response rather
  35. // than waiting for the entire response all at once.
  36. Chunked bool
  37. // ChunkSize sets the maximum number of rows that will be returned per
  38. // chunk. Chunks are either divided based on their series or if they hit
  39. // the chunk size limit.
  40. //
  41. // Chunked must be set to true for this option to be used.
  42. ChunkSize int
  43. }
  44. // ParseConnectionString will parse a string to create a valid connection URL
  45. func ParseConnectionString(path string, ssl bool) (url.URL, error) {
  46. var host string
  47. var port int
  48. h, p, err := net.SplitHostPort(path)
  49. if err != nil {
  50. if path == "" {
  51. host = DefaultHost
  52. } else {
  53. host = path
  54. }
  55. // If they didn't specify a port, always use the default port
  56. port = DefaultPort
  57. } else {
  58. host = h
  59. port, err = strconv.Atoi(p)
  60. if err != nil {
  61. return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, err)
  62. }
  63. }
  64. u := url.URL{
  65. Scheme: "http",
  66. }
  67. if ssl {
  68. u.Scheme = "https"
  69. }
  70. u.Host = net.JoinHostPort(host, strconv.Itoa(port))
  71. return u, nil
  72. }
  73. // Config is used to specify what server to connect to.
  74. // URL: The URL of the server connecting to.
  75. // Username/Password are optional. They will be passed via basic auth if provided.
  76. // UserAgent: If not provided, will default "InfluxDBClient",
  77. // Timeout: If not provided, will default to 0 (no timeout)
  78. type Config struct {
  79. URL url.URL
  80. UnixSocket string
  81. Username string
  82. Password string
  83. UserAgent string
  84. Timeout time.Duration
  85. Precision string
  86. WriteConsistency string
  87. UnsafeSsl bool
  88. }
  89. // NewConfig will create a config to be used in connecting to the client
  90. func NewConfig() Config {
  91. return Config{
  92. Timeout: DefaultTimeout,
  93. }
  94. }
  95. // Client is used to make calls to the server.
  96. type Client struct {
  97. url url.URL
  98. unixSocket string
  99. username string
  100. password string
  101. httpClient *http.Client
  102. userAgent string
  103. precision string
  104. }
  105. const (
  106. // ConsistencyOne requires at least one data node acknowledged a write.
  107. ConsistencyOne = "one"
  108. // ConsistencyAll requires all data nodes to acknowledge a write.
  109. ConsistencyAll = "all"
  110. // ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
  111. ConsistencyQuorum = "quorum"
  112. // ConsistencyAny allows for hinted hand off, potentially no write happened yet.
  113. ConsistencyAny = "any"
  114. )
  115. // NewClient will instantiate and return a connected client to issue commands to the server.
  116. func NewClient(c Config) (*Client, error) {
  117. tlsConfig := &tls.Config{
  118. InsecureSkipVerify: c.UnsafeSsl,
  119. }
  120. tr := &http.Transport{
  121. TLSClientConfig: tlsConfig,
  122. }
  123. if c.UnixSocket != "" {
  124. // No need for compression in local communications.
  125. tr.DisableCompression = true
  126. tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
  127. return net.Dial("unix", c.UnixSocket)
  128. }
  129. }
  130. client := Client{
  131. url: c.URL,
  132. unixSocket: c.UnixSocket,
  133. username: c.Username,
  134. password: c.Password,
  135. httpClient: &http.Client{Timeout: c.Timeout, Transport: tr},
  136. userAgent: c.UserAgent,
  137. precision: c.Precision,
  138. }
  139. if client.userAgent == "" {
  140. client.userAgent = "InfluxDBClient"
  141. }
  142. return &client, nil
  143. }
  144. // SetAuth will update the username and passwords
  145. func (c *Client) SetAuth(u, p string) {
  146. c.username = u
  147. c.password = p
  148. }
  149. // SetPrecision will update the precision
  150. func (c *Client) SetPrecision(precision string) {
  151. c.precision = precision
  152. }
  153. // Query sends a command to the server and returns the Response
  154. func (c *Client) Query(q Query) (*Response, error) {
  155. return c.QueryContext(context.Background(), q)
  156. }
  157. // QueryContext sends a command to the server and returns the Response
  158. // It uses a context that can be cancelled by the command line client
  159. func (c *Client) QueryContext(ctx context.Context, q Query) (*Response, error) {
  160. u := c.url
  161. u.Path = "query"
  162. values := u.Query()
  163. values.Set("q", q.Command)
  164. values.Set("db", q.Database)
  165. if q.Chunked {
  166. values.Set("chunked", "true")
  167. if q.ChunkSize > 0 {
  168. values.Set("chunk_size", strconv.Itoa(q.ChunkSize))
  169. }
  170. }
  171. if c.precision != "" {
  172. values.Set("epoch", c.precision)
  173. }
  174. u.RawQuery = values.Encode()
  175. req, err := http.NewRequest("POST", u.String(), nil)
  176. if err != nil {
  177. return nil, err
  178. }
  179. req.Header.Set("User-Agent", c.userAgent)
  180. if c.username != "" {
  181. req.SetBasicAuth(c.username, c.password)
  182. }
  183. req = req.WithContext(ctx)
  184. resp, err := c.httpClient.Do(req)
  185. if err != nil {
  186. return nil, err
  187. }
  188. defer resp.Body.Close()
  189. var response Response
  190. if q.Chunked {
  191. cr := NewChunkedResponse(resp.Body)
  192. for {
  193. r, err := cr.NextResponse()
  194. if err != nil {
  195. // If we got an error while decoding the response, send that back.
  196. return nil, err
  197. }
  198. if r == nil {
  199. break
  200. }
  201. response.Results = append(response.Results, r.Results...)
  202. if r.Err != nil {
  203. response.Err = r.Err
  204. break
  205. }
  206. }
  207. } else {
  208. dec := json.NewDecoder(resp.Body)
  209. dec.UseNumber()
  210. if err := dec.Decode(&response); err != nil {
  211. // Ignore EOF errors if we got an invalid status code.
  212. if !(err == io.EOF && resp.StatusCode != http.StatusOK) {
  213. return nil, err
  214. }
  215. }
  216. }
  217. // If we don't have an error in our json response, and didn't get StatusOK,
  218. // then send back an error.
  219. if resp.StatusCode != http.StatusOK && response.Error() == nil {
  220. return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
  221. }
  222. return &response, nil
  223. }
  224. // Write takes BatchPoints and allows for writing of multiple points with defaults
  225. // If successful, error is nil and Response is nil
  226. // If an error occurs, Response may contain additional information if populated.
  227. func (c *Client) Write(bp BatchPoints) (*Response, error) {
  228. u := c.url
  229. u.Path = "write"
  230. var b bytes.Buffer
  231. for _, p := range bp.Points {
  232. err := checkPointTypes(p)
  233. if err != nil {
  234. return nil, err
  235. }
  236. if p.Raw != "" {
  237. if _, err := b.WriteString(p.Raw); err != nil {
  238. return nil, err
  239. }
  240. } else {
  241. for k, v := range bp.Tags {
  242. if p.Tags == nil {
  243. p.Tags = make(map[string]string, len(bp.Tags))
  244. }
  245. p.Tags[k] = v
  246. }
  247. if _, err := b.WriteString(p.MarshalString()); err != nil {
  248. return nil, err
  249. }
  250. }
  251. if err := b.WriteByte('\n'); err != nil {
  252. return nil, err
  253. }
  254. }
  255. req, err := http.NewRequest("POST", u.String(), &b)
  256. if err != nil {
  257. return nil, err
  258. }
  259. req.Header.Set("Content-Type", "")
  260. req.Header.Set("User-Agent", c.userAgent)
  261. if c.username != "" {
  262. req.SetBasicAuth(c.username, c.password)
  263. }
  264. precision := bp.Precision
  265. if precision == "" {
  266. precision = "ns"
  267. }
  268. params := req.URL.Query()
  269. params.Set("db", bp.Database)
  270. params.Set("rp", bp.RetentionPolicy)
  271. params.Set("precision", precision)
  272. params.Set("consistency", bp.WriteConsistency)
  273. req.URL.RawQuery = params.Encode()
  274. resp, err := c.httpClient.Do(req)
  275. if err != nil {
  276. return nil, err
  277. }
  278. defer resp.Body.Close()
  279. var response Response
  280. body, err := ioutil.ReadAll(resp.Body)
  281. if err != nil {
  282. return nil, err
  283. }
  284. if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
  285. var err = fmt.Errorf(string(body))
  286. response.Err = err
  287. return &response, err
  288. }
  289. return nil, nil
  290. }
  291. // WriteLineProtocol takes a string with line returns to delimit each write
  292. // If successful, error is nil and Response is nil
  293. // If an error occurs, Response may contain additional information if populated.
  294. func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
  295. u := c.url
  296. u.Path = "write"
  297. r := strings.NewReader(data)
  298. req, err := http.NewRequest("POST", u.String(), r)
  299. if err != nil {
  300. return nil, err
  301. }
  302. req.Header.Set("Content-Type", "")
  303. req.Header.Set("User-Agent", c.userAgent)
  304. if c.username != "" {
  305. req.SetBasicAuth(c.username, c.password)
  306. }
  307. params := req.URL.Query()
  308. params.Set("db", database)
  309. params.Set("rp", retentionPolicy)
  310. params.Set("precision", precision)
  311. params.Set("consistency", writeConsistency)
  312. req.URL.RawQuery = params.Encode()
  313. resp, err := c.httpClient.Do(req)
  314. if err != nil {
  315. return nil, err
  316. }
  317. defer resp.Body.Close()
  318. var response Response
  319. body, err := ioutil.ReadAll(resp.Body)
  320. if err != nil {
  321. return nil, err
  322. }
  323. if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
  324. err := fmt.Errorf(string(body))
  325. response.Err = err
  326. return &response, err
  327. }
  328. return nil, nil
  329. }
  330. // Ping will check to see if the server is up
  331. // Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
  332. func (c *Client) Ping() (time.Duration, string, error) {
  333. now := time.Now()
  334. u := c.url
  335. u.Path = "ping"
  336. req, err := http.NewRequest("GET", u.String(), nil)
  337. if err != nil {
  338. return 0, "", err
  339. }
  340. req.Header.Set("User-Agent", c.userAgent)
  341. if c.username != "" {
  342. req.SetBasicAuth(c.username, c.password)
  343. }
  344. resp, err := c.httpClient.Do(req)
  345. if err != nil {
  346. return 0, "", err
  347. }
  348. defer resp.Body.Close()
  349. version := resp.Header.Get("X-Influxdb-Version")
  350. return time.Since(now), version, nil
  351. }
  352. // Structs
  353. // Message represents a user message.
  354. type Message struct {
  355. Level string `json:"level,omitempty"`
  356. Text string `json:"text,omitempty"`
  357. }
  358. // Result represents a resultset returned from a single statement.
  359. type Result struct {
  360. Series []models.Row
  361. Messages []*Message
  362. Err error
  363. }
  364. // MarshalJSON encodes the result into JSON.
  365. func (r *Result) MarshalJSON() ([]byte, error) {
  366. // Define a struct that outputs "error" as a string.
  367. var o struct {
  368. Series []models.Row `json:"series,omitempty"`
  369. Messages []*Message `json:"messages,omitempty"`
  370. Err string `json:"error,omitempty"`
  371. }
  372. // Copy fields to output struct.
  373. o.Series = r.Series
  374. o.Messages = r.Messages
  375. if r.Err != nil {
  376. o.Err = r.Err.Error()
  377. }
  378. return json.Marshal(&o)
  379. }
  380. // UnmarshalJSON decodes the data into the Result struct
  381. func (r *Result) UnmarshalJSON(b []byte) error {
  382. var o struct {
  383. Series []models.Row `json:"series,omitempty"`
  384. Messages []*Message `json:"messages,omitempty"`
  385. Err string `json:"error,omitempty"`
  386. }
  387. dec := json.NewDecoder(bytes.NewBuffer(b))
  388. dec.UseNumber()
  389. err := dec.Decode(&o)
  390. if err != nil {
  391. return err
  392. }
  393. r.Series = o.Series
  394. r.Messages = o.Messages
  395. if o.Err != "" {
  396. r.Err = errors.New(o.Err)
  397. }
  398. return nil
  399. }
  400. // Response represents a list of statement results.
  401. type Response struct {
  402. Results []Result
  403. Err error
  404. }
  405. // MarshalJSON encodes the response into JSON.
  406. func (r *Response) MarshalJSON() ([]byte, error) {
  407. // Define a struct that outputs "error" as a string.
  408. var o struct {
  409. Results []Result `json:"results,omitempty"`
  410. Err string `json:"error,omitempty"`
  411. }
  412. // Copy fields to output struct.
  413. o.Results = r.Results
  414. if r.Err != nil {
  415. o.Err = r.Err.Error()
  416. }
  417. return json.Marshal(&o)
  418. }
  419. // UnmarshalJSON decodes the data into the Response struct
  420. func (r *Response) UnmarshalJSON(b []byte) error {
  421. var o struct {
  422. Results []Result `json:"results,omitempty"`
  423. Err string `json:"error,omitempty"`
  424. }
  425. dec := json.NewDecoder(bytes.NewBuffer(b))
  426. dec.UseNumber()
  427. err := dec.Decode(&o)
  428. if err != nil {
  429. return err
  430. }
  431. r.Results = o.Results
  432. if o.Err != "" {
  433. r.Err = errors.New(o.Err)
  434. }
  435. return nil
  436. }
  437. // Error returns the first error from any statement.
  438. // Returns nil if no errors occurred on any statements.
  439. func (r *Response) Error() error {
  440. if r.Err != nil {
  441. return r.Err
  442. }
  443. for _, result := range r.Results {
  444. if result.Err != nil {
  445. return result.Err
  446. }
  447. }
  448. return nil
  449. }
  450. // duplexReader reads responses and writes it to another writer while
  451. // satisfying the reader interface.
  452. type duplexReader struct {
  453. r io.Reader
  454. w io.Writer
  455. }
  456. func (r *duplexReader) Read(p []byte) (n int, err error) {
  457. n, err = r.r.Read(p)
  458. if err == nil {
  459. r.w.Write(p[:n])
  460. }
  461. return n, err
  462. }
  463. // ChunkedResponse represents a response from the server that
  464. // uses chunking to stream the output.
  465. type ChunkedResponse struct {
  466. dec *json.Decoder
  467. duplex *duplexReader
  468. buf bytes.Buffer
  469. }
  470. // NewChunkedResponse reads a stream and produces responses from the stream.
  471. func NewChunkedResponse(r io.Reader) *ChunkedResponse {
  472. resp := &ChunkedResponse{}
  473. resp.duplex = &duplexReader{r: r, w: &resp.buf}
  474. resp.dec = json.NewDecoder(resp.duplex)
  475. resp.dec.UseNumber()
  476. return resp
  477. }
  478. // NextResponse reads the next line of the stream and returns a response.
  479. func (r *ChunkedResponse) NextResponse() (*Response, error) {
  480. var response Response
  481. if err := r.dec.Decode(&response); err != nil {
  482. if err == io.EOF {
  483. return nil, nil
  484. }
  485. // A decoding error happened. This probably means the server crashed
  486. // and sent a last-ditch error message to us. Ensure we have read the
  487. // entirety of the connection to get any remaining error text.
  488. io.Copy(ioutil.Discard, r.duplex)
  489. return nil, errors.New(strings.TrimSpace(r.buf.String()))
  490. }
  491. r.buf.Reset()
  492. return &response, nil
  493. }
  494. // Point defines the fields that will be written to the database
  495. // Measurement, Time, and Fields are required
  496. // Precision can be specified if the time is in epoch format (integer).
  497. // Valid values for Precision are n, u, ms, s, m, and h
  498. type Point struct {
  499. Measurement string
  500. Tags map[string]string
  501. Time time.Time
  502. Fields map[string]interface{}
  503. Precision string
  504. Raw string
  505. }
  506. // MarshalJSON will format the time in RFC3339Nano
  507. // Precision is also ignored as it is only used for writing, not reading
  508. // Or another way to say it is we always send back in nanosecond precision
  509. func (p *Point) MarshalJSON() ([]byte, error) {
  510. point := struct {
  511. Measurement string `json:"measurement,omitempty"`
  512. Tags map[string]string `json:"tags,omitempty"`
  513. Time string `json:"time,omitempty"`
  514. Fields map[string]interface{} `json:"fields,omitempty"`
  515. Precision string `json:"precision,omitempty"`
  516. }{
  517. Measurement: p.Measurement,
  518. Tags: p.Tags,
  519. Fields: p.Fields,
  520. Precision: p.Precision,
  521. }
  522. // Let it omit empty if it's really zero
  523. if !p.Time.IsZero() {
  524. point.Time = p.Time.UTC().Format(time.RFC3339Nano)
  525. }
  526. return json.Marshal(&point)
  527. }
  528. // MarshalString renders string representation of a Point with specified
  529. // precision. The default precision is nanoseconds.
  530. func (p *Point) MarshalString() string {
  531. pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
  532. if err != nil {
  533. return "# ERROR: " + err.Error() + " " + p.Measurement
  534. }
  535. if p.Precision == "" || p.Precision == "ns" || p.Precision == "n" {
  536. return pt.String()
  537. }
  538. return pt.PrecisionString(p.Precision)
  539. }
  540. // UnmarshalJSON decodes the data into the Point struct
  541. func (p *Point) UnmarshalJSON(b []byte) error {
  542. var normal struct {
  543. Measurement string `json:"measurement"`
  544. Tags map[string]string `json:"tags"`
  545. Time time.Time `json:"time"`
  546. Precision string `json:"precision"`
  547. Fields map[string]interface{} `json:"fields"`
  548. }
  549. var epoch struct {
  550. Measurement string `json:"measurement"`
  551. Tags map[string]string `json:"tags"`
  552. Time *int64 `json:"time"`
  553. Precision string `json:"precision"`
  554. Fields map[string]interface{} `json:"fields"`
  555. }
  556. if err := func() error {
  557. var err error
  558. dec := json.NewDecoder(bytes.NewBuffer(b))
  559. dec.UseNumber()
  560. if err = dec.Decode(&epoch); err != nil {
  561. return err
  562. }
  563. // Convert from epoch to time.Time, but only if Time
  564. // was actually set.
  565. var ts time.Time
  566. if epoch.Time != nil {
  567. ts, err = EpochToTime(*epoch.Time, epoch.Precision)
  568. if err != nil {
  569. return err
  570. }
  571. }
  572. p.Measurement = epoch.Measurement
  573. p.Tags = epoch.Tags
  574. p.Time = ts
  575. p.Precision = epoch.Precision
  576. p.Fields = normalizeFields(epoch.Fields)
  577. return nil
  578. }(); err == nil {
  579. return nil
  580. }
  581. dec := json.NewDecoder(bytes.NewBuffer(b))
  582. dec.UseNumber()
  583. if err := dec.Decode(&normal); err != nil {
  584. return err
  585. }
  586. normal.Time = SetPrecision(normal.Time, normal.Precision)
  587. p.Measurement = normal.Measurement
  588. p.Tags = normal.Tags
  589. p.Time = normal.Time
  590. p.Precision = normal.Precision
  591. p.Fields = normalizeFields(normal.Fields)
  592. return nil
  593. }
  594. // Remove any notion of json.Number
  595. func normalizeFields(fields map[string]interface{}) map[string]interface{} {
  596. newFields := map[string]interface{}{}
  597. for k, v := range fields {
  598. switch v := v.(type) {
  599. case json.Number:
  600. jv, e := v.Float64()
  601. if e != nil {
  602. panic(fmt.Sprintf("unable to convert json.Number to float64: %s", e))
  603. }
  604. newFields[k] = jv
  605. default:
  606. newFields[k] = v
  607. }
  608. }
  609. return newFields
  610. }
  611. // BatchPoints is used to send batched data in a single write.
  612. // Database and Points are required
  613. // If no retention policy is specified, it will use the databases default retention policy.
  614. // If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored.
  615. // If time is specified, it will be applied to any point with an empty time.
  616. // Precision can be specified if the time is in epoch format (integer).
  617. // Valid values for Precision are n, u, ms, s, m, and h
  618. type BatchPoints struct {
  619. Points []Point `json:"points,omitempty"`
  620. Database string `json:"database,omitempty"`
  621. RetentionPolicy string `json:"retentionPolicy,omitempty"`
  622. Tags map[string]string `json:"tags,omitempty"`
  623. Time time.Time `json:"time,omitempty"`
  624. Precision string `json:"precision,omitempty"`
  625. WriteConsistency string `json:"-"`
  626. }
  627. // UnmarshalJSON decodes the data into the BatchPoints struct
  628. func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
  629. var normal struct {
  630. Points []Point `json:"points"`
  631. Database string `json:"database"`
  632. RetentionPolicy string `json:"retentionPolicy"`
  633. Tags map[string]string `json:"tags"`
  634. Time time.Time `json:"time"`
  635. Precision string `json:"precision"`
  636. }
  637. var epoch struct {
  638. Points []Point `json:"points"`
  639. Database string `json:"database"`
  640. RetentionPolicy string `json:"retentionPolicy"`
  641. Tags map[string]string `json:"tags"`
  642. Time *int64 `json:"time"`
  643. Precision string `json:"precision"`
  644. }
  645. if err := func() error {
  646. var err error
  647. if err = json.Unmarshal(b, &epoch); err != nil {
  648. return err
  649. }
  650. // Convert from epoch to time.Time
  651. var ts time.Time
  652. if epoch.Time != nil {
  653. ts, err = EpochToTime(*epoch.Time, epoch.Precision)
  654. if err != nil {
  655. return err
  656. }
  657. }
  658. bp.Points = epoch.Points
  659. bp.Database = epoch.Database
  660. bp.RetentionPolicy = epoch.RetentionPolicy
  661. bp.Tags = epoch.Tags
  662. bp.Time = ts
  663. bp.Precision = epoch.Precision
  664. return nil
  665. }(); err == nil {
  666. return nil
  667. }
  668. if err := json.Unmarshal(b, &normal); err != nil {
  669. return err
  670. }
  671. normal.Time = SetPrecision(normal.Time, normal.Precision)
  672. bp.Points = normal.Points
  673. bp.Database = normal.Database
  674. bp.RetentionPolicy = normal.RetentionPolicy
  675. bp.Tags = normal.Tags
  676. bp.Time = normal.Time
  677. bp.Precision = normal.Precision
  678. return nil
  679. }
  680. // utility functions
  681. // Addr provides the current url as a string of the server the client is connected to.
  682. func (c *Client) Addr() string {
  683. if c.unixSocket != "" {
  684. return c.unixSocket
  685. }
  686. return c.url.String()
  687. }
  688. // checkPointTypes ensures no unsupported types are submitted to influxdb, returning error if they are found.
  689. func checkPointTypes(p Point) error {
  690. for _, v := range p.Fields {
  691. switch v.(type) {
  692. case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool, string, nil:
  693. return nil
  694. default:
  695. return fmt.Errorf("unsupported point type: %T", v)
  696. }
  697. }
  698. return nil
  699. }
  700. // helper functions
  701. // EpochToTime takes a unix epoch time and uses precision to return back a time.Time
  702. func EpochToTime(epoch int64, precision string) (time.Time, error) {
  703. if precision == "" {
  704. precision = "s"
  705. }
  706. var t time.Time
  707. switch precision {
  708. case "h":
  709. t = time.Unix(0, epoch*int64(time.Hour))
  710. case "m":
  711. t = time.Unix(0, epoch*int64(time.Minute))
  712. case "s":
  713. t = time.Unix(0, epoch*int64(time.Second))
  714. case "ms":
  715. t = time.Unix(0, epoch*int64(time.Millisecond))
  716. case "u":
  717. t = time.Unix(0, epoch*int64(time.Microsecond))
  718. case "n":
  719. t = time.Unix(0, epoch)
  720. default:
  721. return time.Time{}, fmt.Errorf("Unknown precision %q", precision)
  722. }
  723. return t, nil
  724. }
  725. // SetPrecision will round a time to the specified precision
  726. func SetPrecision(t time.Time, precision string) time.Time {
  727. switch precision {
  728. case "n":
  729. case "u":
  730. return t.Round(time.Microsecond)
  731. case "ms":
  732. return t.Round(time.Millisecond)
  733. case "s":
  734. return t.Round(time.Second)
  735. case "m":
  736. return t.Round(time.Minute)
  737. case "h":
  738. return t.Round(time.Hour)
  739. }
  740. return t
  741. }