|
|
@@ -0,0 +1,122 @@
|
|
|
+import requests
|
|
|
+import time
|
|
|
+import logging
|
|
|
+from datetime import datetime, timezone
|
|
|
+
|
|
|
+# --- 配置 ---
|
|
|
+# QuestDB 实例的连接参数
|
|
|
+# 由于您使用 docker run -p 9000:9000 启动,因此 REST API 和 InfluxDB Line Protocol 都在 9000 端口
|
|
|
+QUESTDB_HOST = "127.0.0.1"
|
|
|
+QUESTDB_INFLUX_PORT = 9000
|
|
|
+QUESTDB_REST_PORT = 9000
|
|
|
+TABLE_NAME = "sensors"
|
|
|
+
|
|
|
+# InfluxDB 行协议写入的 URL
|
|
|
+INFLUX_URL = f"http://{QUESTDB_HOST}:{QUESTDB_INFLUX_PORT}/write"
|
|
|
+# QuestDB REST API 查询的 URL
|
|
|
+REST_URL = f"http://{QUESTDB_HOST}:{QUESTDB_REST_PORT}/exec"
|
|
|
+
|
|
|
+# 配置日志记录
|
|
|
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
+
|
|
|
+def write_sensor_data():
|
|
|
+ """
|
|
|
+ 使用 InfluxDB 行协议向 QuestDB 写入单条数据。
|
|
|
+ 行协议格式: <table_name>,<tag_key>=<tag_value> <field_key>=<field_value> <timestamp_ns>
|
|
|
+ - table_name: 表名 (sensors)
|
|
|
+ - tag: 索引列,用于快速过滤 (location)
|
|
|
+ - field: 数据列 (temperature, humidity)
|
|
|
+ - timestamp: 时间戳 (纳秒)
|
|
|
+ """
|
|
|
+ location = "room1"
|
|
|
+ temperature = 23.5
|
|
|
+ humidity = 45.2
|
|
|
+
|
|
|
+ # 获取当前的 UTC 时间戳(纳秒)
|
|
|
+ timestamp_ns = int(time.time_ns())
|
|
|
+
|
|
|
+ # 构建 InfluxDB 行协议字符串
|
|
|
+ # 注意:tag 的值不需要引号,field 的浮点数值会自动处理
|
|
|
+ line_protocol = f"{TABLE_NAME},location={location} temperature={temperature},humidity={humidity} {timestamp_ns}"
|
|
|
+
|
|
|
+ logging.info(f"准备写入数据: {line_protocol}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 发送 POST 请求写入数据
|
|
|
+ response = requests.post(INFLUX_URL, data=line_protocol.encode('utf-8'), timeout=5)
|
|
|
+
|
|
|
+ # 检查响应状态码
|
|
|
+ if response.status_code == 204:
|
|
|
+ logging.info("数据写入成功。")
|
|
|
+ else:
|
|
|
+ logging.error(f"数据写入失败。状态码: {response.status_code}, 响应: {response.text}")
|
|
|
+
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logging.error(f"写入数据时发生网络错误: {e}")
|
|
|
+
|
|
|
+def query_sensor_data():
|
|
|
+ """
|
|
|
+ 使用 QuestDB 的 REST API 执行 SQL 查询。
|
|
|
+ """
|
|
|
+ # 构建 SQL 查询语句
|
|
|
+ # QuestDB 使用标准 SQL
|
|
|
+ query = f"SELECT * FROM '{TABLE_NAME}' ORDER BY timestamp DESC LIMIT 10"
|
|
|
+
|
|
|
+ logging.info(f"准备执行查询: {query}")
|
|
|
+
|
|
|
+ params = {'query': query}
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 发送 GET 请求执行查询
|
|
|
+ response = requests.get(REST_URL, params=params, timeout=5)
|
|
|
+
|
|
|
+ # 检查响应状态码
|
|
|
+ if response.status_code == 200:
|
|
|
+ logging.info("数据查询成功。")
|
|
|
+ result = response.json()
|
|
|
+
|
|
|
+ # 格式化并打印查询结果
|
|
|
+ logging.info("查询结果:")
|
|
|
+ if result and result.get('dataset'):
|
|
|
+ # 从 'columns' 列表中提取表头名称
|
|
|
+ headers = [col.get('name', '') for col in result.get('columns', [])]
|
|
|
+ rows = result.get('dataset', [])
|
|
|
+
|
|
|
+ # 打印表头
|
|
|
+ if headers:
|
|
|
+ print(f"{' | '.join(headers)}")
|
|
|
+ print("-" * (len(' | '.join(headers))))
|
|
|
+
|
|
|
+ # 打印数据行
|
|
|
+ for row in rows:
|
|
|
+ # 将时间戳转换为可读格式
|
|
|
+ row[headers.index('timestamp')] = datetime.fromisoformat(row[headers.index('timestamp')].replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ print(f"{' | '.join(map(str, row))}")
|
|
|
+ else:
|
|
|
+ logging.warning("查询成功,但未返回任何数据。")
|
|
|
+
|
|
|
+ else:
|
|
|
+ logging.error(f"数据查询失败。状态码: {response.status_code}, 响应: {response.text}")
|
|
|
+
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logging.error(f"查询数据时发生网络错误: {e}")
|
|
|
+
|
|
|
+def main():
|
|
|
+ """
|
|
|
+ 主函数,演示如何调用写入和查询功能。
|
|
|
+ """
|
|
|
+ logging.info("--- QuestDB (InfluxDB 兼容) Demo 开始 ---")
|
|
|
+
|
|
|
+ # 1. 写入数据
|
|
|
+ write_sensor_data()
|
|
|
+
|
|
|
+ # 等待一秒,确保数据已被处理
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ # 2. 查询数据
|
|
|
+ query_sensor_data()
|
|
|
+
|
|
|
+ logging.info("--- Demo 结束 ---")
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|