Most Pig tutorials you will find assume that you are working with data where you know all the column names ahead of time, and that the column names themselves are just labels, versus being composites of labels and data. For example, when working with HBase, it’s actually not uncommon for both of those assumptions to be false. Being a columnar database, it’s very common to be working to rows that have thousands of columns. Under that circumstance, it’s also common for the column names themselves to encode to dimensions, such as date and counter type.

How do you solve this mismatch? If you’re in the early stages of designing a schema, you could reconsider a more row based approach. If you have to work with an existing schema, however, you can with the help of Pig UDFs.

Say we have the following table:

rowkey cf1:20130101post cf1:20130101comment cf1:20130101like cf1:20130102post ...
ce410-00005031-00089182 147 5 41 153
ce410-00005031-00021915 1894 33 86 1945
5faa4-00009521-00019828 30 2 8 31
...

Here there is a composite row key, but also composite column keys. Because the date is part of the column keys, there are potentially many, many columns. Enumerating them all in your Pig scripts in impractical. Notice that they are also in the same column family. To load them all, you can do the following in Pig:

data = load 'hbase://table_name' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('cf1:*', '-loadKey true') AS (id:chararray, stats:map[int]);
illustrate data;

This will result in all columns being loaded into a Pig map, which is just a collection of tuples:

-----------------------------------------------------------------------------------------------------
| data         | id:chararray            | stats:map(:int)                                          |
-----------------------------------------------------------------------------------------------------
|              | ce410-00005031-00089182 | {20130101post=147,20130101comment=5,20130101like=41,...} |
-----------------------------------------------------------------------------------------------------

So, now you have loaded all the data, but how to you parse the column names into their respective parts, so you can apply logic to the values? Here is a very simply Python implementation of a UDF that will turn that map into a bag:

@outputSchema("values:bag{t:tuple(key, value)}")
def bag_of_tuples(map_dict):
    return map_dict.items()

You can include this UDF (place the above in a file called udfs.py in the current working directory for pig), and invoke it like this:

register 'udfs.py' using jython as py
data = load 'hbase://table_name' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('cf1:*', '-loadKey true') AS (id:chararray, stats:map[int]);
databag = foreach data generate id, FLATTEN(py.bag_of_tuples(stats));
illustrate databag;

This is taking advantage of the built-in FLATTEN operator, which takes a bag and does a cross product with bag’s row to produce N new rows.

------------------------------------------------------------------------------
| databag      | id:chararray            | key:bytearray  | value:bytearray  |
------------------------------------------------------------------------------
|              | ce410-00005031-00089182 | 20130101post    | 147             |
------------------------------------------------------------------------------
|              | ce410-00005031-00089182 | 20130101comment | 5               |
------------------------------------------------------------------------------
|              | ce410-00005031-00089182 | 20130101like    | 41              |
------------------------------------------------------------------------------

You can then process your data as normal. You can then write your data bag to HBase in the same format by using the built-in UDFs TOMAP and and same * syntax. Assuming you have produced new column names and values in your script, you can do:

...
mapped = foreach processed_data generate TOMAP(columnname, value) as stats;
store mapped into 'hbase://table_name' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('stats:*');