Hadoop DFS to Teradata

Extensibility
Extensibility covers the mechanisms by which you, as the user or developer, can extend the functionality of the Teradata Database, for example with the use of User Defined Functions, or UDFs.
yxu
Teradata Employee

Hadoop DFS to Teradata

Hadoop systems [1], sometimes called Map Reduce, can coexist with the Teradata Data Warehouse allowing each subsystem to be used for its core strength when solving business problems. Integrating the Teradata Database with Hadoop turns out to be straight forward using existing Teradata utilities and SQL capabilities. There are a few options for directly integrating data from a Hadoop Distributed File System (HDFS) with a Teradata Enterprise Data Warehouse (EDW), including using SQL and Fastload. This document focuses on using a Table Function UDF to both access and load HDFS data into the Teradata EDW. In our examples, there is historical data already in Teradata EDW, presumably derived from HDFS for trend analysis. We will show examples where the Table Function UDF approach is used to perform inserts or joins from HDFS with the data warehouse.

Hadoop DFS is an open source distributed file system implementation from the Apache Software Foundation [1]. HDFS is designed to run on clusters of nodes built from low-cost hardware. It is not uncommon for the clustered nodes to number in the dozens or hundreds --and sometimes thousands. An HDFS file is chopped into blocks (usually 64 MB), each of which is replicated multiple times across the nodes in the HDFS system for fault-tolerance and performance. HDFS is increasingly being used by companies to store large amounts of data, especially by Dot.Com companies with enormous server farms.

The Table Function UDF approach

In this Table UDF approach, the Table UDF pulls data from HDFS into the EDW. Each Table UDF instance running on an AMP is responsible for retrieving a portion of the HDFS file. Data filtering and transformation can be done by the UDF as the rows are delivered by the UDF to the SQL processing step. The HDFS file is accessed by every UDF running in an AMP in the Teradata EDW in parallel.

As an example, the following SQL query calls the table UDF named HDFSUDF to load data from a HDFS file named mydfsfile.txt to a table Tab1 in Teradata In this example, imagine that mydfsfile.txt is a 1 terabyte file spread across fifty Hadoop nodes. We could then use a SQL statement invoking the Table UDF instances to move data from Hadoop into the data warehouse. The UDF sample code is included in the Appendix.

insert into Tab1 SELECT * FROM TABLE(HDFSUDF (‘mydfsfile.txt’)) AS T1;

 

How it works

Notice that once the Table UDF is written, it is called just like any other UDF. How the data flows from HDFS to Teradata is transparent to the users of this Table UDF. Typically the Table UDF is written to be run by every AMP in a Teradata system when the Table UDF is called in a SQL query. However, we have the choice of writing the Table UDF to run on a single AMP or a group of AMPs when it is called in a SQL query.

When the UDF instance is invoked on an AMP, the Table UDF instance communicates with the NameNode in HDFS which manages the metadata about mydfsfile.txt. The Hadoop NameNode metadata includes information such as which blocks of the HDFS file are stored and replicated on which nodes. In this example, each UDF talks to the NameNode and finds the total size of mydfsfile.txt to be 1TB. The Table UDF then inquires into the Teradata Database to discover its own numeric AMP identity and that there are 100 total AMPs. With these facts, a simple calculation is done by each UDF instance to identify the offset into mydfsfile.txt that it will start reading data from HDFS.

For any request from the UDF instances to the Hadoop API, the HDFS NameNode identifies which DataNodes in the HDFS system are responsible for returning the data requested. The Table UDF instance running on an AMP will receive data directly from those DataNodes in HDFS which hold the requested data block. Note that no data from the HDFS file is ever routed through the NameNode –its all done directly node to node for better performance. In the sample program we provide in the Appendix, we simply make the N-th AMP in the system load the N-th portion of the HDFS file. Other types of UDF-AMP mapping to HDFS can be done depending on an application’s needs..

The following figure illustrates this approach.

Issues

