MapReduce高级案例⑦

小文件处理(自定义InputFormat)

需求分析

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
小文件的优化无非以下几种方式:
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
(2)在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
(3)在mapreduce处理时,可采用CombineTextInputFormat提高效率
本次采用自定义InputFormat的方式,处理输入小文件的问题。
(1)自定义一个类继承FileInputFormat
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
(3)在输出时使用SequenceFileOutPutFormat输出合并文件

数据

a.txt
yongpeng weidong weinan
sanfeng luozong xiaoming
b.txt
longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin
c.txt
shuaige changmo zhenqiang 
dongli lingu xuanxuan

code

ruaFileInputFormat
package com.kami.demo02;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/17
 */
public class ruaFileInputFormat extends FileInputFormat {
    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        ruaRecordReader ruaRecordReader = new ruaRecordReader();
        // 初始化recordReader
        ruaRecordReader.initialize(inputSplit, taskAttemptContext);
        return ruaRecordReader;
    }

    //直接返回文件不可切割,保证一个文件是一个完整的一行
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}
自定义ruaRecordReader
package com.kami.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/17
 */
public class ruaRecordReader extends RecordReader {

    private FileSplit split;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;
//    private FSDataInputStream inputStream;

    /**
     * 初始化
     *
     * @param inputSplit         封装了读取的文件内容
     * @param taskAttemptContext 上下文对象
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        // 转换切片类型的文件切片
        this.split = (FileSplit) inputSplit;
        conf = taskAttemptContext.getConfiguration();
//        // 通过切片获取类型
//        Path path = this.split.getPath();
//        // 通过路径获取文件系统
//        FileSystem fileSystem = path.getFileSystem(taskAttemptContext.getConfiguration());
//        // 开流
//        inputStream = fileSystem.open(path);
    }


    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            // 定义缓存
            byte[] contents = new byte[(int) split.getLength()];
            // 获取文件系统
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(conf);
            // 读取内容
            FSDataInputStream fis = null;
            try {
                // 打开输入流
                fis = fs.open(path);
                //当文件输出流读取到bytes中
                IOUtils.readFully(fis, contents, 0, contents.length);
                // 输出文件内容
                value.set(contents, 0, contents.length);
            } catch (Exception e) {
                System.out.println("脑阔痛了异常");
            } finally {
                IOUtils.closeStream(fis);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed?1:0;
    }

    @Override
    public void close() throws IOException {

    }
}
ruaDriver
package com.kami.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/17
 */
public class ruaDriver {
    public static void main(String[] args) throws Exception {args = new String[] { "data\\d02", "output\\d02" };

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(ruaDriver.class);
//        job.setNumReduceTasks(0);

        job.setMapperClass(ruaMapper.class);
        job.setInputFormatClass(ruaFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }

}
ruaMapper
package com.kami.demo02;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/17
 */
public class ruaMapper extends Mapper {
    private Text k = new Text();;

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context)
            throws IOException, InterruptedException {
        // 获取切片信息
        InputSplit split = context.getInputSplit();
        // 获取切片路径
        Path path = ((FileSplit) split).getPath();
        // 根据切片路径获取文件名称
        k.set(path.toString());
        // 文件名称为key
        context.write(k, value);
    }
}

推荐参考:https://www.blog.kamisamak.com/index.php/2019/11/19/hadoop-mapreduce自定义inputformat输入/
案例来源:https://www.cnblogs.com/frankdeng/p/9256245.html

点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注