Running Arbitrary R Code in Parallel with Teradata AsterR

Learn Data Science
Teradata Employee

Summary

In this post I show how we can run arbitrary R code in parallel over distributed data using the Teradata AsterR package.


I begin by explaining what is the model of parallel processing used, which is very similar to the Map-Reduce paradigm, and what functions from the Teradata AsterR package provide that functionality.


Then I show an example implementation of the famous canonical word count example implemented with the Teradata AsterR package.



The Split-Apply-Combine Strategy


There is a set of functions within the Teradata AsterR framework known as Map-Reduce Runners, which allows the execution of arbitrary R code in parallel over distributed data in the Aster cluster.


Those functions operate in a way known as Split-Apply-Combine, which is a processing paradigm much similar to Map-Reduce and widely used in R with packages such as data.table, plyr, dplyr, and in the "apply" family of R functions.


In Split-Apply-Combine, sometimes the Split and Apply steps correspond to Map, and the Combine corresponds to Reduce, or the Split corresponds to Map, and the Apply and Combine correspond to Reduce.


For example, suppose you have a dataset with 2 columns representing categories and respective quantities. The figure below illustrates how the total quantities grouped by category are computed in the Split-Apply-Combine paradigm:



In the example above, when compared to Map-Reduce, Split corresponds to an Identity Map and Apply with Combine corresponds to the Reduce step.



Teradata AsterR Map-Reduce Runners


Teradata AsterR currently implements three functions in the group known as Map-Reduce Runners, which can be used to implement the Split-Apply-Combine strategy as described above. Here I present a basic description and examples. Please note that these functions have much more parameter options than illustrated by the simple examples here. Please refer to the Teradata AsterR package documentation for more information.


  • ta.eval

This function executes arbitrary R code on one or more Aster VWorkers. It is a data-generating function and it does not take any input data as argument. In the example below the function generates a vector c(1:5) and it  is executed two times independently and in parallel according to the number of VWorkers available in the cluster:


ta.eval(FUN=function(x) {1:x}, x=5, FUN.tasks=2)

[1] 1 2 3 4 5 1 2 3 4 5

  • ta.apply

It applies an arbitrary R function on a given Aster Virtual Data Frame. The function is executed on rows, columns, rows followed by columns, columns followed by rows, or the entire Virtual Data Frame. It can optionally apply a combiner function on the results of the first function. The first example below computes the sum of each row in a Virtual Data Frame. Each VWorker can operate on one or more rows, in parallel. The second example computes the sum for each column and then multiply the results by 2:


d <- as.ta.data.frame(data.frame(c1=c(1:3), c2=c(4:6)))

d

  c1 c2

1  1  4

2  2  5

3  3  6


ta.apply(tadf=d, MARGIN=1, FUN=sum)

[1] 5 7 9

ta.apply(tadf=d, MARGIN=2, FUN=sum, COMBINER.FUN=function(x) {2*x})

c1 c2

12 30

ta.tapply

Similar to ta.apply, but operates independently on partitions of a given Virtual Data Frame. The example below computes the sum for each column independently for each partition, which is defined as column c3:


d <- as.ta.data.frame(data.frame(c1=c(1:4), c2=c(5:8), c3=c(1,1,2,2)))

d

  c1 c2 c3

1  1  5  1

2  2  6  1

3  3  7  2

4  4  8  2


ta.tapply(tadf=d, INDEX=d[,'c3'], FUN.MARGIN=2, FUN=sum)

$`2`

c1 c2 c3

7 15  4

$`1`

c1 c2 c3

3 11  2



A Word Count Example

Now I show an application example of the ta.apply function to implement text pre-processing and word count. The idea is to show an algorithm that could be executed in parallel on a very large dataset distributed in an Aster cluster.


We begin by loading some text into the Aster cluster and representing it as a Virtual Data Frame. Note that this is for demonstration purposes. In a real situation, data would already be in an Aster table and we would just reference it from R as a Virtual Data Frame. I'm using a news dataset comprised of short paragraphs in this example:


library(TeradataAsterR)

library(readr)

# CONNECT TO ASTER

ta.connect('AsterExpress')

# READ THE DATA AND CREATE A VIRTUAL DATA FRAME FROM IT

# DATA IS REPRESENTED IN A VIRTUAL DATA FRAME COMPRISED OF ONE COLUMN NAMED "text"

ta_data <- as.ta.data.frame(data.frame(text=sample(read_lines('en_US.news.txt'), 10000)))


# SHOW TWO LINES OF TEXT

print(ta_data, 2)


text

1     øHer father, Earl Jr., is Tiger's half brother, and she says Earl Sr., her paternal grandfather, introduced her to the game and "got me started when I was young."

2     TRENTON — The attorney for one of two troopers suspended amid allegations they led a caravan of sports cars racing to Atlantic City last month said Thursday that his own investigation shows the State Police have escorted "dozens" of similar caravans in the past, though not at high speeds.


Notice that the text contains some non-ascii, punctuation, and upper-case characters. Probably it also contains numbers. So the first thing to do is to clean the text before counting words.


This will be accomplished by applying a custom text cleaning function over each row of data with the ta.apply function. This will be executed in parallel, where each available VWorker operates on a subset of the rows. Notice that the result of the ta.apply function is written to an Aster table named "news_data_clean" and then we create a reference to it in a Virtual Data Frame named "ta_data_clean":


# FUNCTION FOR REMOVING PUNCTUATION, NUMBERS, NON-ASCII CHARACTERS, AND CONVERTING TO LOWER-CASE

cleanText <- function(x) {

  x <- gsub(pattern='[[:punct:]]', replacement='', x, perl=T)

  x <- gsub(pattern='[[:digit:]]', replacement='', x, perl=T)

  x <- tolower(x)

  x <- iconv(x, to='ASCII', sub='')

  x

}


# APPLY THE cleanText FUNCTION ON THE ROWS OF THE GIVEN VIRTUAL DATA FRAME

# AND WRITE THE RESULT TO AN ASTER TABLE

ta.apply(ta_data, MARGIN = 1, FUN = cleanText, out.tadf = list(table='news_data_clean',

                                                               schemaName='public',

                                                               tableType='fact',

                                                               partitionKey='text',

                                                               columns=c('text')))

# CREATE A VIRTUAL DATA FRAME FOR REFERENCING THE CLEANED TEXT AND SHOW ONE LINE

ta_data_clean <- ta.data.frame('news_data_clean', schemaName = 'public')

print(ta_data_clean, 1)


text

1     her father earl jr is tigers half brother and she says earl sr her paternal grandfather introduced her to the game and got me started when i was young


Now that we have the prepared text, we can start counting words. The classic word counting algorithm could be implemented with the Split-Apply-Combine strategy as illustrated by the diagram below:



We will do exactly that by applying custom functions to the text using the ta.apply function from Teradata AsterR. Notice that the wcMapper function below corresponds to the Split and Apply steps and the wcReducer function corresponds to the Combine step. Notice also that the intermediate results are in the form of lists and this is informed when calling the ta.apply function by passing the argument FUN.result = 'list'.


The result of the ta.apply call is written to a local R object in this case (wc, in the listing below). This is OK for a word-count table of that type, as the vocabulary is usually in the order of few tens of thousands words:


# MAPPER FUNCTION TO ASSIGN COUNT=1 TO EACH WORD

wcMapper <- function(x) {

x <- unlist(strsplit(x, split='[ ]+'))

x <- x[x != '']

list(w=x, c=rep(1, length(x)))

}

# REDUCER FUNCTION FOR AGGREGATING WORD COUNTS

wcReducer <- function(x) {

x <- data.frame(w=as.character(x$w), c=as.numeric(x$c))

aggregate(c~w, x, sum)

}


# APPLY THE wcMapper FUNCTION TO A CHUNK OF TEXT, IN PARALLEL, ON EACH VWORKER

# THOSE CHUNKS OF TEXT ARE DEFINED AUTOMATICALLY BY THE FRAMEWORK WHEN PARTITIONING THE DATA

# THEN APPLY THE wcReducer FUNCTION OVER THE CONCATENATED RESULTS FROM THE wcMapper FUNCTION

wc <- ta.apply(ta_data_clean, MARGIN = 2, FUN = wcMapper, FUN.upload = 'script',

               COMBINER.FUN = wcReducer, COMBINER.FUN.upload = 'script',

               FUN.result = 'list')


# ORDER wc BY WORD COUNTS IN DECREASING ORDER

wc <- wc[order(-wc$text.c),]


# SIZE OF wc

dim(wc)

[1] 30242     2


# PRINT THE FIRST 10 ENTRIES OF wc

wc[1:10,]

      text.w text.c

27167    the  19997

27506     to   9011

1          a   8671

948      and   8583

18765     of   7610

13263     in   6688

10268    for   3706

27162   that   3442

13902     is   2809

18906     on   2679

From now on we can do anything we want with our word-count table in the local R environment. Notice that the 10 most frequent words are the so called stop-words, as expected. Just as an illustration, we can use the computed word-count table to plot a word-cloud without those stop-words and see that the most common words indeed represent what one would expect from a news dataset:


library(tm)

wc$text.w <- removeWords(as.character(wc$text.w), stopwords())

wc <- subset(wc, !text.w == '')

library(wordcloud)

wordcloud(words=wc$text.w, freq=wc$text.c, min.freq=20, max.words=75, scale=c(4, 0.5),

          colors=brewer.pal(8, "Dark2"), random.order=F, rot.per=0)