The default JVM shipped with Teradata 13 is 1.5 from IBM. The current Hadoop version is 0.20.0 which requires Java 1.6. Depending on your needs, both JVM versions will work. In the first test example, we installed an earlier version of Hadoop (0.18.0) which requires Java 1.5. In the second solution, we download and installed IBM JVM 1.6 on every node in the Teradata system. Then we used the cufconfig tool to make Teradata DBMS use the JVM 1.6 version. The following shows the detailed steps:

  • a) We download IBM ibm-java-x86_64-sdk.rpm on every node in the Teradata system.
  • b) Install IBM JVM 1.6 on every Teradata node

       rpm –ivh ibm-java-x86_64-sdk.rpm (which is installed under /opt/ibm/java-x86_64-60/jre/)

  • c) Use the following command on any Teradata node (does not have to be run on all nodes)

            cufconfig –f 1.txt

a) 1.txt specifies the path under which the desired JVM should be used by Teradata DBMS.

b) 1.txt contains a single line shown below

                JREPath: /opt/ibm/java-x86_64-60/jre/

When deciding what portion of the HDFS file every AMP should load via the Table UDF approach, we should make sure that every byte in the DFS file should be read exactly once in the end by all UDF instances. Since each AMP asks for data block from HDFS by sending the offset of the bytes it should load in its request to HDFS, we need to make sure that the last row read by every AMP is a complete line, not a partial line if the UDF processes the input file in a line by line mode. In the example UDF in the Appendix, the input HDFS file to be loaded has fixed row size; therefore we can easily compute the starting offset and the ending offset of the bytes each AMP should read. Depending on the input file’s format and an application’s needs, extra care should be made in assigning which portion of the HDFS file should be loaded by which AMPs.

Joining relational data and HDFS data

Once HDFS data is load into Teradata, we can analyze HDFS data like as any other data stored in EDW. However more interestingly we can perform integrated BI over relational data stored in Teradata and external data originally stored in HDFS, without actually first creating a table and loading HDFS data to the table, as shown in the following example.

Assume a Telecommunication company owns an HDFS file called packets.txt which stores information about networking packets and has rows in the format <source-id, dest-id, timestamp>. The source and destination ID fields are TCP/IP addresses being used to find spammers and hackers. They tell us who sent a request to what destination. Now assume there is a watchlist table stored in Teradata which stores a list of source-ids to be monitored and used in trend analysis. The following standard SQL query joins the packets.txt file and the watchlist table to find the list of source-ids in the watchlist table who have sent packets to more than 1 million unique destination ids.            

Select watchlist.source-id, count(distinct (T.dest-id)) as Total 
From watchlist, TABLE(packets(“packets.txt”)) AS T
Where watchlist.source-id=T.source-id
Group by watchlist.source-id
Having Total > 1000000

Note: in the appendix UDF code example, the TCP/IP addresses were simplified to be integers to make the example easier to understand.

Conclusion

Teradata table function UDFs can be used to directly access data from a Hadoop distributed file system. Whether the goal is to load new data into the data warehouse or simply join the Hadoop data to existing tables to produce a report is up to the programmer. These examples show that we can use the table UDF approach to apply complex BI available through the SQL engine on both HDFS data and relational data.

 References

[1] Hadoop DFS http://hadoop.apache.org/hdfs/

Appendix

This section contains a sample Table UDF Java program and associated BTEQ/SQL statements to illustrate the Table UDF approach discussed in this document. The Java program has been intentionally simplified to focus on HDFS. It can read a DFS file of fixed-size lines containing two integers. The UDF asks Teradata to get the number of AMPs, asks the DFS to get the size of the DFS file, and makes the N-th AMP read the N-th portion of the DFS file. Detailed comments are provided in the Java program. The sample Java program is aslo attached as HDFS_UDF.zip.

import com.teradata.fnc.AMPInfo;
import com.teradata.fnc.NodeInfo;
import com.teradata.fnc.Tbl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;

import java.io.*;
import java.sql.SQLException;

