分布式并行编程

2026/1/17 16:30:10

随后,请检查你的 MapReduce perspective中是否有一个专门的 MapReduce Servers view, 如果没有,请选择 Window -> Show View ->other, 再从弹出框中选择 MapReduce Tools 类别下面的 MapReduce Servers, 打开这个 view.

然后,请点击 MapReduce Servers view 右上角的蓝色图标,就会出现如图一所示的设置 Hadoop Server 的位置的界面。此处所说的 Hadoop server,具体到本文,就是 homer06 这台机器。在输入各项参数之后,请点击 ”Validate location” 按钮,检查是否能够正确的找到并连接上你的 Hadoop server. 如果出错,请尝试在命令行下执行命令:ssh the_hostname_of_your_hadoop_server, (或使用图形界面的 SSH 远程登录软件), 确保 ssh 能够连接成功。图一 定义 Hadoop server 的位置

2. 创立一个 MapReduce Project

在 Eclipse 中新创建一个 MapReduce Project, 将我们在第二篇文章中定义的 WordCount 类加到此 Project 中。这个类需要略作修改才能直接远程部署到我们已经搭建好的分布式环境中去运行,因为我们原来在 WordCount 程序中是通过读取命令行参数获得计算任务的输入路径和输出路径,而当前版本的 IBM MapReduce Tools 不支持远程部署时读取命令行参数。为测试的简便起见,我在程序中直接将输入路径定义为 input, 输出路径定义为 output。在测试 WordCount 程序之前,需要事先将需要做词频统计的一批文件拷贝到分布式文件系统的 input 目录下去。 完整的 WordCount 类的代码如代码清单 7 所示:

代码清单7

//import 语句省略

public class WordCount extends Configured implements Tool {

public static class MapClass extends MapReduceBase

implements Mapper {

private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern=\

public void map(LongWritable key, Text value,

OutputCollector output, Reporter reporter) throws IOException { String line = value.toString().toLowerCase(); line = line.replaceAll(pattern, \

StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one);

} } }

public static class Reduce extends MapReduceBase

implements Reducer {

public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0;

while (values.hasNext()) { sum += values.next().get(); }

output.collect(key, new IntWritable(sum)); } }

public int run(String[] args) throws Exception {

Path tempDir = new Path(\

Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

JobConf conf = new JobConf(getConf(), WordCount.class); try {

conf.setJobName(\

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class);

conf.setInputPath(new Path(args[0])); conf.setOutputPath(tempDir);

conf.setOutputFormat(SequenceFileOutputFormat.class);

JobClient.runJob(conf);

JobConf sortJob = new JobConf(getConf(), WordCount.class); sortJob.setJobName(\

sortJob.setInputPath(tempDir);

sortJob.setInputFormat(SequenceFileInputFormat.class);

sortJob.setMapperClass(InverseMapper.class);

sortJob.setNumReduceTasks(1);

sortJob.setOutputPath(new Path(args[1])); sortJob.setOutputKeyClass(IntWritable.class);


分布式并行编程.doc 将本文的Word文档下载到电脑
搜索更多关于: 分布式并行编程 的文档
相关推荐
相关阅读
× 游客快捷下载通道(下载后可以自由复制和排版)

下载本文档需要支付 10

支付方式:

开通VIP包月会员 特价:29元/月

注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信:xuecool-com QQ:370150219