【阿里云生活物联网架构师专题 ⑧】基于在 SpringBoot Java私有云上接收阿里云物联网设备的各种状态数据,实现m2m流转;


  • 本系列博客学习由非官方人员 半颗心脏 潜心所力所写,仅仅做个人技术交流分享,不做任何商业用途。如有不对之处,请留言,本人及时更改。

1、esp32接入阿里云物联网平台,实现天猫精灵语音控制;
2、esp8266直连接入阿里云物联网平台,实现天猫精灵找队友零配网功能和语音控制;
3、esp32 sdk 直连接入天猫精灵IOT开放平台,实现天猫精灵找队友零配网功能和语音控制;
4、如何在天猫精灵IOT开放平台二次开发智能设备的 H5控制面板;
5、分享可商用的ESP8266 SDK连接阿里云物联网生活平台的在线远程升级OTA笔记。
6、ESP8266接入阿里生活飞燕平台国际版,实现亚马逊Alexa Echo音响语音控制。
7、阿里云物联网平台的网关-子设备理论协议说明,支持Zigbee/ble等没上云能力的设备;;
8、基于在 SpringBoot Java 私有云上接收阿里云物联网平台设备的各种状态数据,实现m2m流转;

文章目录

  • 寄言
  • 一、前言
    • 什么是服务器订阅?
      • 适用场景
  • 二、分享一个好玩的IDEA插件
  • 三、配置AMQP服务端订阅
  • 四、集成开始
    • 另外,不要把我的博客作为学习标准,我的只是笔记,难有疏忽之处,如果有,请指出来,也欢迎留言哈!

在这里插入图片描述

寄言


       我写过很多物联网控制的博文系列,包括微信公众号、微信小程序控制硬件,私有云对接天猫精灵服务器,抑或是硬件端 esp8266/esp32 等系列博文,这是一个一个专题是写下我们如何在阿里云物联网上全栈开发我们的应用专题,让我们无须企业账号也可以体验设备-云端-App” 的过程;让我们变得更强,一个人担任一个公司的全部职责,全栈开发物联网攻城狮前进;

       我会带领大家轻轻松松地把自己的设备接入天猫精灵,告别 “单机时代”,走进语音控制物联网时代。 有疑问请留言区留言,或者加群大伙们讨论;写总结,写博文不容易,望大家多多体谅!

  • 自带资料:
    • git 分布式管理软件的基本使用;
    • 硬件开发:乐鑫 esp8266、esp32模块一个;具备 c 语言基础 ,不需要很熟练;
    • 移动端开发:android 端具备 javavue.js 开发语言,AndroidStudio 环境;
    • 服务器端开发:php 开发,熟悉 服务器运行、部署等原理操作;

一、前言

    将近2个月没更新博文了,罪恶感满满的!工作上频繁遇到了很多客户的一些需求,这2个月我会慢慢做博文,做视频,给大家做一些技术原理讲解,共勉物联网;

    针对不少纯软件的电商类型公司想玩转物联网开发,问到我一个问题,我们的私有云服务器如何接收来自阿里云物联网的设备的状态和数据呢?

    今天,我给大家啊,介绍下如何在 Java 语言开发的私有云上实现!

什么是服务器订阅?

    私有云可以直接订阅产品下多种类型的消息:设备上报消息、设备状态变化通知、设备生命周期变更、网关发现子设备上报、设备拓扑关系变更等。配置服务端订阅后,物联网平台会将产品下所有设备的已订阅类型的消息转发至您的服务器。

适用场景

  • 1、服务端订阅适用于单纯的接收设备数据的场景,并且适用于高并发场景。

  • 2、服务端接收产品下全部设备的订阅数据。

  • 3、如果您有多个服务器消费同一个产品的订阅消息,消息会随机转发至某个服务器。

  • 4、服务端订阅与规则引擎数据流转的使用场景和能力对比,请参见数据流转方案对比。

    今天使用的是 SpringBoot 架构语言,这个也是我擅长开发的架构,所以这篇我给大家介绍下如何集成AMQP高级消息队列协议,配置AMQP服务端订阅后,物联网平台会将产品下所有已订阅类型的消息,通过AMQP通道推送至私有云。


    AMQP服务端订阅消息流转流程图

