SpringBoot与InfluxDB 2.*整合指南:物联网应用实践

前言

随着物联网(IoT)和大数据时代的到来,传统关系型数据库在处理时序数据、传感器数据等应用场景时存在一定局限性。InfluxDB 作为专门为时序数据设计的开源数据库,不仅能高效存储和处理数据,还拥有丰富的查询语言(Flux)和完善的生态系统,可以满足物联网应用中海量数据写入、实时查询和分析的需求。本文将带您了解 InfluxDB 的基本架构、优势,并通过 Spring Boot 整合 InfluxDB2.x 的示例来展示如何在实际项目中使用它。

InfluxDB 介绍

InfluxDB 是一款开源的时序数据库,专门设计用于高性能地存储和查询大规模的时序数据,如监控数据、传感器数据、事件日志等。它内置强大的时序数据压缩和聚合功能,支持数据保留策略(Retention Policies)、连续查询(Continuous Queries)以及灵活的查询语言 Flux。对于数据写入频繁且需要实时分析和展示的场景,InfluxDB 是一个理想的选择。

InfluxDB 的主要优势

  1. 高写入性能
    InfluxDB 采用多线程写入和内存缓存机制,能够高效处理大规模数据写入,特别适合传感器数据、监控指标等高速数据流。

  2. 时序数据优化设计
    独特的数据存储结构和压缩算法,使得数据存储空间大幅度减少,同时在执行时间范围查询、聚合分析时性能表现优异。

  3. 灵活的查询语言 Flux
    Flux 语法丰富,支持数据转换、聚合、过滤、窗口化操作,为用户提供了更高的灵活度以及更直观的数据分析体验。

  4. 完善的生态系统
    InfluxDB 与多种开源工具和商业产品良好集成,例如 Grafana 用于可视化展示,可以无缝构建监控系统和物联网平台。

物联网背景下的数据管理需求

