S3 Standard Ingest Template

Problem

You would like to ingest data from a S3 data source into Hive tables backed by S3 external folders without the data files traveling through the NiFi edge nodes.

Introduction

The Data Ingest S3 template is a variation of the standard Data Ingest template within Kylo. The standard template utilizes HDFS backed hive tables, accepts inputs from local files, and is designed to run on a Cloudera or Hortonworks sandbox. By contrast, the Data Ingest S3 template utilizes S3 backed hive tables, accepts inputs from an S3 bucket and is designed for use on an AWS stack utilizing EC2 and EMR. Additionally the template has improved performance in that data on s3 is not brought into the Nifi node.i In order to accommodate these changes, the ExecuteHQLStatement processor has been updated and a new processor, CreateElasticsearchBackedHiveTable, has been created.

1. S3 Data Ingest Template Overview

The template has two parts. The first is a non-reusable part that is created for each feed. This is responsible for getting the input location of the data in S3 as well as setting properties that will be used by the reusable portion of the template. The second is the reusable template. The reusable template creates the hive tables. It also merges, validates, profiles, and indexes the data.

The template is very similar to the HDFS standard ingestion template. The differences are outlined in the following sections.

1.1 Template processors pull defaults from application.properties

Creating feeds from the S3 template is simplified by adding default values into Kylo’s /opt/kylo/kylo-services/conf/application.properties.

config.s3ingest.s3.protocol
The protocol to use for your system. e.g. The hortonworks sandbox typically uses “s3a”, EMR using an EMRFS may use “s3”
config.s3ingest.es.jar_url
The location of the elasticsearch-hadoop jar. Use an S3 location accessible to the cluster.
config.s3ingest.apach-commons.jar_url
The location of the commons-httpclient-3.1.jar. Use an S3 location accessible to the cluster.
config.s3ingest.hiveBucket
This property is the name output bucket where the data ends up. Hive will generate the folder structure within it. Note: This bucket must have something in it. Hive cannot create folders within an empty S3 bucket.
config.s3ingest.es.nodes
A comma separated list of Elasticsearch nodes that will be connected to.

For Example settings see below.

1.2 Non-reusable portion of template

1.2.1 List S3

Rather than fetching the data and bringing it into the Nifi node the first few properties get the location of the input data and pass the data location to subsequent processors.

Bucket
This is the S3 bucket where the input data is located. Note: The data files should be in a folder at the root level of the bucket.
Region
The region of the input S3 bucket.
Prefix
The “path” or “sub directory” within the bucket that will receive input files. Be sure the value ends with a trailing slash.

1.2.2 Initialize Feed Parameters

Just like in the Standard ingestion template, this processor sets the attributes that will be used by the reusable portion of the template. There are several parameters that have been added to accommodate changes made to the template for S3 integration:

InputFolderName:=<the path portion of the filename>
The input folder name will be used by the create feed partition processor in the reusable flow.
s3ingest.apache-commons.jar_url:=${config.s3ingest.apache-commons.jar_url}
The location of the commons-httpclient.jar. Use an S3 location accessible to the cluster.
s3ingest.es.jar_url:=${config.s3ingest.es.jar_url}
The location of the elasticsearch-hadoop.jar. Use an S3 location accessible to the cluster.
s3ingest.hiveBucket:=${config.3ingest.hiveBucket}
This property is the name output bucket where the data ends up. Hive will generate the folder structures within it. Note: Hive cannot create folders into a fresh bucket that has not had objects written to it before. Prime the pump on new S3 buckets by uploading and deleting a file.
s3ingest.es.nodes:=${config.s3ingest.es.nodes}
The comma separated list of node names for your elasticsearch nodes.
s3ingest.s3.protocol:=${config.s3ingest.s3.protocol}
The protocol your cluster will use to access the S3 bucket. (e.g. ‘s3a’)

1.2.3 DropInvalidFlowFile

When ListS3 scans a bucket, the first time it sees an object that represents the folder you specified in the Prefix it creates a flow file. Since this flow file is not a data file it will not process correctly in the flow and should be removed.

