【上海大数据培训】MapReduce实战项目之美国气象站30年来气温统计

时间:2018-03-28 14:32:05   来源:上海尚学堂   阅读:

一、前言

       MapReduce是一种用于数据处理的编程模型。该模型非常简单。同一个程序Hadoop可以运行用各种语言编写的MapReduce程序。最重要的是,MapReduce程序本质上是并行的,因此可以将大规模的数据分析交给任何一个拥有足够多机器的运营商。MapReduce的优势在于处理大型数据集。


       在本文中,我们要学习一个挖掘气象数据的程序。气象数据是通过分布在美国全国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来统计分析。
 

二、分析

  我们使用的数据来自美国国家气候数据中心、美国国家海洋和大气管理局(简称 NCDCNOAA),这些数据按行并以 ASCII 格式存储,其中每一行是一条记录。 下面我们展示一行采样数据,其中重要的字段被突出显示。该行数据被分割成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。





       MapReduce 任务过程分为两个处理阶段:map 阶段和reduce阶段 。每个阶段都以键值对作为输入和输出,其类型由程序员自己来选择。程序员还需要写两个函数:map 函数和reduce 函数。

      在这里,map阶段的输入是NCDC NOAA原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。

      我们的map函数很简单。由于我们只对气象站和气温感兴趣,所以只需要取出这两个字段数据。在本实战中,map 函数只是一个数据准备阶段,通过这种方式来准备数据,使 reducer 函数继续对它进行处理:即统计出每个气象站30年来的平均气温。map 函数还是一个比较合适去除已损记录的地方,在 map 函数里面,我们可以筛掉缺失的或者错误的气温数据。(上海大数据培训
 

三、实现


上面已经分析完毕,下面我们就着手实现它。这里需要编写三块代码内容:

  1. map 函数、
  2. reduce函数
  3. 一些用来运行作业的代码。
     

3.1 Map函数


下面我们来编写 Mapper 类,实现 map() 函数,提取气象站和气温数据

public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {

    /**

     * 解析气象站数据

     */

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 每行气象数据

        String line = value.toString();

        // 每小时气温值

        int temperature = Integer.parseInt(line.substring(14, 19).trim());

        // 过滤无效数据    

        if (temperature != -9999) {         

            FileSplit fileSplit = (FileSplit) context.getInputSplit();

            // 通过文件名称提取气象站id

            String weatherStationId = fileSplit.getPath().getName().substring(5, 10);

            context.write(new Text(weatherStationId), new IntWritable(temperature));

        }

    }

       这个 Mapper 类是一个泛型类型,它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键和输出值的类型。 就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。Hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不是使用 java 内嵌的类型。这些类型都在 org.apache.hadoop.io 包中。 这里使用 LongWritable 类型(相当于 Java 的 Long 类型)、Text 类型(相当于 Java 中的 String 类型)和 IntWritable 类型(相当于 Java 的 Integer 类型)。

       map() 方法的输入是一个键(key)和一个值(value),我们首先将 Text 类型的 value 转换成 Java 的 String 类型, 之后使用 substring()方法截取我们业务需要的值。map() 方法还提供了 Context 实例用于输出内容的写入。 在这种情况下,我们将气象站id按Text对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。(上海大数据培训)

 

3.2 reduce函数

下面我们来编写 Reducer类,实现reduce函数,统计每个气象站的平均气温。

public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {

    

    /**

     * 统计美国各个气象站的平均气温

     */

    public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {

        IntWritable result = new IntWritable();

        int sum = 0;

        int count = 0;

        

        // 统计每个气象站的气温值总和

        for (IntWritable val : values) {

            sum += val.get();

            count++;

        }

        

        // 求每个气象站的气温平均值

        result.set(sum / count);

        context.write(key, result);

    }

}

       同样,reduce 函数也有四个形式参数类型用于指定输入和输出类型。reduce 函数的输入类型必须匹配 map 函数的输出类型:即 Text 类型和 IntWritable 类型。 在这种情况下,reduce 函数的输出类型也必须是 Text 和 IntWritable 类型,分别是气象站id和平均气温。在 map 的输出结果中,所有相同的气象站(key)被分配到同一个reduce执行,这个平均气温就是针对同一个气象站(key),通过循环所有气温值(values)求和并求平均数所得到的。


3.3  job对象、JAR包

/**

* 任务驱动方法

*

* @param arg0

* @throws Exception

*/

@Override

public int run(String[] arg0) throws Exception {

    // TODO Auto-generated method stub

    // 读取配置文件

    Configuration conf = new Configuration();

    Path mypath = new Path(arg0[1]);

    FileSystem hdfs = mypath.getFileSystem(conf);

    if (hdfs.isDirectory(mypath)) {

        hdfs.delete(mypath, true);

    }

    

    // 新建一个任务

    Job job = new Job(conf, "temperature");

    // 设置主类

    job.setJarByClass(Temperature.class);

    

    // 输入路径

    FileInputFormat.addInputPath(job, new Path(arg0[0]));

    // 输出路径

    FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

    

    // Mapper

    job.setMapperClass(TemperatureMapper.class);

    // Reducer

    job.setReducerClass(TemperatureReducer.class);

    

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);    

    // 提交任务

    return job.waitForCompletion(true)?0:1;

}

 

