Flume 的安装和使用方法

一、Flume的安装

1.下载压缩包

https://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2.上传到linux中

3.解压安装包
cd
#进入加载压缩包目录sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local
# 将 apache-flume-1.7.0-bin.tar.gz 解压到/usr/local 目录下sudo mv /usr/local/apache-flume-1.7.0-bin /usr/local/flume-1.7.0
将解压后的文件夹名称从 apache-flume-1.7.0-bin 改为 flume-1.7.0cd /usr/local/sudo chown -R qiangzi:qiangzi ./flume-1.7.0
#把/usr/local/flume-1.7.0 目录的权限赋予当前登录 Linux 系统的用户,这里假设是 qiangzi 用户
4.配置环境变量
cd /usr/lcoal/sudo vim ~/.bashrc

然后在首行加入如下代码:

export JAVA_HOME=/usr/local/jdk1.8.0/;
export FLUME_HOME=/usr/local/flume-1.7.0 
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin

注意,上面的 JAVA_HOME,如果以前已经在.bashrc 文件中设置过,就不要重复添加 了,使用以前的设置即可。比如,以前设置得 JAVA_HOME 可能是“export JAVA_HOME=/usr/lib/jvm/default-java”,则使用原来的设置即可。

使环境变量生效:

[qiangzi@master local]$ source ~/.bashrc
5.修改 flume-env.sh 配置文件
cd /usr/local/flume-1.7.0/confsudo cp ./flume-env.sh.template ./flume-env.shsudo vim ./flume-env.sh

打开 flume-env.sh 文件以后,在文件的最开始位置增加一行内容,用于设置 JAVA_HOME 变量

export JAVA_HOME=/usr/local/jdk1.8.0/;
6.查看 flume 版本信息
cd /usr/local/flume-1.7.0./bin/flume-ng version #查看 flume 版本信息;

如果安装成功,出现如下图片

二、Flume的使用

1.使用 Avro 数据源测试 Flume

        Avro 可以发送一个给定的文件给 Flume,Avro 源使用 AVRO RPC 机制。请对 Flume 的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件 helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动 Flume 以后, 可以把 helloworld.txt 中的文本内容显示出来。

(1)agent 配置文件
cd /usr/local/flumesudo vim ./conf/avro.conf #在 conf 目录下编辑一个 avro.conf 空文件

然后,在 avro.conf 写入以下内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

        上面 Avro Source 参数说明如下: Avro Source 的别名是 avro,也可以使用完整类别名 称 org.apache.flume.source.AvroSource,因此,上面有一行设置是 a1.sources.r1.type = avro,表示数据源的类型是 avro。bind 绑定的 ip 地址或主机名,使用 0.0.0.0 表示绑定机 器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。port 表示绑定 的端口。a1.sources.r1.port = 4141,表示绑定的端口是 4141。a1.sinks.k1.type = logger, 表示 sinks 的类型是 logger。

(2)启动 flume agent a1
/usr/local/flume-1.7.0/bin/flume-ng agent -c /usr/local/flume-1.7.0/conf/ -f /usr/local/flume-1.7.0/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

agent 窗口

(3)创建指定文件

先打开另外一个终端,在/usr/local/flume 下写入一个文件 log.00,内容为 hello,world

echo "hello, world" > /usr/local/flume-1.7.0/log.00

前面已经启动日志控制台,这里不再启动

再打开另外一个终端,执行:

cd /usr/local/flume-1.7.0bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /usr/local/flume-1.7.0/log.00

此时我们可以看到第一个终端(agent 窗口)下的显示,也就是在日志控制台,就会把 log.00 文件的内容打印出来:

avro source 执行成功!

2. 使用 netcat 数据源测试 Flume
(1)创建 agent 配置文件
cd /usr/local/flume-1.7.0sudo vim ./conf/example.conf   #在 conf 目录创建 example.conf

在 example.conf 里写入以下内容:

