|
|
@@ -1,122 +0,0 @@
|
|
|
-import requests
|
|
|
-import time
|
|
|
-import logging
|
|
|
-from datetime import datetime, timezone
|
|
|
-
|
|
|
-# --- 配置 ---
|
|
|
-# QuestDB 实例的连接参数
|
|
|
-# 由于您使用 docker run -p 9000:9000 启动,因此 REST API 和 InfluxDB Line Protocol 都在 9000 端口
|
|
|
-QUESTDB_HOST = "127.0.0.1"
|
|
|
-QUESTDB_INFLUX_PORT = 9000
|
|
|
-QUESTDB_REST_PORT = 9000
|
|
|
-TABLE_NAME = "sensors"
|
|
|
-
|
|
|
-# InfluxDB 行协议写入的 URL
|
|
|
-INFLUX_URL = f"http://{QUESTDB_HOST}:{QUESTDB_INFLUX_PORT}/write"
|
|
|
-# QuestDB REST API 查询的 URL
|
|
|
-REST_URL = f"http://{QUESTDB_HOST}:{QUESTDB_REST_PORT}/exec"
|
|
|
-
|
|
|
-# 配置日志记录
|
|
|
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
-
|
|
|
-def write_sensor_data():
|
|
|
- """
|
|
|
- 使用 InfluxDB 行协议向 QuestDB 写入单条数据。
|
|
|
- 行协议格式: <table_name>,<tag_key>=<tag_value> <field_key>=<field_value> <timestamp_ns>
|
|
|
- - table_name: 表名 (sensors)
|
|
|
- - tag: 索引列,用于快速过滤 (location)
|
|
|
- - field: 数据列 (temperature, humidity)
|
|
|
- - timestamp: 时间戳 (纳秒)
|
|
|
- """
|
|
|
- location = "room1"
|
|
|
- temperature = 23.5
|
|
|
- humidity = 45.2
|
|
|
-
|
|
|
- # 获取当前的 UTC 时间戳(纳秒)
|
|
|
- timestamp_ns = int(time.time_ns())
|
|
|
-
|
|
|
- # 构建 InfluxDB 行协议字符串
|
|
|
- # 注意:tag 的值不需要引号,field 的浮点数值会自动处理
|
|
|
- line_protocol = f"{TABLE_NAME},location={location} temperature={temperature},humidity={humidity} {timestamp_ns}"
|
|
|
-
|
|
|
- logging.info(f"准备写入数据: {line_protocol}")
|
|
|
-
|
|
|
- try:
|
|
|
- # 发送 POST 请求写入数据
|
|
|
- response = requests.post(INFLUX_URL, data=line_protocol.encode('utf-8'), timeout=5)
|
|
|
-
|
|
|
- # 检查响应状态码
|
|
|
- if response.status_code == 204:
|
|
|
- logging.info("数据写入成功。")
|
|
|
- else:
|
|
|
- logging.error(f"数据写入失败。状态码: {response.status_code}, 响应: {response.text}")
|
|
|
-
|
|
|
- except requests.exceptions.RequestException as e:
|
|
|
- logging.error(f"写入数据时发生网络错误: {e}")
|
|
|
-
|
|
|
-def query_sensor_data():
|
|
|
- """
|
|
|
- 使用 QuestDB 的 REST API 执行 SQL 查询。
|
|
|
- """
|
|
|
- # 构建 SQL 查询语句
|
|
|
- # QuestDB 使用标准 SQL
|
|
|
- query = f"SELECT * FROM '{TABLE_NAME}' ORDER BY timestamp DESC LIMIT 10"
|
|
|
-
|
|
|
- logging.info(f"准备执行查询: {query}")
|
|
|
-
|
|
|
- params = {'query': query}
|
|
|
-
|
|
|
- try:
|
|
|
- # 发送 GET 请求执行查询
|
|
|
- response = requests.get(REST_URL, params=params, timeout=5)
|
|
|
-
|
|
|
- # 检查响应状态码
|
|
|
- if response.status_code == 200:
|
|
|
- logging.info("数据查询成功。")
|
|
|
- result = response.json()
|
|
|
-
|
|
|
- # 格式化并打印查询结果
|
|
|
- logging.info("查询结果:")
|
|
|
- if result and result.get('dataset'):
|
|
|
- # 从 'columns' 列表中提取表头名称
|
|
|
- headers = [col.get('name', '') for col in result.get('columns', [])]
|
|
|
- rows = result.get('dataset', [])
|
|
|
-
|
|
|
- # 打印表头
|
|
|
- if headers:
|
|
|
- print(f"{' | '.join(headers)}")
|
|
|
- print("-" * (len(' | '.join(headers))))
|
|
|
-
|
|
|
- # 打印数据行
|
|
|
- for row in rows:
|
|
|
- # 将时间戳转换为可读格式
|
|
|
- row[headers.index('timestamp')] = datetime.fromisoformat(row[headers.index('timestamp')].replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- print(f"{' | '.join(map(str, row))}")
|
|
|
- else:
|
|
|
- logging.warning("查询成功,但未返回任何数据。")
|
|
|
-
|
|
|
- else:
|
|
|
- logging.error(f"数据查询失败。状态码: {response.status_code}, 响应: {response.text}")
|
|
|
-
|
|
|
- except requests.exceptions.RequestException as e:
|
|
|
- logging.error(f"查询数据时发生网络错误: {e}")
|
|
|
-
|
|
|
-def main():
|
|
|
- """
|
|
|
- 主函数,演示如何调用写入和查询功能。
|
|
|
- """
|
|
|
- logging.info("--- QuestDB (InfluxDB 兼容) Demo 开始 ---")
|
|
|
-
|
|
|
- # 1. 写入数据
|
|
|
- write_sensor_data()
|
|
|
-
|
|
|
- # 等待一秒,确保数据已被处理
|
|
|
- time.sleep(1)
|
|
|
-
|
|
|
- # 2. 查询数据
|
|
|
- query_sensor_data()
|
|
|
-
|
|
|
- logging.info("--- Demo 结束 ---")
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- main()
|