IT-blog

This article is about our attempt to load 15,000 production schedules using Apache NIFI and the results we achieved.

A bit about Apache NIFI

In the era of import substitution and the rapid development of open-source solutions in the field of data processing and ETL & ELT processes, Apache NIFI has firmly established itself in the arsenal of import-substituted technology stack.

This tool, due to its ease of learning and a low entry barrier for developers of data pipelines and analysts, has proven itself as the main ETL tool for many companies. However, when faced with the practical use of this technology, especially when dealing with tasks involving the processing of a large number of files and data, one can keenly feel the need to understand the internal workings of NIFI and the pressing need for pipeline optimizations.

Only when dealing with complex data processing cases, such as filtering, aggregation, parsing each line, and transforming semi-structured or unstructured data into structured form, it becomes distinctly clear that Apache NIFI is not a simple tool for ETL implementation and can be quite demanding on developers' nerves.

Application

In our practice, we have gained decent experience in using Apache NIFI for various domains and different data volumes. We have successfully applied NIFI for processing mobile data (where true Big Data comes into play), handling user events from a website (not a large number of events, but an interesting case), and now we are faced with a more complex task – processing production schedules in Excel format, transforming them, and loading them into a database.

Problem Statement

Our task was to quickly load 15,000 Excel files containing production schedules accumulated over two years into a PostgreSQL database. These 15,000 files were divided into 5 different types of production schedules, each with its specific characteristics that needed to be considered.

This led to the creation of 5 different pipelines for the different production schedules. In general, the pipelines были одинаковы по количеству процессоров и их настройке, кроме процессора for the Get File processor, where each pipeline had its own input directory and Replace Text processor. The Replace Text processor was used to replace column names in the file for the correct insertion of data into the PostgreSQL database fields.

After development, debugging, testing, and running various pipelines, the NIFI developer had to restart NIFI more than 30 times due to various performance and memory issues in NIFI. But more on that later.

Challenges

The challenges our team faced included sudden errors related to Java Heap Space. In our opinion, it could have been expected that NIFI would struggle with such a large amount of data, and it was necessary to decompose the task in some way. In practice, NIFI was unable to handle the large amount of data and constantly crashed.

Additionally, the files contained characters that were not suitable for processing and needed to be replaced.

Implementation

Initially, the pipelines consisted of 9 processors:

1) Get File

This processor is used to read files from the specified directory and delete them afterward. It is recommended to enable file deletion only after testing the entire pipeline. In our pipeline, we configured file deletion only after they were successfully written to the database.

2) Convert Excel to Csv

To work with the Excel-format files that were read, we needed to convert them to CSV format. If the data file stream is continuous, it is recommended to set a file reading interval of 4 seconds or more, depending on the case, to read files in one batch and load them into NIFI's memory for further processing.

3) Split Text

To later load the data into the database, we needed to split the file into rows while preserving the first row (header). This allows us to create the necessary SQL query for the database.

4) Convert Record

After Split Text, we obtained files in txt/csv format, and to avoid file format errors in the Update Record processor, we used Convert Record to convert text/csv to proper CSV format.

5) Update Attribute

This processor was used to enrich the data with a technical date-time field called extract time for each row, which was then saved as a variable.

6) Replace Text

The first Replace Text processor added a new column called extract time.

7) Replace Text

The second Replace Text processor directly added the value of the extract time.

8) Update Record

This processor was used to correct the date to the required format, based on the data model requirements in the database.

9) Convert Record

The modified, corrected, and enriched CSV file is converted into JSON format for further processing.

10) Convert Json to SQL

The JSON file is converted into an SQL insert query. It's important to have a database table ready for loading, and the keys in the JSON file must match the columns in this table.

11) Put SQL

Executes the SQL insert query to insert data into the database.


Experiments

The NIFI assembly was running from a Docker container, and we forgot to mount the volume for data. As a result, the local files on the machine with production plans were not visible in the container. We had to redeploy NIFI from scratch, this time with the volume properly mounted. Afterward, the Get File processor was able to see all the local files.

Next, in one of the pipelines, we discovered an issue with the indexes in the Convert Record processor. This happened because the value separator in Convert Excel to Csv was a comma, and the data also contained commas. This obviously led to incorrect interpretation of the comma in NIFI. While it could have been anticipated, you never really expect commas in your files. However, as practical experience from various projects shows, user-generated content can contain absolutely anything, including characters that may interfere with file processing.