/* Demonstration steps:
* 1. Install and configure Hadoop DFS.
* 1.1 This demo has been tested with Hadoop 0.20.0 and Teradata 13.0
* 1.2 Get Hadoop from http://hadoop.apache.org/core/releases.html
* 1.3 Follow the instructions and tutorials on Hadoop's website (http://hadoop.apache.org/common/docs/r0.20.0/quickstart.html)
*
* 2. Load data into HDFS. An example command copying a local file to Hadoop is: $hadoop-0.20.0/bin/hadoop dfs -put mylocal.txt mydfstbl.txt
* If you upload the file as root, then the file's DFS path is likely '/user/root/mydfstbl.txt'
* 3. Hadoop 0.20.0 or higher version requirs Java 1.6. The JVM included with TD 13.0 is 1.5. If Hadoop 0.20.0 or higher version is used
* , IBM JVM 1.6 should be first download and installed on every Teradata node. In our testing, the following commands were used to intall
* IBM JVM 1.6 and make Teradata DBMS use the new installed IBM JVM 1.6
* a) We download IBM ibm-java-x86_64-sdk.rpm on every node in the Teradata system.
b) Install IBM JVM 1.6 on every Teradata node
* rpm -ivh ibm-java-x86_64-sdk.rpm (which is installed under /opt/ibm/java-x86_64-60/jre/)
c) Use the following command on any Teradata node (does not have to be run on all nodes)
cufconfig -f 1.txt
a) 1.txt specifies the path under which the desired JVM should be used by Teradata DBMS.
b) 1.txt contains a single line shown below
JREPath: /opt/ibm/java-x86_64-60/jre/
c)use cufconfig -o to display the configurations and check if "JREPath: /opt/ibm/java-x86_64-60/jre/" is in the output.

* 4. Prepare the .jar file.
* $/opt/ibm/java-x86_64-60/bin/javac HDFS_UDF.java (make sure Teradata javFnc.jar and hadoop-0.20.0-core.jar
* can be found by javac or explicilty include the two jar files in the javac command)
$/opt/ibm/java-x86_64-60/bin/jar -cf hdfsudf.jar HDFS_UDF.class GenCtx.class
*
* 5. Use the follwing bteq script to set up a test database and install the jar files in Teradata DBMS (change the directories if your Hadoop installation and Java UDF directories are different).
*/

/* bteq scripts */

/*

.logon NodeId/dbc

CREATE USER testdb AS PERM = 600000000000 PASSWORD = testdb;
GRANT CREATE PROCEDURE ON testdb TO testdb WITH GRANT OPTION;
GRANT DROP PROCEDURE ON testdb TO testdb WITH GRANT OPTION;
GRANT EXECUTE PROCEDURE ON testdb TO testdb WITH GRANT OPTION;
GRANT CREATE PROCEDURE ON testdb TO testdb WITH GRANT OPTION;
GRANT CREATE EXTERNAL PROCEDURE ON testdb TO testdb WITH GRANT OPTION;
GRANT ALTER EXTERNAL PROCEDURE ON testdb TO testdb WITH GRANT OPTION;
GRANT ALTER PROCEDURE on testdb TO testdb;
grant all on testdb to testdb with grant option;
grant all on SQLJ to testdb with grant option;
grant all on testdb to dbc with grant option;

*
.logoff

*
//For debugging, the following diagnostics should be set. The output files are under /tmp.
.logon NodeId/testdb;
database testdb;
diagnostic JAVALANGUAGE on for session;
diagnostic JAVA32 on for session;
diagnostic javalogging on for session;
diagnostic nocache on for session;

call sqlj.install_jar('CJ!/home2/tableudf/hdfsudf.jar', 'hdfsudf', 0);
Call sqlj.replace_jar('CJ!/home2/tableudf/hdfsudf.jar', 'hdfsudf');
call sqlj.install_jar('cj!/hadoop-0.20.0/hadoop-0.20.0-core.jar','newhadoop',0);
call sqlj.install_jar('cj!/hadoop-0.20.0/lib/commons-logging-1.0.4.jar','hadooploggingjar',0);

call SQLJ.ALTER_JAVA_PATH('hdfsudf','(*,newhadoop) (*,hadooploggingjar)');

REPLACE FUNCTION hdfs_udf( filename VARCHAR(250), hdfsname VARCHAR(250) )
RETURNS TABLE (c1 integer, c2 integer)
LANGUAGE JAVA
NO SQL
PARAMETER STYLE JAVA
EXTERNAL NAME 'hdfsudf:HDFS_UDF.GetDFSFileData';

CREATE TABLE testdb.mytab,NO FALLBACK ,
NO BEFORE JOURNAL,
NO AFTER JOURNAL,
CHECKSUM = DEFAULT
(
c1 integer,
c2 INTEGER)
NO PRIMARY INDEX ;

insert into mytab SELECT * FROM TABLE (hdfs_udf('/user/root/mydfstbl.txt','hdfs://HDFS_server.mycompany.com:19000')) AS t1;
*/

