百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

flume部署安装以及案例运行(flume lzo)

wxin55 2024-11-10 12:17 9 浏览 0 评论

基本认识:

大数据阶段数据的收集来源, flume的收集数据一般是日志,比如:网站日志

flume是一个分布式的,可靠的,可用的

flume可以做离线也可以做实时分析

collecting --》source --》数据采集来源

aggregating --》channel --》数据临时缓存(只要数据被move了,那就不在存储了)

moving --》sink --》数据的转移

1、agent :source、channel、sink

(1)source:用于采集数据,将产生的数据流传输到Channel

(2)channel:连接 sources 和 sinks ,临时缓存数据

(3)sink:从Channel收集数据,将数据写到目标源

2、Events:

(1)是Flume数据传输的基本单元

(2)由header和载有数据的一个byte array构成,byte array字节数组:存储真实的数据

(3)每一个事件的大小:deserializer.maxLineLength2048字节,编码格式:UTF-8

一个source,可以绑定多个channel

一个sink,只能绑定一个channel


flume安装:

准备安装包

apache-flume-1.7.0-bin.tar.gz

解压缩

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/

配置文件:flume-env.sh

mv flume-env.sh.template flume-env.sh

配置jdk

export JAVA_HOME=/opt/bigdata/jdk1.8

测试是否成功

bin/flume-ng version

flume的flume-ng命令

Usage: bin/flume-ng [options]...

例如一些提交任务的命令(熟悉下格式):

bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties

bin/flume-ng agent -c conf -n agent -f conf/test.properties

bin/flume-ng avro-client --conf conf --host host --port 8080


配置情况选择:

1、flume安装在hadoop集群中:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

2、flume安装在hadoop集群中,而且还配置了HA:

(1)HDFS访问入口变化

(2)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(3)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录

3、flume不在hadoop集群里:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(2)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录

(3)将hadoop的一些jar包添加到flume的lib目录下(用的是什么版本拷贝什么版本)


运行官网案例:

编辑配置文件flume-test.properties(创建一个)

准备配置信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

查看系统有没有安装telnet:rpm -qa | grep telnet

没有的就安装: yum -y install telnet或yum -y install nc

文件log4j.properties显示的是日志信息配置

运行:

bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console

开启一个窗口:telnet连接端口号

telnet masterhbase 44444 (卡在那是正常的,你可以随意输入信息)

输入hello world

在flume中就可以看到数据

退出telnet:输入ctrl + ] 然后输入quit

运行实例一

需求:监控apache服务器的日志,利用flume监控某一个文件

安装httpd服务

yum -y install httpd

安装完成之后,会有个目录生成 /var/www/html

到/var/www/html这个目录下 vim index.html [随意输入内容]

启动服务: service httpd start

浏览网页:输入主机名[hostname]

日志产生的路径:/var/log/httpd/access_log

配置flume agent

source: exec

channel:memory

sink:hdfs

我们复制配置文件

编辑信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /var/log/httpd/access_log


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

一些配置说明


问题:hdfs上的文件一般数据文件大小要大,而且文件数量是要少


hdfs.rollInterval = 600 (这个地方最好还是设置一个时间)

hdfs.rollSize = 1048576 (1M,134217728-》128M)

hdfs.rollCount = 0

hdfs.minBlockReplicas = 1 (这个不设置的话,上面的参数有可能不会生效)


在hdfs文件上设置时间格式分层 年月日/时 每小时生成一个文件

hdfs.useLocalTimeStamp = true

hdfs.round = true

hdfs.roundValue= 1

hdfs.roundUnit = hour

将准备好的jar上传到flume/lib中

运行

查看hdfs上,不断刷新会有新的文件

查看下进程

运行实例二

利用flume监控某一个文件目录,将目录下滚动好的文件实时抽取到HDFS上

类型选择

source:spooldir

channel:file

sink:hdfs

创建配置文件flume-spooldir.properties

编写信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data/logs

a1.sources.r1.recursiveDirectorySearch = true


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text


# Describe the channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir

a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置信息概念补充

1.source:spooldir(已经生成好的最终的数据文件)

(1)recursiveDirectorySearch 是否监视子目录以查找要读取的新文件

(2)includePattern 正则表达式,指定要包含的文件 (只.csv数据文件,是正则匹配)

(3)ignorePattern 正则表达式,指定要忽略的文件 (不抽取.csv数据文件,是正则匹配)

