IoTDB在Spring Boot 2中的集成与写入指南(一)

因公司业务的需要,所以需要对设备相关的指标信息进行记录并进行显示,在尝试了mysql、MongoDB的实际应用之后,发现在大量的数据前还是过于缓慢,MongoDB如果查询几个月、半年的数据速率到不是很慢,但是如果存在分页还是提不上了速度,有可能也是应用的问题, 且在实用过程中频繁的读写带来的CPU和内存压力也是非常巨大的,所以在讨论之后决定对时序数据库进行研究,从而选择了IoTDB这款国产数据库,废话不多说,直接上内容,所以下文都是基于1.X。

1、maven引用

<dependency>
            <groupId>org.apache.iotdb</groupId>
            <artifactId>iotdb-session</artifactId>
            <version>1.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>logback-core</artifactId>
                    <groupId>ch.qos.logback</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>logback-classic</artifactId>
                    <groupId>ch.qos.logback</groupId>
                </exclusion>
            </exclusions>
        </dependency>

因为存在Log相关冲突,我项目中需要剔除Logback相关包,这个根据自己实际情况来决定

2、yml配置

spring:
    iotdb:
        username: root
        password: root
        ip: 192.168.1.127
        port: 6667
        maxSize: 100

默认的端口为6667

3、连接池配置


import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * IoTDB线程池
 * @author zgy
 * @date 2023-6
 */
@Component
@Configuration
public class IoTDBSessionConfig {

    private static Logger logger = LoggerFactory.getLogger(IoTDBSessionConfig.class);

    @Value("${spring.iotdb.username}")
    private String username;

    @Value("${spring.iotdb.password}")
    private String password;

    @Value("${spring.iotdb.ip}")
    private String ip;

    @Value("${spring.iotdb.port}")
    private Integer port;

    @Value("${spring.iotdb.maxSize}")
    private Integer   maxSize;

    private static Session session;

    /**
     * 初始化
     * @return
     * @throws IoTDBConnectionException
     * @throws StatementExecutionException
     */
    @Bean
    public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
        if (session == null) {
            logger.info("正在连接iotdb.......");
            session = new Session.Builder()
                    .host(ip)
                    .port(port)
                    .username(username)
                    .password(password)
                    .version(Version.V_1_0)
                    .build();
            session.open(false);
            session.setFetchSize(maxSize);
            // 设置时区
            session.setTimeZone("+08:00");
        }
        return session;
    }

    /**
     * 节点路径
     * @param records
     * @return
     */
    private List<String> getDeviceIds(List<? extends IoTDBRecordable> records) {
        List<String> deviceIds = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            String deviceId = ioTDBRecord.getDeviceId();
            deviceIds.add(deviceId);
        }
        return deviceIds;
    }

    /**
     * 时间戳
     * @param records
     * @return
     */
    private List<Long> getTimes(List<? extends IoTDBRecordable> records) {
        List<Long> times = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            times.add(ioTDBRecord.getTime());
        }
        return times;
    }

    /**
     * 物理量 即:属性
     * @param records
     * @return
     */
    private List<List<String>> getMeasurementsList(List<? extends IoTDBRecordable> records) {
        List<List<String>> measurementsList = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            measurementsList.add(ioTDBRecord.getMeasurementsList());
        }
        return measurementsList;
    }

    /**
     * 属性值 --- 属性必须与属性值一一对应
     * @param records
     * @return
     */
    private List<List<Object>> getValuesList(List<? extends IoTDBRecordable> records) {
        List<List<Object>> valuesList = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            valuesList.add(ioTDBRecord.getValuesList());
        }

        return valuesList;
    }

    /**
     * 数据类型 BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5));
     * @param records
     * @return
     */
    private List<List<TSDataType>> getTSDataType(List<? extends IoTDBRecordable> records) {
        List<List<TSDataType>> valuesList = new ArrayList<>();
        for (IoTDBRecordable ioTDBRecordable : records) {
            IoTDBRecord ioTDBRecord = ioTDBRecordable.toRecord();
            List<TSDataType> strList = new ArrayList<>();
            for(String str : ioTDBRecord.getTypeList()){
                strList.add(convertTypeByEntity(str));
            }
            valuesList.add(strList);
        }

        return valuesList;
    }

    /**
     * 实体数据类型转换
     * @param type 属性类型
     * @return
     */
    private TSDataType convertTypeByEntity(String type) {
        switch (type) {
            case "java.lang.Double":
                return TSDataType.DOUBLE;
            case "java.lang.Integer":
                return TSDataType.INT32;
            case "java.lang.Long":
                return TSDataType.INT64;
            case "java.lang.Boolean":
                return TSDataType.BOOLEAN;
            case "java.lang.Float":
                return TSDataType.FLOAT;
            default:
                return TSDataType.TEXT;
        }
    }

    /**
     * 批量插入
     * @param records 类集合
     */
    public void insertRecords(List<? extends IoTDBRecordable> records) {
        try {
            session.insertRecords(getDeviceIds(records), getTimes(records), getMeasurementsList(records),getTSDataType(records),
                    getValuesList(records));
        } catch (Exception e) {
            logger.error("IoTDB插入异常:{}",e.getMessage());
        }
    }

    /**
     * 单个插入实体
     * @param recordEntity
     */
    public void insertRecord(IoTDBRecordable recordEntity) {
        try {
            IoTDBRecord ioTDBRecord = recordEntity.toRecord();
            List<TSDataType> strList = new ArrayList<>();
            for(String str : ioTDBRecord.getTypeList()){
                strList.add(convertTypeByEntity(str));
            }
            session.insertRecord(ioTDBRecord.getDeviceId(), ioTDBRecord.getTime(), ioTDBRecord.getMeasurementsList()
                    ,strList, ioTDBRecord.getValuesList());
        } catch (Exception e) {
            logger.error("IoTDB插入异常:{}",e.getMessage());
        }
    }

    /**
     * description: 根据SQL查询
     */
    public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
        return session.executeQueryStatement(sql,30000);
    }

    /**
     * description: 删除分组 如 root.a1eaKSRpRty
     * @param  groupName:分组名称
     * @return
     */
    public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteStorageGroup(groupName);
    }

    /**
     * description: 根据Timeseries删除  如:root.a1eaKSRpRty.CA3013A303A25467.breath  (个人理解:为具体的物理量)
     */
    public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteTimeseries(timeseries);
    }
    /**
     * description: 根据Timeseries批量删除
     */
    public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteTimeseries(timeseriesList);
    }

    /**
     * description: 根据分组批量删除
     */
    public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteStorageGroups(storageGroupList);
    }

    /**
     * description: 根据路径和结束时间删除 结束时间之前的所有数据
     */
    public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteData(path, endTime);
    }
    /**
     * description: 根据路径集合和结束时间批量删除 结束时间之前的所有数据
     */
    public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteData(pathList, endTime);
    }
    /**
     * description: 根据路径集合和时间段批量删除
     */
    public void deleteDataByPathListAndTime(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
        session.deleteData(pathList, startTime, endTime);
    }
}



4、IoTDBRecord 封装

主要是对IoTDB入参进行实体封装,好使用,和



import java.util.List;

/**
 * IoTDB参数实体类
 * @author zgy
 * @date 2023-6
 */
public class IoTDBRecord {

    /**
     * 节点路径
     */
    private String deviceId;
    /**
     * 时间戳
     */
    private long time = DateUtil.current();
    /**
     * 属性
     */
    private List<String> measurementsList;
    /**
     * 属性值
     */
    private List<Object> valuesList;
    /**
     * 数据类型
     */
    private List<String> typeList;

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public List<String> getMeasurementsList() {
        return measurementsList;
    }

    public void setMeasurementsList(List<String> measurementsList) {
        this.measurementsList = measurementsList;
    }

    public List<Object> getValuesList() {
        return valuesList;
    }

    public void setValuesList(List<Object> valuesList) {
        this.valuesList = valuesList;
    }

    public List<String> getTypeList() {
        return typeList;
    }

    public void setTypeList(List<String> typeList) {
        this.typeList = typeList;
    }
}

5、自定义实体列明注解


import java.lang.annotation.*;

/**
 * 自定义当前实体列名 名称
 * @author zgy
 * @date 2023-6
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
public @interface IoTTableName {

    /**
     * 实体对应的表名
     */
    String value() default "";

}

6、接口获取信息封装

主要对实体类的信息进行解析成IoTDB入参实体信息,方便使用入库

注:其中获取getIoTDBTime为获取存入的时间戳,我们在实体类中建立IoTDBTime的字段存储存入的时间戳,然后再此处封装时获取时间戳,如果不存在默认目前的,该方式主要为了解决批量插入时,时间戳应该为封装实体的时间,不然会导致之间一致只存入数据库一条信息。


import cn.hutool.core.date.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: ZGY
 * @Date: 2023/5/29 15:27
 * @FileName: Recordable
 * @Description: iot基类
 */
