• notice
  • Congratulations on the launch of the Sought Tech site

IoT data pipeline with MQTT, NiFi and InfluxDB

1 Introduction

In this tutorial, we will learn what needs to be done when creating a data pipeline for IoT applications.

In this process, we will understand the characteristics of IoT architecture and learn how to use different tools such as MQTT Broker, NiFi and InfluxDB to build highly scalable data pipelines for IoT applications.

2.Internet of Things and its architecture

First, let us study some basic concepts and understand the general architecture of IoT applications.

2.1. What is the Internet of Things?

The Internet of Things (IoT) generally refers to a network of physical objects , called "things". For example, things may include anything from ordinary household items (such as light bulbs) to complex industrial equipment. Through this network, we can connect various sensors and actuators to the Internet to exchange data:

iot-data-pipeline-with-mqtt-nifi-and-influxdb.jpg

Now, we can deploy things in very different environments -for example, the environment can be our home, or it can be a completely different thing, such as a moving cargo truck. However, we cannot actually make any assumptions about the quality of the power supply and network available for these devices. Therefore, this places unique requirements on IoT applications.

2.2. Introduction to IoT architecture

A typical IoT architecture usually constructs itself into four different layers. Let us understand how data actually flows through these layers:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-1.jpg

First, the sensing layer is mainly composed of sensors that collect measurements from the environment. Then, the network layer helps to aggregate the raw data and send it over the Internet for processing. In addition, the data processing layer filters the raw data and generates early analysis. Finally, the application layer uses powerful data processing functions to perform more in-depth data analysis and management.

3.Introduction to MQTT, NiFi and InfluxDB

Now, let's check some of the products that we use extensively in IoT settings today. These all provide some unique features that make it suitable for the data needs of IoT applications.

3.1. MQTT

Message Queuing Telemetry Transmission (MQTT) is a lightweight publish-subscribe network protocol . It is now OASIS and ISO standards . IBM originally developed it to transfer messages between devices. MQTT is suitable for limited environments where memory, network bandwidth and power are scarce.

MQTT follows a client-server model , where different components can act as clients and connect to the server via TCP. We know that the server is an MQTT broker. Clients can publish messages to addresses called topics. They can also subscribe to topics and receive all messages published to that topic.

In a typical IoT setup, the sensor can publish measurement results such as temperature to the MQTT agent, and the upstream data processing system can subscribe to the following topics to receive data:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-2.jpg

As we have seen, the topics in MQTT are hierarchical. The system can easily subscribe to the entire topic hierarchy using wildcards.

MQTT supports three levels of quality of service (QoS) . These are "Delivered at most once", "Delivered at least once" and "Delivered exactly once". QoS defines the level of agreement between the client and the server. Each customer can choose the service level that suits their environment.

The client can also request the agent to keep the message when it is published. In some settings, the MQTT broker may require the client to provide user name and password authentication before it can connect. In addition, for privacy considerations, you can use SSL/TLS to encrypt TCP connections.

There are several MQTT broker implementations and client libraries available-for example, HiveMQ , Mosquitto and Paho MQTT . In the examples in this tutorial, we will use Mosquitto. Mosquitto is part of the Eclipse Foundation, and we can easily install it on boards like Raspberry Pi or Arduino.

3.2. Apache NiFi

Apache NiFi was originally developed by NSA as NiagaraFiles. It promotes the automation and management of data flows between systems, and is based on a flow-based programming model that defines applications as a network of black box processes.

Let us first understand some basic concepts. Objects that move through the system in NiFi are called FlowFiles. The FlowFile processor actually performs useful work, such as routing, conversion, and mediation of FlowFiles. The FlowFile processor is connected to Connections.

Process group is a mechanism for grouping components together to organize data flow in NiFi. The process group can receive data through the input port, and can send data through the output port. Remote Process Group (RPG) provides a mechanism to send data to or receive data from a remote instance of NiFi.

Now, with this knowledge, let's take a look at the NiFi architecture:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-3.jpg

NiFi is a Java-based program that can run multiple components in the JVM. The web server is the component that hosts the command and control API. Flow Controller is the core component of NiFi, and it manages the schedule of when extensions will receive execution resources. Expansion allows NiFi to expand and support integration with different systems.

NiFi will track the status of FlowFile in the FlowFile database. The actual content bytes of FlowFile are located in the content repository. Finally, the provenance event data related to FlowFile is located in the provenance library.

Since collecting data from the source may require a smaller footprint and lower resource consumption, NiFi has a sub-project called MiNiFi . MiNiFi provides a complementary data collection method for NiFi and integrates easily with NiFi through a site-to-site (S2S) protocol:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-4.jpg