在这里插入图片描述


二、分享一个好玩的IDEA插件

     这几个月在玩 Java 服务器,同事推荐了一个插件 Alibaba Cloud Toolkit,这是阿里巴巴针对 ECS服务器做的一款插件,能一键部署 jar 包,非常方便;

在这里插入图片描述

在这里插入图片描述
     预想在本地看到远程服务器的运行日志,插件还提供了这样的命令,其中运行此命令还需要 SSH 远程连接的账户密码:

在这里插入图片描述

     其中,restart-springboot.sh的内容如下:

killall java
nohup java -jar /www/wwwroot/www.aliyun.com/demo-0.0.1-SNAPSHOT.jar > nohup.log 2>&1 &

三、配置AMQP服务端订阅

    在物联网平台控制台设置服务端订阅的消息类型。

  1. 登录物联网平台控制台。
  2. 在实例概览页,找到对应的实例,单击实例进入实例详情页;

在这里插入图片描述

  1. 在左侧导航栏,选择规则引擎 > 服务端订阅。
  2. 在服务端订阅页,单击创建订阅。
  3. 在创建订阅对话框中,完成配置,单击确认。
    在这里插入图片描述

四、集成开始

     首先,先集成架包;

<!--aliyun core--><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>4.5.6</version></dependency><!--aliyun Iot--><!-- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-iot --><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-iot</artifactId><version>7.16.0</version></dependency><!-- IOT用于监听阿里平台消息 --><dependency><groupId>com.aliyun.openservices</groupId><artifactId>iot-client-message</artifactId><version>1.1.5</version></dependency><!-- amqp 1.0 qpid client --><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.47.0</version></dependency><!-- util for base64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency>

     整个接收处理类如下:

     其中,要注意的是以下几个参数,其实在阿里云官网描述得非常清楚了点我:

参数说明
accessKey阿里云账号的AccessKey Key
accessSecret阿里云账号的AccessKey Secret
consumerGroupId消费组ID。登录物联网平台控制台,在对应实例的规则引擎 > 服务端订阅 > 消费组列表查看您的消费组ID。
iotInstanceId实例ID。仅您购买的实例需要传入;如果是公共示例,留空即可。
connectionUrl见下面的详细描述

    AMQP客户端接入物联网平台的连接地址和连接认证参数说明如下:

  • 接入域名:
    • 对于您购买的实例,接入域名请在物联网平台控制台,找到对应的实例,单击实例进入实例详情查看。
    • 公共实例的接入域名:${uid}.iot-amqp.${regionId}.aliyuncs.com,其域名中的变量说明
