百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

大数据开发入门实例(大数据开发步骤和流程)

wxin55 2024-11-16 01:39 7 浏览 0 评论

最近参加了一个大数据开发的培训,整理一下在培训过程中,老师一直说的一个案例。案例比较简单,使用 MapReduce、hive、Spark 等框架进行计算,对框架有一个简单的了解。现在对这个案例进行一个简单的整理,方便后期学习。

一、数据

1.1 原始数据


1.2 数据说明

数据按照逗号(,)分割,每列数据说明如下:

序号 说明 备注

1 学号

2 姓名

3 性别

4 年龄

5 上学期成绩 学科成绩以&分割,学科=成绩

6 下学期成绩 学科成绩以&分割,学科=成绩

二、需求

统计第一学期数学成绩及格和不及格的人数

三、解决方案

数据结构相对比较简单,分析每行数据中的第一学期数学成绩,
判断其中数据成绩是否及格,如果及格,则统计及格的人数,不及格统计不及格的人数。

3.1.准备工作

在正式开始这项工作时,首先要保证自己的环境可以正常运行,hadoop 集群、mysql、hive、spark 集群可以正常运行。

3.1.1 环境准备

进入 hadoop 的 sbin 目录: 1.启动 DFS

./start-dfs.sh

2.启动 yarn

 ./start-yarn.sh

3.启动 hive,进入 hive 的 bin 目录

#启动mysql
service mysql start
# 启动hive的metastore服务
hive --service metastore &
# 启动hive的hiveserver2服务
hive --service hiveserver2 &

4.启动 spark:进入 spark 的 sbin 目录

start-all.sh

3.1.2 将需要分析的数据上传到服务器和 HDFS 中

将数据存放到 student 文件,进入 student 文件所在目录,将文件上传到 HDFS 的/data/目录下

    hdfs dfs -put student /data/

查看 HDFS 文件是否上传成功

3.2 使用 MapReduce 解决

MapReduce的思路就是将所需要进行计算的数据,拆分到不同的机器上 然后在不同的机器上计算,将不同机器上的结果汇总到一起,然后再进行计算。 在不同机器上进行计算的过程,通常称为Map阶段,汇总结果进行计算的过程,通常称为Reduce阶段。

结合MapReduce计算的基本思路,MapReduce实现一个任务主要分为Map和reduce两部分。 在实现的过程中,主要完成Map和Reduce的实现过程即可。 当然,一个MapReduce的任务,必须要有一个驱动主类将Map和Reduce调起方能执行。

3.2.1 Map 阶段

新建 StudentScoreMapper,需要继承 org.apache.hadoop.mapreduce.Mapper,并指定 Map 的输入输出类型,共有四个参数,前两个为输入数据类型,后两个为输出数据类型

public class StudentScoreMapper extends Mapper<Object,Text, Text, IntWritable>

Map 阶段具体的执行逻辑时,在 map 方法中实现的,故需要重写父类的 Map 方法,具体思路:

  1. 获取第一学期的成绩
  2. 获取该学期的数据成绩
  3. 判断数学成绩是否及格,如果及格,则使用输出<"Math_Score_Pass",1>,否则输出<"Math_Score_Not_Pass",1>
@Override
protected void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {   
    // 1. valueIn为文件中每行的数据,使用split方法进行分割   
    String[] vals = value.toString().split(",",-1);   
    // 2.每行数据中,倒数第二列为第一学期成绩   
    if(vals.length !=6 ){return;}   
    String scoreStr = vals[5];   
    // 3.每门成绩按照&分割   
    String[] scores = scoreStr.split("&");   
    if(scores.length != 4){return;}   
    // 4.获取数学成绩,学科成绩按照=分割   
    String mathScoreStr = scores[0];   
    String[] mathScore = mathScoreStr.split("=");   
    if(mathScore.length != 2){return;}   
    String score = mathScore[1];   
    // 5.判断成绩是否及格    int temp = Integer.parseInt(score);   
    if(temp >= 60){       
        context.write(outKeyPass,new IntWritable(1));   
    }else{       
        context.write(outKeyNotPass,new IntWritable(1));   
    }
}

3.2.2 Reduce 阶段

