Jeff Holoman's Blog

Archive for the ‘Big Data’ Category

Using Hadoop / HBase with APEX

Posted by jholoman on August 21, 2012

This blog is a cross-post from my company website, www.biascorp.com/blog

Increasingly we are getting requests from our customers surrounding Big Data. In this post we’ll explain in detail one easy way to integrate Big Data technologies with a more traditional Oracle stack.

Recently BIAS was approached by one of our large customers with an interesting problem. The specifics are abstracted, but essentially they conduct a huge number of transactions daily across many thousands of locations, using a Point-of-Sale style system. Additionally, digital images are captured and associated with each transaction.

These images are useful in certain situations, but are not critical for the day-to-day operations of the business. The images are small(50Kb), consistent in size and are sent to a centralized location via xml after being base64 encoded.
The images come over the wire as such:

<transactions>
<transaction id="GUID" timestamp="timestamp">
<image1>base_64 encoded jpg</image1>
<image2>base_64 encoded jpg</image2>
</transaction>
<transaction id="GUID" timestamp="timestamp">
<image1>base_64 encoded jpg</image1>
<image2>base_64 encoded jpg</image2>
</transaction>
</transactions>

Our client wanted to explore the possibility of not storing the BLOBs directly in the database, but still wanted efficient random-access on-demand. They are already running Cloudera’s Distribution including Apache Hadoop (CDH3) and asked if we could help develop a solution to process and display the images quickly. We put together this very basic POC over a weekend based on HBase as a potential solution, and another way to utilize their Hadoop cluster.

As it was the weekend, access to sample data was not available, so I took some license and generated it myself. I started with two images of approximately 50kb. I used these two images for all of the images in this POC.

Here’s a short script to generate the xml file using Python. It was useful to be able to move the number of transactions up and down to test the MapReduce jobs that I was running

#!/usr/bin/python 

import datetime
import uuid
a = open("pic_to_shrink.jpg", "rb").read().encode("base64")
b = open("pic_to_shrink2.jpg", "rb").read().encode("base64")
c = uuid.uuid4()
print c
e = datetime.datetime.now()
d = '<?xml version="1.0" encoding="UTF-8" ?>\n'
d = d + '<transactions>\n'
i = 0
while i < 5000:
        d = d + '<transaction id="%s" timestamp="%s">\n' % (c, e)
        d = d + '<image1>'+a+'</image1>'
        d = d + '<image2>'+b+'</image2>'
        d = d + '</transaction>'
        i=i+1
d = d + '</transactions>'
file = open("transactions.xml", 'w')
file.write(d)
file.close()

This produces XML like this (content of <imageN tags trimmed)

<?xml version="1.0" encoding="UTF-8" ?>
<transactions>
<transaction id="c59d6ca3-d9a8-4e27-9ca8-274d3aa21fc3" timestamp="2012-07-26 18:05:06.875927">
<image1>/9j/4AAQSkZJRgABAQEASABIAAD/4ge4SUNDX1BST0ZJTEUAAQEAAAeoYXBwbAIgAABtbnRyUkdC
ZXNjAAABCAAAAG9kc2NtAAABeAAABWxjcHJ0AAAG5AAAADh3dHB0AAAHHAAAABRyWFlaAAAHMAAA
AFFFFABRRRQAUUUUAFFFFABRRRQAUUUUAFFFFABRRRQAUUUUAFFFFABRRRQAUUUUAf/Z</image1>
<image2>/9j/4AAQSkZJRgABAQEASABIAAD/4ge4SUNDX1BST0ZJTEUAAQEAAAeoYXBwbAIgAABtbnRyUkdC
IFhZWiAH2QACABkACwAaAAthY3NwQVBQTAAAAABhcHBsAAAAAAAAAAAAAAAAAAAAAAAA9tYAAQAA
lnnl8LagkcUaFmdjbSAKAOSSeMCgD+PsH5Afav6pv2M7H+zv+CW/wSt8YJ8NxzH6yO8h/wDQq/mC</image2>
</transaction>
</transactions>

This script generates the data in XML which is not ideal for storage. In a production application we would likely utilize native Java MapReduce to handle the processing but it was more convenient for the POC to use a combination of Hadoop Streaming, Python and Pig.

