使用Java从MQTT获取设备数据并通过Druid连接池写入MySQL数据库(Windows系统)——物联网开发终端管理篇

物联网开发终端管理篇-java从MQTT获取设备数据,并通过Druid连接池把数据写入MySQL数据库(Windows系统)

下面来给大家做个简单的数据对接,也就是通过写JAVA代码实现MQTT协议

  1. 首页我们得搭建一个简单的IDEA项目,这个我就不做演示了
  2. 搭建完项目,我们需要准备一些jar包,jar包名如下:
  1. org.eclipse.paho.client.mqttv3-1.1.0.jar
  2. mysql-connector-java-5.1.34.jar
  3. jackson-databind-2.10.0.jar
  4. jackson-core-2.10.0.jar
  5. jackson-annotations-2.10.0.jar
  6. 如果是连接sqlserver数据库 则用   jtds-1.3.1.jar

下面就是java代码了

package com.baidai;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 接收订阅的消息
 */
public class ClientMQTT implements MqttCallback {

    public static final String HOST = "tcp://127.0.0.1:1883";//(127.0.0.1也就是EMQX的ip地址)
    private static final String clientID = "clientXX";//(这个clientXX 可以随便写)
    private String TOPIC= "testtopic";//(这个testtopic 是EMQX的订阅主题,如果你对接别的数据,别人给了你主题,改这个就行)
    private MqttClient client;
    private MqttConnectOptions options;
    private String user = "admin";//(连接登录EMQX的账号)
    private String password = "xiaofang";//(连接登录EMQX的密码)
    private String driverName = "com.mysql.cj.jdbc.Driver";//(连接MySQL数据库)
    //private String driverName = "net.sourceforge.jtds.jdbc.Driver";//(连接SQLServer数据库)
    private String url = "";//根据不同数据库填写自己的数据库地址
    private String userName = "";//填写自己的数据库名称
    private String userPwd = "";//数据库对应密码