/**
* This class contains a Table UDF function to get data from Hadoop DFS.
*
* A static cache of context objects is maintained. Only the indices are stored
* in the TD scratchpad for each running UDF instance.
*/

/**GenCtx stores information necessary to build the next row by HDFS_UDF.GetDFSFileData
*/
class GenCtx implements Serializable
{

public int id; //the index to the cache element which contains the FSDataInputStream used by this AMP.
public long startpos; // the first byte in the DFS file to be read by this AMP
public long currentpos; //the next byte in the DFS file to be read by this AMP in the next round of building a database row
private long DfsRowsCnt; //the number of rows should be read from DFS by this AMP
private long DfsRowsRead = 0; //the number of rows have been read from DFS by this AMP
private int rowsize; // size of each line in DFS including the last newline character('\n')

public GenCtx()
{
}

/**
*
* @param id
* the index to the cache element which contains the FSDataInputStream used by this AMP.
* @param startpos
* the first byte in the DFS file to be read by this AMP
* @param DfsRowsCnt
* the number of rows to be retreived by this AMP
* @rowsize the size of the row
* @throws
*/

public GenCtx(int id, long startpos, long DfsRowsCnt, int rowsize)
{
this.id = id;
this.startpos = startpos;
this.DfsRowsCnt = DfsRowsCnt;
currentpos = startpos;
this.rowsize = rowsize;

}

/**Create a database row from reading a line in the DFS file
*
*
* @param in
* the FSDataInputStream used by the AMP.
* @param c1
* the array containing the first column vlaue to be retured to DBS
* @param c2
* the array containing the second column vlaue to be retured to DBS
* @rowsize
* @throws IOException
*/

public int CreateRow(FSDataInputStream in, int[] c1, int[] c2) throws IOException
{

if (DfsRowsRead == DfsRowsCnt) return 0; // No more rows; This AMP has loaded all rows assigned to it.

in.seek(currentpos);
BufferedReader bufferIn = new BufferedReader(new InputStreamReader(in));
String line;

//read a line from the DFS
line = bufferIn.readLine();

//parse the two integers in the line read
String[] yb = line.split("\|");
c1[0] = Integer.parseInt(yb[0].trim());
c2[0] = Integer.parseInt(yb[1].trim());

currentpos += rowsize;
DfsRowsRead++;

return 1;
}

}

