Hadoop MapReduce programmers often find that it is more convenient and productive to have direct access from their MapReduce programs to data stored in a RDBMS such as Teradata Enterprise Data Warehouse (EDW) because:
Connectivity between a Java or other language program to JDBC and ODBC is well understood by most programmers. But when dealing with Hadoop and MPP databases, two factors outside the domain of connectors become crucial: scale and balance. Hadoop and MPP databases often run into the dozens or hundreds of server nodes, consuming 10s or hundreds of terabytes per execution. For the shared-nothing architecture to perform at maximum throughput, the processing workload and data must be partitioned across execution threads. Otherwise, one server will have an inordinate amount of work compared to others, causing the total elapsed time to be slower. Consequently, a Hadoop connector is needed to support parallel efficiency.
In this document we first describe how MapReduce programs (a.k.a. mappers) can have parallel access to the Teradata EDW data using the
TeradataDBInputFormat approach discussed in Section 2. In Section 3, we provide a complete example with accompanying source code which can be directly used to access the EDW without any changes required in the data warehouse or Hadoop. The
TeradataDBInputFormat class can be directly used by programmers without any changes for many applications. Overall, readers can get the following out of this article:
TeradataDBInputFormatclass can be used without changes to Hadoop or the EDW. Step-by-step installation and deployment is included in the example.
TeradataDBInputFormatapproach for specific application needs.
A common approach for a MapReduce program to access relational data is to first use the DBMS export utility to pass SQL answer sets to a local file and then load the local file to Hadoop.
However, there are several use cases where the export and load into HDFS is inconvenient for the programmer. Recognizing the need to access relational data in MapReduce programs, the Apache.org open source project for Hadoop provides the
DBInputFormat class library. The
DBOutputFormat Java class libraries allow MapReduce programs to send SQL queries through the standard JDBC interface to the EDW in parallel. Teradata provides a version of
DBInputFormat  that will be part of the Cloudera Distribution for Hadoop. Note that Cloudera has a good explanation of
DBInputFormat on their website. The
TeradataDBInputFormat approach is inspired by but not based on the Apache
DBOutputFormat along with their Teradata versions are good interfaces for ad hoc medium or small volume data transfers. They make it easy to copy tables from the EDW into HDFS and vice versa. One good use of these interfaces is when a mapper program needs to do table look-ups but has no need to persist the data fetched. These interfaces are not efficient for high volume data transfers where bulk data movement tools like Teradata Parallel Transporter are more appropriate. In many cases queries and bulk data movement is better optimized inside the database itself. While it's an oversimplification, think of the input and output format class libraries similar to workloads processed by BTEQ. They are very flexible and useful, but do not support every workload.
DBInputFormat uses JDBC to connect to relational databases, typically MySQL or Oracle. The basic idea is that a MapReduce programmer provides a SQL query via the
DBInputFormat class. The
DBInputFormat class associates a modified SQL query with each mapper started by Hadoop. Then each mapper sends a query through a standard JDBC driver to the DBMS and gets back a portion of the query results and works on the results in parallel. The
DBInputFormat approach is correct because the union of all queries sent by all mappers is equivalent to the original SQL query.
DBInputFormat approach provides two interfaces for a MapReduce program to directly access data from a DBMS. The underlying implementation is the same for the two interfaces. In the first interface, a MapReduce program provides a table name T, a list P of column names to be retrieved, optional filter conditions C on the table and column(s) O to be used in the Order-By clause, in addition to user name, password and DBMS URL values. The
DBInputFormat implementation first generates a “count” query:
SELECT count(*) from T where C
and sends it to the DBMS to get the number of rows (R) in the query result. At runtime, the
DBInputFormat implementation knows the number of mappers (M) started by Hadoop and associates the following query Q with each mapper. Each mapper will connect to the DBMS and send Q over JDBC connection and get back the results.
SELECT P FROM T WHERE C ORDER BY O
LIMIT L (Q)
The above Query Q asks the DBMS to evaluate the query.
SELECT P FROM T WHERE C ORDER BY O,
but only return L number of rows starting from the offset X. In total M queries are sent to the DBMS by the M mappers and they are almost identical except that the values of L and X are different. For the ith mapper (where 1 ≤ i ≤ M − 1) which is not the last mapper, and. For the last mapper, and .
Basically all mappers except the last one will receive an average number of rows and the last mapper will get more rows than other mappers when the total number of rows in the result cannot be evenly divided by the number of mappers.
In the second interface of the
DBInputFormat class, a mapper program can provide an arbitrary SQL select query SQ (which could involve multiple tables) whose results are the input to the mappers. The mapper has to provide a count query QC which returns an integer which is the number of rows returned by the query SQ. The
DBInputFormat class sends the query QC to the DBMS to get the number of rows (R), and the rest of the processing is the same as described for the first interface.
DBInputFormat approach clearly streamlines the process of accessing relational data from products like MySQL, the performance cannot scale. There are several performance issues with the
DBInputFormat approach. In both interfaces, each mapper sends essentially the same SQL query to the DBMS but with different LIMIT and OFFSET clauses to get a subset of the relational data. Sorting is required at the DBMS side for every query sent by a mapper because of the ORDER-BY clause introduced into each query, even if the program itself does not need sorted input. This is how parallel processing of relational data by mappers is achieved in the
DBInputFormat class. Furthermore, the DBMS has to execute as many queries as the number of mappers in the Hadoop system which is not efficient -- especially when the number of mappers is large. The above performance issues are especially serious for a parallel DBMS such as Teradata EDW which tends to have high number of concurrent queries and larger datasets. Also the required ordering/sorting is an expensive operation in parallel DBMS because the rows in a table are not stored on a single node and sorting requires row redistribution across nodes.
DBInputFormat cannot be used to access Teradata EDW since LIMIT and OFFSET clauses are not in ANSI Standard SQL and are not supported by Teradata EDW. However, a newer Apache Hadoop class named
DataDrivenDBInputFormat derived from
DBInputFormat can read input data from a Teradata EDW table.
DataDrivenDBInputFormat operates like
DBInputFormat. The only difference is that instead of using non-standard LIMIT and OFFSET to demarcate splits,
DataDrivenDBInputFormat generates WHERE clauses which separate the data into roughly equivalent shards.
DataDrivenDBInputFormat has all of the same
DBInputFormat performance issues we have discussed above.
The Teradata connector for Hadoop --
TeradataDBInputFormat -- sends the SQL query Q provided by a MapReduce program only once to Teradata EDW. Q is executed only once and the results are stored in a Partitioned Primary Index table (PPI) T . Then each mapper from Hadoop sends a new query Qi which just asks for the ith partition on every AMP. Depending on the number of mappers, the complexity of the SQL query provided by a MapReduce program and the amount of data involved in the SQL query, the performance of the
TeradataDBInputFormat approach can obviously be orders of magnitudes better than the
DBInputFormat approach, as we have seen in our internal testing.
Now we describe the architecture behind
TeradataDBInputFormat. First, the
TeradataDBInputFormat class sends the query P to the EDW based on the query Q provided by the mapper program.
CREATE TABLE T AS (Q) WITH DATA
PRIMARY INDEX ( c1 ) (P)
PARTITION BY (c2 MOD M) + 1
The above query asks the EDW to evaluate Q and store the results – table layout and data -- in a new PPI table T. The hash value of the primary index column c1 of each row in the query results determines which AMP should store that row. Then the value of the partition-by expression determines the physical partition (location) of each row on a particular AMP. This is done using modulo M which means divide by m and take the remainder.
All rows on the same AMP with the same partition-by value are physically stored together and can be directly and efficiently located by the Teradata optimizer. There are different ways to automatically choose the primary index column and partition-by expression.
After the query Q is evaluated and the table T is created, each AMP has M partitions numbered from 1 to M (M is the number of mappers started in Hadoop). Then each mapper sends the following query Qi (1 ≤ i ≤ M) to the EDW,
SELECT * FROM T WHERE PARTITION = i (Qi)
Teradata EDW will directly locate all rows in the ith partition on every AMP in parallel and return them to the mapper. This operation is done in parallel for all mappers. After all mappers retrieve their data, the table T is automatically deleted. Notice that if the original SQL query just selects data from a base table which is a PPI table, then we do not need to create another PPI table since we can directly use the existing partitions to partition the data each mapper should receive.
As mentioned in the beginning of Section 2.2, if the number of mappers is large and the complexity of the SQL query provided by a MapReduce program is high (for example involving multi-table join and grouping), the performance of the
TeradataDBInputFormat approach can obviously be orders of magnitudes better than the
DBInputFormat approach. This is because the DBMS system has to execute the same user SQL query as many times as the number of mappers, sort the results and send back only a portion of the final results to each mapper. However in the
TeradataDBInputFormat approach, the complex SQL query is executed only once and the results are stored in a PPI table’s multiple partitions each of which is sent to a different mapper. As mentioned before, the discussed
TeradataDBInputFormat does not require any change to the Teradata EDW codebase. We have investigated a few areas where we can significantly improve the performance of the
TeradataDBInputFormat approach with new enhancements to Teradata EDW, which we will probably discuss in a separate article.
Notice that the data retrieved by a MapReduce program via the
TeradataDBInputFormat approach or the
DBInputFormat approach are not stored in Hadoop after the MapReduce program is finished, unless the MapReduce program intentionally does so. Therefore if some Teradata EDW data are frequently used by many MapReduce programs, it will be more efficient to copy these data and materialize them in HDFS. One approach to store a Teradata table permanently in Hadoop DFS is to use Cloudera’s Sqoop  which we have integrated
One potential issue in the current implementation provided in Appendix is that we could have column name conflict. For example, assume the business query in the DDL P in Section 2.2 is "
select * from T1, T2 where T1.a=T2.b" and that T1 and T2 have columns of the same names. Currently Teradata DBMS will complain about column name conflict if we simply create a table to store the above query result. Either the EDW can be enhanced to automatically resolve the column name conflict or the
TeradataDBInputFormat class can be enhanced to automatically resolve the column name conflict so that users do not need to change the query “
select * from T1, T2 where T1.a=T2.b" to explicitly uniquely name each column in the results which is a workaround solution for now.
DBOutputFormat provided by Cloudera writes to the database by generating a set of INSERT statements in each reducer. The current
DBOutputFormat approach while multiple Reducers sending batches of INSERT statements to DBMS can work with the Teradata EDW without modification. For more detail, please refer to .
In the section, we first describe the requirements for running our
TeradataDBInputFormat class, and then an example is used to explain how to use the
TeradataDBInputFormat class is implemented in Java. The enclosed package can be compiled and run in an environment with the following features:
Note that the Hadoop DFS and the Teradata DBMS may or may not be installed on the same hardware platform.
You should start by downloading the
TeradataDBInputFormat connector. The JAR file (i.e.,
TeradataDBInputFormat.jar) should be placed into the
$HADOOP_HOME/lib/ directory on your Hadoop TaskTracker machines. It is a good idea to also include it on any server you launch Hadoop jobs from.
Assume a MapReduce program needs to access some transaction data stored in the following table
table_1, defined by the following
CREATE TABLE statement:
CREATE TABLE table_1 (
) PRIMARY INDEX(transaction_id).
To access the transaction and product information, a MapReduce program can simply provide a SQL query, like "
select transaction_id, product_id from table_1 where transaction_id > 1000".
The following code shows how a MapReduce job using
TeradataDBInputFormat class is configured and run.
public class TransactionsMapReduceJob extends Configured implements Tool
private String query = "";
private String output_file_path = "";
public TransactionsMapReduceJob(final String query_, final String output_)
query = query_;
output_file_path = output_;
public int run(String args) throws Exception
Configuration conf = getConf();
Job myJob = new Job(conf, conf.get("mapreduce.job.name"));
// the following statement is very important!!!
///1. Set the class as the record reader
myJob.getConfiguration().setClass("record.reader.class", TransactionTDBWritable.class, TeradataDBWritable.class);
///2. Store the query
TeradataDBInputFormat.setInput(myJob.getConfiguration(), query, TransactionTDBWritable.class);
///3. Specify the input format class
FileOutputFormat.setOutputPath(myJob, new Path(output_file_path));
int ret = myJob.waitForCompletion(true) ? 0 : 1;
// clean up ...
public static void main(String args) throws Exception
int res = 0;
int args_num = args.length;
// Assumption 1: The second to last parameter: output file path
String output_file_path = args[args_num-2];
// Assumption 2: The last parameter: the query
String query = args[args_num-1];
Tool mapred_tool = new TransactionsMapReduceJob(query, output_file_path);
res = ToolRunner.run(new Configuration(), mapred_tool, args);
} catch (Exception e)
The following class
TDBWritable is defined to describe data from Teradata EDW to be used by the MapReduce program.
public class TransactionTDBWritable implements TDBWritable, Writable
private long transaction_id = 0;
private long product_id = 0;
// Static code: the programmer should explicitly declare the attributes (name and type) related to the query.
static private List<String> attribute_names = new Vector<String>();
static private List<String> attribute_types = new Vector<String>();
// The corresponding query:
/// SELECT transaction_id, product_id FROM ... WHERE ...
//1. for the first item
//2. for the second item
* Default constructor
public void readFields(ResultSet resultSet) throws SQLException
transaction_id = resultSet.getLong("transaction_id");
product_id = resultSet.getLong("product_id");
public void readFields(DataInput in) throws IOException
transaction_id = in.readLong();
product_id = in.readLong();
public void write(DataOutput out) throws IOException
public void addAttrbute(String name, String type)
public List<String> getAttributeNameList()
public List<String> getAttributeValueList()
Note that the
TeradataDBInputFormat can be enhanced such that the above class does not need to be manually created since it can be automatically generated by looking at the resulting query’s schema.
A dummy class inheriting from
TeradataDBInputFormat<T> is needed:
public class TransactionDBInputFormat extends TeradataDBInputFormat<TransactionTDBWritable>
//NEED DO NOTHING!
//Transfer the type information of “TransactionTDBWritable” down to TransactionDBInputFormat’s constructor.
TeradataDBInputFormat will read from the database and populate the retrieved data to the fields in the
TransactionTDBWritable class. A mapper then receives an instance of the
TransactionTDBWritable implementation as its input value and can use the retrieved DBMS data as desired. The following code simply shows how a mapper has direct access to DBMS data passed to it as an instance of the
public class TransactionMapper extends Mapper<LongWritable, TransactionTDBWritable, LongWritable, LongWritable>
protected void map(LongWritable k1, TransactionTDBWritable v1, Context context)
context.write(new LongWritable(v1.getTransactionID()), new LongWritable(v1.getProductID()));
} catch (IOException e)
} catch (InterruptedException e)
To enable the access to the Teradata DB, the mapper program also needs to know the information of the database connection, like DB URL, user account, password and so on. This information can be stored in a property XML file as shown in the following file
<!-- Parameters about the Teradata RDBMS -->
The URL of the Teradata DB the program is going to interact with
<description>The account name</description>
The whole source package should be first compiled and compressed as a jar file, for example,
MyMapReduceJob.jar, before it can be run on the Hadoop DFS. Assume that we put the property XML file under the same directory as the jar file. Then, we may start the MapReduce job with the following command:
HADOOP_HOME/bin/hadoop jar MyMapReduceJob.jar TransactionsMapReduceJob -Dmapred.map.tasks=32 -conf TeraProps.xml output.tbl "select transaction_id, product_id from table_1 where transaction_id > 1000"
HADOOP_HOMEstands for the Hadoop installation directory
-Dmapred.map.tasks=32sets the number of map tasks to 32
-conf TeraProps.xmltells where to find the parameters about the Teradata DBMS
output.tblcontains the job's output, and
select transaction_id, product_id from table_1 where transaction_id > 1000" is the user query.
MapReduce related research and development continues to be active and attract interests from both industry and academia. MapReduce is particularly interesting to parallel DBMS vendors since both MapReduce and Teradata Data Warehouses use clusters of nodes and shared-nothing scale-out technology for large scale data analysis. The
TeradataDBInputFormat approach in this article show how MapReduce programs can efficiently and directly have parallel access to Teradata EDW data without external steps of exporting and loading data from Teradata EDW to Hadoop.
 Teradata Online Documentation http://www.info.teradata.com
 Cloudera Scoop http://www.cloudera.com/blog/2009/06/introducing-sqoop/
TeradataDBInputFormat class is implemented in Java, and the related source code is included in the attached zip file. The source code is composed of three parts:
The core implementation of the
TeradataDBInputFormat classes is built based on the PPI strategy in Section 2.2. Five classes are defined:
The generation of the internal intermediate table in Teradata DBMS according to the user query.
An example to show how to use the