    /*连接MQtt*/
    public void clientStart(){
        try {
            client = new MqttClient(HOST,clientID,new MemoryPersistence());
            options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setKeepAliveInterval(10);
            options.setConnectionTimeout(50);
            options.setUserName(user);
            options.setPassword(password.toCharArray());
            client.setCallback(new ClientMQTT());
            MqttTopic topic = client.getTopic(TOPIC);
            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
            options.setWill(topic,"close".getBytes(),1,true);
            client.connect(options);
            int[] Qos = {1};
            String[] topic1 = {TOPIC};
            client.subscribe(topic1,Qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接数据库
     */
    public void connection(){
        try {
            Class.forName(driverName);
            System.out.println("连接成功!!!");

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            System.out.println("驱动加载失败");
        }

        try {
        	//在这里可以先测试数据库能不能连接
            Connection dbcon = DriverManager.getConnection(url,userName,userPwd);
            System.out.println("数据库连接成功!");

            System.out.println("数据库连接成功!");
        } catch (SQLException e) {
            e.printStackTrace();
            System.out.println("连接失败");

        }
    }

    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println(throwable);
        //连接断掉会执行到这里
        System.out.println("连接以断,请重新连接!!!");
    }

	//接收EMQX上订阅主题的数据
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

        try {

			//获取消息返回格式
            String msg = new String(mqttMessage.getPayload());
            if(msg.equals("close")){
                return;
            }
            ObjectMapper objectMapper = new ObjectMapper();
            JsonNode jsonNode = objectMapper.readTree(msg);

            String info= jsonNode.get("info").toString();
            String time= jsonNode.get("time").toString().replaceAll("\"", "").replaceAll("/","");
			//格式化接收过来的时间
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddhhmmss");
            Date productTime = simpleDateFormat.parse(timeStamp);
            String infoTwo = jsonNode.get("infoTwo ").toString();

			//按照对方返回过来的格式接收数据
            JsonNode infoList = objectMapper.readTree(info);
            JsonNode infoTwoList = objectMapper.readTree(infoTwo);

            for (JsonNode dataJsonNode : infoList) {
                //这里就省略了
            }
            
            for (JsonNode dataJsonNode : infoTwoList) {
                //这里就省略了
            }

			//连接数据库
            Connection dbcon = DriverManager.getConnection(url,userName,userPwd);
            Statement stmt = dbcon.createStatement();

			//如果是SqlServe不能自动生成id,可以用这个生成一个随机id
            ResultSet rs = stmt.executeQuery("select REPLACE(NEWID(), '-', '') as Id");
            String id="";
            while(rs.next()) {
                id=rs.getString("Id").toString();
            }
            String sql="insert into 表的名称 (id,name,date,number,totalNumber,nowDate)"+
                    "values(?,?,?,?,?,?)";

            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            PreparedStatement preparedStatement = dbcon.prepareStatement(sql);//预编译下SQL语句
            preparedStatement.setString( 1,id);
            preparedStatement.setString( 2,"测试");
            preparedStatement.setString( 3, dateFormat.format(time));//获取时间
            preparedStatement.setInt( 4,number);//数量
            preparedStatement.setDouble( 5,totalNumber);//总数量
            preparedStatement.setString( 6,dateFormat.format(System.currentTimeMillis()));//获取当前时间
            //这里是执行上面的sql语句的方法
            preparedStatement.executeUpdate();
            //subscribe后会执行到这里
            System.out.println("订阅消息的主题是:"+s);
            System.out.println("消息的ID是:"+mqttMessage.getId());
            System.out.println("添加成功:"+msg);
            System.out.println("添加成功SQl语句:"+preparedStatement);
        }catch (Exception e){
            System.out.println("插入错误信息:"+e);
        }

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        //publish可以执行到这里
        System.out.println("This is deliveryComplete method----->"+iMqttDeliveryToken.isComplete());
    }

    public static void main(String[] args) {
        ClientMQTT clientMQTT = new ClientMQTT();
        clientMQTT.clientStart();
        //在这里可以先测试数据库能不能连接
        //clientMQTT.connection();
    }
}

上面的代码写完,下面我们该怎么让上面的代码可以直接在服务器上面跑,不用自己一直启动idea项目呢???

生成可执行jar包, 并安装运行到服务器

1.停止运行

2.因为程序已经设置过生成jar包,所以用户直接按照下面操作即可生成可执行jar包.







然后找到idea设置的Show Excluded Files

然后可能就会生成Out文件或者class文件。文件是橙黄色的,然后右键Show in Explorer

然后找到生成的jar包的位置

如果服务器没有安装JDK,请用户先安装JDK,然后把先前生成的jar包拷贝到服务器

下一步就是打开命令窗口 cmd

进入到jar包目录

执行 java -jar MqttDataToMySQL.jar

打印所有设备的数据,说明已经运行起来了

让程序在后台运行

1. 关闭

2. 注意呀!不要犯常识性错误!把文件扩展名选中!

3. 在jar包目录新建一个后缀名字为.bat的文件

编辑里面的内容如下(MqttDataToMySQL.jar 就是jar包的名字)

@echo off
start javaw -jar MqttDataToMySQL.jar
exit

4. 双击运行即可

5. 如果要关闭,找到任务管理器java运行进程,关闭即可

6.也可以指定JDK路径运行

C:\java8\jdk1.8.0.131 为JDK的安装路径
@echo off
set JAVA_HOME=C:\java8\jdk1.8.0.131
set CLASSPATH=.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOMe%\lib\tools.jar;
set Path=%JAVA_HOME%\bin;
start javaw -jar MqttDataToMySQL.jar
exit

在服务器运行Jar包可能会出现的错误:

“Exception in thread “main” java.lang.SecurityException: Invalid signature file digest for Manifest”

解决办法

找到你打的jar包META-INF中后缀为.SF和.RSA文件并且删除,然后重新运行,就会成功!!!!赶紧去试试吧!!!

如果还不可以的话,你可以自行查看别人解决的办法点击查看解决方法

如果你想多次运行不同的jar包,你得改java代码的“clientID”,然后再重新打jar包,取不同的jar包名称,然后放到服务器上面运行

物联沃分享整理
物联沃-IOTWORD物联网 » 使用Java从MQTT获取设备数据并通过Druid连接池写入MySQL数据库(Windows系统)——物联网开发终端管理篇

发表评论