public class HDFS_UDF
{

/*We use static array called cache to store the list of FSDataInputStream used by AMPs to read from HDFS. We support multiple queries executed in parallel calling the same Table UDF.
* Thus we don't want a SQL query calling the UDF on a AMP to override the content of the static cache array used by another SQL query calling the same UDF
* on the same AMP. last_id indicates the last used array element in the cache array. If we reach the end of the array, we come back to the begining of the array.
* Therefore, the total number of supported concurrent queries (all running at the same time) calling the same UDF is max_ids/(#-of-AMPs per node).
*/
private static int last_id = 0; //the last used cell in the cache array
private static final int max_ids = 1000;

//The array keeps the list of FSDataInputStream opened by all AMPs to access HDFS. Each AMP uses a FSDataInputStream to access HDFS.
private static final FSDataInputStream[] cache = new FSDataInputStream[max_ids];

private static int ROWSIZE = 25; //size of each line in DFS including the last newline character('\n')

/**GetDFSFileData is the table UDF accesing DFS file and return rows
*
*
*@param filename
* the DFS file name
*@param hdfsname
* the HDFS system to be accessed (Example, "hdfs://HDFS_server.mycompany.com:19000")
*@param c1
* the array containing the first column vlaue to be retured to DBS
* @param c2
* the array containing the second column vlaue to be retured to DBS
* @throws SQLException
* @throws IOException
* @throws ClassNotFoundException
*/

public static void GetDFSFileData(String filename, String hdfsname, int[] c1, int[] c2)
throws SQLException, IOException, ClassNotFoundException
{
int status;
int[] phase = new int[1];
GenCtx obj;
Tbl tbl = new Tbl();

/* make sure the function is called in the supported context */
switch (tbl.getPhase(phase))
{
case Tbl.TBL_MODE_CONST:
/* depending on the phase decide what to do */
switch (phase[0])
{
case Tbl.TBL_PRE_INIT:

//HDFS related setup
Configuration conf = new Configuration();
conf.setClassLoader(Configuration.class.getClassLoader());

conf.set("fs.default.name", hdfsname);
conf.set("user", "root");
FileSystem fs = FileSystem.get(conf);

Path inFile = new Path(filename);

// Check if input is valid
if (!fs.exists(inFile))
throw new IOException(filename + " does not exist");
if (!fs.isFile(inFile))
throw new IOException(filename + " is not a file");
return;

case Tbl.TBL_INIT:
/* get scratch memory to keep track of things */

// Create ID for this particular SQL+AMP instance.
int id = getNewId();

// set up the information needed to build a row by this AMP
obj = InitializeGenCtx(filename, hdfsname, id);

//store the GenCtx object created which will be used to create the first row
tbl.allocCtx(obj);
tbl.setCtxObject(obj);

break;
case Tbl.TBL_BUILD:

// Get the GenCtx from the scratch pad from the last time.
obj = (GenCtx)tbl.getCtxObject();
int myid = obj.id;
status = obj.CreateRow(cache[myid], c1, c2);

if (status == 0)
throw new SQLException("no more data", "02000");

tbl.setCtxObject(obj);
break;

case Tbl.TBL_END:
int my_id = ((GenCtx)tbl.getCtxObject()).id;
cache[my_id].close();
cache[my_id] = null;

break;
}
return;

case Tbl.TBL_MODE_VARY:
throw new SQLException("Table VARY mode is not supported.");

}

}

/**
* Given the data path, decide which parts of the DFS file are to be read by this AMP.
*
*
*@param filename
* the DFS file name
*@param hdfsname
* the HDFS system to be accessed (Example, "hdfs://HDFS_server.mycompany.com:19000")
*@param id
* cache[id] is the FSDataInputStream to be used by this AMP.
* @throws SQLException
* @throws IOException
*/
private static GenCtx InitializeGenCtx(String filename, String hdfsname, int id) throws IOException, SQLException
{

//HDFS setup
Configuration conf = new Configuration();
conf.setClassLoader(Configuration.class.getClassLoader());
conf.set("fs.default.name", hdfsname);
conf.set("user", "root");

FileSystem fs = FileSystem.get(conf);

Path inFile = new Path(filename);
FileStatus fstatus = fs.getFileStatus(inFile);

FSDataInputStream in = fs.open(inFile);

/* get the number of AMPs, compute the AMP id of this AMP
* The N-th AMP reads the N-th portion of the DFS file
*/

AMPInfo amp_info = new AMPInfo();
NodeInfo node_info = new NodeInfo();

int[] amp_ids = node_info.getAMPIds();
int ampcnt = node_info.getNumAMPs(); // the number of AMPs in the Teradata system
int amp_id = amp_info.getAMPId(); //the id of this AMP

long size = fstatus.getLen(); //the size of the DFS file
long totalRowsCnt = size / ROWSIZE; // the total number of lines in the DFS file

// some "heavy" AMPs will read one more line than other "light" AMPs. avgRowCnt is the number of lines "light" AMPs will read.
long avgRowCnt = totalRowsCnt / ampcnt;

int heavyAMPId = (int)(totalRowsCnt % ampcnt); // the id of the first "heavy" AMP

long myRowsCnt2Read = avgRowCnt; //how many rows this AMP should load
long[] rowCntAMP = new long[ampcnt]; // this array records how many rows each AMP should load

for (int k = 0; k < heavyAMPId; k++)
{
rowCntAMP[k] = avgRowCnt + 1;
if (amp_id == amp_ids[k]) myRowsCnt2Read++;
}
for (int k = heavyAMPId; k < ampcnt; k++)
rowCntAMP[k] = avgRowCnt;

long rowCntBeforeMe = 0; //total number of DFS lines (counting from the begining of the DFS file) other AMPs before this AMP will read, i.e., the number of DFS lines this AMP should skip
for (int k = 0; k < ampcnt; k++)
{
if (amp_id == amp_ids[k])
{
break;
}
else
rowCntBeforeMe += rowCntAMP[k];

}

long startpos = rowCntBeforeMe * ROWSIZE; //the first byte in the DFS file this AMP should read.

cache[id] = in;
return new GenCtx(id, startpos, myRowsCnt2Read, ROWSIZE);
}

private synchronized static int getNewId()
{
last_id = (last_id + 1) % max_ids;
return last_id;
}

}

