Flume入门到实践--Flume的安装与基础概念与安装实战

news/2024/9/28 7:54:30 标签: flume, 大数据

        在当今大数据时代,有效管理和处理海量日志数据对于企业获取洞察和保持运营效率至关重要。为此目的设计的强有力工具之一是 Apache Flume。本文将带您探索Flume,了解其功能、安装方法以及一些实际用例,以展示其在处理日志数据方面的有效性。

Flume简介

        Apache Flume 是一个分布式、可靠且可用的服务,旨在高效地收集、聚合和移动来自各种来源的大量日志数据到集中式数据存储。它在处理管道中扮演着关键角色,充当数据流的导管。

        Flume 常被形象地比喻为“水管”,一端连接着数据源(水源),另一端连接着数据存储(水桶)。它特别适合于实时日志数据的收集与传输。

Flume的核心组件

  1. Agent:Flume的基本工作单元,每个Flume配置文件可以包含一个或多个Agent。
  2. Source:数据的来源,可以是日志文件、网络端口等。
  3. Channel:数据的通道,用于暂存从Source到Sink的数据。
  4. Sink:数据的目的地,可以是HDFS、HBase或其他存储系统。
  5. Event:Flume中数据的基本单位。

Flume的关键特性

  1. 可靠的数据处理: Flume确保在传输过程中不会丢失数据,提供端到端的可靠性。
  2. 持久性: 它使用通道支持持久化存储,确保在故障情况下数据不会丢失。
  3. 灵活性: Flume可以通过各种源、通道和接收器进行定制,以适应特定的数据流需求。
  4. 可扩展性: 它旨在随着数据量的增加而扩展,适合大规模数据处理。

安装和配置

通过网盘分享的文件

apache-flume-1.9.0-bin.tar.gz

安装Flume

Flume的安装相对简单,以Flume 1.9.0版本为例

下载并解压

把安装包拉到/opt/modules目录下

进入/opt/modules目录下

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/

进入/opt/installs目录下

mv apache-flume-1.9.0-bin/ flume

配置环境变量

进入/etc/profile配置环境变量

export FLUME_HOME=/opt/installs/flume
export PATH=$PATH:$FLUME_HOME/bin

使用命令配置环境变量

echo 'export FLUME_HOME=/opt/installs/flume' >> /etc/profile
echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile

刷新环境变量

source /etc/profile

修改配置文件

在/opt/installs/flume/conf路径下复制并修改flume-env.sh文件。

cp flume-env.sh.template flume-env.sh

 修改 JAVA_HOME 的路径为自己的 jdk 路径。

export JAVA_HOME=/opt/installs/jdk

Flume的数据模型

Flume支持单一数据流和多数据流模型,可以根据实际需求灵活配置。

单一数据流模型

单一数据流模型包含一个Agent,适用于简单的日志收集场景。

多数据流模型

多数据流模型可以包含多个Agent,Agent之间可以进行数据的流转和处理。

Flume的使用

        Flume的使用主要依赖于配置文件,通过定义Source、Channel和Sink的组件及其关系来实现数据的流动。

编写 conf文件

flume 的使用是编写 conf文件的,运行的时候指定该文件

# 定义组件的名字
<Agent>.sources = <Source>
a1.sources=s1
<Agent>.channels = <Channel1> <Channel2>
a1.channels=c1
<Agent>.sinks = <Sink>
a1.sinks=sink1

# 设置source 和 channel 之间的关系
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
a1.sources.s1.channels=c1

# 设置sink 和 channel 之间的关系
<Agent>.sinks.<Sink>.channel = <Channel1>
a1.sinks.sink1.channel=c1

先定义agent的名字,再定义agent中三大组件的名字
接着定义各个组件之间的关联关系
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.channels = mem-channel-1
agent_foo.sinks = hdfs-sink-1

# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

常见的组件

 参考网址:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了

常见的Source 

常见的channel

常见的sink

总结:Kafka 可以充当flume中的各个组件。三个组件可以两两组合在一起,所以使用场景非常的多。

案例展示

1.)Avro+Memory+Logger(Avro + 内存 + 日志)

此设置适用于演示目的。它监听指定端口,通过内存收集数据,并将其记录到控制台。

先找source 中的avro看需要设置什么参数

#编写s1的类型是什么
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.32.128
a1.sources.s1.port = 4141
a1.sources.s1.channels = c1

找到channel中的memory类型,再设置一下

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
#source 或者 sink 每个事务中存取 Event 的操作数量
a1.channels.c1.transactionCapacity = 10000

接着查找sink,sink的类型是logger

a1.sinks.s2.channel = c1
a1.sinks.s2.type = logger

最终合并起来的文件就是:

配置

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  avro-memory-log.conf

将配置文件的内容拷贝进去

  启动命令

flume-ng agent -c ../ -f avro-memory-log.conf -n a1 -Dflume.root.logger=INFO,console

 -c  后面跟上 配置文件的路径
-f  跟上自己编写的conf文件
-n  agent的名字
-Dflume.root.logger=INFO,console   INFO 日志输出级别  Debug,INFO,warn,error 等

接着向端口中发送数据:

flume-ng avro-client -c /opt/installs/flume/conf/ -H bigdata01 -p 4141 -F /home/hivedata/arr1.txt

给avro发消息,使用avro-client

flume是没有运行结束时间的,它一直监听某个Ip的端口,有消息就处理,没消息,就等着,反正不可能运行结束。

如果想停止,可以使用ctrl + c 终止flume

2)Exec + Memory + HDFS(执行命令 + 内存 + HDFS)

此配置用于持续监控日志文件,并将它们存储在HDFS中。

配置

以下版本演示的是没有时间语义的案例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hivedata/arr1.txt
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000


a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/event/

接着我们演示hdfs文件中含有时间转义字符怎么办?

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txt

a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间,否则会报时间戳是null的异常
a1.sinks.k1.hdfs.useLocalTimeStamp=true

flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  exec-memory-hdfs.conf

将配置文件的内容拷贝进去

 启动命令

flume-ng agent -c ./ -f exec-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

假如hdfs中有时间转义字符,必须给定时间,给定时间有两种方式,要么通过header 传递一个时间,要么使用本地时间。

假如hdfs中使用了时间转义字符,配置文件中必须二选一:
1)useLocalTimeStamp=true
2)使用时间戳拦截器

时间需要转义,没有时间无法翻译为具体的值  %d 就无法翻译为 日期

实现不断的向a.txt中存入数据的效果

echo "Hello World" >> a.txt

3)Spool +File + HDFS(Spooldir + 文件 + HDFS)

此设置非常适合处理包含多个不断更新的日志文件的目录。

配置

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /home/scripts/
a1.sources.src-1.fileHeader = true

a1.channels.ch-1.type = file

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = ch-1
a1.sinks.k1.hdfs.path = /flume/

flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  Spool_File_hdfs.conf

将配置文件的内容拷贝进去

 启动命令

flume-ng agent -c ./ -f Spool_File_hdfs.conf -n a1 -Dflume.root.logger=INFO,console

数据采集的意思:不管是一个文件还是一个文件夹,数据都是不断产生的,特别是生产环境下更是如此。

以上的采集只能采集到文件夹中是否有新的文件产生,不能采集变化的文件。

抽取一个文件夹中的所有文件,子文件夹中的文件是不抽取的,抽取过的文件,数据发生了变化,也不会再抽取一次。

假如你的channel是 file类型,必定会有临时文件产生,产生的文件在

总结:

channel 一般常用的只要两个,一个是memory,一个是file ,最高效的是memory ,也是最常用的。

 

4)TailDir+Memory+HDFS(TailDir+日志+HDFS)

这个案例演示了如何监控文件内容的变化,将变化的内容实时上传至HDFS。

配置

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
# . 代表的意思是一个任意字符   * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*


a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs

flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  taildir_memory_hdfs.conf

将配置文件的内容拷贝进去

启动命令

flume-ng agent -c ./ -f taildir_memory_hdfs.conf -n a1 -Dflume.root.logger=INFO,console

 数据不断的发生变化,每发生一次变化,就传递一次,源源不断的抽取出来。

刚才为什么抽取了那么多个文件还没有抽取完?
由于我们刚才的一些数据非常的大,根据如下参数可以疯狂创建文件:
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize  1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)

再次抽取,发现不抽了,原因是有一个文件记录了以前抽取过的内容,将其删除:
rm -rf /root/.flume/taildir_position.json

修改1.txt 里面的内容,flume会再次抽取数据
echo "hello" >> 1.txt

视频链接

