Hadoop facilitates data warehousing infrastructure with Hive, although Hive is pretty stable distributed computing platform on Hadoop cluster, it's still in it's early stage of development and getting enriched every day with new features by a large open source community. Hence, several features used by the data scientists during data crunching may not yet be available as Hive in-built functions. Therefore, Hive user defined function (UDF) is an excellent component which allows developers to introduce any new function to the cluster as needed. Following article explains how Java or Python can be used as Hive UDF.
Name of the UDF: WEEKDAY
Name of the UDF: WEEKDAY
Purpose of Hive UDF: The UDF converts a date in the format (MM/DD/YYYY) to the corresponding day of the week. The default Hive implementation does not provide any such function or UDF. Hence, as custom UDF can be used when there is any such business need in the hive query.
e.g.
WEEKDAY(06/16/2014) -> Monday
WEEKDAY(06/17/2014) -> Tuesday
Java Code for the UDF:
package com.dev.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
*
* @author Surajit Paul
* @version 1.0
*/
public final class WeekDay extends UDF {
/**
*
* @param input
* @return
*/
public Text evaluate(Text input) {
if(input == null) return null;
String date = input.toString();
DateTimeFormatter formater = DateTimeFormat.forPattern("MM/dd/yyyy");
DateTime dt = formater.parseDateTime(date);
System.out.println(dt.toString());
int day = Integer.parseInt(dt.dayOfWeek().getAsString());
String dayText = this.getWeekDay(day);
System.out.println(dayText);
return new Text(dayText);
}
/**
*
* @param dayOfWeek
* @return - Function returns day of the week in text format
*/
private String getWeekDay(int dayOfWeek){
String dayName = null;
switch(dayOfWeek){
case 1: dayName = "Monday";
break;
case 2: dayName = "Tuesday";
break;
case 3: dayName = "Wednesday";
break;
case 4: dayName = "Thursday";
break;
case 5: dayName = "Friday";
break;
case 6: dayName = "Saturday";
break;
case 7: dayName = "Sunday";
break;
default: dayName = "Invalid Day";
break;
}
return dayName;
}
}
UDF Registration Process using Hive CLI:
hive>ADD JAR weekday.jar;
hive>CREATE TEMPORARY FUNCTION WEEKDAY AS 'com.dev.hive.WeekDay';
The above commands create a function called WEEKDAY inside the hive schema the commands are executed in.
UDF Usage:
hive>select WEEKDAY(asset_end_date), target, seat_segment, WEEKDAY(asset_create_date) from rpt_asset_registry limit 10;
hive>select WEEKDAY(asset_end_date), target, seat_segment, WEEKDAY(asset_create_date) from rpt_asset_registry limit 10;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1397885665200_2594, Tracking URL = http://10.37.68.217:9046/proxy/application_1397885665200_2594/
Kill Command = /home/hadoop/bin/hadoop job -kill job_1397885665200_2594
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 0
2014-06-16 18:47:16,805 Stage-1 map = 0%, reduce = 0%
2014-06-16 18:47:20,932 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.15 sec
2014-06-16 18:47:21,972 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.39 sec
2014-06-16 18:47:23,007 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.39 sec
MapReduce Total cumulative CPU time: 4 seconds 390 msec
Ended Job = job_1397885665200_2594
Counters:
MapReduce Jobs Launched:
Job 0: Map: 2 Cumulative CPU: 4.39 sec HDFS Read: 9456 HDFS Write: 470 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 390 msec
OK
Tuesday N 6+ Thursday
Wednesday Y 1-5 Friday
Thursday Y 1-5 Friday
Saturday N 1-5 Thursday
Friday N 1-5 Friday
Thursday N 1-5 Saturday
Monday PARTIAL 6+ Wednesday
Tuesday N 1-5 Thursday
Wednesday N 6+ Monday
Sunday N 6+ Tuesday
Time taken: 16.119 seconds, Fetched: 10 row(s)
Python script as Hive UDF
The purpose of introducing streaming UDF in Hadoop is to enhance the Hive functional capabilities using non-JVM languages such as R, Python scripting languages. Another advantage of having Python script as Hive UDF is, the data scientists can focus primarily on data crunching revealing various business insights instead of getting entangled with the complexity of programming languages.
We would use the following simple python script to explain various pieces of codes needed to plugin a python script as Hive UDF.
Python script - rpt.py
#!/usr/bin/python
import sys
from datetime import datetime
for line in sys.stdin.readlines():
boolVal = "false"
line = line.strip()
fields = line.split('\t')
asset_end_dt = datetime.strptime(fields[0], "%m/%d/%Y") asset_create_dt = datetime.strptime(fields[1], "%m/%d/%Y") if(asset_end_dt >= asset_create_dt) :
boolVal = "true"
else :
boolVal = "false"
print asset_end_dt,"\t",asset_create_dt,"\t",boolVal
Following Hive command can be used to add the python file:
hive>add file <file_path>../pypack/rpt.py;
It's essential to add the python file as resource to Hive cluster.
Hive script to read data from source hive table and load result set post processing with Python script in destination hive table:
create table dev_schema.rpt_asset_extract as
select TRANSFORM(asset_end_date, asset_create_date) USING 'rpt.py' AS asset_end_date, asset_create_date, end_prior_to_create from dev_schema.rpt_asset_registry;
1. The above script reads data from table dev_schema.rpt_asset_registry 2. The script loads columns - asset_end_date, asset_create_date and ensures if asset_end_date >= asset_create_date, and records the Boolean value in the result set. 3. Finally it creates table dev_schema.rpt_asset_extract with the output columns.
Python script - rpt.py
#!/usr/bin/python
import sys
from datetime import datetime
for line in sys.stdin.readlines():
boolVal = "false"
line = line.strip()
fields = line.split('\t')
asset_end_dt = datetime.strptime(fields[0], "%m/%d/%Y") asset_create_dt = datetime.strptime(fields[1], "%m/%d/%Y") if(asset_end_dt >= asset_create_dt) :
boolVal = "true"
else :
boolVal = "false"
print asset_end_dt,"\t",asset_create_dt,"\t",boolVal
Following Hive command can be used to add the python file:
hive>add file <file_path>../pypack/rpt.py;
It's essential to add the python file as resource to Hive cluster.
Hive script to read data from source hive table and load result set post processing with Python script in destination hive table:
create table dev_schema.rpt_asset_extract as
select TRANSFORM(asset_end_date, asset_create_date) USING 'rpt.py' AS asset_end_date, asset_create_date, end_prior_to_create from dev_schema.rpt_asset_registry;
1. The above script reads data from table dev_schema.rpt_asset_registry 2. The script loads columns - asset_end_date, asset_create_date and ensures if asset_end_date >= asset_create_date, and records the Boolean value in the result set. 3. Finally it creates table dev_schema.rpt_asset_extract with the output columns.
Conclusion:
Using the Hive UDF feature, we can add lot more complex function to the Hadoop cluster as UDF, which helps to reduce the complexity of the hive queries. Above examples can be used as a reference point to develop more complex Hive UDFs using either compilable programming languages such as Java or interpreted scripting languages such as R/Python which are more popular amongst the data scientists community.Note: Any comments/feedback/questions on the article would be deeply appreciated.
No comments:
Post a Comment