随后,请检查你的 MapReduce perspective中是否有一个专门的 MapReduce Servers view, 如果没有,请选择 Window -> Show View ->other, 再从弹出框中选择 MapReduce Tools 类别下面的 MapReduce Servers, 打开这个 view.
然后,请点击 MapReduce Servers view 右上角的蓝色图标,就会出现如图一所示的设置 Hadoop Server 的位置的界面。此处所说的 Hadoop server,具体到本文,就是 homer06 这台机器。在输入各项参数之后,请点击 ”Validate location” 按钮,检查是否能够正确的找到并连接上你的 Hadoop server. 如果出错,请尝试在命令行下执行命令:ssh the_hostname_of_your_hadoop_server, (或使用图形界面的 SSH 远程登录软件), 确保 ssh 能够连接成功。图一 定义 Hadoop server 的位置
2. 创立一个 MapReduce Project
在 Eclipse 中新创建一个 MapReduce Project, 将我们在第二篇文章中定义的 WordCount 类加到此 Project 中。这个类需要略作修改才能直接远程部署到我们已经搭建好的分布式环境中去运行,因为我们原来在 WordCount 程序中是通过读取命令行参数获得计算任务的输入路径和输出路径,而当前版本的 IBM MapReduce Tools 不支持远程部署时读取命令行参数。为测试的简便起见,我在程序中直接将输入路径定义为 input, 输出路径定义为 output。在测试 WordCount 程序之前,需要事先将需要做词频统计的一批文件拷贝到分布式文件系统的 input 目录下去。 完整的 WordCount 类的代码如代码清单 7 所示:
代码清单7
//import 语句省略
public class WordCount extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper
private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern=\
public void map(LongWritable key, Text value,
OutputCollector
StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one);
} } }
public static class Reduce extends MapReduceBase
implements Reducer
public void reduce(Text key, Iterator
while (values.hasNext()) { sum += values.next().get(); }
output.collect(key, new IntWritable(sum)); } }
public int run(String[] args) throws Exception {
Path tempDir = new Path(\
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf conf = new JobConf(getConf(), WordCount.class); try {
conf.setJobName(\
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class);
conf.setInputPath(new Path(args[0])); conf.setOutputPath(tempDir);
conf.setOutputFormat(SequenceFileOutputFormat.class);
JobClient.runJob(conf);
JobConf sortJob = new JobConf(getConf(), WordCount.class); sortJob.setJobName(\
sortJob.setInputPath(tempDir);
sortJob.setInputFormat(SequenceFileInputFormat.class);
sortJob.setMapperClass(InverseMapper.class);
sortJob.setNumReduceTasks(1);
sortJob.setOutputPath(new Path(args[1])); sortJob.setOutputKeyClass(IntWritable.class);