In addition, it can also centrally manage agents through the MiNiFi command and control (C2) protocol. In addition, it helps establish data provenance by generating a complete chain of regulatory information.

3.3. InfluxDB

InfluxDB is a time series database written in Go , developed by InfluxData . It is designed for fast and highly available time series data storage and retrieval. This is particularly suitable for processing application metrics, IoT sensor data and real-time analysis.

First, the data in InfluxDB is organized in time series. The time series can contain zero or more points. The dot represents a single data record with four components -measurement, tag set, field set, and timestamp:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-5.jpg

First, the timestamp shows the UTC date and time associated with a specific point . The field set consists of one or more field key and field value pairs. They use dot markers to capture actual data. Similarly, the tag set consists of tag key and tag value pairs, but they are optional. They basically serve as point metadata and can be indexed for faster query responses.

The metric acts as a container for tag sets, field sets, and timestamps. In addition, each point in InfluxDB can have a retention policy associated with it. The retention policy describes how long InfluxDB will retain data and how many copies it will create through replication.

Finally, the database acts as a logical container for users, retention policies, continuous queries, and time series data . We can understand that the database in InfluxDB is roughly similar to a traditional relational database.

In addition, InfluxDB is part of the InfluxData platform, which provides several other products to efficiently process time series data. InfluxData now provides it as an open source platform InfluxDB OSS 2.0 and commercial product InfluxDB Cloud:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-6.jpg

In addition to InfluxDB, the platform also includes Chronograf , which provides a complete interface for the InfluxData platform. In addition, it also includes Telegraf , which is an agent for collecting and reporting metrics and events. Finally, there is a real-time streaming data processing engine Kapacitor.

4.Hands-on practice of IoT data pipeline

Now, we have covered enough basic knowledge to use these products together to create data pipelines for our IoT applications. We assume that this tutorial collects air quality-related measurements from multiple observatories in multiple cities For example, measurements include ground-level ozone, carbon monoxide, sulfur dioxide, nitrogen dioxide, and aerosols.

4.1. Set up the infrastructure

First, we assume that every weather station in the city is equipped with all sensing equipment. In addition, these sensors are connected to boards such as Raspberry Pi to collect analog data and digitize it . The evaluation board is connected to a wireless device to send raw measurement results upstream:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-7.jpg

The regional control station collects data from all weather stations in the city. We can aggregate this data and provide it to some local analysis engines to gain insights faster. The filtered data from all regional control centers is sent to the central command center, which is mainly hosted in the cloud.

4.2. Create an IoT architecture

Now, we are ready to design an IoT architecture for simple air quality applications. We will use MQTT agent, MiNiFi Java agent, NiFi and InfluxDB here:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-8.jpg

As we can see, we are using Mosquitto MQTT agent and MiNiFi Java agent on the weather station site . In the regional control center, we are using NiFi servers to aggregate and route data. Finally, we use InfluxDB to store metrics at the command center level.

4.3. Perform installation

It is very easy to install Mosquitto MQTT agent and MiNiFi Java agent on boards like Raspberry Pi. However, for this tutorial, we will install them on the local computer .

The official download page of Eclipse Mosquito provides binaries for multiple platforms. After installation, starting Mosquitto from the installation directory is very simple:

net start mosquitto

In addition, NiFi binary files can also be downloaded from its official website. We must extract the downloaded archive into a suitable directory. Since MiNiFi will use the inter-site protocol to connect to NiFi, we must specify the inter-site input socket port in <NIFI_HOME> /conf/nifi.properties :

# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=false
nifi.remote.input.socket.port=1026
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Then, we can start NiFi:

<NIFI_HOME>/bin/run-nifi.bat

Similarly, the Java or C++ MiNiFi agent and toolkit binary files can be downloaded from the official website Similarly, we must extract the archive into a suitable directory.

By default, MiNiFi is equipped with a small number of processors . Since we will use the data in MQTT, the MQTT processor must be copied to the <MINIFI_HOME>/lib directory. These files are packaged as NiFi archive (NAR) files and can be located in the <NIFI_HOME>/lib directory:

COPY <NIFI_HOME>/lib/nifi-mqtt-nar-xxxnar <MINIFI_HOME>/lib/nifi-mqtt-nar-xxxnar

Then, we can start the MiNiFi agent:

<MINIFI_HOME>/bin/run-minifi.bat

Finally, we can download the open source version from the official site of InfluxDB As before, we can extract the archive and start InfluxDB with a simple command:

<INFLUXDB_HOME>/influxd.exe

We should keep all other configurations (including ports) as the default configuration for this tutorial. At this point, the installation and setup on our local computer is over.

4.4. Define NiFi data flow

Now, we are ready to define the data flow. NiFi provides an easy-to-use interface to create and monitor data streams . It can be accessed through the URL http://localhost:8080/nifi.

First, we will define the main data flow that will run on the NiFi server:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-9.jpg

As we can see, here, we define an input port, which will receive data from MiNiFi agents. It also sends data through the connection to the PutInfluxDBprocessor responsible for storing the data in InfluxDB In the configuration of this processor, we define the connection URL of InfluxDB and the name of the database in which data will be sent.

4.5. Define MiNiFi data flow

Next, we will define the data flow that will run on the MiNiFi agent. We will use the same user interface as NiFi and export the data stream as a template for configuration in the MiNiFi agent . Let's define the data flow for the MiNiFi agent:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-10.jpg

Here, we define the ConsumeMQTTprocessor , which is responsible for obtaining data from the MQTT broker. We provide proxy URI and topic filters in the properties. air-qualityExtract data from all topics defined under the level.

We also define a remote process group and connect it to the ConcumeMQTT processor. The remote process group is responsible for pushing data to NiFi through the site-to-site protocol .

We can save this data stream as a template and then download it as an XML file. Let's name this file config.xmlNow, we can use the converter toolkit to convert this template from XML to YAML, which is used by the MiNiFi agent:

<MINIFI_TOOLKIT_HOME>/bin/config.bat transform config.xml config.yml

This will provide us with the config.ymlfile where we have to manually add the host and port of the NiFi server:

 Input Ports:
- id: 19442f9d-aead-3569-b94c-1ad397e8291c
name: From MiNiFi
comment: ''
max concurrent tasks: 1
use compression: false
Properties: # Deviates from spec and will later be removed when this is autonegotiated
Port: 1026
Host Name: localhost

Now, we can place the file in the directory <MINIFI_HOME>/conf and replace the files that may already exist there. After that, we will have to restart the MiNiFi agent.

Here, we are doing a lot of manual work to create a data stream and configure it in the MiNiFi agent. For real life scenarios where there may be hundreds of agents in remote locations, this is impractical. However, as we saw earlier, we can use the MiNiFi C2 server to achieve this automation . But this is beyond the scope of this tutorial.

4.6. Test data pipeline

Finally, we are ready to test the data pipeline! Since we do not have the freedom to use real sensors, we will create a small simulation. We will use a small Java program to generate sensor data :

class Sensor implements Callable<Boolean> {
String city;
String station;
String pollutant;
String topic;
Sensor(String city, String station, String pollutant, String topic) {
this.city = city;
this.station = station;
this.pollutant = pollutant;
this.topic = topic;
}
@Override
public Boolean call() throws Exception {
MqttClient publisher = new MqttClient(
"tcp://localhost:1883", UUID.randomUUID().toString());
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
IntStream.range(0, 10).forEach(i -> {
String payload = String.format("%1$s,city=%2$s,station=%3$s value=%4$04.2f",
pollutant,
city,
station,
ThreadLocalRandom.current().nextDouble(0, 100));
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(0);
message.setRetained(true);
try {
publisher.publish(topic, message);
Thread.sleep(1000);
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
});
return true;
}
}

Here, we use the Eclipse Paho Java client to generate messages to the MQTT broker. We can add as many sensors as we need to create a simulation:

ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<Boolean>> sensors = Arrays.asList(
new Simulation.Sensor("london", "central", "ozone", "air-quality/ozone"),
new Simulation.Sensor("london", "central", "co", "air-quality/co"),
new Simulation.Sensor("london", "central", "so2", "air-quality/so2"),
new Simulation.Sensor("london", "central", "no2", "air-quality/no2"),
new Simulation.Sensor("london", "central", "aerosols", "air-quality/aerosols"));
List<Future<Boolean>> futures = executorService.invokeAll(sensors);

If everything is normal, we will be able to query data in the InfluxDB database:

iot-data-pipeline-with-mqtt-nifi-and-influxdb-11.jpg

For example, we can see all points belonging to the measurement "ozone" in the database "airquality".

5 Conclusion

In summary, we have introduced a basic IoT use case in this tutorial. We also learned how to use tools like MQTT, NiFi, and InfluxDB to build a scalable data pipeline. Of course, this does not cover the full scope of IoT applications, and the possibilities for expanding the data analysis pipeline are limitless.

Also, the examples we chose in this tutorial are for demonstration purposes only. The actual infrastructure and architecture of IoT applications can vary greatly. In addition, we can complete the feedback cycle by pushing actionable insights back as commands.


Tags

Technical otaku

Sought technology together

Related Topic

1 Comments

author

buy atorvastatin 40mg for sale & lt;a href="https://lipiws.top/"& gt;atorvastatin 40mg brand& lt;/a& gt; how to get atorvastatin without a preion

Blecsl

2024-03-09

Leave a Reply

+