1.2.4 Initialize Cleanup Parameters

The clean up flow needs to know the name of the Hive bucket in order to clean it so the s3ingest.hiveBucket property has been added to this processor.

1.3 Reusable portion of Template

1.3.1 Register Tables

This processor creates S3 backed hive tables for storing valid, invalid, feed, profile, and master data. Feed Root Path, Profile Root Path, and Master Root Path define the location of their respective tables. Each of these properties will use the protocol you specified in s3ingest.protocol (s3, s3n, or s3a). The protocol must be supported by you cluster distribution.

1.3.2 Route if Data to Create ES Table

This processor routes the flow to the CreateElastisearchBackedHiveTable processor if the metadata.table.fieldIndexString property has been set. Otherwise, the CreateElastisearchBackedHiveTable processor is skipped.

1.3.3 CreateElasticsearchBackedHiveTable

This processor creates an elasticsearch backed hive table for indexing data that will be searchable from with in the Kylo UI. A description of this processor and it’s properties can be found here: CreateElasticsearchBackedHiveTable Create Feed Partition In the statement for this processor the protocol for the s3 location may need to be updatad to use a protocol supported by the distribution being used.

1.3.4 Set Feed Defaults

The following property has been modified:

filename
The filename property will later be used by Failed Flow processor when the flowfile is placed into the temp location. Since filename coming from S3List in the feed flow includes path information, it is stripped of that here.

1.3.5 Create Feed Partition

The ALTER TABLE statement has been modified to include the InputFolderName

1.3.6 ExecuteHQLStatement

We have updated the ExecuteHQLStatement processor to run Hive statements they just need to be separated by a semi-colon (”;”). This allows us to add the elasticsearch-hadoop jar using the config.s3ingest.es.jar_url property. This particular processor inserts the data to be indexed into the elasticsearch backed hive table. It executes the following statements:

ADD JAR ${config.s3ingest.es.jar_url};
ADD JAR ${config.s3ingest.apache-commons.jar_url};
INSERT INTO TABLE ${category}.${feed}_index SELECT ${metadata.table.fieldIndexString},processing_dttm FROM ${category}.${feed}_valid

1.3.5 Merge Table

The Merge Table processor will merge the incoming data with the master table, based on the merge strategy you choose.

1.3.4.1 Sync Merge Strategy

If you encounter an error similar to:

2017-06-21 20:50:42,430 ERROR [Timer-Driven Process Thread-4] c.t.ingest.TableMergeSyncSupport Failed to execute alter table `category_name`.`feed_name_1498078145646` RENAME TO `catgeory_name`.`feed_name` with error
java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. Alter Table operation for <category_name>.<feed_name>_1498078145646 failed to move data due to: 'Renaming s3a://${hiveS3Bucket}/${hive.root.master}/<category_name>/<feed_name>_1498078145646 to s3a://hiveS3Bucket/${hive.metastore.warehouse.dir}/${category_name}.db/<feed_name> failed' See hive log file for details.

Note that hive.root.master is a feed property and that hive.metastore.warehouse.dir is a property from your hive-site.xml. In versions of Hive prior to 2.2.0 the HDFS location of a managed table, with a LOCATION clause, will be moved and that Hive derives the new location using the hive.metastore.warehouse.dir and the schema_name with a .db suffix. Be sure that you have set the properties mapred.input.dir.recursive=true and hive.mapred.supports.subdirectories=true in your hive-site.xml.

1.3.6 DeleteS3Object

This processor replaces the RemoveHDFSFolder processor in standard ingest. It is analgous in that it takes the attributes from earlier in the flow and uses them to calculate the objects in the S3bucket that need to be removed and performs the delete operation.

2. Sandbox Walk-Through

2.1 Prerequisites

Download the required JARS for Hive to write table data to ElasticSearch. Using the links below find the jars need and place them in a folder within your hive bucket (or other S3 bucket). Make them public. In the end you should have jars available in S3 and the following commands should produce a good result:

