Objectives
- Nifi log generator canvas generates fictional firewall logs.
- Use Nifi processor PutHiveStreaming to load directly into Hive
- Issue here is PutHiveStreaming requires AVRO file format
- So we could
- Ingest files
- Process incoming files changing delimiters from say “|” to “,”
- Infer the Avro schema format using the correct processor
- Then use ConvertCSVToAvro process
- Then use the output of the ConvertCSVToAVRO in the PutHiveStreaming process
- Reference https://community.hortonworks.com/articles/52856/stream-data-into-hive-like-a-king-using-nifi.html
Steps are below…
- Take an Input, and first perform a single string replace, replacing any column delimiters, with a processor to replace with desired delimiter i.e. replace “|” with “,”
Config of ReplaceText_pipe_for_comma
To test the search/replace is working, can route the output above to a PUTFILE processor just to check
So we can now review and ensure the file contains the correct delimiter, by viewing a file in the /tmp/nifi_local file system, which should now contain “,” delimited fields :-
We can now create the Avro Schema, by adding an “IngerAvroSchema” processor to the canvas
Note the config of the operator :-
The Schema is defined from the “CSV Header Definition” parameter i.e. columns in the source file.
We can now pass the successful content from the Avro Schema, to a “ConvertCSVTOAvro” processor
Config for the processor, note we use the value “${inferred.avro.schema}” parameter to pass the defined AVRO schema to the processor :-
Temporarily we can pass the output to the local file system, to verify the AVRO format has been formatted correctly, before passing to the Hive Streaming processor
At the command line, cd /tmp/nifi_avro, and view a sample file :-
Before setting up the stream to HIVE :-
- Check ACID ( transactions ) as been enabled in HIVE – check within Ambari
- Configure a ORC/Bucketed table has been created in HIVE :
drop table default.firewall_stream;
CREATE TABLE default.firewall_stream ( time STRING, ip STRING, country STRING ,status BIGINT )
CLUSTERED BY (ip) INTO 32 BUCKETS
STORED AS ORC TBLPROPERTIES (‘transactional’=’true’);
- You can use Ambari HIVE views to do this, or HIVE command line :-
- We can now connect the converted AVRO output to the PutHiveStreaming processor Config for the processor below :-
- With the config applied, and the NiFi flow running, we should now see useful information being streamed directly into the “firewall_stream” table, we can see this again via a SQL tool, HIVE command line, or Hive views in Ambari.
- Also means any tool requiring access to structured data via JDBC could use the HIVE store i.e. R scripts.
- Example using SQL tool ( SQL Developer )
- We can also view HIVE source files within Ambari File views, or directly from command line on the host