字段说明
uid您的阿里云账号ID。可登录物联网平台控制台,单击账号头像,跳转至安全设置页面查看
regionId您的物联网平台服务所在地域ID。在物联网平台控制台左上方可查看地域。RegionId的表达方法,请参见地域和可用区。
package com.example.demo;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;@SpringBootApplication
public class DemoApplication {private final static Logger logger = LoggerFactory.getLogger(DemoApplication.class);//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(50000));public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);//参数说明,请参见AMQP客户端接入说明文档。String accessKey = "LTAI4Fg3**********GYJBLPoTo5";String accessSecret = "OT7s9vk*********K4p8KQ31qoTIL4";String consumerGroupId = "DEFAULT_GROUP";//iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。String iotInstanceId = "";long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5、hmacsha1和hmacsha256。String signMethod = "hmacsha1";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。String clientId = "xuhongv";//userName组装方法,请参见AMQP客户端接入说明文档。String userName = clientId + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",iotInstanceId=" + iotInstanceId+ ",consumerGroupId=" + consumerGroupId+ "|";//计算签名,password组装方法,请参见AMQP客户端接入说明文档。String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;String password = null;try {password = doSign(signContent, accessSecret, signMethod);} catch (Exception e) {e.printStackTrace();}//接入域名,请参见AMQP客户端接入说明文档。String connectionUrl = "failover:(amqps://13933000001932702.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=15";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF", connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = null;try {context = new InitialContext(hashtable);} catch (NamingException e) {e.printStackTrace();}ConnectionFactory cf = null;Destination queue = null;try {cf = (ConnectionFactory) context.lookup("SBCF");queue = (Destination) context.lookup("QUEUE");} catch (NamingException e) {e.printStackTrace();}// Create ConnectionConnection connection = null;try {connection = cf.createConnection(userName, password);} catch (JMSException e) {e.printStackTrace();}((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// Create Session// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)Session session = null;try {session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();} catch (JMSException e) {e.printStackTrace();}try {// Create Receiver LinkMessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);} catch (JMSException e) {e.printStackTrace();}}private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {try {//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() -> processMessage(message));} catch (Exception e) {logger.error("submit task occurs exception ", e);}}};/*** 在这里处理您收到消息后的具体业务逻辑。*/private static void processMessage(Message message) {try {byte[] body = message.getBody(byte[].class);String content = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");logger.info("receive message"+ ", topic = " + topic+ ", messageId = " + messageId+ ", content = " + content);} catch (Exception e) {logger.error("processMessage occurs error ", e);}}private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立。*/@Overridepublic void onConnectionEstablished(URI remoteURI) {logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {logger.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {logger.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}
}
  • 当设备正常推送数据上报,服务器会接收到:

在这里插入图片描述

2020-11-20 15:43:04.266  INFO 2108 --- [pool-1-thread-1] com.example.demo.DemoApplication         : receive message, topic = /a1quyJFAxPS/Device01/thing/event/property/post, messageId = 1329691716009393153, content = {"deviceType": "WaterloggingSensor","iotId": "PZzjOxtNUvQ2jlpATdPs000000","requestId": "123","checkFailedData": {},"productKey": "a1quoiyJF0","gmtCreate": 1605858184226,"deviceName": "Device01","items": {"BatteryLevel": {"value": 1,"time": 1605858184231},"WaterLeachState": {"value": 1,"time": 1605858184231},"GeoLocation": {"value": {"CoordinateSystem": 1,"Latitude": 11,"Longitude": 11,"Altitude": 1},"time": 1605858184231}}
}

另外,不要把我的博客作为学习标准,我的只是笔记,难有疏忽之处,如果有,请指出来,也欢迎留言哈!

  • 玩转esp8266带你飞、加群付费QQ群,不喜的朋友勿喷勿加:434878850
  • esp8266源代码学习汇总(持续更新,欢迎star):https://github.com/xuhongv/StudyInEsp8266
  • esp32源代码学习汇总(持续更新,欢迎star):https://github.com/xuhongv/StudyInEsp32
  • 关注下面微信公众号二维码,干货多多,第一时间推送!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/251933.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

软件吞噬世界,Api快速入门到放弃

正如汽车行业必须达到一定的规模&#xff0c;才能让企业只生产一个部件。软件产业现在已经足够大了&#xff0c;尤其当你接受所谓的“软件吞噬世界”此类的说法时更是如此。因此&#xff0c;和汽车工业不再生产自己的钢铁一样&#xff0c;大多数公司都希望利用API来提供更好的服…

EKS独领风骚

前言 随着公司的逐渐发展&#xff0c;开拓了更加多的子项目与小程序&#xff0c;这些都需要进行宣传&#xff0c;但是管理以及部署新的应用是一个繁琐的工程&#xff0c;部署麻烦而且更新业务的时候非常不方面。尤其面向用户的时候&#xff0c;体验感很差。于是想要使用docker…