public interface IoTDBRecordable {
    Logger logger = LoggerFactory.getLogger(IoTDBRecordable.class);
    /**
     * 数据载入方法
     * @return Record
     */
    default IoTDBRecord toRecord() {
        IoTDBRecord ioTDBRecord = new IoTDBRecord();
        //获取存入时间
        Object IoTDBTime = this.getClass().getMethod("getIoTDBTime").invoke(this);
        if(IoTDBTime!=null){
            ioTDBRecord.setTime(Long.parseLong(IoTDBTime.toString()));
        }
        Class aClass = this.getClass();
        IoTTableName name = this.getClass().getAnnotation(IoTTableName.class);
        ioTDBRecord.setDeviceId(name.value());
        Field[] declaredFields = aClass.getDeclaredFields();
        List<String> measurements = new ArrayList<>();
        List<Object> records = new ArrayList<>();
        List<String> types = new ArrayList<>();
        try {
            for (Field field : declaredFields) {
                measurements.add(field.getName());
                String methodNamePro = field.getName().substring(0, 1).toUpperCase() + field.getName().substring(1);
                Method methodName = this.getClass().getMethod("get" + methodNamePro);
                records.add(methodName.invoke(this));
                types.add(methodName.getReturnType().getName());
            }
            ioTDBRecord.setMeasurementsList(measurements);
            ioTDBRecord.setValuesList(records);
            ioTDBRecord.setTypeList(types);
        } catch (Exception e) {
            logger.error("IoTDB实体类转换异常:{}",e.getMessage());
        }
        return ioTDBRecord;
    }
}

7、节点路径参数类

统一定义方便使用,比如接收syslog信息



public class IoTDBTableParam {

 
    public static final String SYSLOG_IOT_TABLE = "root.syslog";

}

8、实体类举例

定义syslog实体类进行举例,通过@IoTTableName引入统一配置的参数信息来进行动态识别



import com.IoTDB.IoTDBRecordable;
import com.IoTDB.IoTDBTableParam;
import com.IoTDB.IoTTableName;

@IoTTableName(value = IoTDBTableParam.SYSLOG_IOT_TABLE)
public class IoTDBSysLog implements IoTDBRecordable {
    private String id;//主键标识
    /**
     * facility中文标识
     */
    private String facilityName;
    /**
     * facility key
     */
    private Integer facilityKey;
    /**
     * 级别中文标识
     */
    private String levelName;
    /**
     * 级别中文标识key
     */
    private Integer levelKey;
    /**
     * 日志IP
     */
    private String logIp;
    /**
     * 日志消息
     */
    private String logMsg;
    private String createTime;
    //插入时间为IoTDB
    private Long IoTDBTime = DateUtil.current();

    public Long getIoTDBTime() {
        return IoTDBTime;
    }

    public void setIoTDBTime(Long ioTDBTime) {
        IoTDBTime = ioTDBTime;
    }
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getFacilityName() {
        return facilityName;
    }

    public void setFacilityName(String facilityName) {
        this.facilityName = facilityName;
    }

    public Integer getFacilityKey() {
        return facilityKey;
    }

    public void setFacilityKey(Integer facilityKey) {
        this.facilityKey = facilityKey;
    }

    public String getLevelName() {
        return levelName;
    }

    public void setLevelName(String levelName) {
        this.levelName = levelName;
    }

    public Integer getLevelKey() {
        return levelKey;
    }

    public void setLevelKey(Integer levelKey) {
        this.levelKey = levelKey;
    }

    public String getLogIp() {
        return logIp;
    }

    public void setLogIp(String logIp) {
        this.logIp = logIp;
    }

    public String getLogMsg() {
        return logMsg;
    }

    public void setLogMsg(String logMsg) {
        this.logMsg = logMsg;
    }

    public String getCreateTime() {
        return createTime;
    }

    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }
}

9、插入数据测试

我们写一个测试

    @Test
    public void testMongoDB(){
        //插入IoTDB历史库
        IoTDBSysLog sysLog = new IoTDBSysLog();
        sysLog.setId("123");
        sysLog.setLogMsg("hello");
        sysLog.setLogIp("1.1.1.1");
        sysLog.setFacilityKey(1);
        sysLog.setFacilityName("信息");
        sysLog.setLevelKey(3);
        sysLog.setLevelName("提示信息");
        sysLog.setCreateTime(DateUtil.now());
        iotDBSessionConfig.insertRecord(sysLog);
    }

 插入成功!!!  查询的话,我们下次在进行处理

作者:zhu1361

物联沃分享整理
物联沃-IOTWORD物联网 » IoTDB在Spring Boot 2中的集成与写入指南(一)

发表评论