拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 具有MQTT,NiFi和InfluxDB的物联网数据管道

具有MQTT,NiFi和InfluxDB的物联网数据管道

白鹭 - 2021-11-09 2211 0 2

1.简介

在本教程中,我们将学习为物联网应用程序创建数据管道时需要做什么。


在此过程中,我们将了解IoT架构的特征,并了解如何利用MQTT Broker,NiFi和InfluxDB等不同工具来为IoT应用程序构建高度可扩展的数据管道。

2.物联网及其架构

首先,让我们研究一些基本概念并了解IoT应用程序的一般体系结构。

2.1。什么是物联网?

物联网(IoT)泛指物理对象的网络,称为“事物”。例如,事物可能包括从普通家用物品(例如灯泡)到复杂的工业设备的任何事物。通过这个网络,我们可以将各种各样的传感器和执行器连接到互联网上以交换数据:

iot-data-pipeline-with-mqtt-nifi-and-influxdb (1).jpg

现在,我们可以在非常不同的环境中部署事物-例如,该环境可以是我们的家,也可以是完全不同的事物,例如移动的货运卡车。但是,我们实际上无法对可用于这些设备的电源和网络的质量做出任何假设。因此,这对物联网应用提出了独特的要求。

2.2。物联网架构简介

典型的物联网架构通常将自身构造为四个不同的层。让我们了解数据实际上是如何流过这些层的:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-1 (1).jpg

首先,传感层主要由收集来自环境的测量值的传感器组成。然后,网络层帮助汇总原始数据并通过Internet发送以进行处理。此外,数据处理层过滤原始数据并生成早期分析。最后,应用程序层使用强大的数据处理功能来执行更深入的数据分析和管理。

3. MQTT,NiFi和InfluxDB简介

现在,让我们检查一下我们今天在IoT设置中广泛使用的一些产品。这些都提供了一些独特的功能,使其适合于IoT应用程序的数据需求。

3.1。 MQTT

消息队列遥测传输(MQTT)是一种轻量级的发布-订阅网络协议。现在是OASIS和ISO标准。 IBM最初开发它是为了在设备之间传输消息。 MQTT适用于内存,网络带宽和电源稀缺的受限环境。


MQTT遵循客户机-服务器模型,其中不同的组件可以充当客户机并通过TCP连接到服务器。我们知道该服务器是MQTT代理。客户端可以将消息发布到称为主题的地址。他们还可以订阅主题并接收发布到该主题的所有消息。


在典型的物联网设置中,传感器可以将温度等测量结果发布到MQTT代理,而上游数据处理系统可以订阅以下主题以接收数据:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-2 (1).jpg

如我们所见,MQTT中的主题是分层的。系统可以使用通配符轻松订阅整个主题层次结构。


MQTT支持三个级别的服务质量(QoS) 。这些是“最多交付一次”,“至少交付一次”和“恰好交付一次”。 QoS定义了客户端和服务器之间的协议级别。每个客户可以选择适合其环境的服务级别。


客户端还可以请求代理在发布时保留消息。在某些设置中,MQTT代理可能要求客户端提供用户名和密码身份验证才能进行连接。此外,出于隐私考虑,可以使用SSL / TLS对TCP连接进行加密。


有几种可用的MQTT代理实现和客户端库可供使用-例如, HiveMQ , Mosquitto和Paho MQTT 。在本教程的示例中,我们将使用Mosquitto。 Mosquitto是Eclipse Foundation的一部分,我们可以轻松地将其安装在Raspberry Pi或Arduino之类的板上。

3.2。Apache NiFi

Apache NiFi最初是由NSA开发为NiagaraFiles。它促进了系统之间数据流的自动化和管理,并且基于基于流的编程模型,该模型将应用程序定义为黑匣子流程网络。


让我们首先了解一些基本概念。在NiFi中通过系统移动的对象称为FlowFile。 FlowFile处理器实际上执行有用的工作,例如FlowFiles的路由,转换和中介。 FlowFile处理器与Connections连接。