aws s3 ls s3://hive-bucket/jars/elasticsearch-hadoop-5.4.0.jar
aws s3 ls s3://hive-bucket/jars/commons-httpclient-3.1.jar

2.2 Launch an EC2 instance using the Sandbox AMI

The S3 template was developed using the 0.8.1 sandbox but relies on code changes to be released in the 0.8.2 release. Go to AWS Market place and find the 0.8.2 or later sandbox for your region and launch the instance. Wait 15 minutes or more for nifi service and kylo services to start. Now shut down Nifi so we can change cluster configs and will need to refresh the NiFi connections to the cluster. Shut down Kylo so we change the application properties later.
service nifi stop
/opt/kylo/stop-kylo-apps.sh

2.3 Configuring core-site.xml and hive-site.xml

In the core-site.xml where your data is to be processed make sure that your fs.s3 properties are set.

Note

  • for s3 use fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey
  • for s3n use fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey
  • for s3a use fs.s3a.access.key and fs.s3a.secret.key

Depending on what distribution you are using the supported protocol may be different (s3, s3n) in which case you would need to use the equivalent property for that protocol. Import the template using kylo-ui making sure to import the reusable portion as well as overwriting any previous versions of the template.

Warning

There are times when AWS SDK will consult the ‘s3’ properties for the keys, regardless of the protocol you use. To work around the problem define s3 properties in addition to your protocol properties.

Open Ambari and go to HDFS -> Configs -> Advanced -> Custom core-site section. Add the fs.s3a access properties.

fs.s3.awsAccessKeyId=XXX
fs.s3.awsSecretAccessKey=YYY
fs.s3a.access.key=XXX
fs.s3a.secret.key=YYY

Go to Hive -> Configs -> Advanced -> Custom hive-site section. Add the mapred.input.dir.recursive and hive.mapred.supports.subdirectories properties.

mapred.input.dir.recursive=true
hive.mapred.supports.subdirectories=true

Stop all services in the cluster. Start all services.

2.4 Get Nifi Ready

service nifi start

Go into Nifi UI and open up the Process Group Configuration and create a new AWSCredentialsProviderControllerService under the Controller Services tab. This service will be utilized by the various S3 processors to access the configured S3 buckets. Add your Access Key and Secret Key to the named parameters.

2.5 Get Kylo Ready

Edit /opt/kylo/kylo-services/conf/elasticsearch.properties and edit your settings.

Change elasticsearch.host to be same as your host in use by the template, if not already done. e.g.

search.host=localhost
search.clusterName=demo-cluster

Edit /opt/kylo/kylo-services/conf/application.properties and edit your settings. Append your template defaults. Example settings:

config.s3ingest.s3.protocol=s3a
config.s3ingest.hiveBucket=hive-bucket
config.s3ingest.es.jar_url=s3a://hive-bucket/jars/elasticsearch-hadoop-5.4.0.jar
config.s3ingest.apache-commons.jar_url=s3a://hive-bucket/jars/commons-httpclient-3.1.jar
config.s3ingest.es.nodes=localhost

Start Kylo

/opt/kylo/start-kylo-apps.sh

2.6 Import the Template

Go to Admin -> Templates section of Kylo. Import the ‘S3 Data Ingest’ bundle from the kylo source repo path: samples/templates/nifi-1.0/s3_data_ingest.template.zip

2.7 Create the Data Ingest Feed

Create a category called “S3 Feeds” to place your new feed. Create a feed and provide the following feed inputs:

Bucket
This is the name of your S3 bucket for input data. e.g. “myInputBucket”
Region
This is the region where your servers operate. e.g. us-east-1
s3ingest.hiveBucket
This is the name of your S3 bucket for the various hive tables e.g. “myHiveBucket”. It appears twice as it will be initilaized for the feed flow and the cleanup flow. It should be defaulted to the value you set in application.properties.
prefix
This is the folder in the S3 input bucket to search for input files. The default bucket will look in a folder with the same system name as the feed you are creating: “${metadata.systemFeedName}/”

2.8 Test the Feed

Put a data file in your input bucket. Check Kylo to ensure your feed ran successfully!