(4)缺点:不能对目录文件进行修改,如果有追加内容的文本文件,是不允许的(有可能不会被抽取,有可能会有错误)


2.flume监控目录,支持文件修改,并记录文件状态

(1)source:taildir (类似exec + spooldir的组合)

(2)filegroups :设置source组 可设置多个 filegroups = f1

(3)filegroups.:设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件

(4)positionFile:设置定位文件的位置,以JSON格式写入给定位置文件上每个文件的最后读取位置

3.Memory Channel是一个不稳定的channel,它在内存中存储所有事件,

如果进程异常停止,内存中的数据将不能让恢复,而且受内存大小的限制。

4.flie channel:是一个持久化的channel,数据安全并且只要磁盘空间足够,它就可以将数据存储到磁盘上

5.checkpointDir:检查数据完整性,存放检查点目录,可以检测出哪些数据已被抽取,哪些还没有

6.dataDirs:存放数据的目录,dataDirs可以是多个目录,以逗号隔开,用独立的多个磁盘上的多个目录可以提高file channel的性能。

7.hdfs上数据默认是二进制的文件类型:bin/hdfs dfs -text /

8.可以修改hdfs.fileType 改为DataStream(数据流)hdfs.writeFormat = Text 改为文本格式

9.当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;hdfs.codeC压缩编码解码器 --》snappy压缩

10.batchSize默认值:100 每个批次刷新到HDFS上的events数量;


创建目录

mkdir –p /data/logs

模拟数据

cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/

查看数据

运行

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties

查看下HDFS

运行实例三

将hive的一些jar拷贝过来 flume的lib目录下

配置flume agent

source:netcat

channel:Memory

sink:hive

启动hive的元数据服务:

/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &

创建库和表 (表必须是CLUSTERED BY ,INTO BUCKETS)

create database flume_test;

use flume_test;

create table flume_user(

user_id int,

user_name string,

user_age int

)CLUSTERED BY (user_id) INTO 2 BUCKETS

row format delimited fields terminated by '\t'

stored as orc;

准备配置文件flume-sink-hive.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = flume_user

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age

a1.sinks.k1.serializer.serdeSeparator = '\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置概念补充

1.serializer: 负责解析事件中的字段并将它们映射到hive表中的列

(2)DELIMITED 普通文本

(2)json json文件 (不需要配置,JSON中的对象名称直接映射到Hive表中具有相同名称的列, 内部使用

org.apache.hive.hcatalog.data.JsonSerDe)


2.DELIMITED:

serializer.delimiter:传入数据中的字段分隔符,用双引号括起来,例如"\t"

serializer.fieldnames:从输入字段到hive表中的列的映射,指定为hive表列名称的逗号分隔列表

serializer.serdeSeparator :输出字段分隔符,单引号括起来,例如'\t'

hive参数设置vim hive-site.xml:


<property>
??? <name>hive.metastore.uris</name>
??? <value>thrift://masterhbase:9083</value>
</property>
<property>
??? <name>hive.txn.manager</name>
??? <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
??? <name>hive.compactor.initiator.on</name>
??? <value>true</value>
</property>
<property>
??? <name>hive.compactor.worker.threads</name>
??? <value>1</value>
</property>
<property>
??? <name>hive.support.concurrency</name>
??? <value>true</value>
</property>
<property>
??? <name>hive.enforce.bucketing</name>
??? <value>true</value>
</property>
<property>
??? <name> hive.exec.dynamic.partition.mode</name>
??? <value>nonstrict</value>
</property>
<property>
??? <name>hive.in.test</name>
??? <value>true</value>
</property>


解决报错问题

(1)报错:

Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns

-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

打开一部分事务支持

-》协同配置

hive.compactor.initiator.on=true; -》运行启动程序和清除线程,用于打开所需参数的完整列表事务

hive.compactor.worker.threads=1; -》增加工作线程的数量将减少花费的时间

hive.support.concurrency=true; -》是否支持并发,默认是false

hive.enforce.bucketing=true; -》是否启用bucketing,写入table数据时会启动分桶

hive.exec.dynamic.partition.mode=nonstrict; -》设置非严格模式

(2)启动metastore时报错:

Table 'metastore.COMPACTION_QUEUE' doesn't exist

配置以下属性:这个是用来创建COMPACTION_QUEUE这张表的


hive.in.test

true


(3)再启动metastore时报错:

Error rolling back: Can't call rollback when autocommit=true

去掉以下属性:


hive.in.test