进程组是一种将组件分组在一起以在NiFi中组织数据流的机制。进程组可以通过输入端口接收数据,并可以通过输出端口发送数据。远程进程组(RPG)提供了一种向NiFi远程实例发送数据或从其接收数据的机制。


现在,有了这些知识,让我们看一下NiFi架构:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-3 (1).jpg

NiFi是基于Java的程序,可在JVM中运行多个组件。 Web服务器是承载命令和控制API的组件。 Flow Controller是NiFi的核心组件,它管理扩展何时接收执行资源的时间表。扩展允许NiFi进行扩展,并支持与不同系统的集成。


NiFi会在FlowFile信息库中跟踪FlowFile的状态。 FlowFile的实际内容字节位于内容存储库中。最后,与FlowFile相关的出处事件数据位于出处库中。


由于从源头收集数据可能需要较小的占地面积和较低的资源消耗,因此NiFi拥有一个名为MiNiFi的子项目。 MiNiFi为NiFi提供了一种补充性的数据收集方法,并通过站点到站点(S2S)协议轻松与NiFi集成:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-4 (1).jpg

此外,它还可以通过MiNiFi命令和控制(C2)协议对代理进行集中管理。此外,它通过生成完整的监管信息链来帮助建立数据出处。

3.3。 InfluxDB

InfluxDB是一个用Go编写的时间序列数据库,由InfluxData开发。它设计用于快速和高可用性的时间序列数据存储和检索。这特别适合处理应用程序指标,IoT传感器数据和实时分析。


首先,InfluxDB中的数据是按时间序列组织的。时间序列可以包含零个或多个点。点代表具有四个组成部分的单个数据记录-测量,标签集,字段集和时间戳:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-5 (1).jpg

首先,时间戳显示与特定点关联的UTC日期和时间。字段集由一个或多个字段关键字和字段值对组成。他们使用点标记捕获实际数据。同样,标签集由标签键和标签值对组成,但它们是可选的。它们基本上充当点的元数据,并且可以为更快的查询响应建立索引。


该度量充当标签集,字段集和时间戳的容器。此外,InfluxDB中的每个点都可以具有与其关联的保留策略。保留策略描述了InfluxDB将保留数据多长时间,以及它将通过复制创建多少个副本。


最后,数据库充当用户,保留策略,连续查询和时间序列数据的逻辑容器。我们可以理解InfluxDB中的数据库与传统的关系数据库大致相似。


此外,InfluxDB是InfluxData平台的一部分,该平台提供了其他几种产品来有效地处理时间序列数据。 InfluxData现在提供它作为开源平台InfluxDB OSS 2.0和商业产品InfluxDB Cloud:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-6 (1).jpg

除了InfluxDB之外,该平台还包括Chronograf ,它为InfluxData平台提供了完整的接口。此外,它还包括Telegraf ,它是用于收集和报告指标和事件的代理。最后,还有一个实时流数据处理引擎Kapacitor。

4.物联网数据管道的动手实践

现在,我们已经覆盖了足够的基础知识来一起使用这些产品来为我们的物联网应用程序创建数据管道。我们假定本教程从多个城市的多个观测站收集与空气质量相关的测量值。例如,测量包括地面臭氧,一氧化碳,二氧化硫,二氧化氮和气溶胶。

4.1。设置基础架构

首先,我们假设城市中的每个气象站都配备了所有传感设备。此外,这些传感器还连接到Raspberry Pi之类的板上,以收集模拟数据并将其数字化。该评估板已连接至无线设备,以将原始测量结果发送到上游:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-7 (1).jpg

区域控制站从城市中的所有气象站收集数据。我们可以将这些数据汇总并提供给一些本地分析引擎,以更快地获得洞察。来自所有区域控制中心的已过滤数据被发送到中央命令中心,该中心主要托管在云中。

