TDengine语言连接器(Python版)详解
简介
taospy 是 TDengine 数据库面向 Python 语言提供的官方连接器,连接器对外提供对数据库写入、查询、订阅等多种访问接口。
安装连接器命令如下:
# 原生连接和 REST 连接
pip3 install taospy
# WebSocket 连接,可选装
pip3 install taos-ws-py
连接器代码对外开源,源码托管在 Github taos-connector-python 仓库。
连接方式
taospy提供三种连接方式,我们推荐使用 WebSocket 连接。
taospy 包的 taos 模块。taosAdapter 提供的 HTTP 接口连接 TDengine 实例,特点依赖小,不需要安装 TDengine 客户端驱动。功能上不支持 schemaless 和数据订阅等特性。对应 taospy 包的 taosrest 模块。taosAdapter 提供的 WebSocket 接口连接 TDengine 实例,特点是兼具前两种连接的优势,即性能高又依赖小。功能上 WebSocket 连接实现功能集合和原生连接有少量不同。对应 taos-ws-py 包,可以选装。连接方式的详细介绍请参考:连接方式
除了对原生接口和 REST 接口的封装,taospy 还提供了符合 Python 数据访问规范 (PEP 249) 的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas。
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口或 WebSocket 接口与服务端建立的连接的方式下文中称为“REST 连接”或“WebSocket 连接”。
Python 版本兼容性
支持 Python 3.0 及以上版本。
支持的平台
历史版本
Python Connector 历史版本(建议使用最新版本的 taospy):
| Python Connector 版本 | 主要变化 | TDengine 版本 |
|---|---|---|
| 2.8.0 | 移除 Apache Superset 连接驱动 | – |
| 2.7.23 | 支持 DECIMAL 数据类型 | – |
| 2.7.22 | 支持 Python 3.12 及以上版本 | – |
| 2.7.21 | Native 支持 STMT2 写入 | – |
| 2.7.19 | 支持 Apache Superset 连接 TDengine Cloud 数据源 | – |
| 2.7.18 | 支持 Apache Superset 产品连接本地 TDengine 数据源 | – |
| 2.7.16 | 新增订阅配置 (session.timeout.ms, max.poll.interval.ms) | – |
| 2.7.15 | 新增 VARBINARY 和 GEOMETRY 类型支持 | – |
| 2.7.14 | 修复已知问题 | – |
| 2.7.13 | 新增 tmq 同步提交 offset 接口 | – |
| 2.7.12 | 1. 新增 varbinary 类型支持(STMT 暂不支持 varbinary) 2. query 性能提升(感谢贡献者hadrianl) |
3.1.1.2 及更高版本 |
| 2.7.9 | 数据订阅支持获取消费进度和重置消费进度 | 3.0.2.6 及更高版本 |
| 2.7.8 | 新增 execute_many |
3.0.0.0 及更高版本 |
WebSocket Connector 历史版本:
| WebSocket Connector 版本 | 主要变化 | TDengine 版本 |
|---|---|---|
| 0.4.0 | 支持订阅新增参数动态扩展 | – |
| 0.3.9 | 修复 fetchmany 自定义行数时获取不完全的问题 | – |
| 0.3.8 | 支持 SuperSet 连接到 TDengine 云服务实例 | – |
| 0.3.5 | 修复 crypto provider 中的问题 | – |
| 0.3.4 | 支持 VARBINARY 和 GEOMETRY 数据类型 | 3.3.0.0 及更高版本 |
| 0.3.2 | 优化 WebSocket sql 查询和插入性能,修改 readme 和 文档,修复已知问题 | 3.2.3.0 及更高版本 |
| 0.2.9 | 已知问题修复 | – |
| 0.2.5 | 1. 数据订阅支持获取消费进度和重置消费进度 2. 支持 schemaless 3. 支持 STMT |
– |
| 0.2.4 | 数据订阅新增取消订阅方法 | 3.0.5.0 及更高版本 |
处理异常
Python 连接器可能会产生 4 种异常:
| Error Type | Description | Suggested Actions |
|---|---|---|
| InterfaceError | taosc 版本太低,不支持所使用的接口 | 请检查 TDengine 客户端版本 |
| ConnectionError | 数据库链接错误 | 请检查 TDengine 服务端状态和连接参数 |
| DatabaseError | 数据库错误 | 请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版 |
| OperationalError | 操作错误 | API 使用错误,请检查代码 |
| ProgrammingError | 接口调用错误 | 请检查提交的数据是否正确 |
| StatementError | stmt 相关异常 | 请检查绑定参数与 sql 是否匹配 |
| ResultError | 操作数据错误 | 请检查操作的数据与数据库中的数据类型是否匹配 |
| SchemalessError | schemaless 相关异常 | 请检查数据格式及对应的协议类型是否正确 |
| TmqError | tmq 相关异常 | 请检查 Topic 及 consumer 配置是否正确 |
Python 中通常通过 try-expect 处理异常,异常处理相关请参考 Python 错误和异常文档。
TDengine 其他功能模块的报错,请参考 错误码
Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
import taos
try:
conn = taos.connect()
conn.execute("CREATE TABLE 123") # wrong sql
except taos.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)
# output:
# [0x0216]: syntax error near 'Incomplete SQL statement'
# exception class: ProgrammingError
# error number: -2147483114
# error message: syntax error near 'Incomplete SQL statement'
数据类型映射
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
| TDengine DataType | Python DataType |
|---|---|
| TIMESTAMP | datetime |
| INT | int |
| BIGINT | int |
| FLOAT | float |
| DOUBLE | int |
| SMALLINT | int |
| TINYINT | int |
| BOOL | bool |
| BINARY | str |
| NCHAR | str |
| JSON | str |
| GEOMETRY | bytearray |
| VARBINARY | bytearray |
| DECIMAL | Decimal |
示例程序汇总
| 示例程序链接 | 示例程序内容 |
|---|---|
| bind_multi.py | 参数绑定,一次绑定多行 |
| bind_row.py | 参数绑定,一次绑定一行 |
| insert_lines.py | InfluxDB 行协议写入 |
| json_tag.py | 使用 JSON 类型的标签 |
| tmq_consumer.py | tmq 订阅 |
| native_all_type_query.py | 支持全部类型示例 |
| native_all_type_stmt.py | 参数绑定 stmt 全部类型示例 |
| test_stmt2.py | 参数绑定 stmt2 写入示例 |
示例程序源码请参考:
- 原生更多示例程序
- WebSocket 更多示例程序
关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善 (见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。
- https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
- https://www.python.org/dev/peps/pep-0564/
常见问题
欢迎提问或报告问题。
API 参考
WebSocket 连接
URL 规范
[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------------|---|-----------|-----------|------|------|------------|-----------------------|
| protocol | | username | password | host | port | database | params |
ws://localhost:6041localhost:6041建立连接
fn connect(dsn: Option<&str>, args: Option<&PyDict>) -> PyResult<Connection>
dsn:类型 Option<&str> 可选,数据源名称(DSN),用于指定要连接的数据库的位置和认证信息。args:类型 Option<&PyDict> 可选,以 Python 字典的形式提供,可用于设置
user:数据库的用户名password:数据库的密码。host:主机地址port:端口号database:数据库名称ConnectionError 异常。fn cursor(&self) -> PyResult<Cursor>
ConnectionError 异常。执行 SQL
fn execute(&self, sql: &str) -> PyResult<i32>
sql:待执行的 sql 语句。QueryError 异常。fn execute_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<i32>
sql:待执行的 sql 语句。reqId:用于问题追踪。QueryError 异常。fn query(&self, sql: &str) -> PyResult<TaosResult>
sql:待执行的 sql 语句。TaosResult 数据集对象。QueryError 异常。fn query_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<TaosResult>
sql:待执行的 sql 语句。reqId:用于问题追踪。TaosResult 数据集对象。QueryError 异常。数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
fn fields(&self) -> Vec<TaosField>
Vec<TaosField> 字段信息数组。fn field_count(&self) -> i32
i32 查询到的记录条数。无模式写入
fn schemaless_insert(&self, lines: Vec<String>, protocol: PySchemalessProtocol, precision: PySchemalessPrecision, ttl: i32, req_id: u64) -> PyResult<()>
lines:待写入的数据数组,无模式具体的数据格式可参考 Schemaless 写入。protocol:协议类型
PySchemalessProtocol::Line:InfluxDB 行协议(Line Protocol)。PySchemalessProtocol::Telnet:OpenTSDB 文本行协议。PySchemalessProtocol::Json:JSON 协议格式precision:时间精度
PySchemalessPrecision::Hour:小时PySchemalessPrecision::Minute:分钟PySchemalessPrecision::Second 秒PySchemalessPrecision::Millisecond:毫秒PySchemalessPrecision::Microsecond:微秒PySchemalessPrecision::Nanosecond:纳秒ttl:表过期时间,单位天。reqId:用于问题追踪。DataError 或 OperationalError 异常。参数绑定
fn statement(&self) -> PyResult<TaosStmt>
ConnectionError 异常。fn prepare(&mut self, sql: &str) -> PyResult<()>
sql:预编译的 SQL 语句。ProgrammingError 异常。fn set_tbname(&mut self, table_name: &str) -> PyResult<()>
tableName:表名,如果需要指定数据库,例如:db_name.table_name 即可。ProgrammingError 异常。fn set_tags(&mut self, tags: Vec<PyTagView>) -> PyResult<()>
paramsArray:Tags 数据。ProgrammingError 异常。fn bind_param(&mut self, params: Vec<PyColumnView>) -> PyResult<()>
paramsArray:绑定数据。ProgrammingError 异常。fn add_batch(&mut self) -> PyResult<()>
ProgrammingError 异常。fn execute(&mut self) -> PyResult<usize>
QueryError 异常。fn affect_rows(&mut self) -> PyResult<usize>
fn close(&self) -> PyResult<()>
数据订阅
创建消费者支持属性列表:
fn Consumer(conf: Option<&PyDict>, dsn: Option<&str>) -> PyResult<Self>
conf:类型 Option<&PyDict> 可选,以 Python 字典的形式提供,具体配置参见属性列表。dsn:类型 Option<&str> 可选,数据源名称(DSN),用于指定要连接的数据库的位置和认证信息。ConsumerException 异常。fn subscribe(&mut self, topics: &PyList) -> PyResult<()>
topics:订阅的主题列表。ConsumerException 异常。fn unsubscribe(&mut self)
ConsumerException 异常。fn poll(&mut self, timeout: Option<f64>) -> PyResult<Option<Message>>
timeoutMs:表示轮询的超时时间,单位毫秒。Message 每个主题对应的数据。ConsumerException 异常。fn commit(&mut self, message: &mut Message) -> PyResult<()>
message:类型 Message, 当前处理的消息的偏移量。ConsumerException 异常。fn assignment(&mut self) -> PyResult<Option<Vec<TopicAssignment>>>
Vec<TopicAssignment>,即消费者当前分配的所有分区。fn seek(&mut self, topic: &str, vg_id: i32, offset: i64) -> PyResult<()>
topic:订阅的主题。vg_id:vgroupid。offset:需要设置的偏移量。fn committed(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>
topic:订阅的主题。vg_id:vgroupid。i64,分区最后提交的偏移量。fn position(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>
topic:订阅的主题。vg_id:vgroupid。i64,分区最后提交的偏移量。fn close(&mut self)
Native 连接
建立连接
def connect(*args, **kwargs):
kwargs:以 Python 字典的形式提供,可用于设置
user:数据库的用户名password:数据库的密码。host:主机地址port:端口号database:数据库名称timezone:时区TaosConnection 连接对象。AttributeError 或 ConnectionError 异常。def cursor(self)
执行 SQL
def execute(self, operation, req_id: Optional[int] = None)
operation:待执行的 sql 语句。reqId:用于问题追踪。ProgrammingError 异常。def query(self, sql: str, req_id: Optional[int] = None) -> TaosResult
sql:待执行的 sql 语句。reqId:用于问题追踪。TaosResult 数据集对象。ProgrammingError 异常。数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
def fields(&self)
TaosFields 字段信息 list。def field_count(&self)
def fetch_all_into_dict(self)
无模式写入
def schemaless_insert(&self, lines: List[str], protocol: SmlProtocol, precision: SmlPrecision, req_id: Optional[int] = None, ttl: Optional[int] = None) -> int:
lines:待写入的数据数组,无模式具体的数据格式可参考 Schemaless 写入。protocol:协议类型
SmlProtocol.LINE_PROTOCOL:InfluxDB 行协议(Line Protocol)。SmlProtocol.TELNET_PROTOCOL:OpenTSDB 文本行协议。SmlProtocol.JSON_PROTOCOL:JSON 协议格式precision:时间精度
SmlPrecision.Hour:小时SmlPrecision.Minute:分钟SmlPrecision.Second 秒SmlPrecision.Millisecond:毫秒SmlPrecision.Microsecond:微秒SmlPrecision.Nanosecond:纳秒ttl:表过期时间,单位天。reqId:用于问题追踪。SchemalessError 异常。参数绑定
def statement2(self, sql=None, option=None)
sql:绑定的 SQL 语句,如果不为空会调用prepare函数option 传入 TaosStmt2Option 类实例选项ConnectionError 异常。def prepare(self, sql)
sql:绑定的 SQL 语句StatementError 异常。def bind_param(self, tbnames, tags, datas)
tbnames:绑定表名数组,数据类型为 listtags:绑定 tag 列值数组,数据类型为 listdatas:绑定普通列值数组,数据类型为 listStatementError 异常def bind_param_with_tables(self, tables)
tables:BindTable 独立表对象数组StatementError 异常。def execute(self) -> int:
QueryError 异常。def result(self)
def close(self)
数据订阅
def Consumer(configs)
configs:Python 字典的形式提供,具体配置参见属性列表。TmqError 异常。def subscribe(self, topics)
topics:订阅的主题列表。TmqError 异常。def unsubscribe(self)
TmqError 异常。def poll(self, timeout: float = 1.0)
timeout:表示轮询的超时时间,单位毫秒。Message 每个主题对应的数据。TmqError 异常。def commit(self, message: Message = None, offsets: [TopicPartition] = None)
message:类型 Message, 当前处理的消息的偏移量。offsets:类型 [TopicPartition], 提交一批消息的偏移量。TmqError 异常。def assignment(self)
[TopicPartition],即消费者当前分配的所有分区。def seek(self, partition)
partition:需要设置的偏移量。
topic:订阅的主题partition:分区offset:偏移量TmqError 异常。def committed(self, partitions)
partition:需要设置的偏移量。
topic:订阅的主题partition:分区partition,分区最后提交的偏移量。TmqError 异常。def position(self, partitions)
partition:需要设置的偏移量。
topic:订阅的主题partition:分区partition,分区最后提交的偏移量。def close(self)
REST 连接
def connect(**kwargs) -> TaosRestConnection
kwargs:以 Python 字典的形式提供,可用于设置
user:数据库的用户名password:数据库的密码。host:主机地址port:端口号database:数据库名称ConnectError 异常。def execute(self, sql: str, req_id: Optional[int] = None) -> Optional[int]
sql:待执行的 sql 语句。reqId:用于问题追踪。ConnectError 或 HTTPError 异常。def query(self, sql: str, req_id: Optional[int] = None) -> Result
sql:待执行的 sql 语句。reqId:用于问题追踪。Result 数据集对象。ConnectError 或 HTTPError 异常。RestClient(self, url: str, token: str = None, database: str = None, user: str = "root", password: str = "taosdata", timeout: int = None, convert_timestamp: bool = True, timezone: Union[str, datetime.tzinfo] = None)
url:taosAdapter REST 服务的 URL。user:数据库的用户名。password:数据库的密码。database:数据库名称。timezone:时区。timeout:HTTP 请求超时时间。单位为秒。convert_timestamp:是否将时间戳从 STR 类型转换为 datetime 类型。timezone:时区。ConnectError 异常。def sql(self, q: str, req_id: Optional[int] = None) -> dict
sql:待执行的 sql 语句。reqId: 用于问题追踪。ConnectError 或 HTTPError 异常。访问官网
更多内容欢迎访问 TDengine 官网
作者:TDengine (老段)