A look at the Hadoop wiki shows that tags can be configured to determine which splits are sent to the map tasks.

I found a nice summary by David Hill for processing xml in python with eTree Here. The script below borrows heavily.

#!/usr/bin/python
import sys
import cStringIO
import elementtree.ElementTree as ET

if __name__ == '__main__':
    in_it = False

    for line in sys.stdin:
        line = line.strip()
        if line.find("<transaction id=") != -1:
            in_it=True
        # now that we are in the transaction, we want need to write it out to the StringIO for parsing with the elementtree
            Sbuffer= cStringIO.StringIO()
            Sbuffer.write(line)
        elif line.find("</transaction>") != -1:
            in_it=False
            Sbuffer.write(line)
            value=Sbuffer.getvalue()
            Sbuffer.close()
            Sbuffer=None
            xml_root = ET.fromstring(value)
            id = (xml_root.get("id"))
            ts = (xml_root.get("timestamp"))
            image1 = (xml_root.find('image1').text)
            image2 = (xml_root.find('image2').text)
            print '%s\t%s\t%s\t%s' % (id,ts,image1,image2)
        else:
            if in_it:
                Sbuffer.write(line)

In my testing I was getting duplicate keys sent to the reducer, so I wanted to make sure that I was only emitting 1 line per transaction-id that was in the original file.
I have this in the queue to research a bit more. I suspect that this has to do with the StreamXMLRecordReader implementation as there’s been some other buzz about this out on the ‘net.

#!/usr/bin/env python
import sys
prev_key = 'b'
if __name__ == '__main__':
    for line in sys.stdin:
        #just grab the key
        current_key=line.split('\t')[0]
    #compare the keys
        if current_key != prev_key:
            current_line=line.strip()
            prev_key = current_key
            print current_line

Next is to execute the job.

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
-mapper ./map.py \
-file ./map.py \
-reducer ./reducer.py \
-file ./reducer.py \
-inputreader "StreamXmlRecordReader,begin=<transaction id,end=</transaction>" \
-input blog/transactions.xml \
-output blog/output

Now the data has been parsed out of xml format and contains one record per line.

At this point it’s probably worthwhile to take a moment and discuss HBase and Pig.
HBase is an open-source, distributed, versioned, column-oriented store modeled after Google’s BigTable. At its core it’s a Map, like an associative array or JavaScript object, and the map is indexed by a row key, column key, and a timestamp.
The row keys are kept in sorted order and it’s built on a distributed filesystem, HDFS. Additionally HBase provides a great REST interface called Stargate that will be utilized here.

A simple table with one column family can be created easily:

hbase> create 'images', {NAME => 'image'}

Pig is a higher level abstraction platform for writing jobs against Hadoop. Pig provides a scripting language that makes data analysis and processing easier. In this case, I want to read our data from the results of the last MapReduce job and load the data into HBase. Also, because the REST Server for HBase base64 encodes values, I just want to store the original blob in the HBase table. To that end, I wrote a simple User-Defined function to decode from base64.

package myudfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.*;
import org.apache.commons.codec.binary.Base64;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.util.WrappedIOException;

