一、app目录下添加
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
1) 点击Sync Now更新依赖
2) AndroidManifest.xml文件添加网络权限
<uses-permission android:name="android.permission.INTERNET"/>
二、 使用
1) 创建MqttConnect.java文件
package com.you.weight.mqtt;import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import android.widget.Toast;import com.youlian.weight.api.Api;
import com.youlian.weight.app.DeviceStatus;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.ArrayList;public class MqttConnect {private String HOST;private String username;private String password;private String Tag = "MQTT";private String clientId = "";private static MqttClient mqttClient = null;private Context context;Api api = new Api();// 订阅主题public static String shutdown = "";public static String update_app = "";Handler handler = new Handler(Looper.getMainLooper());String[] topic;int[] qos = {0,0,0,0,0,0,0,0,0,0,0,0,0}; // 消息质量private static MqttConnect instance;public MqttConnect(Context context, String host, String username, String password, String clientId) {this.HOST = host;this.context = context;this.username = username;this.password = password;this.clientId = clientId;// 这里是你自己需要订阅的主题shutdown = "$oc/devices/"+username+"/user/shutdown"; // 关机update_app = "$oc/devices/"+username+"/user/update_app"; // 发送更新APPArrayList<String> topicList = new ArrayList<>();topicList.add(shutdown);topicList.add(update_app);topic = topicList.toArray(new String[0]);}/*** 单列模式,只能实例化一次* @param context* @param host* @param username* @param password* @param clientId* @return*/public static synchronized MqttConnect getInstance(Context context, String host, String username, String password, String clientId) {if (instance == null) {instance = new MqttConnect(context, host, username, password, clientId);}return instance;}/*** 客户端connect连接mqtt服务器**/public void mqttClient(){close();handler.postDelayed(new Runnable() {@Overridepublic void run() {try {MqttConnectOptions options = mqttConnectOptions(username, password);mqttClient.setCallback(new MqttInitCallback(context, HOST, username, password, clientId));mqttClient.connect(options);sub(topic,qos);uiTip("MQTT连接成功");}catch (MqttException e){uiTip("MQTT连接失败,准备重连。。。:"+e.getMessage());handler.postDelayed(new Runnable() {@Overridepublic void run() {Log.e(Tag,"开始重连。。。");mqttClient();}},3000);}}},200);}/*** 在主线程弹出消息* @param msg*/private void uiTip(String msg){Log.d(Tag,msg);handler.post(new Runnable() {@Overridepublic void run() {Toast.makeText(context.getApplicationContext(), msg, Toast.LENGTH_SHORT).show();}});}/*** MQTT连接参数设置*/private MqttConnectOptions mqttConnectOptions(String userName, String passWord)throws MqttException {mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(userName);options.setPassword(passWord.toCharArray());options.setConnectionTimeout(10);
// options.setAutomaticReconnect(false);options.setCleanSession(false);options.setKeepAliveInterval(60);return options;}/*** 关闭MQTT连接*/public void close(){if(mqttClient != null && mqttClient.isConnected()){try {mqttClient.close();mqttClient.disconnect();mqttClient = null;} catch (MqttException e) {Log.e(Tag,"关闭MQTT连接报错:"+e.getMessage());}}else {Log.d(Tag,"Mqtt已关闭");}}/*** 向某个主题发布消息 默认qos:1*/public static void pub(String topic, String msg) throws MqttException {MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload(msg.getBytes());MqttTopic mqttTopic = mqttClient.getTopic(topic);MqttDeliveryToken token = mqttTopic.publish(mqttMessage);token.waitForCompletion();}/*** 向某个主题发布消息** @param topic: 发布的主题* @param msg: 发布的消息* @param qos: 消息质量 Qos:0、1、2*/public void pub(String topic, String msg, int qos) throws MqttException {MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setPayload(msg.getBytes());MqttTopic mqttTopic = mqttClient.getTopic(topic);MqttDeliveryToken token = mqttTopic.publish(mqttMessage);token.waitForCompletion();}/*** 订阅某一个主题 ,此方法默认的的Qos等级为:1** @param topic 主题*/public void sub(String topic){try {mqttClient.subscribe(topic);} catch (MqttException e) {Log.e(Tag,"MQTT主题订阅失败:" + e.getMessage());}}/*** 订阅某一个主题,可携带Qos** @param topic 所要订阅的主题* @param qos* 消息质量:0最多发送一次,不保证消息能够到达接收端,也不负责重发* 1至少发送一次,确保消息能够到达接收端,但可能会导致消息重复* 2确保消息恰好被接收一次*/public void sub(String[] topic, int[] qos){try {mqttClient.subscribe(topic, qos);}catch (MqttException e){Log.e(Tag,"订阅主题失败:"+e.getMessage());}}
}
2) 创建MqttInitCallback.java文件
package com.you.weight.mqtt;import android.content.Context;
import android.util.Log;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;/*** MQTT回调*/
public class MqttInitCallback implements MqttCallback {private String Tag = "MqttInitCallback";private String HOST;private String username;private String password;private String clientId = "";private MqttConnect mqttConnect = null;private Context context = null;MqttInitCallback(Context context, String host, String username, String password, String clientId){this.context = context;this.HOST = host;this.username = username;this.password = password;this.clientId = clientId;}/*** 连接丢失*/@Overridepublic void connectionLost(Throwable cause) {Log.d(Tag,"mqtt连接断开,执行重连");mqttConnect = MqttConnect.getInstance(context, HOST, username, password, clientId);mqttConnect.mqttClient();}/*** subscribe订阅后得到的消息会执行到这里*/@Overridepublic void messageArrived(String topic, MqttMessage message){
// Log.d(Tag,"topic");
// Log.d(Tag,topic);}/*** publish发布成功后会执行到这里*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {}
}
3) 调用
import com.you.weight.mqtt.MqttConnect;private MqttConnect mqttConnect = null;mqttConnect = MqttConnect.getInstance(MainActivity.this, host, username, password, clientId);
mqttConnect.mqttClient();