Using JMSL In Apache Spark
May 25, 2016

Using JMSL in Apache Spark

Open Source
Data Mining & Big Data

Data in all domains is getting bigger — and because more data presumably equates to more accuracy in measurement, “big data” is generating the promise of new insights and understanding. However, the technical challenges posed by big data can overwhelm engineering teams who are seeking ways to optimize their work effort.

Leveraging existing algorithms in Apache Spark applications can save developers weeks, or even months of development effort within the context of this exciting new cluster computing platform. This enables organizations to focus on solving business intelligence problems in their core business.

A collection of mathematical and statistical algorithms written for Java, JMSL Numerical Library, has hundreds of advanced algorithms. This article illustrates two examples on how to use JMSL classes on Spark resilient distributed datasets (RDDs) to leverage the advanced mathematics and statistics algorithms in JMSL in distributed Spark applications written in Java.

What Is Apache Spark?

Apache Spark, available as open source software from the Apache Software Foundation, is a flexible and fast framework designed to process big data on clusters of up to thousands of nodes. First developed by Matei Zaharia at UC Berkeley's AMPLab in 2009, it was donated to Apache in 2013, where it quickly became an official Apache “Top Level” project supported by hundreds of contributors.

Spark has gained rapidly in popularity and acceptance, and, has been used in production by more than 1000 organizations. It offers three key benefits to its users:

  • Ease of use
  • Speed
  • A general-purpose engine that allows users to "combine multiple types of computations (e.g., SQL queries, text processing, and machine learning) that might previously have required different engines."

Spark is written in Scala and natively supports Java, Scala, and Python.

Using BOND.YIELDDISC in RDD Transformations

This first example demonstrates how to use a JMSL class in transformations applied to an RDD. The example can be extended to use any JMSL class in this way. In other words, JMSL classes can be executed in worker nodes in Spark applications.

Discount bonds (or zero-coupon bonds) do not pay interest during the life of the security. Instead, they sell at a discount to their value at maturity. The Bond.yielddisc() method in JMSL returns the annual yield of a discount bond. 

The input data files used for this example are stored in the Hadoop Distributed File System (HDFS). Each row in the file contains this discount bond information: settlement date, maturity date, price, and redemption (partially shown in Figure 1).

Blog - Apache Spark Fig1

In Spark, the sequence to calculate the annual yield for a large number of discount bonds is below.

1. Calculate the annual yield for each row of data in the file. This is achieved by applying a map transformation and leveraging the Bond.yielddisc() method in JMSL. This step creates a JavaRDD<Double>.

JavaRDD<Double> parsedData = data.map(
           new Function<String, Double>() {
               public Double call(String s) throws Exception {
                   String[] sarray = s.split(" ");
 
                   GregorianCalendar settlement = new GregorianCalendar();
                   settlement.setTime(DateFormat.getDateInstance(
                                  DateFormat.SHORT, Locale.US).parse(sarray[0]));
                   GregorianCalendar maturity = new GregorianCalendar();
                   maturity.setTime(DateFormat.getDateInstance(
                                  DateFormat.SHORT, Locale.US).parse(sarray[1]));
                   Double price = Double.parseDouble(sarray[2]);
                   Double redemption = Double.parseDouble(sarray[3]);
                   DayCountBasis dcb = DayCountBasis.BasisNASD;
                      
                   return new Double(Bond.yielddisc(settlement, maturity,
                                  price.doubleValue(), redemption.doubleValue(), dcb));
               }
           }
);


2. Collect the annual yields computed in the previous step and print the results. To do this, apply a collect action. Since Spark performs lazy evaluations, this is the first step in the sequence in which Spark evaluates the expressions performed on the RDDs, and returns data to the driver program.

Double[] collectedData = parsedData.collect().toArray(new Double[0]);

DecimalFormat df = new DecimalFormat("0.00000000");
for (int i = 0; i < collectedData.length; i++) {
    System.out.println("The yield on the discounted bond is "
           + df.format(collectedData[i]));
}


The output of the Spark application displays the annual yields for the discount bonds (partially shown in Figure 2).

Blog - Apache Spark Fig2

Using Apriori in RDD Transformations and Actions

This second example demonstrates how to use JMSL classes in a sequence of transformations and actions applied to RDDs. In this example, we leverage the distributed nature of Spark and the lazy evaluations in Spark to minimize the data passed to the driver program from worker nodes.

Read more about the Apriori algorithm in the JMSL in Hadoop MapReduce applications white paper >>

Association rule discovery refers to the problem of detecting associations among discrete items. In market basket analysis (a primary application for association rule discovery), the discrete items are the different products purchased together in a transaction. Companies that sell products might use market basket analysis to help make better marketing decisions. For example, knowing that two products are highly associated, a company may run a promotion on one or the other, but not both. Other application areas for association rule discovery include text mining and bioinformatics.

The Apriori algorithm is one of the most popular algorithms for association rule discovery in transactional datasets. Apriori first mines the transactions for the frequent itemsets, where an itemset (or set of items) is frequent if it appears in more than a minimum number of transactions. The number of transactions containing an itemset is known as its support, and the minimum support (as a percentage of transactions) is a control parameter in the Apriori algorithm. From the collection of frequent itemsets, the algorithm then filters for significant associations among the items.

If an itemset is frequent in one transaction data set, it does not necessarily mean that it is frequent in the entire collection of data. This is why Apriori needs two passes through the data when the data is distributed. The primary reference for this procedure is Savasere, Omiecinski, and Navathe (1995).

The input data file used for this example is stored in the HDFS. Each row in the file contains blocks of 50 transactions involving five different item IDs (partially shown in Figure 3). The data is organized in pairs: the first element is the transaction ID and the second element is the item ID. For example, in the first block of transactions, the transaction having transaction ID 1 involved the items have IDs 3, 2, and 1. In the second block, the first transaction involved items {2, 1, 4, 3}. Transaction IDs must be unique within a block.

Blog - Apache Spark Fig3

In Spark, the sequence to apply the Apriori algorithm for association rule discovery is below.

1. Create a JavaRDD from the blocks of transactions stored in the input data file. The first column of the two-dimensional array contains the transaction IDs and the second column the item IDs. Each row represents a transaction ID / item ID combination, and all records for a single transaction ID must be in adjacent rows.

JavaRDD<int[][]> dataReal = data.map(
           new Function<String, int[][]>() {
               public int[][] call(String s) {
                   String delims = "[ {},]+";
                   String[] sarray = s.replaceFirst("\\{\\{", "").
                           split(delims);
                   int[][] x = new int[sarray.length / 2][2];
                   for (int i = 0; i < (sarray.length / 2); i++) {
                       for (int j = 0; j < 2; j++) {
                           x[i][j] = Integer.parseInt(sarray[(i * 2) + j]);
                       }
                   }
                   return x;
               }
           }
);
dataReal.cache();


2. Find the frequent itemsets in the RDD created in the previous step by applying a map transformation and leveraging the Apriori.getFrequentItemsets() method in JMSL on the RDD created in the previous step. This step creates a JavaRDD<Itemsets>.

// Find the frequent itemsets.
JavaRDD<Itemsets> frequentItemsets = dataReal.map(
           new Function<int[][], Itemsets>() {
               public Itemsets call(int[][] x) {
                   return Apriori.getFrequentItemsets(x, 5, 4, 0.30);
               }
           }
);


3. Find the union of all the frequent itemsets by applying a reduce action and leveraging the Apriori.getUnion() method in JMSL on the RDD created in the previous step. The union is the collection of all the itemsets which may be frequent for all of the data, such as the candidate frequent itemsets. Since Spark performs lazy evaluations, this is the first step in the sequence in which Spark evaluates the expressions performed on the RDDs created in Step #1 and Step #2, and returns data to the driver program.

// Get the union of the frequent itemsets.
final Itemsets cis = frequentItemsets.reduce(
         new Function2<Itemsets, Itemsets, Itemsets>() {
             public Itemsets call(Itemsets x, Itemsets y) {
                 return Apriori.getUnion(x, y);
             }
         }
);


4. For every itemset in the union, count the number of times it occurs in each transaction set. This requires a second evaluation of the RDD created in Step #1. This step outputs the frequencies of the candidate itemsets per transaction set. To do this, apply a map transformation and leverage the Apriori.countFrequency() method in JMSL. This step creates a JavaRDD<int[]>

// Count the frequencies of the itemsets in the union in each of the
// data sets.
JavaRDD<int[]> frequencies = dataReal.map(
        new Function() {
            public int[] call(int[][] x) {
                   return Apriori.countFrequency(cis, x);
            }
        }
);


5. Sum the frequencies of the candidate itemsets (computed in Step #4) over all transaction sets. This is achieved by applying a reduce action and leveraging the Apriori.sum() method in JMSL on the RDD created in the previous step.

// Sum the frequencies.
int[] freq = frequencies.reduce(
        new Function2() {
            public int[] call(int[] x, int[] y) {
                   return Apriori.sum(x, y);
            }
        }
);


6. Get the frequent itemsets of the union, and generate and print the association rules, by leveraging the Apriori.updateFrequentItemsets() and Apriori.getAssociationRules() methods in JMSL from the driver program.

// Get the frequent itemset of the union.
Itemsets itemsets = Apriori.updateFrequentItemsets(cis, freq);
itemsets.print();

// Generate and print the association rules.
AssociationRule.print(Apriori.getAssociationRules(itemsets, 0.80, 2.0));


The output of the Spark application shows the frequent itemsets and the association rules (shown in Figure 4).

Using JMSL In Apache Spark Figure 4

From these results, we can conclude that product ID at index {1} is frequent (63 percent of the transactions), and several of its supersets, {0 1}, {1 2}, {1 3}, etc. are also frequent in the transactions. We may also add a post-processing step to replace the product indices with product IDs or names to generate easy-to-read reports.

Use JMSL in Your Spark Applications

The JMSL Numerical Library is the broadest collection of mathematical, statistical, financial, data mining, and charting classes available in 100 percent Java. It enables analytics to be easily and seamlessly embedded in applications, database, and business intelligence solutions.

Try Free