Hadoop - MongoDB Webinar June 2014

  • Published on

  • View

  • Download




  • 1. Mongo-Hadoop Integration Justin Lee Software Engineer @ MongoDB

2. We will cover: what it is how it works a tour of what it can do A quick brieng on what Mongo and Hadoop are all about: (Q+A at the end) 3. document-oriented database with dynamic schema stores data in JSON-like documents: { _id : kosmo kramer, age : 42, location : { state : NY, zip : 10024 }, favorite_colors : [red, green] } different structure in each document values can be simple like strings and ints or nested documents 4. mongodb scales horizontally via sharding to handle lots of data and load app 5. Java-based framework for MapReduce Excels at batch processing on large data sets by taking advantage of parallelism map reduce created by google (white paper) implemented in open source by hadoop 6. Mongo-Hadoop Connector - Why Lots of people using Hadoop and Mongo separately but need integration Custom import/export scripts often used to get data in+out Scalability and exibility with changes in Hadoop or MongoDB congurations Need to process data across multiple sources custom scripts slow, fragile 7. Mongo-Hadoop Connector Turn MongoDB into a Hadoop-enabled lesystem: use as the input or output for Hadoop .BSON -or- input data .BSON -or- Hadoop Cluster output results bson file new in 1.1 bson is the output of mongodump 8. Mongo-Hadoop Connector Benets + Features Takes advantage of full multi-core parallelism to process data in Mongo Full integration with Hadoop and JVM ecosystems Can be used with Amazon Elastic MapReduce Can read and write backup les from local lesystem, HDFS, or S3 9. Mongo-Hadoop Connector Vanilla Java MapReduce write MapReduce code in ruby or if you dont want to use Java, support for Hadoop Streaming. Benets + Features can write your own language binding 10. Mongo-Hadoop Connector Support for Pig high-level scripting language for data analysis and building MapReduce workows Support for Hive SQL-like language for ad-hoc queries + analysis of data sets on Hadoop-compatible le systems Benets + Features 11. Mongo-Hadoop Connector How it works: Adapter examines the MongoDB input collection and calculates a set of splits from the data Each split gets assigned to a node in Hadoop cluster In parallel, Hadoop nodes pull data for splits from MongoDB (or BSON) and process them locally Hadoop merges results and streams output back to MongoDB or BSON 12. Tour of Mongo-Hadoop, by Example - Using Java MapReduce with Mongo-Hadoop - Using Hadoop Streaming - Pig and Hive with Mongo-Hadoop - Elastic MapReduce + BSON 13. { "_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"), "body" : "Here is our forecastnn ", "filename" : "1.", "headers" : { "From" : "phillip.allen@enron.com", "Subject" : "Forecast Info", "X-bcc" : "", "To" : "tim.belden@enron.com", "X-Origin" : "Allen-P", "X-From" : "Phillip K Allen", "Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)", "X-To" : "Tim Belden ", "Message-ID" : "", "Content-Type" : "text/plain; charset=us-ascii", "Mime-Version" : "1.0" } } Input Data: Enron e-mail corpus (501k records, 1.75Gb) each document is one email sender recipients 14. {"_id": {"t":"bob@enron.com", "f":"alice@enron.com"}, "count" : 14} {"_id": {"t":"bob@enron.com", "f":"eve@enron.com"}, "count" : 9} {"_id": {"t":"alice@enron.com", "f":"charlie@enron.com"}, "count" : 99} {"_id": {"t":"charlie@enron.com", "f":"bob@enron.com"}, "count" : 48} {"_id": {"t":"eve@enron.com", "f":"charlie@enron.com"}, "count" : 20} Lets use Hadoop to build a graph of (senders recipients) and the count of messages exchanged between each pair bob alice eve charlie 14 99 9 48 20 sample, simplied data nodes are people. edges/arrows # of msgs from A to B 15. Example 1 - Java MapReduce mongodb document passed into Hadoop MapReduce Map phase - each input doc gets passed through a Mapper function @Override publicvoidmap(NullWritablekey,BSONObjectval,finalContextcontext){ BSONObjectheaders=(BSONObject)val.get("headers"); if(headers.containsKey("From")&&headers.containsKey("To")){ Stringfrom=(String)headers.get("From"); Stringto=(String)headers.get("To"); String[]recips=to.split(","); for(inti=0;ipValues, finalContextpContext){ intsum=0; for(finalIntWritablevalue:pValues){ sum+=value.get(); } BSONObjectoutDoc=newBasicDBObjectBuilder().start() .add("f",pKey.from) .add("t",pKey.to) .get(); BSONWritablepkeyOut=newBSONWritable(outDoc); pContext.write(pkeyOut,newIntWritable(sum)); } 17. Example 1 - Java MapReduce (cont) mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat mongo.input.uri=mongodb://my-db:27017/enron.messages Read from MongoDB Read from BSON mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat mapred.input.dir=file:///tmp/messages.bson hdfs:///tmp/messages.bson s3:///tmp/messages.bson 18. Example 1 - Java MapReduce (cont) mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat mongo.output.uri=mongodb://my-db:27017/enron.results_out Write output to MongoDB Write output to BSON mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat mapred.output.dir=file:///tmp/results.bson hdfs:///tmp/results.bson s3:///tmp/results.bson 19. Results : Output Data mongos> db.results_out.find({"_id.t": /^kenneth.lay/}) { "_id" : { "t" : "kenneth.lay@enron.com", "f" : "15126-1267@m2.innovyx.com" }, "count" : 1 } { "_id" : { "t" : "kenneth.lay@enron.com", "f" : "2586207@www4.imakenews.com" }, "count" : 1 } { "_id" : { "t" : "kenneth.lay@enron.com", "f" : "40enron@enron.com" }, "count" : 2 } { "_id" : { "t" : "kenneth.lay@enron.com", "f" : "a..davis@enron.com" }, "count" : 2 } { "_id" : { "t" : "kenneth.lay@enron.com", "f" : "a..hughes@enron.com" }, "count" : 4 } { "_id" : { "t" : "kenneth.lay@enron.com", "f" : "a..lindholm@enron.com" }, "count" : 1 } { "_id" : { "t" : "kenneth.lay@enron.com", "f" : "a..schroeder@enron.com" }, "count" : 1 } ... has more 20. Example 2 - Hadoop Streaming Lets do the same Enron MapReduce job with Python instead of Java $ pip install pymongo_hadoop 21. Example 2 - Hadoop Streaming (cont) Hadoop passes data to an external process via STDOUT/STDIN map(k, v) map(k, v) map(k, v)map() JVM STDIN Python / Ruby / JS interpreter STDOUT Hadoop (JVM) def mapper(documents): . . . 22. Example 2 - Hadoop Streaming (cont) from pymongo_hadoop import BSONMapper def mapper(documents): i = 0 for doc in documents: i = i + 1 from_field = doc['headers']['From'] to_field = doc['headers']['To'] recips = [x.strip() for x in to_field.split(',')] for r in recips: yield {'_id': {'f':from_field, 't':r}, 'count': 1} BSONMapper(mapper) print >> sys.stderr, "Done Mapping." BSONMapper is pymongo layer that translates from hadoop streaming back to hadoop 23. Example 2 - Hadoop Streaming (cont) from pymongo_hadoop import BSONReducer def reducer(key, values): print >> sys.stderr, "Processing from/to %s" % str(key) _count = 0 for v in values: _count += v['count'] return {'_id': key, 'count': _count} BSONReducer(reducer) 24. Surviving Hadoop: making MapReduce easier Pig + Hive writing m/r jobs from scratch can be clunky and cumbersome 25. Example 3 - Mongo-Hadoop and Pig Lets do the same thing yet again, but this time using Pig Pig is a powerful language that can generate sophisticated MapReduce workows from simple scripts Can perform JOIN, GROUP, and execute user-dened functions (UDFs) 26. Example 3 - Mongo-Hadoop and Pig (cont) Pig directives for loading data: BSONLoader and MongoLoader Writing data out BSONStorage and MongoInsertStorage data = LOAD 'mongodb://localhost:27017/db.collection' using com.mongodb.hadoop.pig.MongoLoader; STORE records INTO 'file:///output.bson' using com.mongodb.hadoop.pig.BSONStorage; 27. Pig has its own special datatypes: Bags, Maps, and Tuples Mongo-Hadoop Connector intelligently converts between Pig datatypes and MongoDB datatypes Example 3 - Mongo-Hadoop and Pig (cont) bags -> arrays maps -> objects 28. raw = LOAD 'hdfs:///messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ; send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to; send_recip_filtered = FILTER send_recip BY to IS NOT NULL; send_recip_split = FOREACH send_recip_filtered GENERATE from as from, TRIM(FLATTEN(TOKENIZE(to))) as to; send_recip_grouped = GROUP send_recip_split BY (from, to); send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count; STORE send_recip_counted INTO 'file:///enron_results.bson' using com.mongodb.hadoop.pig.BSONStorage; Example 3 - Mongo-Hadoop and Pig (cont) 29. Hive with Mongo-Hadoop Similar idea to Pig - process your data without needing to write MapReduce code from scratch ...but with SQL as the language of choice 30. Hive with Mongo-Hadoop Sample Data: db.users db.users.find() { "_id": 1, "name": "Tom", "age": 28 } { "_id": 2, "name": "Alice", "age": 18 } { "_id": 3, "name": "Bob", "age": 29 } { "_id": 101, "name": "Scott", "age": 10 } { "_id": 104, "name": "Jesse", "age": 52 } { "_id": 110, "name": "Mike", "age": 32 } ... CREATE TABLE mongo_users (id int, name string, age int) STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler" WITH SERDEPROPERTIES( "mongo.columns.mapping" = "_id,name,age" ) TBLPROPERTIES ( "mongo.uri" = "mongodb://localhost:27017/test.users"); rst, declare the collection to be accessible in Hive: 31. Hive with Mongo-Hadoop ...then you can run SQL on it, like a table. SELECT name,age FROM mongo_users WHERE id > 100 ; SELECT * FROM mongo_users GROUP BY age WHERE id > 100 ; you can use GROUP BY: or JOIN multiple tables/collections together: SELECT * FROM mongo_users T1 JOIN user_emails T2 WHERE T1.id = T2.id; subset of SQL 32. Write the output of queries back into new tables: INSERT OVERWRITE TABLE old_users SELECT id,name,age FROM mongo_users WHERE age > 100 ; DROP TABLE mongo_users; Drop a table in Hive to delete the underlying collection in MongoDB use external when declaring your table to prevent the collection drop 33. Usage with Amazon Elastic MapReduce Run mongo-hadoop jobs without needing to set up or manage your own Hadoop cluster. Pig, Hive, and streaming work on EMR, too! Logs get captured into S3 les 34. Usage with Amazon Elastic MapReduce First, make a bootstrap script that fetches dependencies (mongo-hadoop jar and java drivers) #!/bin/sh wget -P /home/hadoop/lib http://central.maven.org/maven2/org/ mongodb/mongo-java-driver/2.12.2/mongo-java-driver-2.12.2.jar wget -P /home/hadoop/lib https://s3.amazonaws.com/mongo-hadoop- code/mongo-hadoop-core_1.1.2-1.1.0.jar this will get executed on each node in the cluster that EMR builds for us. working on updating hadoop artifacts in maven 35. Example 4 - Usage with Amazon Elastic MapReduce Put the bootstrap script, and all your code, into an S3 bucket where Amazon can see it. s3cp ./bootstrap.sh s3://$S3_BUCKET/bootstrap.sh s3mod s3://$S3_BUCKET/bootstrap.sh public-read s3cp $HERE/../enron/target/enron-example.jar s3://$S3_BUCKET/ enron-example.jar s3mod s3://$S3_BUCKET/enron-example.jar public-read 36. $ elastic-mapreduce --create --jobflow ENRON000 --instance-type m1.xlarge --num-instances 5 --bootstrap-action s3://$S3_BUCKET/bootstrap.sh --log-uri s3://$S3_BUCKET/enron_logs --jar s3://$S3_BUCKET/enron-example.jar --arg -D --arg mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat --arg -D --arg mapred.input.dir=s3n://mongo-test-data/messages.bson --arg -D --arg mapred.output.dir=s3n://$S3_BUCKET/BSON_OUT --arg -D --arg mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat # (any additional parameters here) Example 4 - Usage with Amazon Elastic MapReduce ...then launch the job from the command line, pointing to your S3 locations Control the type and number of instances in the cluster 37. Example 4 - Usage with Amazon Elastic MapReduce Easy to kick off a Hadoop job, without needing to manage a Hadoop cluster Pig, Hive, and streaming work on EMR, too! Logs get captured into S3 les 38. Example 5 - New Feature: MongoUpdateWritable ... but we can also modify an existing output collection Works by applying mongodb update modiers: $push, $pull, $addToSet, $inc, $set, etc. Can be used to do incremental MapReduce or join two collections In previous examples, we wrote job output data by inserting into a new collection 39. Example 5 - MongoUpdateWritable For example, lets say we have two collections. { "_id":ObjectId("51b792d381c3e67b0a18d678"), "sensor_id":ObjectId("51b792d381c3e67b0a18d4a1"), "value":3328.5895416489802, "timestamp":ISODate("2013-05-18T13:11:38.709-0400"), "loc":[-175.13,51.658] } { "_id":ObjectId("51b792d381c3e67b0a18d0ed"), "name":"730LsRkX", "type":"pressure", "owner":"steve", } sensors log events refers to which sensor logged the event For each owner, we want to calculate how many events were recorded for each type of sensor that logged it. 40. For each owner, we want to calculate how many events were recorded for each type of sensor that logged it. Plain english: Bobs sensors for temperature have stored 1300 readings Bobs sensors for pressure have stored 400 readings Alices sensors for humidity have stored 600 readings Alices sensors for temperature have stored 700 readings etc... 41. sensors (mongodb collection) Stage 1 -MapReduce on sensors collection Results (mongodb collection) for each sensor, emit: {key: owner+type, value: _id} group data from map() under each key, output: {key: owner+type, val: [ list of _ids] } read from mongodb insert() new records to mongodb MapReduce log events (mongodb collection) do this in two stages 42. the sensors owner and type After stage one, the output docs look like: list of IDs of sensors with this owner and type { "_id":"alicepressure", "sensors":[ ObjectId("51b792d381c3e67b0a18d475"), ObjectId("51b792d381c3e67b0a18d16d"), ObjectId("51b792d381c3e67b0a18d2bf"), ] } Now we just need to count the total # of log events recorded for any sensors that appear in the list for each owner/type group. 43. sensors (mongodb collection) Stage 2 -MapReduce on log events collection read from mongodb Results (mongodb collection) update() existing records in mongodb MapReduce log events (mongodb collection) for each sensor, emit: {key: sensor_id, value: 1} group data from map() under each key for each value in that key: update({sensors: key}, {$inc : {logs_count:1}}) context.write(null, newMongoUpdateWritable( query,//whichdocumentstomodify update,//howtomodify($inc) true,//upsert false) );//multi 44. Example - MongoUpdateWritable Result after stage 2 { "_id":"1UoTcvnCTztemp", "sensors":[ ObjectId("51b792d381c3e67b0a18d475"), ObjectId("51b792d381c3e67b0a18d16d"), ObjectId("51b792d381c3e67b0a18d2bf"), ], "logs_count":1050616 } now populated with correct count 45. New Features in v1.2 and beyond Continually improving Hive support Performance Improvements - Lazy BSON Support for multi-collection input sources API for adding custom splitter implementations and more primarily focusing on hive but pig is next maven central 46. Recap Mongo-Hadoop - use Hadoop to do massive computations on big data sets stored in MongoDB/BSON Tools and APIs make it easier: Streaming, Pig, Hive, EMR, etc. MongoDB becomes a Hadoop-enabled lesystem 47. Questions? https://github.com/mongodb/mongo-hadoop/tree/ master/examples Examples can be found on github: 48. MongoDB World New York City, June 23-25 Save 25% with 25JustinLee Register at world.mongodb.com