4.2。创建物联网架构

现在,我们准备为简单的空气质量应用程序设计IoT架构。我们将在此处使用MQTT代理,MiNiFi Java代理,NiFi和InfluxDB:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-8 (1).jpg

如我们所见,我们正在气象站站点上使用Mosquitto MQTT代理和MiNiFi Java代理。在区域控制中心,我们正在使用NiFi服务器来聚合和路由数据。最后,我们使用InfluxDB在命令中心级别存储度量。

4.3。执行安装

在Raspberry Pi之类的板上安装Mosquitto MQTT代理和MiNiFi Java代理非常容易。但是,对于本教程,我们将它们安装在本地计算机上。


Eclipse Mosquito的官方下载页面提供了多个平台的二进制文件。安装后,从安装目录启动Mosquitto非常简单:

net start mosquitto

此外, NiFi二进制文件也可以从其官方网站下载。我们必须将下载的存档提取到合适的目录中。由于MiNiFi将使用站点间协议连接到NiFi,因此我们必须在<NIFI_HOME> /conf/nifi.properties中指定站点间输入套接字端口:

# Site to Site properties

 nifi.remote.input.host=

 nifi.remote.input.secure=false

 nifi.remote.input.socket.port=1026

 nifi.remote.input.http.enabled=true

 nifi.remote.input.http.transaction.ttl=30 sec

然后,我们可以启动NiFi:

<NIFI_HOME>/bin/run-nifi.bat

同样,可以从官方网站下载Java或C ++ MiNiFi代理和工具包二进制文件。同样,我们必须将存档提取到合适的目录中。


默认情况下,MiNiFi配备的处理器数量很少。由于我们将使用MQTT中的数据,因此必须将MQTT处理器复制到<MINIFI_HOME> / lib目录中。这些文件打包为NiFi存档(NAR)文件,可以位于<NIFI_HOME> / lib目录中:

COPY <NIFI_HOME>/lib/nifi-mqtt-nar-xxxnar <MINIFI_HOME>/lib/nifi-mqtt-nar-xxxnar

然后,我们可以启动MiNiFi代理:

<MINIFI_HOME>/bin/run-minifi.bat

最后,我们可以从InfluxDB的官方站点下载开源版本。和以前一样,我们可以提取存档并使用简单的命令启动InfluxDB:

<INFLUXDB_HOME>/influxd.exe

我们应该保留所有其他配置(包括端口)作为本教程的默认配置。至此,我们本地计算机上的安装和设置结束了。

4.4。定义NiFi数据流

现在,我们准备定义数据流。 NiFi提供了一个易于使用的界面来创建和监视数据流。可通过URL http:// localhost:8080 / nifi访问。


首先,我们将定义将在NiFi服务器上运行的主要数据流:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-9 (1).jpg

如我们所见,在这里,我们定义了一个输入端口,它将接收来自MiNiFi代理的数据。它还通过连接发送数据到负责将数据存储在InfluxDB中PutInfluxDB处理器。在此处理器的配置中,我们定义了InfluxDB的连接URL和要在其中发送数据的数据库名称。

4.5。定义MiNiFi数据流

接下来,我们将定义将在MiNiFi代理上运行的数据流。我们将使用NiFi相同的用户界面,并将数据流导出为模板,以便在MiNiFi代理中进行配置。让我们为MiNiFi代理定义数据流:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-10 (1).jpg

在这里,我们定义了ConsumeMQTT处理器,该处理器负责从MQTT代理获取数据。我们在属性中提供了代理URI以及主题过滤器。 air-quality等级下定义的所有主题中提取数据。


我们还定义了一个远程进程组,并将其连接到ConcumeMQTT处理器。远程进程组负责通过站点到站点协议将数据推送到NiFi。


我们可以将此数据流另存为模板,然后将其下载为XML文件。让我们将此文件命名为config.xml 。现在,我们可以使用转换器工具包将此模板从XML转换为YAML,MiNiFi代理使用该模板:

<MINIFI_TOOLKIT_HOME>/bin/config.bat transform config.xml config.yml

这将为我们提供config.yml文件,我们必须在其中手动添加NiFi服务器的主机和端口:

 Input Ports:

 - id: 19442f9d-aead-3569-b94c-1ad397e8291c

 name: From MiNiFi

 comment: ''

 max concurrent tasks: 1

 use compression: false

 Properties: # Deviates from spec and will later be removed when this is autonegotiated

 Port: 1026

 Host Name: localhost

现在,我们可以将该文件放置在目录<MINIFI_HOME> / conf中,替换那里可能已经存在的文件。此后,我们将不得不重新启动MiNiFi代理。


在这里,我们正在做很多手动工作来创建数据流并在MiNiFi代理中进行配置。对于在远程位置可能存在数百个代理的现实生活场景,这是不切实际的。但是,正如我们前面所看到的,我们可以使用MiNiFi C2服务器来实现此自动化。但这不在本教程的范围之内。


4.6。测试数据管道

最后,我们准备测试数据管道!由于我们没有使用真实传感器的自由,因此我们将创建一个小型仿真。我们将使用一个小的Java程序生成传感器数据

class Sensor implements Callable<Boolean> {

 String city;

 String station;

 String pollutant;

 String topic;

 Sensor(String city, String station, String pollutant, String topic) { this.city = city; this.station = station; this.pollutant = pollutant; this.topic = topic;

 } @Override

 public Boolean call() throws Exception {

 MqttClient publisher = new MqttClient( "tcp://localhost:1883", UUID.randomUUID().toString());

 MqttConnectOptions options = new MqttConnectOptions();

 options.setAutomaticReconnect(true);

 options.setCleanSession(true);

 options.setConnectionTimeout(10);

 publisher.connect(options);

 IntStream.range(0, 10).forEach(i -> {

 String payload = String.format("%1$s,city=%2$s,station=%3$s value=%4$04.2f",

 pollutant,

 city,

 station,

 ThreadLocalRandom.current().nextDouble(0, 100));

 MqttMessage message = new MqttMessage(payload.getBytes());

 message.setQos(0);

 message.setRetained(true); try {

 publisher.publish(topic, message);

 Thread.sleep(1000);

 } catch (MqttException | InterruptedException e) {

 e.printStackTrace();

 }

 }); return true;

 }

 }

在这里,我们使用Eclipse Paho Java客户端来生成到MQTT代理的消息。我们可以根据需要添加任意数量的传感器来创建仿真:

ExecutorService executorService = Executors.newCachedThreadPool();

 List<Callable<Boolean>> sensors = Arrays.asList( new Simulation.Sensor("london", "central", "ozone", "air-quality/ozone"), new Simulation.Sensor("london", "central", "co", "air-quality/co"), new Simulation.Sensor("london", "central", "so2", "air-quality/so2"), new Simulation.Sensor("london", "central", "no2", "air-quality/no2"), new Simulation.Sensor("london", "central", "aerosols", "air-quality/aerosols"));

 List<Future<Boolean>> futures = executorService.invokeAll(sensors);

如果一切正常,我们将能够在InfluxDB数据库中查询数据:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-11 (1).jpg

例如,我们可以在数据库“ airquality”中看到属于测量“臭氧”的所有点。

5.结论

综上所述,我们在本教程中介绍了一个基本的IoT用例。我们还了解了如何使用MQTT,NiFi和InfluxDB之类的工具来构建可扩展的数据管道。当然,这并不涵盖物联网应用程序的全部范围,并且扩展数据分析管道的可能性是无限的。


此外,我们在本教程中选择的示例仅用于演示目的。物联网应用程序的实际基础架构和架构可能千差万别。此外,我们可以通过将可操作的见解作为命令向后推来完成反馈周期。


0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *