MapReduce高级案例④

TopN

输出流量使用量在前10的用户信息

数据
13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
13590439668	1116	954	2070
13630577991	6960	690	7650
13682846555	1938	2910	4848
13729199489	240	0	240
13736230513	2481	24681	27162
13768778790	120	120	240
13846544121	264	0	264
13956435636	132	1512	1644
13966251146	240	0	240
13975057813	11058	48243	59301
13992314666	3008	3720	6728
15043685818	3659	3538	7197
15910133277	3156	2936	6092
15959002129	1938	180	2118
18271575951	1527	2106	3633
18390173782	9531	2412	11943
84188413	4116	1432	5548
输出

13509468723		7335	110349	117684
13975057813		11058	48243	59301
13568436656		3597	25635	29232
13736230513		2481	24681	27162
18390173782		9531	2412	11943
13630577991		6960	690	7650
15043685818		3659	3538	7197
13992314666		3008	3720	6728
15910133277		3156	2936	6092
13560439638		918	4938	5856
排序概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask而言,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用达到一定阈值后,再对缓冲区的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask而言,它从每个MapTask上远程拷贝相应的数据,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。换句话说,如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类:

部分排序:
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
全排序:
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因此一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
辅助排序(GroupingComparator):
在Reduce端对kkey进行分组。应用于在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
二次排序:
在自定义排序过程中,如果compareTo的判断条件为两个即为二次排序。

code

FlowBean

package com.kami.demo04;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/15
 */
public class FlowBean implements WritableComparable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean(long upFlow, long downFlow, long sumFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }

    //定义一个空参构造器,反射时会用到
    public FlowBean() {
    }

    @Override
    public int compareTo(FlowBean obj) {
//        int result;
//        if (this.sumFlow > obj.getSumFlow()) {
//            result = -1;
//        } else if (this.sumFlow < obj.getSumFlow()) {
//            result = 1;
//        } else {
//            result = 0;
//        }
//        return result;
        return Long.compare(obj.sumFlow, this.sumFlow);
    }


    /**
     * 定义序列化方法
     * dataOutput:
     * 框架给我们提供的数据出口,我们通过该对象进行数据序列化操作。
     * 温馨提示:
     * 注意输出时的数据类型顺序。我们写入的顺序为upFlow,downFlow,sumFlow.
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    /**
     * 定义反序列化方法
     * dataInput:
     * 框架给我们提供的数据来源,我们通过该对象进行数据反序列化操作。
     * 温馨提示:
     * 注意输入的数据类型顺序要和输出时的数据类型要一一对应,即upFlow,downFlow,sumFlow。如果你不对应可能会导致最终的结果不正确哟~
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }

    @Override
    public String toString() {
        return "\t" + upFlow +
                "\t" + downFlow +
                "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

ruaDriver

package com.kami.demo04;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class ruaDriver {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(ruaDriver.class);
        job.setMapperClass(ruaMapper.class);
        job.setReducerClass(ruaReducer.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        FileInputFormat.addInputPath(job, new Path("data\\d04\\rua.txt"));
        FileOutputFormat.setOutputPath(job, new Path("output\\d10"));
        job.setNumReduceTasks(1);
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

ruaMapper

package com.kami.demo04;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/15
 */
public class ruaMapper extends Mapper {
    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    private TreeMap flowMap = new TreeMap();
    private FlowBean kBean;
    private Text v;
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        kBean = new FlowBean();
        v = new Text();
        String[] sps = value.toString().split("\t");
        //获取电话号码
        String phoneNum = sps[0];
        //获取上传流量
        upFlow = Long.parseLong(sps[1]);
        //获取下载流量
        downFlow = Long.parseLong(sps[2]);
        //获取上传下载流量he
        sumFlow = Long.parseLong(sps[3]);
        kBean.setDownFlow(downFlow);
        kBean.setUpFlow(upFlow);
        kBean.setSumFlow(sumFlow);
        v.set(phoneNum);
        flowMap.put(kBean, v);
        if (flowMap.size() > 10) {
            flowMap.remove(flowMap.lastKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Iterator bean = flowMap.keySet().iterator();
        while (bean.hasNext()) {
            FlowBean k = bean.next();
            context.write(k, flowMap.get(k));
        }
    }
}

ruaReducer


package com.kami.demo04;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class ruaReducer extends Reducer {

    TreeMap flowMap = new TreeMap<>();

    @Override
    protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            FlowBean bean = new FlowBean();
            bean.setDownFlow(key.getDownFlow());
            bean.setUpFlow(key.getUpFlow());
            bean.setSumFlow(key.getSumFlow());
            flowMap.put(bean, new Text(value));
            if (flowMap.size() > 10) {
                flowMap.remove(flowMap.lastKey());
            }
        }
    }

    @Override
    protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
        System.out.println("Reduce cleanup  ");
        Iterator it = flowMap.keySet().iterator();
        while (it.hasNext()) {
            FlowBean v = it.next();
            context.write(new Text(flowMap.get(v)), v);
        }
    }
}

推荐阅读1:https://www.cnblogs.com/yinzhengjie2020/p/12528395.html
推荐阅读2:https://www.cnblogs.com/yinzhengjie2020/p/12520516.html
推荐阅读3:https://blog.csdn.net/weixin_35353187/article/details/82025204

点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注