博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用MapReduce实现一些经典的案例
阅读量:6000 次
发布时间:2019-06-20

本文共 9694 字,大约阅读时间需要 32 分钟。

  在工作中,很多时候都是用hive或pig来自动化执行mr统计,但是我们不能忘记原始的mr。本文记录了一些通过mr来完成的经典的案例,有倒排索引、数据去重等,需要掌握。

一、使用mapreduce实现倒排索引

   倒排索引(Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。通过倒排索引,可以根据单词快速获取包含这个单词的文档列表。

   之所以称之为倒排索引,是因为文章内的单词反向检索获取文章标识,从而完成巨大文件的快速搜索。搜索引擎就是利用倒排索引来进行搜索的,此外,倒排索引也是Lucene的实现原理。

   假设有两个文件,a.txt类容为“hello you hello”,b.txt内容为“hello hans”,则倒排索引后,期望返回如下内容:

"hello" "a.txt:2;b.txt:1""you" "a.txt:1""hans" "b.txt:1"
View Code

   从后想前倒退,要输出结果“"hello" "a.txt:2;b.txt:1"”,则reduce输出为<hello,a.txt:2;b.txt:1>,输入为<hello,a.txt:2>、<hello,b.txt:1>。reduce的输入为map的输出,分一下,要map端直接输出<hello,a.txt:2>这种类型的数据是实现不了的。这时,我们可以借助combine作为中间过渡步骤来实现。combine输入数据为<hello:a.txt,1>、<hello:a.txt,1>、<hello:b.txt,1>,可以转化为符合reduce输入要求的数据,此时map端输出<hello:a.txt,1>类型的数据也是很简单的,实现过程如图1所示。

图1 mapreduce倒排索引实现原理示意图

   实现代码如下:

package com.hicoor.hadoop.mapreduce.reverse;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.SplitCompressionInputStream;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//工具类class StringUtil {    public static String getShortPath(String filePath) {        if (filePath.length() == 0)            return filePath;        return filePath.substring(filePath.lastIndexOf("/") + 1);    }    public static String getSplitByIndex(String str, String regex, int index) {        String[] splits = str.split(regex);        if (splits.length < index)            return "";        return splits[index];    }}public class InverseIndex {    public static class ReverseWordMapper extends            Mapper
{ @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); String fileName = StringUtil.getShortPath(split.getPath() .toString()); StringTokenizer st = new StringTokenizer(value.toString()); while (st.hasMoreTokens()) { String word = st.nextToken().toLowerCase(); word = word + ":" + fileName; context.write(new Text(word), new Text("1")); } } } public static class ReverseWordCombiner extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { long sum = 0; for (Text value : values) { sum += Integer.valueOf(value.toString()); } String newKey = StringUtil.getSplitByIndex(key.toString(), ":", 0); String fileKey = StringUtil .getSplitByIndex(key.toString(), ":", 1); context.write(new Text(newKey), new Text(fileKey + ":" + String.valueOf(sum))); } } public static class ReverseWordReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(""); for (Text v : values) { sb.append(v.toString()+" "); } context.write(key, new Text(sb.toString())); } } private static final String FILE_IN_PATH = "hdfs://hadoop0:9000/reverse/in/"; private static final String FILE_OUT_PATH = "hdfs://hadoop0:9000/reverse/out/"; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { System.setProperty("hadoop.home.dir", "D:\\desktop\\hadoop-2.6.0"); Configuration conf = new Configuration(); // 删除已存在的输出目录 FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf); if (fileSystem.exists(new Path(FILE_OUT_PATH))) { fileSystem.delete(new Path(FILE_OUT_PATH), true); } Job job = Job.getInstance(conf, "InverseIndex"); job.setJarByClass(InverseIndex.class); job.setMapperClass(ReverseWordMapper.class); job.setCombinerClass(ReverseWordCombiner.class); job.setReducerClass(ReverseWordReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH)); FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH)); job.waitForCompletion(true); }}
View Code
二、使用mapreduce实现TopK查询

   TopK问题指在海量数据中查找某条件排名前K名的记录,如在用户存款记录中查找存款余额最大的前3名用户。当数据量不大时,可以直接加载到单机内存中进行处理,但是当数据量非常庞大时,需要借助mapreduce来分布式处理。可以使用HiveQL来处理,也可以自己编写mapduce程序来处理此问题。

   实现原理:在每个map任务中查询并返回当前处理数据最大的top k条记录,然后将所有map输出的记录交由一个reduce任务处理,查找并返回最终的top k记录,过程如图2所示。

             图2 mapreduce实现top k过程示意图

   需要注意的是,这里reduce个数只能为1个,并且不需要设置Combiner

   假设存在文件deposit1.txt和deposit2.txt,其内容分别为(列分别表示用户名与存款金额):

deposit1.txtp1    125p2    23p3    365p4    15p5    188deposit2.txtp6    236p7    115p8    18p9    785p10    214
View Code

   要求找出存款金额最大的前3位用户,参考实现代码:

package com.hicoor.hadoop.mapreduce;import java.io.IOException;import java.net.URI;import java.util.Comparator;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MapReduceTopKDemo {    public static final int K = 3;        //默认的TreeMap是按key升序排列 此方法用于获取降序排列的TreeMap    private static TreeMap
getDescSortTreeMap() { return new TreeMap
(new Comparator
() { @Override public int compare(Long o1, Long o2) { return o2.compareTo(o1); } }); } static class TopKMapper extends Mapper
{ private TreeMap
map = getDescSortTreeMap(); @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { String line = value.toString(); if(line == null || line == "") return; String[] splits = line.split("\t"); if(splits.length < 2) return; map.put(Long.parseLong(splits[1]), splits[0]); //只保留最大的K个数据 if(map.size() > K) { //由于记录按照key降序排列 只需删除最后一个记录 map.remove(map.lastKey()); } } @Override protected void cleanup(Mapper
.Context context) throws IOException, InterruptedException { for (Long num : map.keySet()) { context.write(new LongWritable(num), new Text(map.get(num))); } } } static class TopKReducer extends Reducer
{ private TreeMap
map = getDescSortTreeMap(); @Override protected void reduce(LongWritable key, Iterable
value, Reducer
.Context context) throws IOException, InterruptedException { StringBuilder ps = new StringBuilder(); for (Text val : value) { ps.append(val.toString()); } map.put(key.get(), ps.toString()); if(map.size() > K) { map.remove(map.lastKey()); } } @Override protected void cleanup(Reducer
.Context context) throws IOException, InterruptedException { for (Long num : map.keySet()) { context.write(new Text(map.get(num)), new LongWritable(num)); } } } private final static String FILE_IN_PATH = "hdfs://cluster1/topk/in"; private final static String FILE_OUT_PATH = "hdfs://cluster1/topk/out"; /* TopK问题:在海量数据中查找某条件排名前K名的记录,如在用户存款记录中查找存款余额最大的前3名用户 * 1) 测试输入数据(列分别表示用户账户与存款余额): * p1 125 * p2 23 * p3 365 * p4 15 * p5 188 * p6 236 * p7 115 * p8 18 * p9 785 * p10 214 * 2) 输出结果: * p9 785 * p3 365 * p6 236 */ public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "D:\\desktop\\hadoop-2.6.0"); Configuration conf = getHAContiguration(); // 删除已存在的输出目录 FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf); if (fileSystem.exists(new Path(FILE_OUT_PATH))) { fileSystem.delete(new Path(FILE_OUT_PATH), true); } Job job = Job.getInstance(conf, "MapReduce TopK Demo"); job.setMapperClass(TopKMapper.class); job.setJarByClass(MapReduceTopKDemo.class); job.setReducerClass(TopKReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH)); FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH)); job.waitForCompletion(true); } private static Configuration getHAContiguration() { Configuration conf = new Configuration(); conf.setStrings("dfs.nameservices", "cluster1"); conf.setStrings("dfs.ha.namenodes.cluster1", "hadoop1,hadoop2"); conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop1", "172.19.7.31:9000"); conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop2", "172.19.7.32:9000"); conf.setStrings("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); return conf; }}
View Code

   执行结果为:

p9      785p3      365p6      236
View Code

转载于:https://www.cnblogs.com/hanganglin/p/4470968.html

你可能感兴趣的文章
Keepalived集群软件高级使用(工作原理和状态通知)
查看>>
OSSIM Server和Sensor间通讯问题
查看>>
关于SQL92标准和Sybase,SQLServer2000,Oracle的数据类型对比关系
查看>>
Spread for Windows Forms高级主题(7)---自定义打印的外观
查看>>
论本我、自我、超我对人工智能的启示
查看>>
DB2 XQuery学习笔记
查看>>
Java控制内存的功力
查看>>
在SQLServer处理中的一些问题及解决方法
查看>>
Windows Server 2008下 常用网络排错工具汇总
查看>>
Linux4.0平台下Oracle10g安装
查看>>
Angular Service入门
查看>>
android例子程序(ApiDemo)的简单分类整理
查看>>
利用AutoSPSourceBuilder和Autospinstaller自动安装SharePoint Server 2013图解教程——Part 2...
查看>>
巧用apply让javascript函数仅执行一次
查看>>
struts2 Result Type四个常用转跳类型
查看>>
ArcGIS IQueryFilter接口
查看>>
《国富论》西方经济学的“圣经”——自利的人性是资本主义市场经济的基本假设,财富的源泉是劳动,钱变成可再生的钱“资产”而不是负债...
查看>>
ios5--计算器
查看>>
Node.js返回JSONP
查看>>
Intellij IDEA 使用小结
查看>>