//a sample DFS file contains fixed size 2 columns which can be read by the Table UDF
/*
1| 1|
2| 2|
3| 3|
4| 4|
5| 5|
6| 6|
7| 7|
8| 8|
9| 9|
10| 10|
11| 11|
12| 12|
13| 13|
14| 14|
15| 15|
16| 16|
17| 17|
18| 18|
19| 19|
20| 20|
21| 21|
22| 22|
23| 23|
24| 24|
25| 25|
26| 26|
27| 27|
28| 28|
29| 29|
30| 30|
*/
7 REPLIES
Teradata Employee

Re: Hadoop DFS to Teradata

chopped by man or HDFS ?
Enthusiast

Re: Hadoop DFS to Teradata

while this is very interesting, and potentially useful. I'd love to see the same technique used to source data from other (standard dbms) brands - such as oracle, db2 or sql_server, mysql, postgres.
Using fastload/mload is very painful because of the static control files.
yxu
Teradata Employee

Re: Hadoop DFS to Teradata

That's a great point. We've thought about that and we'll be working on that soon.
Teradata Employee

Re: Hadoop DFS to Teradata

This is a well written artical that clearly explains the relationship between Teradata and hadoop. I also give an excellent example.

Well done, an example for all!

Re: Hadoop DFS to Teradata

For the example under the section "Joining relational data and HDFS data", since it involves the tables on heterogeneous databases, how is the processing distributed among Hadoop and Teradata?
yxu
Teradata Employee

Re: Hadoop DFS to Teradata

The actual join is done at the Teradata EDW side. The Hadoop system serves the requested DFS data to the AMPs: directly Hadoop nodes to Teradata AMPs in parallel. For the join, the Teradata EDW table will be handled as before, either redistributed or duplicated or kept locally for the join based on the Optimizer's option. Assume the Optimizer decides to redistribute both Teradata EDW table and the "table" created from the Table UDF. Then each AMP redistributes the rows it creates from the portion of DFS it retrieved from the Table UDF. After the redistribution, the join is done in parallel on every AMP as before.

Re: Hadoop DFS to Teradata

very interesting and great article.
i'm very excited to see more about the detail of "Joining relational data and HDFS data".
Is there any example code about this?