新建 StudentScoreReducer 类,继承 org.apache.hadoop.mapreduce.Reducer,同时指定 Reduce 的输入和输出类型,其中输入类型与 Map 的输出类型保持一致。

public class StudentScoreReducer extends Reducer<Text, IntWritable,Text, LongWritable>

Reduce 阶段具体的处理逻辑需要重写 reduce 方法,具体思路:就是把 Map 的输出结果,累加即可。

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {   
    //1.经过shuffle之后 相同key的结果value 组成一个Iterable,作为reduce的输入参数进行计算   
    //2.遍历Iterable,将期所有的val进行累加   
    long sum = 0L;   
    for(IntWritable val : values){
        sum += val.get();   
    }   
    //3.返回计算结果   
    context.write(key,new LongWritable(sum));
}

3.2.3 驱动类

针对 MapReduce 的任务进行一些配置,设置 Map 类、Reduce 类,任务输入输出路径等相关信息。

public static void main(String[] args) throws
IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //设置任务的名称
    job.setJobName("StudentScoreCount");
    //设置任务执行的主类
    job.setJarByClass(StudentScoreMain.class);
    //设置reduce任务的个数
    job.setNumReduceTasks(1);
    //设置Map类
    job.setMapperClass(StudentScoreMapper.class);
    //设置combiner,在Map端执行reduce任务
    //job.setCombinerClass(StudentScoreReducer.class);
    //设置Reduce类
    job.setReducerClass(StudentScoreReducer.class);
    //设置输出key的类
    job.setOutputKeyClass(Text.class);
    //设置输出value的类
    job.setOutputValueClass(LongWritable.class);
    //如果Map和Reduce的输出类型不一致,需要单独对map设置输出key和value的类型
    //同时注释setCombinerClass方法
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    //设置Map输入路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
    //设置Reduce输处路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));   
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

3.2.4 执行程序

将编写的代码进行打包,发送到 hadoop 的集群,提交 MapReduce 任务

hadoop jar hadoop-training-1.0-SNAPSHOT.jar \
com.hadoop.mapreduce.StudentScoreMain /data/student \
/data/studentscore/

其中,/data/student 为任务所需要处理的路径,/data/studentscore/ 为最终结果输出路径,需要保证这个文件在 HDFS 上不存在。

查看输出文件夹下的文件,其中,以 part 开始的文件为最终的数据结果

查看任务执行结果

查看任务的基本信息,执行时间为 20sec

3.2.5 源码

具体源码可以通过 Github 进行下载:hadoop-training[1]

3.3 使用 hive 解决

将数据导入到hive中,使用类SQL语句进行统计分析

3.3.1 创建数据库

    create database hive_2020;

查看创建的数据库

    show databases;

数据库创建成功

3.3.2 创建表结构

根据文本文件的内容格式,每行数据根据逗号(,)分割,每个字段使用 string 类型进行存储,建立 hive 对应的表,建表语句如下:

create  table hive_2020.student(
    sno string,
    name string,
    gender string,
    age int,
    class string,
    semester1 String,
    semester2 String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as textfile;

建表语句中 hive_2020 为刚创建的数据库名称,这样可以直接看到表所在的数据库,也可以使用 use hive_2020,进入到指定的数据库中,然后再执行建表语句;建表语句后面的 2 句分别制定了文件内容分割符号和存储文件的格式。

需要特别说明一下 :hive 的分割符目前只能支持单字符,对于多字符需要特殊处理。目前通用的方式主要有 2 种:

  1. 使用单字符分割后,然后再使用数据时,将数据在进行额外的处理
  2. 重新实现 hive 的 InputFormat 方法,使其支持多字符分割

3.3.3 导入数据文件

可以将本地文件或者 hdfs 上的文件导入到 hive,命令执行方式如下:

#将本地文件导入hive
load data local inpath '本地路径' into table student;
#将HDFS文件导入hive
load data  inpath 'HDFS路径' into table student;

为了方便,选择将本地文件直接导入 hive 的表中

  1. 将本地文件导入到 hive 中
    load data local inpath '/root/2020/01/student' into table student;

导入成功后,显示“OK”信息

  1. 查看表中数据

3.3.4 准备分析

hive 的语句基本上与 SQL 语句保持一致,如何解决上述问题就转换成:如果使用关系型数据库我们怎么解决?

  1. 将每个学生第一学期的数据成绩拆分出来,使用 split 函数,以&进行拆分
  2. 获取每个学生第一学期的数据成绩,使用 split 函数,以=进行拆分
  3. 如果数学成绩大于等于 60,则赋值为 1,否则赋值为 0
  4. 统计 1 的个数作为及格的人数,使用总数减去及格的人数,作为不合格的人数

首先,放上最终 Hive 执行的语句

    select
    sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_pass ,
    count(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) - sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_not_pass
    from student;

然后,放上 Hive 的执行结果

最后,一步一步的分析一下 Hive 的执行语句:

  1. 从整体上看,该语句从 student 表中查询两个属性;
  2. 第一个属性是 count,第二个属性是 sum 值减去第一个属性值,从而可以简单的任务,一个是计算 count 值,另外一个是计算 sum 值;
  3. count 和 sum 函数的计算内容相同,分析他们共同内容,if(split(split(semester1,'&')[0],'=')[1]>=60,1,0);
  4. 使用 if 函数和两个 split 函数,最内层的 split 函数用于拆分第一个学期学科成绩,第二个 split 用于获取数学成绩,if 函数用于判断数学成绩是否大于等于 60,如果大于等于 60,则赋值为 1,否则赋值为 0;
  5. 则 count 用于计算 1 的个数,sum 则是用于计算总数。

3.3.5 执行程序

一些 hive 的命令会转化成 haooop 的 MapRecude 任务去执行,查看该任务的基本执行情况

PS:一些常用的命令:

查看 hive 中的所有指定函数:show functions; 查看 hive 中的指定函数使用方法:describe function 方法名

3.4 使用 Spark 解决

spark 源码是由 scala 语言进行编码的,Spark 任务是使用 scala 语言进行编写,没有 scala 语言基础的同学,需要对 scala 有一定的了解,才能更好的完成。

3.4.1 编写程序

def main(args: Array[String]): Unit = { 
    val spark = SparkSession
    .builder()   
    .appName("Spark Student Score")   
    .master("local[2]")   
    .getOrCreate() 

    val outKeyPass = "Math_Score_Pass"; 
    val outKeyNotPass = "Math_Score_Not_Pass"; 

    val sc :SparkContext = spark.sparkContext  val lines = sc.textFile(args(0)) 
    // 数据格式: 200412169,gavin,male,30,0401,math=24&en=60&c=85&os=78,math=22&en=20&c=85&os=78  
    val studentScoreCount = lines   
        .map(_.split(","))   
        .filter(_.length == 7)   
        .map(t => t(5))   
        .map(_.split("&"))   
        .filter(_.length == 4)   
        .map(t => t(0))   
        .map(_.split("="))   
        .filter(_.length ==2)   
        .map(t => if (t(1).toInt >= 60) (outKeyPass,1) else (outKeyNotPass,1) )   
        .reduceByKey(_+_)   
        .saveAsTextFile(args(1))
 }

使用 RDD 的方式,执行思路与 Hadoop 的思路基本一致。使用逗号分割获取第一学期的成绩,使用&分割,获取数学成绩,判断数据成绩是否大于等于 60,如果满足,则生成("Math_Score_Pass",1),否则,生成("Math_Score_Not_Pass",1),然后使用 reduceByKey 将所有的结果进行累加,最后保存到 HDFS 的指定目录。

3.4.2 执行程序

    spark-submit \
    --master local \
    --class com.spark.StudentScore \
    original-spark-training-1.0-SNAPSHOT.jar \
    /data/student /data/spark/studentscore/

3.4.3 查看执行结果

查看 hdfs 结果结果目录::

    hdfs dfs -ls /data/spark/studentscore/

HDFS结果

查看 HDFS 结果内容:

    hdfs dfs -cat /data/spark/studentscore/par*


在结果目录中,发现产生了 2 个 part-0000* 的文件,我们要看一下 具体是什么原因产生的: 查阅相关资料,发现从 hdfs 中读取文件,源码默认的分区数是 2,分区数决定最终的结果

在默认的 textfile 中,如果从 hdfs 中读取文件,源码中默认的分区数是 2,如果想改变分区数,可以在 textfile 中设置第二个参数“分区数”

查看 textFile 源码


其中 defaultMinPartitions 为定义的默认分区数:

3.4.4 源码

spark 的实现代码可以在 github 上进行下载spark-training[2]

参考资料

[1] hadoop-training: https://github.com/fulinmao/hadoop-training/

[2] spark-training: https://github.com/fulinmao/spark-training/

相关推荐

Java框架 —— Spring简介

简介一般来说,Spring指的是SpringFramework,它提供了很多功能,例如:控制反转(IOC)、依赖注入(DI)、切面编程(AOP)、事务管理(TX)主要jar包org.sprin...

Monkey自动化测试

Monkey1.通过Monkey程序模拟用户触摸屏幕、滑动Trackball、按键等操作来对设备上的程序进行压力测试,检测程序多久的时间会发生异常;2.Monkey主要用于Android的压力...

十年之重修SpringBoot启动&amp;自动装载&amp;Bean加载过程

总结Springboot的自动装载,完全是依赖Bean的自动注册,其中默认的规则,是把需要自动装载的bean全名称编辑在spring.factories(2.7之后的版本,还支持.imports文件)...

一些可以显著提高大型 Java 项目启动速度的尝试

我们线上的业务jar包基本上普遍比较庞大,动不动一个jar包上百M,启动时间在分钟级,拖慢了我们在故障时快速扩容的响应。于是做了一些分析,看看Java程序启动慢到底慢在哪里,如何去优化,...

class 增量发包改造为 jar 包方式发布

大纲class增量发包介绍项目目录结构介绍jar包方式发布落地方案class增量发包介绍当前项目的迭代修复都是通过class增量包来发版本的将改动的代码class增量打包,如下图cla...

Flink架构及其工作原理(很详细)

原文链接:https://www.cnblogs.com/code2one/p/10123112.html关键词:Flink架构、面试杀手锏!更多大数据架构、实战经验,欢迎关注【大数据与机器学习】,...

大促系统优化之应用启动速度优化实践

作者:京东零售宋维飞一、前言本文记录了在大促前针对SpringBoot应用启动速度过慢而采取的优化方案,主要介绍了如何定位启动速度慢的阻塞点,以及如何解决这些问题。希望可以帮助大家了解如何定位该类问...

Maven工程如何使用非Maven仓库jar包

使用Maven之前,一直都是自己手工在网上搜索需要的jar包,然后添加到工程中。以这样的方式开发,工作了好多年,曾经以为以后也会一直这样下去。直到碰上Maven,用了第一次,就抛弃老方法了。Maven...

【推荐】一款开源免费、功能强大的短链接生成平台

项目介绍reduce是一款开源免费、功能强大的短链接生成平台。部署在服务器,使用短域名解析即可提供服务。CoodyFramework首秀,自写IOC、MVC、ORM、TASK、JSON、DB连接池、...

K8S官方java客户端之七:patch操作

欢迎访问我的GitHubhttps://github.com/zq2599/blog_demos内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;...

Java 的业务逻辑验证框架 之-fluent-validator

开发人员在维护核心业务逻辑的同时,还需要为输入做严格的校验。当输入不合法时,能够给caller一个明确的反馈,最常见的反馈就是返回封装了result的对象或者抛出exception。一些常见...

互联网大厂后端必看!手把手教你替换 Spring Boot 中的日志框架

在互联网大厂的后端开发工作中,SpringBoot框架是搭建项目的“得力助手”,使用十分普遍。但不少开发者都遇到过这样的困扰:SpringBoot默认集成的Logback日志框架,在实际...

测试经理教你如何用monkey进行压力测试!

一、monkey是什么1、monkey程序由android系统自带,使用Java语言写成,在Android文件系统中的存放路径是:/system/framework/monkey.jar2、Mo...

Java-Maven详解

一、什么是Maven?ApacheMaven是一个软件项目管理的综合工具。基于项目对象模型(POM)的概念,提供了帮助管理构建、文档、报告、依赖、发布等方法,Maven简化和标准化项目建设过程。处理...

SpringBoot打包部署最佳实践

springboot介绍SpringBoot目前流行的javaweb应用开发框架,相比传统的spring开发,springboot极大简化了配置,并且遵守约定优于配置的原则即使0配置也能正常运...

取消回复欢迎 发表评论: