Dit recept gebruikt de MongoOutputFormat-klasse om gegevens van een HDFS-instantie in een MongoDB-verzameling te laden.
Getting ready
De eenvoudigste manier om met de Mongo Hadoop-adapter aan de slag te gaan, is door het Mongo-Hadoopproject van GitHub te klonen en het project te bouwen dat is geconfigureerd voor een specifieke versie van Hadoop. Er moet een Gitclient zijn geïnstalleerd om dit project te klonen. Dit recept gaat ervan uit dat je de CDH3-distributie van Hadoop gebruikt. De officiële Git-client is te vinden op http://git-scm.com/downloads .
De Mongo Hadoop Adapter is te vinden op GitHub op https://github.com/mongodb/ mongo-hadoop. Dit project moet worden gebouwd voor een specifieke versie van Hadoop. Het resulterende JAR-bestand moet op elk knooppunt in de map $HADOOP_HOME/lib worden geïnstalleerd. Het Mongo Java-stuurprogramma moet op elk knooppunt in de map $HADOOP_HOME/lib worden geïnstalleerd. Het is te vinden op https://github.com/mongodb/mongo-java-driver / downloads .
Hoe het te doen...
Complete the following steps to copy data form HDFS into MongoDB:
1. Clone the mongo-hadoop repository with the following command line:
git clone https://github.com/mongodb/mongo-hadoop.git
2. Switch to the stable release 1.0 branch:
git checkout release-1.0
3. Set the Hadoop version which mongo-hadoop should target. In the folder
that mongo-hadoop was cloned to, open the build.sbt file with a text editor.
Change the following line:
hadoopRelease in ThisBuild := "default"
to
hadoopRelease in ThisBuild := "cdh3"
4. Build mongo-hadoop :
./sbt package
This will create a file named mongo-hadoop-core_cdh3u3-1.0.0.jar in the
core/target folder.
5. Download the MongoDB Java Driver Version 2.8.0 from https://github.com/
mongodb/mongo-java-driver/downloads .
6. Copy mongo-hadoop and the MongoDB Java Driver to $HADOOP_HOME/lib on
each node:
cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_
HOME/lib
7. Create a Java MapReduce program that will read the weblog_entries.txt file
from HDFS and write them to MongoDB using the MongoOutputFormat class:
import java.io.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.*;
import org.bson.*;
import org.bson.types.ObjectId;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.util.*;
public class ExportToMongoDBFromHDFS {
private static final Log log = LogFactory.getLog(ExportToMongoDBFromHDFS.class);
public static class ReadWeblogs extends Mapper<LongWritable, Text, ObjectId, BSONObject>{
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException{
System.out.println("Key: " + key);
System.out.println("Value: " + value);
String[] fields = value.toString().split("\t");
String md5 = fields[0];
String url = fields[1];
String date = fields[2];
String time = fields[3];
String ip = fields[4];
BSONObject b = new BasicBSONObject();
b.put("md5", md5);
b.put("url", url);
b.put("date", date);
b.put("time", time);
b.put("ip", ip);
context.write( new ObjectId(), b);
}
}
public static void main(String[] args) throws Exception{
final Configuration conf = new Configuration();
MongoConfigUtil.setOutputURI(conf,"mongodb://<HOST>:<PORT>/test. weblogs");
System.out.println("Configuration: " + conf);
final Job job = new Job(conf, "Export to Mongo");
Path in = new Path("/data/weblogs/weblog_entries.txt");
FileInputFormat.setInputPaths(job, in);
job.setJarByClass(ExportToMongoDBFromHDFS.class);
job.setMapperClass(ReadWeblogs.class);
job.setOutputKeyClass(ObjectId.class);
job.setOutputValueClass(BSONObject.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1 );
}
}
8. Export as a runnable JAR file and run the job:
hadoop jar ExportToMongoDBFromHDFS.jar
9. Verify that the weblogs MongoDB collection was populated from the Mongo shell:
db.weblogs.find();