Hadoop MapReduce job to perform GeoEnrichment on BigData.
What is GeoEnrichment? An example would best describe it. Given a big set of customer location records, I would like each location to be GeoEnriched with the average income of the zip code where that location falls into and with the number of people between the age of 25 and 30 that live in that zip code.
Of course the key to this whole thing is the spatial reference data 🙂
I’ve implemented two search methods:
- Point-In-Polygon method
- Nearest Neighbor Weighted method
The Point-In-Polygon (PiP) method is fairly simple. Given a point, find the polygon it falls into and pull from the polygon feature the selected attributes and add them to the original point.
The Nearest Neighbor Weighted (NNW) method finds all the reference points within a specified distance and weights each point based on its distance. The GeoEnrichment value is the sum of the weighted attribute value.
- w is weight of the reference point
- d is the distance between the reference point from the search point
- R is the search radius
$ git clone https://github.com/kungfoo/geohash-java.git $ mvn install $ git clone https://github.com/Esri/geometry-api-java.git $ mvn install
Note: In my dev environment, I had to symbolic soft link java to /bin/java to pass the maven test phase:
$ sudo ln -s /usr/bin/java /bin/java (optional, depending on your dev environment) $ mvn package
The following will generate 1,000,000 points for GeoEnrichment:
$ hadoop fs -rm -skipTrash data.tsv $ awk -f data.awk | hadoop fs -put - data.tsv
Use the world countries for GeoEnrichment. Put the zip file (included in the data folder) that contains the world countries in shapefile format in HDFS:
$ hadoop fs -put cntry06.zip cntry06.zip
$ hadoop fs -rm -R -skipTrash output $ hadoop jar target/GeoEnrichment-1.0-SNAPSHOT-job.jar\ -Dcom.esri.lonField=1\ -Dcom.esri.latField=2\ -Dcom.esri.column="attr:POP2005:%.0f:long"\ -Dcom.esri.searchClass=com.esri.SearchShapefileIndexPolygon\ -Dcom.esri.writeAll=false\ /user/cloudera/data.tsv\ /user/cloudera/output\ /user/cloudera/cntry06.zip
Remove the HDFS output folder. Run the Hadoop job where the longitude values are in the second (zero based) column in the input path and the latitude values are in the third column. By default, the fields are tab separated. Geo enrich the output with the POP2005 field values of type long from the shapefile inside cntry06.zip. Represent that column as a floating point with no decimal values %.0f. Use the com.esri.SearchShapefileIndexPolygon class to perform the GeoEnrichment using a point in polygon search. Only write to the output path the locations that are inside the reference polygons. And finally, the input data is located in /user/cloudera/data.tsv, write the job output to the folder /user/cloudera/output and add /user/cloudera/cntry06.zip as a cached archive in the DistributedCache.
The value of com.esri.column is of the form:
|Column Family||Qualifier||printf float format||i(nteger) or l(ong) or f(loat) or d(ouble)|
$ echo -e "ID\tLON\tLAT\tPOP2005" > /tmp/output.txt $ hadoop fs -cat /user/cloudera/output/part-* >> /tmp/output.txt
This is a mapper only job that relies on the speed of HDFS and the distributed nature of Hadoop to plow through the logic. Once the input is decoded into a latitude and longitude values, the GeoEnrichment search is invoked. I wanted to try out different search environments where the GeoEnrichment layer can be in-proc or out-of-proc. If the reference layer is small enough (ie. US Census Block Groups feature class is about 240,000 features) then an in-proc solution makes sense as, in theory, your Hadoop data nodes should have enough memory.
This PiP implementation relies onGeoTools andJTS to read a polygon shapefile from theDistributedCache and create an in-memory spatial index based on the envelope of the features. When the search function is invoked, a query is performed on that spatial index to find the overlapping feature envelopes and is followed with a ‘contains’ operation to insure that it is truly in feature geometry.
Note: for this to work very very quickly, all geometries have to be ‘prepared’
final PreparedGeometry preparedGeometry = PreparedGeometryFactory.prepare(geometry);
This NNW search implementation relies on points rather than polygons. Again, an in-memory spatial index is created and all the points in the GeoEnrichment layer.
There will be cases when the reference data is too big to fit into the mapper memory and has to be searched “externally”. Based on mypost on spatial index of BigData, I will useHBase andGeoHashing to spatially perform an NNW search The design of the rowkey in an HBase table is very important step depending on the table usage. In this case, it is heavily reliant on a scan search of a bounded range. The reference points are placed in HBase and the rowkey is as follows:
|Geohash Code||Longitude||Latitude||Unique ID|
The geohash code ensures that all the reference points are “close” together for a scan, the embedding of the lat/lon values makes the rowkey a bit more “unique” as depending on the geohash level, two “close” lat/lon coordinates might produce the same geohash. In addition, it has the benefit of not storing the coordinate in a family/qualifier value. And finally to make the rowkey really unique, an identifier as appended – think here of a bunch of customers with unique identifiers living in the same tall building at different floors.
This is an InMemory PiP search where reference polygons are stored in an HBase table in theEsri shape binary format. All the polygons are retrieved into the mapper memory space and spatially indexed using theEsri Geometry For Java QuadTree implementation. The search uses the quad tree to locate the overlapping polygons based on their envelope and then a further ‘contains’ operation is performed to find out which one polygon’s attributes should be used in the GeoEnrichment process.
Take a look at the src/test/java folder for MapReduce map function testing examples and more importantly how to start a local mini HBase cluster for scan testing and local MapReduce cluster for job testing.