区块链,不是元宇宙的全部

元宇宙的火爆并非偶然。无论是在投资界&#xff0c;还是在行业界&#xff0c;我们都看到了元宇宙的身影。“元宇宙”&#xff0c;俨然已经成为一个全新的风口。由于与元宇宙之间的天然联系&#xff0c;于是&#xff0c;很多人开始将元宇宙与区块链联系在一起&#xff0c;甚至还…

区块链+新零售全方位追踪记录有形商品或无形信息的流转链条

传统的溯源系统一般都是使用中心化账本模式&#xff0c;由各个市场参与者分散孤立地记录和保存&#xff0c;是一种信息孤岛模式。 在中心化账本模式下&#xff0c;谁作为中心维护这个账本变成为了问题的关键&#xff0c;无论是源头企业&#xff0c;还是渠道商保存&#xff0c;…

区块链技术下的数字藏品如何赋能实体商品

在很多人看来&#xff0c;区块链技术下的数字收藏品&#xff0c;或许可能与我们的生活相距甚远&#xff0c;然而&#xff0c;在区块链技术不断地发展与现今社会的要求之下&#xff0c;数字藏品&#xff0c;正渐渐地贴近我们的生活。 第一&#xff0c;免费赠送数字藏品、宣传推广…

交易的流动——从钱包到区块链网络

是否经常会好奇我在钱包上生成的交易是如何一步步流入区块链网络&#xff0c;并最终成为区块的一部分呢&#xff1f; 要想弄清这个问题&#xff0c;首先要搞清楚钱包的工作原理&#xff0c;以主流钱包MetaMask为例&#xff0c;在我们添加一个新的网络时&#xff0c;会需要填写…

不止于链:万向区块链“万纳链盟”,构建企业数字转型一体化方案

2022年伊始&#xff0c;为更好满足企业级客户在数字化转型关键期的实际需求&#xff0c;万向区块链汇聚技术及生态资源&#xff0c;对区块链技术架构进行全面拓展&#xff0c;基于PlatONE升级万纳链&#xff0c;推出“万纳链盟”。“万纳链盟”作为新一代区块链技术和网络平台&…

当下的区块链游戏该如何推广?一文通解全部套路

自cryptokitties诞生至今&#xff0c;已有数百款区块链游戏上线&#xff0c;如同电影市场&#xff0c;究竟应该如何排档期&#xff0c;如何营销&#xff0c;才能获得好的票房呢&#xff1f;游戏茶馆的佳伦和屁猪曾撰文&#xff0c;从渠道的角度&#xff0c;论述过区块链游戏的推…

赋能司法,区块链为正义指路

出品 | 人民数字FINTECH 责编 | 晋兆雨 头图 | 付费下载于视觉中国 随着数字化技术的发展&#xff0c;电子证据包括声音、图片、视频等形式在诉讼案件中出现的越来越频繁。 但取证容易举证难&#xff0c;电子证据证据在形成的时候并非都能对数据生成的时间、真实人员身份等予…

元宇宙:区块链成熟的开始

谈及区块链&#xff0c;我们脑海当中首先浮现出来的是&#xff0c;狂热、浮躁的场景。无论是人们试图通过区块链来实现财富自由的妄想&#xff0c;抑或是人们试图通过区块链成就「下一个互联网」的渴望&#xff0c;无不折射出区块链的年轻与冒进以及人们对于区块链认识的肤浅。…

世链投研|people的诞生堪比魔幻,背后蕴藏着怎样的经济体制?

最近一段时间&#xff0c;加密社区中有一个项目颇为火爆&#xff0c;几乎是在一夜之间光速走红&#xff0c;那就是people&#xff0c;也叫做ConstitionDao。 People的诞生过程非常神奇&#xff0c;用“魔幻”二字形容都不为过。它本是由网友在区块链平台筹集资金的个人行为&am…

