IoT data pipeline with MQTT, NiFi and InfluxDB
1 Introduction
In this tutorial, we will learn what needs to be done when creating a data pipeline for IoT applications.
In this process, we will understand the characteristics of IoT architecture and learn how to use different tools such as MQTT Broker, NiFi and InfluxDB to build highly scalable data pipelines for IoT applications.
2.Internet of Things and its architecture
First, let us study some basic concepts and understand the general architecture of IoT applications.
2.1. What is the Internet of Things?
The Internet of Things (IoT) generally refers to a network of physical objects , called "things". For example, things may include anything from ordinary household items (such as light bulbs) to complex industrial equipment. Through this network, we can connect various sensors and actuators to the Internet to exchange data:
Now, we can deploy things in very different environments -for example, the environment can be our home, or it can be a completely different thing, such as a moving cargo truck. However, we cannot actually make any assumptions about the quality of the power supply and network available for these devices. Therefore, this places unique requirements on IoT applications.
2.2. Introduction to IoT architecture
A typical IoT architecture usually constructs itself into four different layers. Let us understand how data actually flows through these layers:
First, the sensing layer is mainly composed of sensors that collect measurements from the environment. Then, the network layer helps to aggregate the raw data and send it over the Internet for processing. In addition, the data processing layer filters the raw data and generates early analysis. Finally, the application layer uses powerful data processing functions to perform more in-depth data analysis and management.
3.Introduction to MQTT, NiFi and InfluxDB
Now, let's check some of the products that we use extensively in IoT settings today. These all provide some unique features that make it suitable for the data needs of IoT applications.
3.1. MQTT
Message Queuing Telemetry Transmission (MQTT) is a lightweight publish-subscribe network protocol . It is now OASIS and ISO standards . IBM originally developed it to transfer messages between devices. MQTT is suitable for limited environments where memory, network bandwidth and power are scarce.
MQTT follows a client-server model , where different components can act as clients and connect to the server via TCP. We know that the server is an MQTT broker. Clients can publish messages to addresses called topics. They can also subscribe to topics and receive all messages published to that topic.
In a typical IoT setup, the sensor can publish measurement results such as temperature to the MQTT agent, and the upstream data processing system can subscribe to the following topics to receive data:
As we have seen, the topics in MQTT are hierarchical. The system can easily subscribe to the entire topic hierarchy using wildcards.
MQTT supports three levels of quality of service (QoS) . These are "Delivered at most once", "Delivered at least once" and "Delivered exactly once". QoS defines the level of agreement between the client and the server. Each customer can choose the service level that suits their environment.
The client can also request the agent to keep the message when it is published. In some settings, the MQTT broker may require the client to provide user name and password authentication before it can connect. In addition, for privacy considerations, you can use SSL/TLS to encrypt TCP connections.
There are several MQTT broker implementations and client libraries available-for example, HiveMQ , Mosquitto and Paho MQTT . In the examples in this tutorial, we will use Mosquitto. Mosquitto is part of the Eclipse Foundation, and we can easily install it on boards like Raspberry Pi or Arduino.
3.2. Apache NiFi
Apache NiFi was originally developed by NSA as NiagaraFiles. It promotes the automation and management of data flows between systems, and is based on a flow-based programming model that defines applications as a network of black box processes.
Let us first understand some basic concepts. Objects that move through the system in NiFi are called FlowFiles. The FlowFile processor actually performs useful work, such as routing, conversion, and mediation of FlowFiles. The FlowFile processor is connected to Connections.
Process group is a mechanism for grouping components together to organize data flow in NiFi. The process group can receive data through the input port, and can send data through the output port. Remote Process Group (RPG) provides a mechanism to send data to or receive data from a remote instance of NiFi.
Now, with this knowledge, let's take a look at the NiFi architecture:
NiFi is a Java-based program that can run multiple components in the JVM. The web server is the component that hosts the command and control API. Flow Controller is the core component of NiFi, and it manages the schedule of when extensions will receive execution resources. Expansion allows NiFi to expand and support integration with different systems.
NiFi will track the status of FlowFile in the FlowFile database. The actual content bytes of FlowFile are located in the content repository. Finally, the provenance event data related to FlowFile is located in the provenance library.
Since collecting data from the source may require a smaller footprint and lower resource consumption, NiFi has a sub-project called MiNiFi . MiNiFi provides a complementary data collection method for NiFi and integrates easily with NiFi through a site-to-site (S2S) protocol:
In addition, it can also centrally manage agents through the MiNiFi command and control (C2) protocol. In addition, it helps establish data provenance by generating a complete chain of regulatory information.
3.3. InfluxDB
InfluxDB is a time series database written in Go , developed by InfluxData . It is designed for fast and highly available time series data storage and retrieval. This is particularly suitable for processing application metrics, IoT sensor data and real-time analysis.
First, the data in InfluxDB is organized in time series. The time series can contain zero or more points. The dot represents a single data record with four components -measurement, tag set, field set, and timestamp:
First, the timestamp shows the UTC date and time associated with a specific point . The field set consists of one or more field key and field value pairs. They use dot markers to capture actual data. Similarly, the tag set consists of tag key and tag value pairs, but they are optional. They basically serve as point metadata and can be indexed for faster query responses.
The metric acts as a container for tag sets, field sets, and timestamps. In addition, each point in InfluxDB can have a retention policy associated with it. The retention policy describes how long InfluxDB will retain data and how many copies it will create through replication.
Finally, the database acts as a logical container for users, retention policies, continuous queries, and time series data . We can understand that the database in InfluxDB is roughly similar to a traditional relational database.
In addition, InfluxDB is part of the InfluxData platform, which provides several other products to efficiently process time series data. InfluxData now provides it as an open source platform InfluxDB OSS 2.0 and commercial product InfluxDB Cloud:
In addition to InfluxDB, the platform also includes Chronograf , which provides a complete interface for the InfluxData platform. In addition, it also includes Telegraf , which is an agent for collecting and reporting metrics and events. Finally, there is a real-time streaming data processing engine Kapacitor.
4.Hands-on practice of IoT data pipeline
Now, we have covered enough basic knowledge to use these products together to create data pipelines for our IoT applications. We assume that this tutorial collects air quality-related measurements from multiple observatories in multiple cities . For example, measurements include ground-level ozone, carbon monoxide, sulfur dioxide, nitrogen dioxide, and aerosols.
4.1. Set up the infrastructure
First, we assume that every weather station in the city is equipped with all sensing equipment. In addition, these sensors are connected to boards such as Raspberry Pi to collect analog data and digitize it . The evaluation board is connected to a wireless device to send raw measurement results upstream:
The regional control station collects data from all weather stations in the city. We can aggregate this data and provide it to some local analysis engines to gain insights faster. The filtered data from all regional control centers is sent to the central command center, which is mainly hosted in the cloud.
4.2. Create an IoT architecture
Now, we are ready to design an IoT architecture for simple air quality applications. We will use MQTT agent, MiNiFi Java agent, NiFi and InfluxDB here:
As we can see, we are using Mosquitto MQTT agent and MiNiFi Java agent on the weather station site . In the regional control center, we are using NiFi servers to aggregate and route data. Finally, we use InfluxDB to store metrics at the command center level.
4.3. Perform installation
It is very easy to install Mosquitto MQTT agent and MiNiFi Java agent on boards like Raspberry Pi. However, for this tutorial, we will install them on the local computer .
The official download page of Eclipse Mosquito provides binaries for multiple platforms. After installation, starting Mosquitto from the installation directory is very simple:
net start mosquitto
In addition, NiFi binary files can also be downloaded from its official website. We must extract the downloaded archive into a suitable directory. Since MiNiFi will use the inter-site protocol to connect to NiFi, we must specify the inter-site input socket port in <NIFI_HOME> /conf/nifi.properties :
# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=false
nifi.remote.input.socket.port=1026
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
Then, we can start NiFi:
<NIFI_HOME>/bin/run-nifi.bat
Similarly, the Java or C++ MiNiFi agent and toolkit binary files can be downloaded from the official website . Similarly, we must extract the archive into a suitable directory.
By default, MiNiFi is equipped with a small number of processors . Since we will use the data in MQTT, the MQTT processor must be copied to the <MINIFI_HOME>/lib directory. These files are packaged as NiFi archive (NAR) files and can be located in the <NIFI_HOME>/lib directory:
COPY <NIFI_HOME>/lib/nifi-mqtt-nar-xxxnar <MINIFI_HOME>/lib/nifi-mqtt-nar-xxxnar
Then, we can start the MiNiFi agent:
<MINIFI_HOME>/bin/run-minifi.bat
Finally, we can download the open source version from the official site of InfluxDB . As before, we can extract the archive and start InfluxDB with a simple command:
<INFLUXDB_HOME>/influxd.exe
We should keep all other configurations (including ports) as the default configuration for this tutorial. At this point, the installation and setup on our local computer is over.
4.4. Define NiFi data flow
Now, we are ready to define the data flow. NiFi provides an easy-to-use interface to create and monitor data streams . It can be accessed through the URL http://localhost:8080/nifi.
First, we will define the main data flow that will run on the NiFi server:
As we can see, here, we define an input port, which will receive data from MiNiFi agents. It also sends data through the connection to the PutInfluxDB
processor responsible for storing the data in InfluxDB . In the configuration of this processor, we define the connection URL of InfluxDB and the name of the database in which data will be sent.
4.5. Define MiNiFi data flow
Next, we will define the data flow that will run on the MiNiFi agent. We will use the same user interface as NiFi and export the data stream as a template for configuration in the MiNiFi agent . Let's define the data flow for the MiNiFi agent:
Here, we define the ConsumeMQTT
processor , which is responsible for obtaining data from the MQTT broker. We provide proxy URI and topic filters in the properties. air-quality
Extract data from all topics defined under the level.
We also define a remote process group and connect it to the ConcumeMQTT processor. The remote process group is responsible for pushing data to NiFi through the site-to-site protocol .
We can save this data stream as a template and then download it as an XML file. Let's name this file config.xml
. Now, we can use the converter toolkit to convert this template from XML to YAML, which is used by the MiNiFi agent:
<MINIFI_TOOLKIT_HOME>/bin/config.bat transform config.xml config.yml
This will provide us with the config.yml
file where we have to manually add the host and port of the NiFi server:
Input Ports:
- id: 19442f9d-aead-3569-b94c-1ad397e8291c
name: From MiNiFi
comment: ''
max concurrent tasks: 1
use compression: false
Properties: # Deviates from spec and will later be removed when this is autonegotiated
Port: 1026
Host Name: localhost
Now, we can place the file in the directory <MINIFI_HOME>/conf and replace the files that may already exist there. After that, we will have to restart the MiNiFi agent.
Here, we are doing a lot of manual work to create a data stream and configure it in the MiNiFi agent. For real life scenarios where there may be hundreds of agents in remote locations, this is impractical. However, as we saw earlier, we can use the MiNiFi C2 server to achieve this automation . But this is beyond the scope of this tutorial.
4.6. Test data pipeline
Finally, we are ready to test the data pipeline! Since we do not have the freedom to use real sensors, we will create a small simulation. We will use a small Java program to generate sensor data :
class Sensor implements Callable<Boolean> {
String city;
String station;
String pollutant;
String topic;
Sensor(String city, String station, String pollutant, String topic) {
this.city = city;
this.station = station;
this.pollutant = pollutant;
this.topic = topic;
}
@Override
public Boolean call() throws Exception {
MqttClient publisher = new MqttClient(
"tcp://localhost:1883", UUID.randomUUID().toString());
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
IntStream.range(0, 10).forEach(i -> {
String payload = String.format("%1$s,city=%2$s,station=%3$s value=%4$04.2f",
pollutant,
city,
station,
ThreadLocalRandom.current().nextDouble(0, 100));
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(0);
message.setRetained(true);
try {
publisher.publish(topic, message);
Thread.sleep(1000);
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
});
return true;
}
}
Here, we use the Eclipse Paho Java client to generate messages to the MQTT broker. We can add as many sensors as we need to create a simulation:
ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<Boolean>> sensors = Arrays.asList(
new Simulation.Sensor("london", "central", "ozone", "air-quality/ozone"),
new Simulation.Sensor("london", "central", "co", "air-quality/co"),
new Simulation.Sensor("london", "central", "so2", "air-quality/so2"),
new Simulation.Sensor("london", "central", "no2", "air-quality/no2"),
new Simulation.Sensor("london", "central", "aerosols", "air-quality/aerosols"));
List<Future<Boolean>> futures = executorService.invokeAll(sensors);
If everything is normal, we will be able to query data in the InfluxDB database:
For example, we can see all points belonging to the measurement "ozone" in the database "airquality".
5 Conclusion
In summary, we have introduced a basic IoT use case in this tutorial. We also learned how to use tools like MQTT, NiFi, and InfluxDB to build a scalable data pipeline. Of course, this does not cover the full scope of IoT applications, and the possibilities for expanding the data analysis pipeline are limitless.
Also, the examples we chose in this tutorial are for demonstration purposes only. The actual infrastructure and architecture of IoT applications can vary greatly. In addition, we can complete the feedback cycle by pushing actionable insights back as commands.
buy atorvastatin 40mg for sale & lt;a href="https://lipiws.top/"& gt;atorvastatin 40mg brand& lt;/a& gt; how to get atorvastatin without a preion
Blecsl
2024-03-09