01-flume的介绍_哔哩哔哩_bilibili

02-flume的安装_哔哩哔哩_bilibili

03-flume的数据模型_哔哩哔哩_bilibili

04-flume中的conf的语法_哔哩哔哩_bilibili

05-avro+mem+log的演示_哔哩哔哩_bilibili

06-vi和touch创建文件后有何不同_哔哩哔哩_bilibili

07-exec+mem+hdfs的演示_哔哩哔哩_bilibili

08-spool+file+hdfs_哔哩哔哩_bilibili

09-taildir+mem+hdfs_哔哩哔哩_bilibili

结论

        Apache Flume是一个多功能工具,它简化了收集、聚合和移动大量日志数据的过程。其在配置上的灵活性和数据处理的稳健性使其成为处理大数据的理想选择。无论您是在实时监控日志文件还是在多个源中聚合数据,Flume都提供了一个可扩展且可靠的解决方案。本文介绍了Flume的基础概念、安装配置以及几个典型的使用案例,希望能够帮助读者更好地理解和使用Flume。


http://www.niftyadmin.cn/n/5680678.html

相关文章

Apache OFBiz SSRF漏洞CVE-2024-45507分析

Apache OFBiz介绍 Apache OFBiz 是一个功能丰富的开源电子商务平台&#xff0c;包含完整的商业解决方案&#xff0c;适用于多种行业。它提供了一套全面的服务&#xff0c;包括客户关系管理&#xff08;CRM&#xff09;、企业资源规划&#xff08;ERP&#xff09;、订单管理、产…

大数据电商数仓项目--实战(一)数据准备

第一章 数仓分层 1.1 为什么要分层 1.2 数仓命名规范 1.2.1 表命名 ODS层命名为ods_表名DIM层命名为dim_表名DWD层命名为dwd_表名DWS层命名为dws_表名DWT层命名为dwt_表名ADS层命名为ads_表名临时表命名为tmp_表名 1.2.2 表字段类型 数量类型为bigint金额类型为decimal(16…

SpringBoot开发——实现webservice服务端和客户端

文章目录 1.WebService介绍1.1. 类型1.2. 架构1.3. Web服务标准和技术2、服务端和客户端示例2.1. 添加依赖2.2. 实现WebService服务端2.2.1 定义接口2.2.2 实现接口2.2.3 配置并启动服务2.3. 实现WebService客户端大家工作多年,在工作上难免不了和传统企业打交道,而这样的企业…

py-mmcif 包atom_site 对象介绍

在 py-mmcif 包中&#xff0c;atom_site 对象用于存储蛋白质或小分子结构中每个原子的坐标及其他相关信息。它包含了每个原子的位置、类型、残基编号等详细信息&#xff0c;通常对应于 PDB 文件的 ATOM 记录。 常见的 atom_site 属性 以下是 atom_site 对象中一些常见的属性&…

Maven常见解决方案

maven引用不到本地仓库的jar&#xff0c;jar是存在的 idea中maven本地仓库jar包打包失败和无法引用的问题解决_java_脚本之家

CSS选择器的全面解析与实战应用

CSS选择器的全面解析与实战应用 一、基本选择器1.1 通配符选择器&#xff08;*&#xff09;2.标签选择器&#xff08;div&#xff09;1.3 类名选择器&#xff08;.class&#xff09;4. id选择器&#xff08;#id&#xff09; 二、 属性选择器&#xff08;attr&#xff09;三、伪…

力扣(leetcode)每日一题 2516 每种字符至少取 K 个 | 滑动窗口

2516. 每种字符至少取 K 个 给你一个由字符 a、b、c 组成的字符串 s 和一个非负整数 k 。每分钟&#xff0c;你可以选择取走 s 最左侧 还是 最右侧 的那个字符。 你必须取走每种字符 至少 k 个&#xff0c;返回需要的 最少 分钟数&#xff1b;如果无法取到&#xff0c;则返回…

SpringBoot开发——使用Hutool工具包处理日期时间详解

文章目录 1、Hutool简介2、使用Hutool工具包进行日期时间处理2.1 添加依赖2.2 使用DateUtil类2.3 高级功能2.4. 优化指南1、Hutool简介 Hutool是一个功能丰富且易用的Java工具包,通过静态方法封装,降低相关API的学习成本,提高工作效率。它涵盖了字符串、数字、集合、编码、…