#example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source                                                                                         a1.sources.r1.type = netcat                                                                                             a1.sources.r1.bind = localhost                                                                                          a1.sources.r1.port = 44444                                                                                              # Describe the sink                                                                                                     a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 
(2)启动 flume agent (即打开日志控制台)
/usr/local/flume-1.7.0/bin/flume-ng agent --conf /usr/local/flume-1.7.0/conf --conf-file /usr/local/flume-1.7.0/conf/example.conf --name a1 -Dflume.root.logger=INFO,console

如图:

再打开一个终端,输入命令

telnet localhost 44444   #上面不能使用,使用nc
nc localhost 44444

 然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如 我们输入”hello,world”或者其他内容,得出

       第一个终端的日志控制台显示:

netcatsource 运行成功!

        这里补充一点,flume 只能传递英文和字符,不能用中文,我们先可以在第二个终端输 入“中国河南省”等字样:

        第一个终端的日志控制台显示:

3.使用 Flume 作为 Spark Streaming 数据源

        Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集 到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写 的 Spark Streaming 应用程序对消息进行处理。

(1)配置 Flume 数据源

        登录 Linux 系统,打开一个终端,执行如下命令新建一个 Flume 配置文件 flume-to-spark.conf

cd /usr/local/flume-1.7.0
cd conf
vim flume-to-spark.conf

        flume-to-spark.conf 文件中写入如下内容:

        在上面的配置文件中,我们把 Flume Source 类别设置为 netcat,绑定到 localhost 的 33333 端口,这样,我们后面就可以通过“telnet localhost 33333”命令向 Flume Source 发 送消息。

         同时,我们把 Flume Sink 类别设置为 avro,绑定到 localhost 的 44444 端口,这样, Flume Source 把采集到的消息汇集到 Flume Sink 以后,Sink 会把消息推送给 localhost 的 44444 端口,而我们编写的 Spark Streaming 程序一直在监听 localhost 的 44444 端口,一 旦有消息到达,就会被 Spark Streaming 应用程序取走进行处理。

        特别要强调的是,上述配置文件完成以后,暂时“不要”启动 Flume Agent,如果这个时 候使用“flume-ng agent”命令启动 agent,就会出现错误提示“localhost:44444 拒绝连接”,也 就是 Flume Sink 要发送消息给 localhost 的 44444 端口,但是,无法连接上 localhost 的 44444 端口。为什么会出现这个错误呢?因为,这个时候我们还没有启动 Spark Streaming 应用程序,也就没有启动 localhost 的 44444 端口,所以,Sink 是无法向这个端口发送消息 的。

