|
@@ -1,5 +1,9 @@
|
|
|
package com.ruoyi.system.init;
|
|
|
|
|
|
+import com.github.rholder.retry.Retryer;
|
|
|
+import com.github.rholder.retry.RetryerBuilder;
|
|
|
+import com.github.rholder.retry.StopStrategies;
|
|
|
+import com.github.rholder.retry.WaitStrategies;
|
|
|
import com.ruoyi.system.config.MQTTConfig;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
@@ -9,8 +13,12 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
import org.springframework.boot.CommandLineRunner;
|
|
|
import org.springframework.core.annotation.Order;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
/**
|
|
|
* mqtt发布AGV实时坐标
|
|
|
*/
|
|
@@ -19,22 +27,27 @@ import org.springframework.stereotype.Component;
|
|
|
@Slf4j
|
|
|
public class PublishAGVInfo implements CommandLineRunner {
|
|
|
|
|
|
- private final String topic="position";
|
|
|
+ private final String topic = "position";
|
|
|
|
|
|
- private int qos = 1;
|
|
|
+ private int qos = 1;
|
|
|
|
|
|
- private MemoryPersistence persistence = new MemoryPersistence();
|
|
|
+ private MemoryPersistence persistence = new MemoryPersistence();
|
|
|
|
|
|
private MQTTConfig mqttConfig = new MQTTConfig();
|
|
|
|
|
|
private MqttClient sampleClient;
|
|
|
|
|
|
+ @Resource
|
|
|
+ ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
|
|
+
|
|
|
+ private boolean isConnect = false;
|
|
|
+
|
|
|
@Override
|
|
|
public void run(String... args) throws Exception {
|
|
|
-// initConnect();
|
|
|
+ initConnect();
|
|
|
}
|
|
|
|
|
|
- private void initConnect(){
|
|
|
+ private void initConnect() {
|
|
|
try {
|
|
|
// 创建客户端
|
|
|
sampleClient = new MqttClient(mqttConfig.getHOST(), mqttConfig.getClientId(), persistence);
|
|
@@ -48,37 +61,78 @@ public class PublishAGVInfo implements CommandLineRunner {
|
|
|
// 建立连接
|
|
|
sampleClient.connect(connOpts);
|
|
|
log.info("MQTT 连接完成!");
|
|
|
+ isConnect = true;
|
|
|
} catch (MqttException me) {
|
|
|
- log.error("An exception occurred when initializing the MQTT connection,msg:{}",me.getMessage());
|
|
|
+ retry();
|
|
|
+ log.error("An exception occurred when initializing the MQTT connection,msg", me);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void closeConnection(){
|
|
|
+ public void closeConnection() {
|
|
|
try {
|
|
|
// 断开连接
|
|
|
sampleClient.disconnect();
|
|
|
// 关闭客户端
|
|
|
sampleClient.close();
|
|
|
- }catch (MqttException me){
|
|
|
- log.error("An exception occurred when the connection about MQTT was closed,msg:{}",me.getMessage());
|
|
|
+ } catch (MqttException me) {
|
|
|
+ log.error("An exception occurred when the connection about MQTT was closed,msg:", me);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
+ public void retry() {
|
|
|
+ threadPoolTaskExecutor.execute(() -> {
|
|
|
+ Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
|
|
|
+ .retryIfResult(Boolean.FALSE::equals)
|
|
|
+ .retryIfExceptionOfType(Exception.class)
|
|
|
+ .withStopStrategy(StopStrategies.neverStop())
|
|
|
+ .withWaitStrategy(WaitStrategies.fixedWait(7, TimeUnit.SECONDS))
|
|
|
+ .build();
|
|
|
+ try {
|
|
|
+ retryer.call(() -> {
|
|
|
+ try {
|
|
|
+ // 创建客户端
|
|
|
+ sampleClient = new MqttClient(mqttConfig.getHOST(), mqttConfig.getClientId(), persistence);
|
|
|
+ // 创建链接参数
|
|
|
+ MqttConnectOptions connOpts = new MqttConnectOptions();
|
|
|
+ // 在重新启动和重新连接时记住状态
|
|
|
+ connOpts.setCleanSession(false);
|
|
|
+ // 设置连接的用户名
|
|
|
+ connOpts.setUserName(mqttConfig.getUserName());
|
|
|
+ connOpts.setPassword(mqttConfig.getPassword().toCharArray());
|
|
|
+ // 建立连接
|
|
|
+ sampleClient.connect(connOpts);
|
|
|
+ log.info("MQTT 连接完成!");
|
|
|
+ isConnect = true;
|
|
|
+ return true;
|
|
|
+ } catch (MqttException me) {
|
|
|
+ log.error("An exception occurred when initializing the MQTT connection,msg:", me);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("An exception occurred during the retry ,msg:", e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
-
|
|
|
- public void publishMessage(String content){
|
|
|
+ public void publishMessage(String content) {
|
|
|
+ if (!isConnect){
|
|
|
+ log.error("The MQTT connection is disconnected!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
try {
|
|
|
- // 创建消息
|
|
|
- MqttMessage message = new MqttMessage(content.getBytes());
|
|
|
- // 设置消息的服务质量
|
|
|
- message.setQos(qos);
|
|
|
- // 发布消息
|
|
|
- sampleClient.publish(topic, message);
|
|
|
- }catch (MqttException me){
|
|
|
+ // 创建消息
|
|
|
+ MqttMessage message = new MqttMessage(content.getBytes());
|
|
|
+ // 设置消息的服务质量
|
|
|
+ message.setQos(qos);
|
|
|
+ // 发布消息
|
|
|
+ sampleClient.publish(topic, message);
|
|
|
+ } catch (MqttException me) {
|
|
|
+ isConnect = false;
|
|
|
closeConnection();
|
|
|
initConnect();
|
|
|
- log.error("An exception occurred in the release information,msg:{}",me.getMessage());
|
|
|
+ log.error("An exception occurred in the release information,msg:",me );
|
|
|
}
|
|
|
}
|
|
|
}
|