We solved this problem by replacing the comma with a tilde "~" in Convert Excel to Csv. We also created two additional CSV reading and writing services with a tilde delimiter.

Even after replacing the comma with a tilde, we started to get index errors in Convert Record, and this time the error occurred because one cell contained two lines.

We solved this problem by adding another Replace Text processor and using regular expressions to find '\n' where we first found the end of the first line and then the cell itself. We replaced them with semicolons.

We resolved the index issues, but suddenly a new error appeared: "Cannot create PoolableConnectionFactory," and the Put SQL processor stopped loading data into PostgreSQL. It turned out that the PostgreSQL database had crashed, and it was also deployed in a container. We had to redeploy a new container (fortunately, all the data was mounted via volume) and connect the containers to the same network, after which the containers could see each other again.

Finally, everything was configured, and in theory, everything should have worked smoothly. However, theory is theory, and it rarely works right away, especially when it comes to NIFI, where there are many nuances and pitfalls.

On practice, we encountered a new memory error, specifically the "JAVA HEAP SPACE OutOfMemory error." Yes, that classic error loved by many programmers, where it's sometimes unclear what could fill up all the memory.

We thought something was wrong with the JAVA HEAP, so we allocated 4 GB (by the way, the machine had 16 GB, but it was also running PostgreSQL and Grafana). However, the crashes continued without any improvement.

We started digging deeper, trying to understand the intricacies of processor operations because we realized that the problem lay with one of them. First, we decided to check the Split Text processor.

Let's delve into how this processor works:

If you have a file with 20,000 lines and the Line Split Count parameter is set to 1, Split Text will split one file into 20,000 files. We added two more Split Text processors, where the Line Split Count was set to 10,000 in the first one and 2,000 in the second one. As a result, the file with 2,000 lines was split into individual files, each with 1 line. This resolved the OutOfMemory issue.

This solution was temporary, and because we were processing a large amount of data (as five pipelines were concurrently working to load the entire history of 15,000 files), the data was accumulating in the processors and not being processed. Consequently, the error "Flow file Repository failed to update" appeared, and, as tradition dictated, NIFI crashed again. This error persisted even when two pipelines were running in parallel. With the deadline for project delivery approaching, we decided to deploy a separate container for each pipeline.

However, this solution was also temporary because we were processing a massive amount of data—300,000 lines per second. A new error related to data appeared: "GC overhead limit exceeded." After analyzing the problem, we found that while one processor was processing one file, another 10 were coming in. To address this issue, we first calculated how many seconds each pipeline took to process one file until "Total queued data" reached "0/0 bytes," plus an additional 2-5 seconds depending on the file's size. We then adjusted the Get File interval based on the calculated time.

Result

As a result, we successfully loaded all 15,000 files, but the pipelines underwent changes:

1) Get File
2) Convert Excel to Csv
3) Split Text
4) Split Text
5) Split Text
6) Convert Record
7) Update Attribute
8) Replace Text
9) Replace Text
10) Replace Text
11) Update Record
12) Convert Record
13) Convert Json to SQL
14) Put SQL

Conclusion

Apache NiFi is a powerful Big Data tool for processing, enriching, orchestrating, and transporting data. The tool offers a wide range of built-in processors that data engineers can use to configure almost any data processing business logic within a visual pipeline design interface.

However, non-standard data processing cases, such as the presence of certain characters in the data, improperly filled records in the source, missing necessary spaces, and so on, along with the simultaneous loading of large volumes of data and the lack of data structuring, require special knowledge and a deep understanding of how Apache NiFi works. Industrial solutions usually demand such in-depth knowledge because data sources often present problems with extraneous, incorrect characters and large volumes of data.

This raises the valid question of whether to master all the nuances of working with NiFi in practice or to choose another data processing tool. To many developers, even writing pure Python may seem more appealing and enjoyable, as it offers more flexibility and virtually unlimited functionality. It's also important to consider project timelines, the expertise available within the team, development costs, the complexity of supporting the solution, and project risks. By weighing the pros and cons of this approach, you can make an informed decision about whether or not to adopt NiFi.