大数据开发入门实例(大数据开发步骤和流程)
wxin55 2024-11-16 01:39 7 浏览 0 评论
最近参加了一个大数据开发的培训,整理一下在培训过程中,老师一直说的一个案例。案例比较简单,使用 MapReduce、hive、Spark 等框架进行计算,对框架有一个简单的了解。现在对这个案例进行一个简单的整理,方便后期学习。
一、数据
1.1 原始数据
1.2 数据说明
数据按照逗号(,)分割,每列数据说明如下:
序号 说明 备注
1 学号
2 姓名
3 性别
4 年龄
5 上学期成绩 学科成绩以&分割,学科=成绩
6 下学期成绩 学科成绩以&分割,学科=成绩
二、需求
统计第一学期数学成绩及格和不及格的人数
三、解决方案
数据结构相对比较简单,分析每行数据中的第一学期数学成绩,
判断其中数据成绩是否及格,如果及格,则统计及格的人数,不及格统计不及格的人数。
3.1.准备工作
在正式开始这项工作时,首先要保证自己的环境可以正常运行,hadoop 集群、mysql、hive、spark 集群可以正常运行。
3.1.1 环境准备
进入 hadoop 的 sbin 目录: 1.启动 DFS
./start-dfs.sh
2.启动 yarn
./start-yarn.sh
3.启动 hive,进入 hive 的 bin 目录
#启动mysql
service mysql start
# 启动hive的metastore服务
hive --service metastore &
# 启动hive的hiveserver2服务
hive --service hiveserver2 &
4.启动 spark:进入 spark 的 sbin 目录
start-all.sh
3.1.2 将需要分析的数据上传到服务器和 HDFS 中
将数据存放到 student 文件,进入 student 文件所在目录,将文件上传到 HDFS 的/data/目录下
hdfs dfs -put student /data/
查看 HDFS 文件是否上传成功
3.2 使用 MapReduce 解决
MapReduce的思路就是将所需要进行计算的数据,拆分到不同的机器上 然后在不同的机器上计算,将不同机器上的结果汇总到一起,然后再进行计算。 在不同机器上进行计算的过程,通常称为Map阶段,汇总结果进行计算的过程,通常称为Reduce阶段。
结合MapReduce计算的基本思路,MapReduce实现一个任务主要分为Map和reduce两部分。 在实现的过程中,主要完成Map和Reduce的实现过程即可。 当然,一个MapReduce的任务,必须要有一个驱动主类将Map和Reduce调起方能执行。
3.2.1 Map 阶段
新建 StudentScoreMapper,需要继承 org.apache.hadoop.mapreduce.Mapper,并指定 Map 的输入输出类型,共有四个参数,前两个为输入数据类型,后两个为输出数据类型
public class StudentScoreMapper extends Mapper<Object,Text, Text, IntWritable>
Map 阶段具体的执行逻辑时,在 map 方法中实现的,故需要重写父类的 Map 方法,具体思路:
- 获取第一学期的成绩
- 获取该学期的数据成绩
- 判断数学成绩是否及格,如果及格,则使用输出<"Math_Score_Pass",1>,否则输出<"Math_Score_Not_Pass",1>
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 1. valueIn为文件中每行的数据,使用split方法进行分割
String[] vals = value.toString().split(",",-1);
// 2.每行数据中,倒数第二列为第一学期成绩
if(vals.length !=6 ){return;}
String scoreStr = vals[5];
// 3.每门成绩按照&分割
String[] scores = scoreStr.split("&");
if(scores.length != 4){return;}
// 4.获取数学成绩,学科成绩按照=分割
String mathScoreStr = scores[0];
String[] mathScore = mathScoreStr.split("=");
if(mathScore.length != 2){return;}
String score = mathScore[1];
// 5.判断成绩是否及格 int temp = Integer.parseInt(score);
if(temp >= 60){
context.write(outKeyPass,new IntWritable(1));
}else{
context.write(outKeyNotPass,new IntWritable(1));
}
}
3.2.2 Reduce 阶段
新建 StudentScoreReducer 类,继承 org.apache.hadoop.mapreduce.Reducer,同时指定 Reduce 的输入和输出类型,其中输入类型与 Map 的输出类型保持一致。
public class StudentScoreReducer extends Reducer<Text, IntWritable,Text, LongWritable>
Reduce 阶段具体的处理逻辑需要重写 reduce 方法,具体思路:就是把 Map 的输出结果,累加即可。
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
//1.经过shuffle之后 相同key的结果value 组成一个Iterable,作为reduce的输入参数进行计算
//2.遍历Iterable,将期所有的val进行累加
long sum = 0L;
for(IntWritable val : values){
sum += val.get();
}
//3.返回计算结果
context.write(key,new LongWritable(sum));
}
3.2.3 驱动类
针对 MapReduce 的任务进行一些配置,设置 Map 类、Reduce 类,任务输入输出路径等相关信息。
public static void main(String[] args) throws
IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置任务的名称
job.setJobName("StudentScoreCount");
//设置任务执行的主类
job.setJarByClass(StudentScoreMain.class);
//设置reduce任务的个数
job.setNumReduceTasks(1);
//设置Map类
job.setMapperClass(StudentScoreMapper.class);
//设置combiner,在Map端执行reduce任务
//job.setCombinerClass(StudentScoreReducer.class);
//设置Reduce类
job.setReducerClass(StudentScoreReducer.class);
//设置输出key的类
job.setOutputKeyClass(Text.class);
//设置输出value的类
job.setOutputValueClass(LongWritable.class);
//如果Map和Reduce的输出类型不一致,需要单独对map设置输出key和value的类型
//同时注释setCombinerClass方法
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Map输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置Reduce输处路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
3.2.4 执行程序
将编写的代码进行打包,发送到 hadoop 的集群,提交 MapReduce 任务
hadoop jar hadoop-training-1.0-SNAPSHOT.jar \
com.hadoop.mapreduce.StudentScoreMain /data/student \
/data/studentscore/
其中,/data/student 为任务所需要处理的路径,/data/studentscore/ 为最终结果输出路径,需要保证这个文件在 HDFS 上不存在。
查看输出文件夹下的文件,其中,以 part 开始的文件为最终的数据结果
查看任务执行结果
查看任务的基本信息,执行时间为 20sec
3.2.5 源码
具体源码可以通过 Github 进行下载:hadoop-training[1]
3.3 使用 hive 解决
将数据导入到hive中,使用类SQL语句进行统计分析
3.3.1 创建数据库
create database hive_2020;
查看创建的数据库
show databases;
数据库创建成功
3.3.2 创建表结构
根据文本文件的内容格式,每行数据根据逗号(,)分割,每个字段使用 string 类型进行存储,建立 hive 对应的表,建表语句如下:
create table hive_2020.student(
sno string,
name string,
gender string,
age int,
class string,
semester1 String,
semester2 String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as textfile;
建表语句中 hive_2020 为刚创建的数据库名称,这样可以直接看到表所在的数据库,也可以使用 use hive_2020,进入到指定的数据库中,然后再执行建表语句;建表语句后面的 2 句分别制定了文件内容分割符号和存储文件的格式。
需要特别说明一下 :hive 的分割符目前只能支持单字符,对于多字符需要特殊处理。目前通用的方式主要有 2 种:
- 使用单字符分割后,然后再使用数据时,将数据在进行额外的处理
- 重新实现 hive 的 InputFormat 方法,使其支持多字符分割
3.3.3 导入数据文件
可以将本地文件或者 hdfs 上的文件导入到 hive,命令执行方式如下:
#将本地文件导入hive
load data local inpath '本地路径' into table student;
#将HDFS文件导入hive
load data inpath 'HDFS路径' into table student;
为了方便,选择将本地文件直接导入 hive 的表中
- 将本地文件导入到 hive 中
load data local inpath '/root/2020/01/student' into table student;
导入成功后,显示“OK”信息
- 查看表中数据
3.3.4 准备分析
hive 的语句基本上与 SQL 语句保持一致,如何解决上述问题就转换成:如果使用关系型数据库我们怎么解决?
- 将每个学生第一学期的数据成绩拆分出来,使用 split 函数,以&进行拆分
- 获取每个学生第一学期的数据成绩,使用 split 函数,以=进行拆分
- 如果数学成绩大于等于 60,则赋值为 1,否则赋值为 0
- 统计 1 的个数作为及格的人数,使用总数减去及格的人数,作为不合格的人数
首先,放上最终 Hive 执行的语句
select
sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_pass ,
count(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) - sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_not_pass
from student;
然后,放上 Hive 的执行结果
最后,一步一步的分析一下 Hive 的执行语句:
- 从整体上看,该语句从 student 表中查询两个属性;
- 第一个属性是 count,第二个属性是 sum 值减去第一个属性值,从而可以简单的任务,一个是计算 count 值,另外一个是计算 sum 值;
- count 和 sum 函数的计算内容相同,分析他们共同内容,if(split(split(semester1,'&')[0],'=')[1]>=60,1,0);
- 使用 if 函数和两个 split 函数,最内层的 split 函数用于拆分第一个学期学科成绩,第二个 split 用于获取数学成绩,if 函数用于判断数学成绩是否大于等于 60,如果大于等于 60,则赋值为 1,否则赋值为 0;
- 则 count 用于计算 1 的个数,sum 则是用于计算总数。
3.3.5 执行程序
一些 hive 的命令会转化成 haooop 的 MapRecude 任务去执行,查看该任务的基本执行情况
PS:一些常用的命令:
查看 hive 中的所有指定函数:show functions; 查看 hive 中的指定函数使用方法:describe function 方法名
3.4 使用 Spark 解决
spark 源码是由 scala 语言进行编码的,Spark 任务是使用 scala 语言进行编写,没有 scala 语言基础的同学,需要对 scala 有一定的了解,才能更好的完成。
3.4.1 编写程序
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark Student Score")
.master("local[2]")
.getOrCreate()
val outKeyPass = "Math_Score_Pass";
val outKeyNotPass = "Math_Score_Not_Pass";
val sc :SparkContext = spark.sparkContext val lines = sc.textFile(args(0))
// 数据格式: 200412169,gavin,male,30,0401,math=24&en=60&c=85&os=78,math=22&en=20&c=85&os=78
val studentScoreCount = lines
.map(_.split(","))
.filter(_.length == 7)
.map(t => t(5))
.map(_.split("&"))
.filter(_.length == 4)
.map(t => t(0))
.map(_.split("="))
.filter(_.length ==2)
.map(t => if (t(1).toInt >= 60) (outKeyPass,1) else (outKeyNotPass,1) )
.reduceByKey(_+_)
.saveAsTextFile(args(1))
}
使用 RDD 的方式,执行思路与 Hadoop 的思路基本一致。使用逗号分割获取第一学期的成绩,使用&分割,获取数学成绩,判断数据成绩是否大于等于 60,如果满足,则生成("Math_Score_Pass",1),否则,生成("Math_Score_Not_Pass",1),然后使用 reduceByKey 将所有的结果进行累加,最后保存到 HDFS 的指定目录。
3.4.2 执行程序
spark-submit \
--master local \
--class com.spark.StudentScore \
original-spark-training-1.0-SNAPSHOT.jar \
/data/student /data/spark/studentscore/
3.4.3 查看执行结果
查看 hdfs 结果结果目录::
hdfs dfs -ls /data/spark/studentscore/
HDFS结果
查看 HDFS 结果内容:
hdfs dfs -cat /data/spark/studentscore/par*
在结果目录中,发现产生了 2 个 part-0000* 的文件,我们要看一下 具体是什么原因产生的: 查阅相关资料,发现从 hdfs 中读取文件,源码默认的分区数是 2,分区数决定最终的结果
在默认的 textfile 中,如果从 hdfs 中读取文件,源码中默认的分区数是 2,如果想改变分区数,可以在 textfile 中设置第二个参数“分区数”
查看 textFile 源码
其中 defaultMinPartitions 为定义的默认分区数:
3.4.4 源码
spark 的实现代码可以在 github 上进行下载spark-training[2]
参考资料
[1] hadoop-training: https://github.com/fulinmao/hadoop-training/
[2] spark-training: https://github.com/fulinmao/spark-training/
相关推荐
- Java框架 —— Spring简介
-
简介一般来说,Spring指的是SpringFramework,它提供了很多功能,例如:控制反转(IOC)、依赖注入(DI)、切面编程(AOP)、事务管理(TX)主要jar包org.sprin...
- Monkey自动化测试
-
Monkey1.通过Monkey程序模拟用户触摸屏幕、滑动Trackball、按键等操作来对设备上的程序进行压力测试,检测程序多久的时间会发生异常;2.Monkey主要用于Android的压力...
- 十年之重修SpringBoot启动&自动装载&Bean加载过程
-
总结Springboot的自动装载,完全是依赖Bean的自动注册,其中默认的规则,是把需要自动装载的bean全名称编辑在spring.factories(2.7之后的版本,还支持.imports文件)...
- 一些可以显著提高大型 Java 项目启动速度的尝试
-
我们线上的业务jar包基本上普遍比较庞大,动不动一个jar包上百M,启动时间在分钟级,拖慢了我们在故障时快速扩容的响应。于是做了一些分析,看看Java程序启动慢到底慢在哪里,如何去优化,...
- class 增量发包改造为 jar 包方式发布
-
大纲class增量发包介绍项目目录结构介绍jar包方式发布落地方案class增量发包介绍当前项目的迭代修复都是通过class增量包来发版本的将改动的代码class增量打包,如下图cla...
- Flink架构及其工作原理(很详细)
-
原文链接:https://www.cnblogs.com/code2one/p/10123112.html关键词:Flink架构、面试杀手锏!更多大数据架构、实战经验,欢迎关注【大数据与机器学习】,...
- 大促系统优化之应用启动速度优化实践
-
作者:京东零售宋维飞一、前言本文记录了在大促前针对SpringBoot应用启动速度过慢而采取的优化方案,主要介绍了如何定位启动速度慢的阻塞点,以及如何解决这些问题。希望可以帮助大家了解如何定位该类问...
- Maven工程如何使用非Maven仓库jar包
-
使用Maven之前,一直都是自己手工在网上搜索需要的jar包,然后添加到工程中。以这样的方式开发,工作了好多年,曾经以为以后也会一直这样下去。直到碰上Maven,用了第一次,就抛弃老方法了。Maven...
- 【推荐】一款开源免费、功能强大的短链接生成平台
-
项目介绍reduce是一款开源免费、功能强大的短链接生成平台。部署在服务器,使用短域名解析即可提供服务。CoodyFramework首秀,自写IOC、MVC、ORM、TASK、JSON、DB连接池、...
- K8S官方java客户端之七:patch操作
-
欢迎访问我的GitHubhttps://github.com/zq2599/blog_demos内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;...
- Java 的业务逻辑验证框架 之-fluent-validator
-
开发人员在维护核心业务逻辑的同时,还需要为输入做严格的校验。当输入不合法时,能够给caller一个明确的反馈,最常见的反馈就是返回封装了result的对象或者抛出exception。一些常见...
- 互联网大厂后端必看!手把手教你替换 Spring Boot 中的日志框架
-
在互联网大厂的后端开发工作中,SpringBoot框架是搭建项目的“得力助手”,使用十分普遍。但不少开发者都遇到过这样的困扰:SpringBoot默认集成的Logback日志框架,在实际...
- 测试经理教你如何用monkey进行压力测试!
-
一、monkey是什么1、monkey程序由android系统自带,使用Java语言写成,在Android文件系统中的存放路径是:/system/framework/monkey.jar2、Mo...
- Java-Maven详解
-
一、什么是Maven?ApacheMaven是一个软件项目管理的综合工具。基于项目对象模型(POM)的概念,提供了帮助管理构建、文档、报告、依赖、发布等方法,Maven简化和标准化项目建设过程。处理...
- SpringBoot打包部署最佳实践
-
springboot介绍SpringBoot目前流行的javaweb应用开发框架,相比传统的spring开发,springboot极大简化了配置,并且遵守约定优于配置的原则即使0配置也能正常运...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- hive行转列函数 (63)
- sourcemap文件是什么 (54)
- display none 隐藏后怎么显示 (56)
- 共享锁和排他锁的区别 (51)
- httpservletrequest 获取参数 (64)
- jstl包 (64)
- qsharedmemory (50)
- watch computed (53)
- java中switch (68)
- date.now (55)
- git-bash (56)
- 盒子垂直居中 (68)
- npm是什么命令 (62)
- python中+=代表什么 (70)
- fsimage (51)
- nginx break (61)
- mysql分区表的优缺点 (53)
- centos7切换到图形界面 (55)
- 前端深拷贝 (62)
- kmp模式匹配算法 (57)
- jsjson字符串转json对象 (53)
- jdbc connection (61)
- javascript字符串转换为数字 (54)
- mybatis 使用 (73)
- 安装mysql数据库 (55)