true

之前没有安装,先安装

启动flume agent

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties

使用nc去连接,然后输入数据,数据以制表符分割

Hive中可以看到数据

运行实例四(hive)

创建表

create table emp(

empno int,

ename string,

job string,

mgr int,

hiredate string,

sal double,

comm double,

deptno int

)CLUSTERED BY (empno) INTO 2 BUCKETS

row format delimited fields terminated by '\t'

stored as orc;

准备配置信息flume-sink-hive2.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /data/emp.txt


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = emp

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno

a1.sinks.k1.serializer.serdeSeparator = '\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

运行flume

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties

查看数据


相关推荐

ES6中 Promise的使用场景?(es6promise用法例子)

一、介绍Promise,译为承诺,是异步编程的一种解决方案,比传统的解决方案(回调函数)更加合理和更加强大在以往我们如果处理多层异步操作,我们往往会像下面那样编写我们的代码doSomething(f...

JavaScript 对 Promise 并发的处理方法

Promise对象代表一个未来的值,它有三种状态:pending待定,这是Promise的初始状态,它可能成功,也可能失败,前途未卜fulfilled已完成,这是一种成功的状态,此时可以获取...

Promise的九大方法(promise的实例方法)

1、promise.resolv静态方法Promise.resolve(value)可以认为是newPromise方法的语法糖,比如Promise.resolve(42)可以认为是以下代码的语...

360前端一面~面试题解析(360前端开发面试题)

1.组件库按需加载怎么做的,具体打包配了什么-按需加载实现:借助打包工具(如Webpack的require.context或ES模块动态导入),在使用组件时才引入对应的代码。例如在V...

前端面试-Promise 的 finally 怎么实现的?如何在工作中使用?

Promise的finally方法是一个非常有用的工具,它无论Promise是成功(fulfilled)还是失败(rejected)都会执行,且不改变Promise的最终结果。它的实现原...

最简单手写Promise,30行代码理解Promise核心原理和发布订阅模式

看了全网手写Promise的,大部分对于新手还是比较难理解的,其中几个比较难的点:状态还未改变时通过发布订阅模式去收集事件实例化的时候通过调用构造函数里传出来的方法去修改类里面的状态,这个叫Re...

前端分享-Promise可以中途取消啦(promise可以取消吗)

传统Promise就像一台需要手动组装的设备,每次使用都要重新接线。而Promise.withResolvers的出现,相当于给开发者发了一个智能遥控器,可以随时随地控制异步操作。它解决了三大...

手写 Promise(手写输入法 中文)

前言都2020年了,Promise大家肯定都在用了,但是估计很多人对其原理还是一知半解,今天就让我们一起实现一个符合PromiseA+规范的Promise。附PromiseA+规范地址...

什么是 Promise.allSettled()!新手老手都要会?

Promise.allSettled()方法返回一个在所有给定的promise都已经fulfilled或rejected后的promise,并带有一个对象数组,每个对象表示对应的pr...

前端面试-关于Promise解析与高频面试题示范

Promise是啥,直接上图:Promise就是处理异步函数的API,它可以包裹一个异步函数,在异步函数完成时抛出完成状态,让代码结束远古时无限回掉的窘境。配合async/await语法糖,可...

宇宙厂:为什么前端离不开 Promise.withResolvers() ?

大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发。1.为什么需要Promise.with...

Promise 新增了一个超实用的 API!

在JavaScript的世界里,Promise一直是处理异步操作的神器。而现在,随着ES2025的发布,Promise又迎来了一个超实用的新成员——Promise.try()!这个新方法简...

一次搞懂 Promise 异步处理(promise 异步顺序执行)

PromisePromise就像这个词的表面意识一样,表示一种承诺、许诺,会在后面给出一个结果,成功或者失败。现在已经成为了主流的异步编程的操作方式,写进了标准里面。状态Promise有且仅有...

Promise 核心机制详解(promise机制的实现原理)

一、Promise的核心状态机Promise本质上是一个状态机,其行为由内部状态严格管控。每个Promise实例在创建时处于Pending(等待)状态,此时异步操作尚未完成。当异步操作成功...

javascript——Promise(js实现promise)

1.PromiseES6开始支持,Promise对象用于一个异步操作的最终完成(包括成功和失败)及结果值的表示。简单说就是处理异步请求的。之所以叫Promise,就是我承诺,如果成功则怎么处理,失败怎...

取消回复欢迎 发表评论: