Develop SQL MR function using AsterData Developer Express, part 2: develop SQL-MR function

Teradata Aster is an analytic platform that embeds MapReduce analytic processing with data stores. •Embedded MapReduce analytic processing and unique SQL-MapReduce® framework •Massively parallel data store for multistructured data •Intuitive tools and SQL-MapReduce libraries for rapid analytic development

Develop SQL MR function using AsterData Developer Express, part 2: develop SQL-MR function

Create SQL-Map-Reduce function using wizard

Aster Developer Express has built-in wizard which helps to create Java class for SQL-MR function

Right click on the project, select "new", "Other"

Then select the right wizard

Type in desired package and Class name

Define input schema (a list of columns and column types which our function can recognize). You can just type it in or use previously set nCluster connection. You can see how column from staging table table has been selected.

Now it's time to define output columns. We do use wizard for defining input and output columns. This wizard will generate Java code. You can easilly skip input and ouput columns step and write code on your own.

Now let's specify arguments for our function. Our SQL-MR fuction will accept two arguments: lists of EXCLUDES and INCLUDES. We can filter data from staging table using these arguments. For example: EXCLUDES('pdf') will exclude all Adobe Acrobat documents. INCLUDES ('html', 'htm') will leave only requests to HTML documents in resulting set. 

You can skip this step and write Java code later. It's not complicated. 

Create junit test

We need to write JUNIT tests for our methods in SQL-MR function. You can also create some kind of mock test for the entire SQL-MR function. 

Create Test class

Write code

We are not going to invent the wheel, so I took part of parsing code from here:

The main things are:

//We have to implement RowFunction interface.
public final class ApacheWebLogParserFunction implements RowFunction {

//Here are the fields of class.
/** If specified, rows with listed substrings will be excluded. */
private static final String ARG_CLAUSE_EXCLUDES = "EXCLUDES";

/** If specified, only rows with listed substrings will included into result set. */
private static final String ARG_CLAUSE_INCLUDES = "INCLUDES";
// These member variables will be populated with the values
// of the argument clauses passed to the SQL-MR function.

private List<String> includesArgument = new ArrayList<String>();
private List<String> excludesArgument = new ArrayList<String>();

//Constructor of our class
//You can see code that defines input and output coumns.
//As I said before you can easilly modify this code.
// The constructor establishes the RuntimeContract between
// the SQL-MR function and nCluster. During query planning,
// the function will constructed on a single node. During
// query execution, it will be constructed and run on
// one or more nodes.

public ApacheWebLogParserFunction(RuntimeContract contract) {
// Verify that the function accepts the given input schema.

List<SqlType> expectedInputTypes = new ArrayList<SqlType>();
expectedInputTypes.add(SqlType.getType("character varying"));

List<SqlType> actualInputTypes = new ArrayList<SqlType>();

for (ColumnDefinition d : contract.getInputInfo().getColumns()){
if (!actualInputTypes.equals(expectedInputTypes)) {
throw new IllegalUsageException(
"Expected input types: (character varying)");

// Construct the output schema.

List<ColumnDefinition> outputColumns = new ArrayList<ColumnDefinition>();

outputColumns.add(new ColumnDefinition("CLIENT", SqlType.getType("character varying")));
outputColumns.add(new ColumnDefinition("REMOTE_USER", SqlType.getType("character varying")));
outputColumns.add(new ColumnDefinition("REQUEST_TS", SqlType.getType("timestamp with time zone")));
outputColumns.add(new ColumnDefinition("REQUEST_VERB", SqlType.getType("character varying")));
outputColumns.add(new ColumnDefinition("REQUEST", SqlType.getType("character varying")));
outputColumns.add(new ColumnDefinition("PROTOCOL", SqlType.getType("character varying")));
outputColumns.add(new ColumnDefinition("RESPONSE_CODE", SqlType.getType("integer")));
outputColumns.add(new ColumnDefinition("CONTENT_LENGTH", SqlType.getType("integer")));
outputColumns.add(new ColumnDefinition("USER_AGENT", SqlType.getType("character varying")));

// Read argument clauses into appropriate member variables.

ArgumentClause tmpAc;
String tmpValue;

// Read argument clause 'INCLUDES'

if (contract.hasArgumentClause(ARG_CLAUSE_INCLUDES)) {
tmpAc = contract.useArgumentClause(ARG_CLAUSE_INCLUDES);

for (int i = 0; i < tmpAc.getValueCount(); i++) {
tmpValue = tmpAc.getValues().get(i);

// Read argument clause 'EXCLUDES'

if (contract.hasArgumentClause(ARG_CLAUSE_EXCLUDES)) {
tmpAc = contract.useArgumentClause(ARG_CLAUSE_EXCLUDES);

for (int i = 0; i < tmpAc.getValueCount(); i++) {
tmpValue = tmpAc.getValues().get(i);
if(!includesArgument.isEmpty() && !excludesArgument.isEmpty()){
throw new ClientVisibleException("You can use "+ARG_CLAUSE_EXCLUDES+" current values are:["+excludesArgument+"] or "+ ARG_CLAUSE_INCLUDES+" current values are:["+includesArgument+"] at one time. You can't use them together.");

System.out.println("includes["+includesArgument+"] excludes["+excludesArgument+"]");
// Add any other function-specific initialization of member variables
// or other checks here. Throw a ClientVisibleException to signal
// an error to the client.

// Complete the contract

contract.setOutputInfo(new OutputInfo(outputColumns));

* This method does processing of input rows. We did select "ROW FUNCTION", so
* each row will be processed one-by-one. Set of rows can be accessed via {@link RowIterator}
* @param inputIterator gives access to input set of rows.
* @param outputEmitter collects processed rows.
* */
public void operateOnSomeRows(RowIterator inputIterator, RowEmitter outputEmitter) {
String logLine = null;
List<String> values = null;
while (inputIterator.advanceToNextRow()) {
logLine = inputIterator.getStringAt(0);
values = splitLogLine(logLine);
if(!values.isEmpty() && checkArgClauseIncludeExclude(values.get(RESOURCE_COL_NUM))){
outputEmitter.addString(values.get(0)); //CLIENT
outputEmitter.addString(values.get(1)); //REMOTE_USER

Timestamp ts = Timestamp.fromSqlTimestamp(SqlType.timestamp(), toSqlTimestamp(toDate(values.get(2))));
outputEmitter.addTimestamp(ts); //REQUEST_TS

outputEmitter.addString(values.get(3)); //REQUEST_VERB
outputEmitter.addString(values.get(4)); //REQUEST
outputEmitter.addString(values.get(5)); //PROTOCOL
outputEmitter.addInt(Integer.valueOf(values.get(6))); //RESPONSE_CODE
// Server can return code 304, it means client should get resource from cache,
// because resource hasn't been modified. Content length won't be specified in access log.
outputEmitter.addInt(Integer.valueOf(values.get(7))); //CONTENT_LENGTH
outputEmitter.addString(values.get(8)); //USER_AGENT

}catch (Exception e) {
throw new ClientVisibleException("Error while parsing log line["+logLine+"], parsed line["+values+"]");

I did show you the key points:

constructor which accepts RuntimeContract and defines input/output columns, and a method that parses input log line and produces row for result set. As you can see the solution is rather simple and easy to understand. Feel free to download the code and modify it. The part is not interesting and I won't cover it here. Please leave you questions in comments, I'll answer you.

Deploy the function

Rather easy and quick process. I suggest you to write ant script for compilation, package and delpoyment procedures.

Now let's export JAR file.

Specify export package. Exclude everything excluding our classes.

Copy ApacheWebLogParserFunction.jar to /home/aster/demo/ext-function on queen server.

Create  install.sql file with text:

-- installApacheWebLogParserFunction
\remove ApacheWebLogParserFunction.jar
\install ApacheWebLogParserFunction.jar;

Be sure that install.sql and ApacheWebLogParserFunction.jar do have appropritate rights. I suggest you to set 777.

run install.sql

aster@linux-qvsn:~> cd /home/aster/demo/ext-function
aster@linux-qvsn:~/demo/ext-function> ls -la
total 16
drwxr-xr-x 2 root root 4096 2012-09-25 11:30 .
drwxr-xr-x 7 aster users 4096 2012-09-25 11:29 ..
-rwxrwxrwx 1 aster root 182 2012-09-25 11:00 ApacheWebLogParserFunction.jar
-rwxrwxrwx 1 aster root 77 2012-09-25 11:30 install.sql
aster@linux-qvsn:~/demo/ext-function> act
Welcome to act 4.6.2, the Aster nCluster Terminal.

Type: \copyright for distribution terms
\h for help with SQL commands
\? for help with act commands
\g or terminate with semicolon to execute query
\q to quit

beehive=> \i install.sql
beehive=> \dF
List of installed files
schemaname | filename | fileowner | uploadtime
public | ApacheWebLogParserFunction.jar | beehive | 2012-09-25 11:35:49.938061

It's time to run our function via Datasource explorer

Oups.. We got an exception! No problem, just modify the code and repeat deployment step.

Also you can inspect logs here using AMC queen console

That'a all! We've just finished writing our first SQL-MR function. Download the code, import it and try to run. Ask your questions in comments, I'll do my best to help you.


Re: Develop SQL MR function using AsterData Developer Express, part 2: develop SQL-MR function


Can you please share the complete Java Code?



Teradata Employee

Re: Develop SQL MR function using AsterData Developer Express, part 2: develop SQL-MR function

when i run query - select * from ApacheWebLogParserFunction ( on ....) ..

i get correct output

but when i run =-  select CLIENT,REMOTE_USER from  ApacheWebLogParserFunction ( on ....) ..

i get error : CLIENT is not exist???

Teradata Employee

Re: Develop SQL MR function using AsterData Developer Express, part 2: develop SQL-MR function

Not sure if you got the query to work but you have two ways to do it.  Aster likes things in lower case so if you do select client that will work...also you can quote the column names if you want to use exact case so select "CLIENT" will work also.