在物联网应用中,每秒钟都会产生大量的时序数据,这些数据来自传感器、设备日志、用户交互等多种数据源。这种数据具有如下特点:

  • 实时性强
    数据采集和处理需要低延迟,确保系统能够快速响应和决策。

  • 数据量大且连续增长
    设备和传感器常年在线,数据持续不断地产生,存储和检索性能要求高。

  • 高并发写入
    数以万计的设备同时发送数据,对于数据库写入速度和稳定性提出了更高要求。

  • InfluxDB 的设计理念正好符合这些需求,通过其高效的写入能力和灵活的查询语言,可以轻松支持这些挑战。此外,通过数据保留、聚合、告警及实时分析功能,可以对设备状态、异常情况和趋势变化进行监控,从而为物联网平台构建出可靠、稳定的基础设施。

    InfluxDB2 的优势

    本文示例中使用的 InfluxDB2 版本,在 InfluxDB1 的基础上做了诸多改进和提升,为开发者提供了更多功能和更好的用户体验。以下将详细介绍 InfluxDB2 相比于 InfluxDB1 的优势:

    1. 统一的 API 与平台体验
      InfluxDB2 将写入、查询、监控和可视化等功能整合到一个统一的平台中,开发者无需再单独部署 Chronograf 或 Kapacitor 等组件。这种一体化设计大大简化了搭建和维护时序数据平台的复杂度,同时提供了统一而一致的用户操作体验。

    2. 增强的查询语言 Flux
      在 InfluxDB2 中,Flux 得到了更深度的集成和优化。Flux 除了提供数据查询外,还支持数据转换、聚合、跨数据源查询等高级操作,比传统的 InfluxQL 灵活性更高,更适合复杂业务场景下的数据分析需求。通过文章中的 FluxUtil 工具类,开发者可以更方便地拼接复杂查询语句。

    3. 改进的数据安全与授权管理
      InfluxDB2 在安全性和权限管理上也有显著改进,支持基于 Token 的认证机制、细粒度的权限控制以及用户管理。这使得在多租户环境下管理和隔离数据变得更加高效和安全,适合企业级应用的场景。

    4. 内置任务调度与告警功能
      InfluxDB2 内置了任务调度和自动化功能,支持定时查询和数据处理任务。同时,系统可根据查询结果触发告警,满足实时监控及自动响应的需求。这些特性减少了外部工具的依赖,让数据监控和自动化管理变得更简洁。

    5. 更友好的 Web UI 界面
      InfluxDB2 提供了功能强大的 Web UI,使得数据库管理、数据查询、监控仪表盘的搭建以及数据探索变得简单直观。开发者和运维人员无需通过命令行就可完成大部分操作,极大提高了工作效率。

    6. 全面支持云原生架构
      InfluxDB2 针对云原生应用场景进行优化,支持容器化部署、自动扩展与高可用性架构,更适合在微服务和分布式系统中作为时序数据存储和分析平台。对于物联网等数据高频写入、快速响应的场景,InfluxDB2 能够提供更稳定的支持。

     往期Influxdb1.*相关文章,如需请跳转。SpringBoot 整合 InfluxDB1.x 三种方式_springboot influxdb-CSDN博客

    Spring Boot 整合 InfluxDB 的示例解析

    本文接下来的示例代码展示了如何使用 Spring Boot 以及 InfluxDB Java 客户端来完成数据的写入和查询操作。主要内容如下:

    环境与依赖

  • JDK 版本:本文示例采用 JDK 17

  • InfluxDB 客户端:通过 com.influxdb:influxdb-client-java 依赖引入

  • 辅助工具库:使用 commons-io

  • 相关依赖配置在 Maven 配置文件中简单列出,读者在实际项目中需根据自己的需求调整版本号和配置参数。

    <dependency>
        <groupId>com.influxdb</groupId>
        <artifactId>influxdb-client-java</artifactId>
        <version>7.2.0</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.18.0</version>
    </dependency>

    配置信息 

    influx:
      url: http://localhost:8086
      org: 
      token: 
      bucket: 
      connectTimeout: 10000
      readTimeout: 120000
      writeTimeout: 120000

    @Data
    @ConfigurationProperties(prefix = "influx")
    public class InfluxProperties {
        private String url;
        private String token;
        private String org;
        private String bucket;
        private int connectTimeout;
        private int readTimeout;
        private int writeTimeout;
    }
    /**
     * InfluxDB 客户端配置
     * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343
     * @Date: 2025-03-20 14:55.
     */
    @Slf4j
    @Configuration
    public class InfluxConfig implements DisposableBean {
    
        private final InfluxProperties properties;
        private volatile InfluxDBClient clientInstance;
    
        @Autowired
        public InfluxConfig(InfluxProperties properties) {
            this.properties = properties;
        }
    
        @Bean
        public InfluxDBClient influxDBClient() {
            validateConfiguration();
    
            if (clientInstance == null) {
                clientInstance = createInfluxDBClient();
            }
    
            return clientInstance;
        }
    
        private InfluxDBClient createInfluxDBClient() {
            OkHttpClient customClient = new OkHttpClient.Builder()
                    .connectTimeout(properties.getConnectTimeout(), TimeUnit.MILLISECONDS) 
                    .readTimeout(properties.getReadTimeout(), TimeUnit.MILLISECONDS)    
                    .writeTimeout(properties.getWriteTimeout(), TimeUnit.MILLISECONDS)   
                    .build();
    
            InfluxDBClientOptions options = InfluxDBClientOptions.builder()
                    .url(properties.getUrl())
                    .authenticateToken(properties.getToken().toCharArray())
                    .org(properties.getOrg())
                    .bucket(properties.getBucket())
                    .okHttpClient(customClient.newBuilder()) 
                    .build();
    
            return InfluxDBClientFactory.create(options);
        }
    
        private void validateConfiguration() {
            if (!StringUtils.hasText(properties.getUrl())) {
                throw new IllegalArgumentException("InfluxDB URL 未配置");
            }
            if (!StringUtils.hasText(properties.getToken())) {
                throw new IllegalArgumentException("InfluxDB Token 未配置");
            }
            if (!StringUtils.hasText(properties.getOrg())) {
                throw new IllegalArgumentException("InfluxDB Org 未配置");
            }
            if (!StringUtils.hasText(properties.getBucket())) {
                throw new IllegalArgumentException("InfluxDB Bucket 未配置");
            }
        }
    
        @Override
        public void destroy() throws Exception {
            if (clientInstance != null) {
                clientInstance.close();
                log.info("InfluxDB 客户端已安全关闭");
            }
        }
    }

     Flux常用语法工具类

    /**
     * InfluxDB语句工具类,用于生成Flux查询语句
     * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343
     * @Date: 2025-03-20 14:03.
     */
    public class FluxUtil {
        public static String getTableName(Class<?> clazz) {
            Measurement measurement = clazz.getAnnotation(Measurement.class);
            if (measurement != null) {
                return measurement.name();
            }
            return null;
        }
        public static void appendCommonFlux(StringBuffer buffer, String bucketName, String tableName,
                                            String start, String stop) {
            appendBucketFlux(buffer, bucketName);
            appendTimeRangeFlux(buffer, start, stop);
            appendTableFlux(buffer, tableName);
        }
        public static void appendBucketFlux(StringBuffer buffer, String bucketName) {
            buffer.append("from(bucket: \"" + bucketName + "\") ");
        }
        public static void appendTableFlux(StringBuffer buffer, String tableName) {
            buffer.append("|> filter(fn: (r) => r._measurement == \"" + tableName + "\") ");
        }
        public static void appendContainsFlux(StringBuffer buffer, String fieldName, List<String> list) {
            if (list != null && !list.isEmpty()) {
                buffer.append("|> filter(fn: (r) => contains(value: r[\"" + fieldName + "\"], set: [");
                appendListAsFluxArray(buffer,list);
                buffer.append("])) ");
            }
        }
        public static void appendTagField(StringBuffer buffer, String field) {
            buffer.append("|> filter(fn: (r) => r._field == \"" + field + "\") ");
        }
        public static void appendTimeRangeFlux(StringBuffer buffer, String start, String stop) {
            if (StringUtils.isBlank(start)) {
                start = "1970-01-01T00:00:00.000Z";
            }
            if (StringUtils.isBlank(stop)) {
                buffer.append("|> range(start:" + start + ") ");
            } else {
                buffer.append("|> range(start:" + start + ", stop:" + stop + ") ");
            }
        }
        public static void appendTimeRangeLastFlux(StringBuffer buffer, int time, String unit) {
            buffer.append("|> range(start: -").append(time).append(unit).append(" )");
        }
        public static void appendDropFlux(StringBuffer buffer, String... args) {
            if (args.length == 0) {
                buffer.append("|> drop(columns: [\"host\"]) ");
                return;
            }
            buffer.append("|> drop(columns: [");
            for (int i = 0; i < args.length; i++) {
                if (i != 0) {
                    buffer.append(",");
                }
                buffer.append("\"" + args[i] + "\"");
            }
            buffer.append("]) ");
        }
        public static void appendKeepFlux(StringBuffer buffer, String... args) {
            if (args.length == 0) {
                return;
            }
            buffer.append("|> keep(columns: [");
            for (int i = 0; i < args.length; i++) {
                if (i != 0) {
                    buffer.append(",");
                }
                buffer.append("\"" + args[i] + "\"");
            }
            buffer.append("]) ");
        }
        public static void appendDuplicateFlux(StringBuffer buffer, String oldField, String newField) {
            buffer.append("|> duplicate(column: \"" + oldField + "\", as: \"" + newField + "\") ");
        }
        public static void appendRenameFlux(StringBuffer buffer, String oldField, String newField) {
            buffer.append(" |> rename(columns: {" + oldField + ": \"" + newField + "\"}) ");
        }
        public static void appendFirstFlux(StringBuffer buffer) {
            buffer.append("|> first() ");
        }
        public static void appendLastFlux(StringBuffer buffer) {
            buffer.append("|> last() ");
        }
        public static void appendLimitFlux(StringBuffer buffer, int n, int offset) {
            buffer.append("|> limit(n:" + n + ", offset: " + offset + ") ");
        }
        public static void appendGroupFlux(StringBuffer buffer, String... columns) {
            if (columns.length == 0) {
                buffer.append("|> group() ");
            } else {
                buffer.append("|> group(columns:[ ");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        buffer.append(",");
                    }
                    buffer.append("\"" + columns[i] + "\"");
                }
                buffer.append("]) ");
            }
        }
        public static void appendDistinctFlux(StringBuffer buffer, String... columns) {
            if (columns.length == 0) {
                buffer.append("|> distinct() ");
            } else {
                buffer.append("|> distinct(column:\"" + columns[0] + "\") ");
            }
        }
        public static void appendCountFlux(StringBuffer buffer) {
            buffer.append("|> count() ");
        }
        public static void appendCountFlux(StringBuffer buffer, String fieldName) {
            buffer.append("|> count(column: \"").append(fieldName).append("\") ");
        }
        public static void appendTopFlux(StringBuffer buffer, int n) {
            buffer.append("|> top(n:" + n + ") ");
        }
        public static void appendBottomFlux(StringBuffer buffer, int n) {
            buffer.append("|> bottom(n:" + n + ") ");
        }
        public static void appendSortFlux(StringBuffer buffer, boolean descFlag, String... columns) {
            if (columns.length == 0) {
                buffer.append("|> sort(columns: [\"_value\"], desc: " + descFlag + ")");
            } else {
                buffer.append("|> sort(columns:[ ");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        buffer.append(",");
                    }
                    buffer.append("\"" + columns[i] + "\"");
                }
                buffer.append("], desc: " + descFlag + ") ");
            }
        }
        public static void appendTimeShiftFlux(StringBuffer buffer) {
            buffer.append("|> timeShift(duration: 8h) ");
        }
        public static void appendFilterFlux(StringBuffer buffer, List<String> list, String operator, String join, String fieldName) {
            if (list == null || list.size() == 0) {
                return;
            }
            for (int i = 0, size = list.size(); i < size; i++) {
                if (i == 0) {
                    buffer.append("|> filter(fn: (r) =>");
                } else {
                    buffer.append(join);
                }
                buffer.append(" r." + fieldName + " " + operator + " \"" + list.get(i) + "\" ");
            }
            buffer.append(")  ");
        }
        public static void appendFilterFlux(StringBuffer buffer, Map<String, Object> map, String operator, String join) {
            Set<Entry<String, Object>> entrySet = map.entrySet();
            Iterator<Entry<String, Object>> iterator = entrySet.iterator();
            boolean flag = true;
            while (iterator.hasNext()) {
                Entry<String, Object> next = iterator.next();
                String key = next.getKey();
                Object value = next.getValue();
                if (flag) {
                    buffer.append("|> filter(fn: (r) =>");
                    flag = false;
                } else {
                    buffer.append(join);
                }
                buffer.append(" r." + key + " " + operator + " \"" + value + "\" ");
            }
            if (!flag) {
                buffer.append(")  ");
            }
        }
        public static void appendMulFilterFlux(StringBuffer buffer, List<Map<String, Object>> list, String innerJoin, String operator, String outerJoin) {
            if (list == null || list.size() == 0) {
                return;
            }
            buffer.append("|> filter(fn: (r) => ");
            boolean outerFlag = true;
            for (int i = 0; i < list.size(); i++) {
                Map<String, Object> map = list.get(i);
                Set<Entry<String, Object>> entrySet = map.entrySet();
                Iterator<Entry<String, Object>> iterator = entrySet.iterator();
                boolean innerFlag = true;
                while (iterator.hasNext()) {
                    Entry<String, Object> next = iterator.next();
                    String key = next.getKey();
                    Object value = next.getValue();
                    if (innerFlag) {
                        if (outerFlag) {
                            outerFlag = false;
                        } else {
                            buffer.append(outerJoin);
                        }
                        buffer.append(" ( ");
                        innerFlag = false;
                    } else {
                        buffer.append(innerJoin);
                    }
                    buffer.append("  r." + key + " " + operator + " \"" + value + "\"  ");
                }
                if (!innerFlag) {
                    buffer.append(" )  ");
                }
            }
            buffer.append(" )  ");
        }
        public static void appendAggregateWindowFlux(StringBuffer buffer, String step, String aggType,boolean createEmpty) {
            buffer.append("|> aggregateWindow(every: " + step + ", fn: " + aggType + ", createEmpty:"+createEmpty+") ");
        }
    
        public static void appendWindowFlux(StringBuffer buffer, String step) {
            buffer.append("|> window(every: " + step + ") ");
        }
        public static void appendAggregateFlux(StringBuffer buffer, String aggType) {
            buffer.append("|> " + aggType + "() ");
        }
        public static void appendYieldFlux(StringBuffer buffer, String name) {
            buffer.append("|> yield(name: \"" + name + "\") ");
        }
        public static void appendTruncateTimeColumn(StringBuffer buffer, String step) {
            buffer.append("|> truncateTimeColumn(unit: " + step + ") ");
        }
        public static void appendImportFlux(StringBuffer buffer, String name) {
            buffer.append("import \"" + name + "\" ");
        }
        public static void appendExistsFlux(StringBuffer buffer) {
            buffer.append("|> filter(fn: (r) => exists r._value ) ");
        }
        public static void appendZeroFlux(StringBuffer buffer) {
            buffer.append("|> filter(fn: (r) => r._value > 0) ");
        }
        public static void appendPivotFlux(StringBuffer buffer) {
            buffer.append("|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")");
        }
        public static void appendPivotFlux(StringBuffer buffer, List<String> rowKeys, List<String> columnKeys, String valueColumn) {
            buffer.append("|> pivot(rowKey: [");
            appendListAsFluxArray(buffer, rowKeys);
            buffer.append("], columnKey: [");
            appendListAsFluxArray(buffer, columnKeys);
            buffer.append("], valueColumn: \"" + valueColumn + "\") ");
        }
        private static void appendListAsFluxArray(StringBuffer buffer, List<String> list) {
            for (int i = 0; i < list.size(); i++) {
                if (i != 0) buffer.append(", ");
                buffer.append("\"" + list.get(i) + "\"");
            }
        }
        public static void appendMapFlux(StringBuffer buffer, String fieldName, String functionName) {
            buffer.append("|> map(fn: (r) => { r[\"").append(fieldName).append("\"] = ")
                    .append(functionName).append("(r[\"").append(fieldName).append("\"]); return r; }) ");
        }
    }
    

     使用示例

    @Data
    @Measurement(name = "student")
    public class Student {
        @Column(tag = true)
        String id;
        @Column()
        String name;
        @Column()
        int age;
        @Column(timestamp = true)
        Instant time;
    }

    /**
     * 测试
     * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343
     * @Date: 2024-03-12 14:31.
     */
    @Service
    public class StudentService {
        @Autowired
        InfluxProperties influxProperties;
        @Autowired
        InfluxDBClient influxDBClient;
    
        public void addStudent(){
            try {
                Student stu=new Student();
                stu.setName("李四");
                stu.setAge(22);
                stu.setId("11002");
                stu.setTime(Instant.now());
                WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
                writeApi.writeMeasurement(influxProperties.getBucket(), influxProperties.getOrg(), WritePrecision.NS, stu);
                System.out.println("新增成功");
            } catch (InfluxException e) {
                System.err.println("error:" + e.getMessage());
            }
        }
    
        public List<Student> queryStudents() {
            try {
                StringBuffer query=new StringBuffer();
                FluxUtil.appendBucketFlux(query,influxProperties.getBucket());
                FluxUtil.appendTimeRangeLastFlux(query,10,"h");
                FluxUtil.appendTableFlux(query, FluxUtil.getTableName(Student.class));
    
                StringBuffer querycount=new StringBuffer();
                querycount.append(query.toString());
    
                FluxUtil.appendPivotFlux(query);
                FluxUtil.appendGroupFlux(query);
                FluxUtil.appendLimitFlux(query,2,0);
    
                FluxUtil.appendTagField(querycount, "age");
                System.out.println(query.toString());
                List<Student> tables = influxDBClient.getQueryApi().query(query.toString(), influxProperties.getOrg(),Student.class);
    
    
                for (Student table : tables) {
                        System.out.println(table.toString());
                }
            } catch (InfluxException e) {
                System.err.println("error:" + e.getMessage());
            }
            return null;
        }
    }

    总结

    通过上述对 InfluxDB2 的详细介绍,可以看出其在操作便捷性、查询能力和安全管理等方面远优于 InfluxDB1。结合文章中的 Spring Boot 整合示例,开发者不仅可以快速上手,并且能够借助 InfluxDB2 的先进特性构建更加智能、稳定和高效的物联网数据处理系统。

    欢迎大家继续在评论区分享使用 InfluxDB2 的经验、讨论遇到的问题或提出新的功能需求。您的反馈将帮助我们进一步完善后续文章内容,共同推进时序数据管理技术的应用和发展。

    作者:TCLms

    物联沃分享整理
    物联沃-IOTWORD物联网 » SpringBoot与InfluxDB 2.*整合指南:物联网应用实践

    发表回复