当前位置:首页 > Windows程序 > 正文

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

2021-05-24 Windows程序

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现

callback方式接收没有成功。所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因。

采用Callback式 发布主题

Java代码  

package com.etrip.mqtt.callback;  

  

import java.net.URISyntaxException;  

  

import org.fusesource.hawtbuf.Buffer;  

import org.fusesource.hawtbuf.UTF8Buffer;  

import org.fusesource.mqtt.client.Callback;  

import org.fusesource.mqtt.client.CallbackConnection;  

import org.fusesource.mqtt.client.Listener;  

import org.fusesource.mqtt.client.MQTT;  

import org.fusesource.mqtt.client.QoS;  

import org.fusesource.mqtt.client.Topic;  

import org.slf4j.Logger;  

import org.slf4j.LoggerFactory;  

  

/** 

 *  

 * MQTT moquette 的Server 段用于并发布主题信息 

 *  

 * 采用Callback式 发布主题  

 *  

 * @author longgangbai 

 */  

public class MQTTCallbackServer {  

      private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class);  

        private final static String CONNECTION_STRING = "tcp://localhost:1883";  

        private final static boolean CLEAN_START = true;  

        private final static short KEEP_ALIVE = 30;// 低耗网络,,但是又需要及时获取数据,心跳30s  

        public  static Topic[] topics = {  

                        new Topic("china/beijing", QoS.EXACTLY_ONCE),  

                        new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  

                        new Topic("china/henan", QoS.AT_MOST_ONCE)};  

        public final  static long RECONNECTION_ATTEMPT_MAX=6;  

        public final  static long RECONNECTION_DELAY=2000;  

          

        public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  

          

          

      public static void main(String[] args)   {  

        //创建MQTT对象  

        MQTT mqtt = new MQTT();  

        try {  

            //设置mqtt broker的ip和端口  

            mqtt.setHost(CONNECTION_STRING);  

            //连接前清空会话信息  

            mqtt.setCleanSession(CLEAN_START);  

            //设置重新连接的次数  

            mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  

            //设置重连的间隔时间  

            mqtt.setReconnectDelay(RECONNECTION_DELAY);  

            //设置心跳时间  

            mqtt.setKeepAlive(KEEP_ALIVE);  

            //设置缓冲的大小  

            mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  

              

              

  

              

            //获取mqtt的连接对象BlockingConnection  

             final CallbackConnection connection = mqtt.callbackConnection();  

               

              

            //添加连接的监听事件  

            connection.listener(new Listener() {  

                

                public void onDisconnected() {  

                }  

                public void onConnected() {  

                }  

  

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/70257.html