/**

* main 方法

*

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

    // 数据输入路径和输出路径

    String[] args0 = {

                "hdfs://ljc:9000/buaa/weather/",

                "hdfs://ljc:9000/buaa/weatherout/"

    };

    int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);

    System.exit(ec);

}

Configuration 类读取 Hadoop 的配置文件,如 site-core.xml、mapred-site.xml、hdfs-site.xml 等。

       Job 对象指定作业执行规范,我们可以用它来控制整个作业的运行。我们在 Hadoop 集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。 不必明确指定 JAR 文件的名称,在 Job 对象的 setJarByClass 方法中传递一个类即可,Hadoop 利用这个类来查找包含它的 JAR 文件,进而找到相关的 JAR 文件。(上海大数据培训)

     
构造 Job 对象之后,需要指定输入和输出数据的路径。

1、调用 FileInputFormat 类的静态方法 addInputPath() 来定义输入数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一系列文件。由函数名可知,可以多次调用 addInputPath() 来实现多路径的输入。

2、调用 FileOutputFormat 类中的静态方法 setOutputPath() 来指定输出路径(只能有一个输出路径)。这个方法指定的是 reduce 函数输出文件的写入目录。 在运行作业前该目录是不应该存在的,否则 Hadoop 会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是件可怕的事情)。

3、通过 setMapperClass() 和 setReducerClass() 指定 map 类型和reduce 类型。

4、通过setOutputKeyClass() 和 setOutputValueClass() 控制 map 和 reduce 函数的输出类型,正如本例所示,这两个输出类型一般都是相同的。如果不同,则通过 setMapOutputKeyClass()和setMapOutputValueClass()来设置 map 函数的输出类型。

 5、输入的类型通过 InputFormat 类来控制,我们的例子中没有设置,因为使用的是默认的 TextInputFormat(文本输入格式)。

 6、Job 中的 waitForCompletion() 方法提交作业并等待执行完成。该方法中的布尔参数是个详细标识,所以作业会把进度写到控制台。 waitForCompletion() 方法返回一个布尔值,表示执行的成(true)败(false),这个布尔值被转换成程序的退出代码 0 或者 1。

然后上传到Hadoop集群上Hadoop的安装路径下(可用cd $HADOOP_HOME命令快速进入目录),然后使用rz命令上传,如果没装rz命令请先运行命令:
  yum install -y rz

  然后找到刚才导出的JAR包确认上传。

  最后,执行运行命令:

  hadoop jar Temperature.jar com.hadoop.test.Temperature /weather /weatherOutput 
                                              //  第一个jar表示运行的对象是jar文件
                           //  .jar文件为我们要运行的jar文件
                           //  com.hadoop.test.Temperature为类的路径,注意请写全
                          //  /weather表示我们的输入文件,对应代码中的args[0]
                          //  /weatherOutput表示我的输出文件,对应代码中的args[1],注意必须是不存在的路径
感谢阅读上海尚学堂大数据文章,获取更多大数据相关内容或支持,请点击  上海大数据培训
参考文章:https://www.cnblogs.com/codeOfLife/p/5362502.html   【刘超★ljc】

分享:0

电话咨询

客服热线服务时间

周一至周五 9:00-21:00

周六至周日 9:00-18:00

咨询电话

021-67690939
15201841284

微信扫一扫