元宇宙漫游指南-区块链构建元宇宙基础设施,一文搞清楚元宇宙和区块链

1、区块链和元宇宙 1.1 区块链和元宇宙的概念 1.1.1 元宇宙大事件 Roblox第一支股票上市 Roblox公司2004年成立&#xff0c;2021年3月份在纽交所直接上市。在他们自己的描述中&#xff0c;Roblox运营着一个“人类共同体验平台”&#xff0c;用户可以在其中相互互动以探索和开…

黑链 明链 暗链 简介

黑链一直以来都是SEO人员讨论的话题&#xff0c;在SEO早期&#xff0c;特别是网络安全意识淡薄的时代&#xff0c;曾被广泛应用&#xff0c;但随着百度算法不断的升级与调整&#xff0c;黑链的使用&#xff0c;逐渐在减少。 那么&#xff0c;什么是黑链&#xff0c;它与明链、暗…

UTON NFR:元宇宙是区块链时代发展的必然产物

当互联网走到新的十字路口&#xff0c;美国企业领头以元宇宙概念重构科技世界&#xff0c;以实现下一代技术领先发展。我们会发现&#xff0c;这个领域依然是媒体的热度居多&#xff0c;还不是行业的热度&#xff0c;因此&#xff0c;我们尝试从元宇宙的重要构成说起&#xff0…

SQL Server 2014 安装教程(图文教程,附下载地址)

SQL Server 2014 安装教程 解压 SQL Server 光盘镜像&#xff0c;在解压好的文件夹中&#xff0c;找到 setup.exe 文件&#xff0c;双击打开&#xff0c;选择安装。 选择 全新 SQL Server 独立安装或向现有安装添加功能。 选择 下一步。 勾选 我接受许可条款&#xff0c;并…

SQL Server 2012 下载与安装详细教程

一、SQL Server 2012下载 链接&#xff1a;https://www.123pan.com/s/VcwKVv-Zkawv 提取码&#xff1a;gEBE 下载好文件之后解压 二、软件安装 1.打开SQL Server 2012文件夹&#xff0c;双击setup.exe文件 2.左侧选择安装&#xff0c;点第一个全新SQL Server 独立安装或向…

Visual Studio 2019下载及安装的详细教程

以下内容都是给纯小白看的&#xff0c;如有大佬请多多指点。 ** 第一步&#xff1a; ** 首先先找自己电脑上的浏览器&#xff08;这里建立用谷歌浏览器&#xff09;&#xff0c;在搜索框中输入“微软”&#xff0c;打开后 点击红色箭头指向的那一个。 第二步&#xff1a; 进…

VS2015安装教程(带图解+下载地址+超详细)

https://blog.csdn.net/guxiaonuan/article/details/73775519?locationNum2&fps1 本文将教给你如何安装 VS2015&#xff0c;如果你还想了解 VS2015 的使用&#xff0c;请猛击&#xff1a;在 VS2015 下运行C语言程序 与此同时&#xff0c;我们还提供了非常优秀的C语言教程…

SQL Server 2014下载及安装教程

目录 一、下载 二、安装 1.安装sql server 2014 2.报错后&#xff0c;安装net framework 3.5 3.继续安装 4.补丁SP3安装 三、用管理工具连接sql 哔哩哔哩账号&#xff1a;我是冷空气 包含各种视频教程&#xff08;数据库、操作系统、运维、网络、存储、虚拟化、云计算、…

SQL Server 2016下载及安装教程

目录 一、下载 1.sql server 2016安装包与sp2补丁 ​编辑2.管理工具 二、安装 1.sql server2016 2.补丁SP2安装 三、用管理工具连接sql 哔哩哔哩账号&#xff1a;我是冷空气 包含各种视频教程&#xff08;数据库、操作系统、运维、网络、存储、虚拟化、云计算、编程开发…