(2)Spark 的准备工作(版本2.4.8以下,新版本没有相关jar包

        Kafka 和 Flume 等高级输入源,需要依赖独立的库(jar 文件)。按照我们前面安装好 的 Spark 版本,这些 jar 包都不在里面,为了证明这一点,我们现在可以测试一下。请打开 一个新的终端,然后启动 spark-shell:

cd /usr/local/spark-2.4.8
./bin/spark-shell

        启动成功后,在 spark-shell 中执行下面 import 语句:

        你可以看到,马上会报错,因为找不到相关的 jar 包。所以,现在我们就需要下载 spark-streaming-flume_2.12-2.4.8.jar,其中2.11表示对应的Scala版本号,3.5.1表示Spark 版本号。打开下方的网址 https://mvnrepository.com/artifact/org.apache.spark?p=2,里 面有提供 spark-streaming-flume_2.12-2.4.8.jar 文件的下载。

        查看Spark和Scala版本(Spark安装版本最好在2.4.8版本之下,高版本没有相关jar包

        Spark下载网址:Index of /dist/spark (apache.org)

        进入网站,选择目录

        选择spark版本scala版本

        查看所有的版本

        选择对应版本下载

        下载好的文件上传到 Linux 登录用户目录下

        现在,我们在“/usr/local/spark-2.4.8/jars”(由于版本不匹配)目录下新建一个“flume”目录,就把这个文件复制到 Spark 目录的“/usr/local/spark-2.4.8/jars/flume”目录下。请新打开一个终端,输入下面命令:      

  

cd /usr/local/spark-2.4.8/jars
mkdir flumecd ~cp ./spark-streaming-flume_2.12-2.4.8.jar /usr/local/spark/jars/flume

        我们就成功地把 spark-streaming-flume_2.11-2.1.0.jar 文件拷贝到了 “/usr/local/spark/jars/flume”目录下。

        下面还要继续把 Flume 安装目录的 lib 目录下的所有 jar 文件复制到 “/usr/local/spark-2.4.8/jars/flume”目录下,请在终端中执行下面命令:

cd /usr/local/flume-1.7.0/lib
ls
cp ./* /usr/local/spark-2.4.8/jars/flume
(3)编写 Spark 程序使用 Flume 数据源

        新打开一个终端,然后,执行命令创建代码目录:

cd /usr/local/spark-2.4.8/mycode
mkdir flume
cd flume
mkdir -p src/main/scala
cd src/main/scala
vim FlumeEventCount.scala

        在 FlumeEventCount.scala 代码文件中输入以下代码:

package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: FlumeEventCount <host> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(host, IntParam(port)) = argsval batchInterval = Milliseconds(2000)// Create the context and set the batch sizeval sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, batchInterval)// Create a flume streamval stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)// Print out the count of events received from this server in each batchstream.count().map(cnt => "Received " + cnt + " flume events." ).print()ssc.start()ssc.awaitTermination()}
}

        保存 FlumeEventCount.scala 文件并退出 vim 编辑器。FlumeEventCount.scala 程序在 编译后运行时,需要我们提供 host 和 port 两个参数,程序会对指定的 host 和指定的 port 进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定 的端口中获取由 Flume Sink 发给该端口的消息,然后进行处理,对消息进行统计,打印出 “Received 0 flume events.”这样的信息。

        然后再使用 vim 编辑器新建 StreamingExamples.scala 文件,输入如下代码,用于控 制日志输出格式:

package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}}
}

        保存 StreamingExamples.scala 文件并退出 vim 编辑器。

        在“/usr/local/spark-2.4.8/mycode/flume/src/main/scala”目录下,就有了如下两个 代码文件:

        然后,执行下面命令新建一个 simple.sbt 文件:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.4.8"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.12" % "2.4.8" 

        保存文件退出 vim 编辑器。然后执行下面命令,进行打包编译:

cd /usr/local/spark-2.4.8/mycode/flume/
/usr/local/sbt/sbt package  #执行这一步之前需要先安装 sbt 插件

        如果看到类似如下的屏幕信息,就表示打包成功了:

        打包成功后,就可以执行程序测试效果了

(4)测试程序效果

        关闭之前打开的所有终端。首先,请新建第 1 个 Linux 终端,启动 Spark Streaming 应 用程序,命令如下:

cd /usr/local/spark-2.4.8./bin/spark-submit --driver-class-path /usr/local/spark-2.4.8/jars/*:/usr/local/spark-2.4.8/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark-2.4.8/mycode/flume/target/scala-2.11/simpleproject_2.11-1.0.jar localhost 44444

        通过上面命令,我们为应用程序提供 host 和 port 两个参数的值分别为 localhost 和 44444,程序会对 localhost 的 44444 端口进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定的端口中获取由 Flume Sink 发给该端口的消息, 然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。

        执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔 2 秒钟刷新一次信息, 大量信息中会包含如下重要信息:

        因为目前 Flume 还没有启动,没有给 FlumeEventCount 发送任何消息,所以 Flume Events 的数量是 0。第 1 个终端不要关闭,让它一直处于监听状态。

        现在,另外新建第 2 个终端,在这个新的终端中启动 Flume Agent,命令如 下:

cd /usr/local/flume-1.7.0
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

        启动 agent 以后,该 agent 就会一直监听 localhost 的 33333 端口,这样,我们下面就 可以通过“telnet localhost 33333”命令向 Flume Source 发送消息。第 2 个终端也不要关闭, 让它一直处于监听状态。

        另外新建第 3 个终端,执行如下命令:

telnet localhost 33333
#或者
nc localhost 33333

        执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息 都会被 Flume 监听到,Flume 把消息采集到以后汇集到 Sink,然后由 Sink 发送给 Spark 的 FlumeEventCount 程序进行处理。然后,你就可以在运行 FlumeEventCount 的前面那个 终端窗口内看到类似如下的统计结果:

        从屏幕信息中可以看出,我们在 telnet 那个终端内发送的消息,都被成功发送到 Spark 进行处理了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3015001.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

119. 再谈接口幂等性

文章目录 0. 前言1. insert前先select2. 加悲观锁3. 加乐观锁5. 加唯一索引【配合 &#xff08;1. insert前先select &#xff09;最常用 】6. 建防重表6. 根据状态机7. 加分布式锁8. 获取token 0. 前言 在 93. 通用防重幂等设计 一文中&#xff0c;已经介绍过幂等的使用。该文…

力扣:63. 不同路径 II

63. 不同路径 II 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish”&#xff09;。 现在考虑网格中有障碍物。那么…

C# Web控件与数据感应之 BaseDataList 类

目录 关于数据感应 BaseDataList 类 范例运行环境 pageview 方法 设计 实现 调用示例 数据源 调用 小结 关于数据感应 数据感应也即数据捆绑&#xff0c;是一种动态的&#xff0c;Web控件与数据源之间的交互&#xff0c;本文将继续介绍以与数据库提取数据并捆绑控件…

CI522/CI523电动车NFC一键启动开发资料

Ci522是一颗工作在13.56MHz频率下的非接触式读写芯片&#xff0c;支持读A卡&#xff08;CI523支持读A/B卡&#xff09;&#xff0c;可做智能门锁、电动车NFC一键启动、玩具NFC开锁等应用。为部分要求低成本&#xff0c;PCB小体积的产品提供了可靠的选择。 Ci522与Si522/MFRC52…

第3章 WebServer重构

3.1 重构原生Web服务框架 3.1.1 分析原生Web服务框架 在服务端代码的 ClientHandler 中&#xff0c;请求解析、处理请求、返回响应的代码混杂在一起&#xff0c;这样的设计会导致代码难以维护和理解。为了提高代码的可读性、可维护性和可扩展性&#xff0c;我们需要对这些代码…

UDP广播

1、UDP广播 1.1、广播的概念 广播&#xff1a;由一台主机向该主机所在子网内的所有主机发送数据的方式 例如 &#xff1a;192.168.3.103主机发送广播信息&#xff0c;则192.168.3.1~192.168.3.254所有主机都可以接收到数据 广播只能用UDP或原始IP实现&#xff0c;不能用TCP…

漏洞挖掘 | EDU证书站任意密码重置

1.前言&#xff1a; 挖了一段时间EDU老破小的站&#xff0c;也该拿证书站下手了。下手的第一个目标&#xff0c;那必然是漏洞排行榜第一的某交大&#xff01;&#xff01;&#xff01; 2.信息搜集 想快速挖到漏洞&#xff0c;必须信息搜集全面。如果信息搜集不到位不全面&…

明星中药企业系列洞察(二)丨百年御药同仁堂,为什么被称为我国最“硬”的老字号?

从最初的同仁堂药室、同仁堂药店到现在的北京同仁堂集团&#xff0c;经历了清王朝由强盛到衰弱、几次外敌入侵、军阀混战到新民主主义革命的历史沧桑&#xff0c;其所有制形式、企业性质、管理方式也都发生了根本性的变化&#xff0c;但同仁堂经历数代而不衰&#xff0c;在海内…

蓝桥杯练习系统(算法训练)ALGO-947 贫穷的城市

资源限制 内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述 某城市有n个小镇&#xff0c;编号是1~n。由于贫穷和缺乏城市规划的人才&#xff0c;每个小镇有且仅有一段单向的公路通往别…

[Linux] GDB使用指南----包含CentOS7下安装以及使用

什么是GDB&#xff1f; GDB 是由 GUN 软件系统社区提供的调试工具&#xff0c;同 GCC 配套组成了一套完整的开发环境&#xff0c;GDB 是 Linux 和许多 类Unix系统的标准开发环境。可以用来调试C、C、Go、java、 objective-c、PHP等语言。 GDB的作用 程序启动时&#xff0c;可…

400 Bad Request问题

总结&#xff1a;请求路径写错了 400 问题 原地址&#xff0c;deleteSetmeal的参数应该改为param 更改请求地址正确后即可

视频质量评估

视频质量评估 一、全参考客观视频质量评价方法三、MSSIM四、STRRED五、VMAF六、MOS 一、全参考客观视频质量评价方法 全参考客观视频质量评价方法是指把原始参考视频与失真视频在每一个对应帧中的每一个对应像素之问进行比较。准确的讲&#xff0c;这种方法得到的并不是真正的…

Chromium编译指南2024 Windows11篇-Git工具准备(四)

前言 在《Chromium编译指南2024&#xff08;三&#xff09;》中&#xff0c;我们已经完成了对 Chromium 编译环境的其他相关环境变量的设置&#xff0c; 接下来&#xff0c;我们将进一步探讨如何初始化配置 Git&#xff0c;为获取 Chromium 源代码做好准备。 1. 配置Git 用户…

AI伦理和安全风险管理终极指南

人工智能&#xff08;AI&#xff09;正在迅速改变各个领域的软件开发和部署。驱动这一转变的两个关键群体为人工智能开发者和人工智能集成商。开发人员处于创建基础人工智能技术的最前沿&#xff0c;包括生成式人工智能&#xff08;GenAI&#xff09;模型、自然语言处理&#x…

VBA在Excel中字母、数字的相互转化

VBA在Excel中字母、数字的相互转化 字母转数字的方法 数字转字母的方法 众所周知,Excel表中的行以数字展示,列用字母展示,如下图: 编程时,很多时候需要将列的字母转变为数字使用,如cells(num1,num2).value等,不知大家是怎么将字母转化为数字的,Excel是否有其他方式…

今天看到一个有意思的问题:个人网站被恶意大量访问,怎么办(文末附GPT指令优化)

目录 问题描述 一、GPT 3.5 二、通义千问 三、讯飞星火 四、文心一言 五、Kimi 六、智谱清言 个人分析&#xff1a; 问题描述 大家好&#xff01;我的个人网站每天晚上7点30到11点被固定的十几个IP大量下载exe&#xff0c;造成网站带宽不够&#xff0c;怎么办! 已经把…

耕耘未来——揭秘第一产业的无限潜能

在浩瀚的科技宇宙中&#xff0c;当火星探测器的每一次着陆都能激起全球狂欢&#xff0c;当虚拟现实的浪潮让我们触碰未来&#xff0c;有一个领域&#xff0c;以其恒久不变的坚韧&#xff0c;默默地滋养着人类文明的根脉——这就是第一产业&#xff0c;那片古老而又充满生机的土…

护眼灯有没有护眼的效果?一键查看这五大护眼效果极佳的护眼台灯

在数字时代&#xff0c;护眼灯已成为保护视力的重要工具。但消费者常问&#xff1a;护眼灯有没有护眼的效果&#xff1f;挑选到技术过关的护眼台灯是能够很好地起到护眼效果的。本文将并重点介绍五款具有卓越护眼功能的台灯。这些精选灯具不仅在照明效果上表现出色&#xff0c;…

C#里如何设置输出路径,不要net7.0-windows

官网介绍&#xff1a; 更改生成输出目录 - Visual Studio (Windows) | Microsoft Learn <PropertyGroup> <AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath> <AppendRuntimeIdentifierToOutputPath>false</Appen…

10000字讲解IoC 思想以及五大注解

文章目录 IoC 思想通过案例讲解 IoC1.传统的开发方式 SpringIoC 和 DI五大注解ControllerServiceComponentRepositoryConfiguration 为什么要有这么多的类注解类注解之间的关系方法注解 Bean重命名 bean扫描路径 IoC 思想 什么是 Spring 呢&#xff1f; 我们经常听到的都是说…