public class DECODE64 extends EvalFunc<DataByteArray>
{
    public DataByteArray exec(Tuple input) throws IOException {
            if (input == null || input.size() == 0)
                        return null;
            try{
                 String str = (String)input.get(0);
                 byte[] decoded = Base64.decodeBase64(str);
                DataByteArray val = new DataByteArray(decoded);
                return val;
                }catch(Exception e){
   throw WrappedIOException.wrap("Caught exception processing input row ", e);
           }

And the pig script:

#!/usr/bin/pig
REGISTER myudfs.jar;
data = LOAD '/user/jholoman/blog/output/part-00000' using PigStorage() as (id:chararray, ts:chararray, image1:chararray, image2:chararray);

A = FOREACH data generate id, ts, myudfs.DECODE64(image1), myudfs.DECODE64(image2);
store A into 'hbase://images' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('image:ts image:image1 image:image2');

$pig image_load_to_hbase.pig

Verification that the data has been successfully loaded:

hbase(main):001:0> count 'images'
Current count: 1000, row: 363b8d19-d6df-46d8-beb5-321ba40b63f5                                                                        
Current count: 2000, row: 6aad1a37-0a4a-4f66-a8d8-115a24ac53ee                                                                        
Current count: 3000, row: 9c6605ef-4ec9-4961-9e46-6cabdab3d7ab                                                                        
Current count: 4000, row: ce2493ac-7a16-44f0-838a-fd474e38918c                                                                        
Current count: 5000, row: fffaebe5-2268-4961-8524-f34df95062c5                                                                        
5000 row(s) in 6.4160 seconds
hbase(main):002:0> scan 'images', {STARTROW => '363b8d19-d6df-46d8-beb5-321ba40b63f5', COLUMNS => 'image:ts', LIMIT => 1}
ROW                                COLUMN+CELL                                                                                        
 363b8d19-d6df-46d8-beb5-321ba40b6 column=image:ts, timestamp=1343353321129, value=2012-07-26 18:05:07.284005                         
 3f5                                                                                                                                  
1 row(s) in 0.0470 seconds

Starting the HBase REST Service is easy:

$ /usr/lib/hbase/bin/hbase-daemon.sh start rest -p 8000

And works as expected:

curl -H "Accept: text/xml" http://localhost:8000/images/363b8d19-d6df-46d8-beb5-321ba40b63f5/image:ts<?xml version="1.0" encoding="UTF-8" standalone="yes"?><CellSet><Row key="MzYzYjhkMTktZDZkZi00NmQ4LWJlYjUtMzIxYmE0MGI2M2Y1"><Cell timestamp="1343353321129" column="aW1hZ2U6dHM=">MjAxMi0wNy0yNiAxODowNTowNy4yODQwMDU=</Cell></Row></CellSet>

We previously demonstrated the Oracle Big Data Connectors and used an APEX environment to show off the Direct Connector for HDFS. It made sense to use that environment to demonstrate displaying the images from HBase.
First, the on-demand process:

declare
  l_clob clob;
  l_buffer         varchar2(32767);
  l_amount         number;
  l_offset         number;
  l_rowkey         varchar2(1000);
begin
   l_rowkey := apex_application.g_x01;
  l_clob := apex_web_service.make_rest_request(
              p_url => 'http://hbase_host:8000/images/'||l_rowkey,
              p_http_method => 'GET');
 
    l_amount := 32000;
    l_offset := 1;
    begin
        loop
            dbms_lob.read( l_clob, l_amount, l_offset, l_buffer );
            htp.p(l_buffer);
            l_offset := l_offset + l_amount;
            l_amount := 32000;
        end loop;
    exception
        when no_data_found then
            null;
    end;
end;

And the code to call it:

function getHBaseImages(pRowKey){
var image1;
var image2;
var ts;
var get = new htmldb_Get(null, $v('pFlowId'),'APPLICATION_PROCESS=getHBaseImages',$v('pFlowStepId'));

  get.addParam('x01', pRowKey);
  gReturn = get.get();
  get = null;
  gXML = apex.jQuery.parseXML(gReturn);
  vals=apex.jQuery(gXML).find('Cell');
  $(vals).each(function(i,o) {
   var returnCell = atob($(this).attr("column"));
   switch (returnCell) {
        case 'image:image1':
            image1 = $(this).text();
         break;
         case 'image:image2':
            image2 = $(this).text();
         break;
         case 'image:ts':
            ts = atob($(this).text());
           }
       });

      $('.image1').attr('src', 'data:image/jpg;base64,'+image1);
      $('.image2').attr('src', 'data:image/jpg;base64,'+image2);
      $('#ts').text(ts);
}

As mentioned previously, the HBase REST service encodes all the values. For the images this is no problem as the browser can display the images directly. The code above calls HBase, parses the resultant xml and sets the source for 2 image elements on the page and a span for the timestamp. From here setting up a link on the table is trivial, and on-click, the images are pulled from HBase.

Check this out in action!

In this post we covered one potential scenario for integrating Big Data technologies with Oracle. It didn’t take very long to put this POC together and demonstrates one technique for accessing data in Hadoop/HBase. Stay tuned for more Big Data examples…

Posted in APEX, Big Data, Hadoop, HBase | 3 Comments »