Flume 的安装和使用方法(Spark-2.1.0)

一、Flume的安装

1.下载压缩包

https://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2.上传到linux中

3.解压安装包
cd
#进入加载压缩包目录sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local
# 将 apache-flume-1.7.0-bin.tar.gz 解压到/usr/local 目录下sudo mv /usr/local/apache-flume-1.7.0-bin /usr/local/flume-1.7.0
将解压后的文件夹名称从 apache-flume-1.7.0-bin 改为 flume-1.7.0cd /usr/local/sudo chown -R qiangzi:qiangzi ./flume-1.7.0
#把/usr/local/flume-1.7.0 目录的权限赋予当前登录 Linux 系统的用户,这里假设是 qiangzi 用户
4.配置环境变量
cd /usr/local/sudo vim ~/.bashrc

然后在首行加入如下代码:

export JAVA_HOME=/usr/local/jdk1.8.0/;
export FLUME_HOME=/usr/local/flume-1.7.0 
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin

注意,上面的 JAVA_HOME,如果以前已经在.bashrc 文件中设置过,就不要重复添加 了,使用以前的设置即可。比如,以前设置得 JAVA_HOME 可能是“export JAVA_HOME=/usr/lib/jvm/default-java”,则使用原来的设置即可。

使环境变量生效:

[qiangzi@master local]$ source ~/.bashrc
5.修改 flume-env.sh 配置文件
cd /usr/local/flume-1.7.0/confsudo cp ./flume-env.sh.template ./flume-env.shsudo vim ./flume-env.sh

打开 flume-env.sh 文件以后,在文件的最开始位置增加一行内容,用于设置 JAVA_HOME 变量

export JAVA_HOME=/usr/local/jdk1.8.0/;
6.查看 flume 版本信息
cd /usr/local/flume-1.7.0./bin/flume-ng version #查看 flume 版本信息;

如果安装成功,出现如下图片

二、Flume的使用

1.使用 Avro 数据源测试 Flume

Avro 可以发送一个给定的文件给 Flume,Avro 源使用 AVRO RPC 机制。请对 Flume 的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件 helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动 Flume 以后, 可以把 helloworld.txt 中的文本内容显示出来。

(1)agent 配置文件
cd /usr/local/flumesudo vim ./conf/avro.conf #在 conf 目录下编辑一个 avro.conf 空文件

然后,在 avro.conf 写入以下内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面 Avro Source 参数说明如下: Avro Source 的别名是 avro,也可以使用完整类别名 称 org.apache.flume.source.AvroSource,因此,上面有一行设置是 a1.sources.r1.type = avro,表示数据源的类型是 avro。bind 绑定的 ip 地址或主机名,使用 0.0.0.0 表示绑定机 器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。port 表示绑定 的端口。a1.sources.r1.port = 4141,表示绑定的端口是 4141。a1.sinks.k1.type = logger, 表示 sinks 的类型是 logger。

(2)启动 flume agent a1
/usr/local/flume-1.7.0/bin/flume-ng agent -c /usr/local/flume-1.7.0/conf/ -f /usr/local/flume-1.7.0/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

agent 窗口

(3)创建指定文件

先打开另外一个终端,在/usr/local/flume 下写入一个文件 log.00,内容为 hello,world

echo "hello, world" > /usr/local/flume-1.7.0/log.00

前面已经启动日志控制台,这里不再启动

再打开另外一个终端,执行:

cd /usr/local/flume-1.7.0bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /usr/local/flume-1.7.0/log.00

此时我们可以看到第一个终端(agent 窗口)下的显示,也就是在日志控制台,就会把 log.00 文件的内容打印出来:

avro source 执行成功!

2. 使用 netcat 数据源测试 Flume
(1)创建 agent 配置文件
cd /usr/local/flume-1.7.0
sudo vim ./conf/example.conf   #在 conf 目录创建 example.conf

在 example.conf 里写入以下内容:

#example.conf: A single-node Flume configuration 
# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 
#同上,记住该端口名
# Describe the sink 
a1.sinks.k1.type = logger 
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1
(2)启动 flume agent (即打开日志控制台)
/usr/local/flume-1.7.0/bin/flume-ng agent --conf /usr/local/flume-1.7.0/conf --conf-file /usr/local/flume-1.7.0/conf/example.conf --name a1 -Dflume.root.logger=INFO,console

如图:

再打开一个终端,输入命令

telnet localhost 44444   #上面不能使用,使用nc
nc localhost 44444

 然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如 我们输入”hello,world”或者其他内容,得出

       第一个终端的日志控制台显示:

netcatsource 运行成功!

这里补充一点,flume 只能传递英文和字符,不能用中文,我们先可以在第二个终端输 入“中国河南省”等字样:

第一个终端的日志控制台显示:

3.使用 Flume 作为 Spark Streaming 数据源

Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集 到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写 的 Spark Streaming 应用程序对消息进行处理。

(1)配置 Flume 数据源

登录 Linux 系统,打开一个终端,执行如下命令新建一个 Flume 配置文件 flume-to-spark.conf

cd /usr/local/flume-1.7.0
cd conf
vim flume-to-spark.conf

flume-to-spark.conf 文件中写入如下内容:

在上面的配置文件中,我们把 Flume Source 类别设置为 netcat,绑定到 localhost 的 33333 端口,这样,我们后面就可以通过“telnet localhost 33333”命令向 Flume Source 发 送消息。

同时,我们把 Flume Sink 类别设置为 avro,绑定到 localhost 的 44444 端口,这样, Flume Source 把采集到的消息汇集到 Flume Sink 以后,Sink 会把消息推送给 localhost 的 44444 端口,而我们编写的 Spark Streaming 程序一直在监听 localhost 的 44444 端口,一 旦有消息到达,就会被 Spark Streaming 应用程序取走进行处理。

特别要强调的是,上述配置文件完成以后,暂时“不要”启动 Flume Agent,如果这个时 候使用“flume-ng agent”命令启动 agent,就会出现错误提示“localhost:44444 拒绝连接”,也 就是 Flume Sink 要发送消息给 localhost 的 44444 端口,但是,无法连接上 localhost 的 44444 端口。为什么会出现这个错误呢?因为,这个时候我们还没有启动 Spark Streaming 应用程序,也就没有启动 localhost 的 44444 端口,所以,Sink 是无法向这个端口发送消息 的。

(2)Spark 的准备工作(版本2.4.8以下,新版本没有相关jar包

Kafka 和 Flume 等高级输入源,需要依赖独立的库(jar 文件)。按照我们前面安装好 的 Spark 版本,这些 jar 包都不在里面,为了证明这一点,我们现在可以测试一下。请打开 一个新的终端,然后启动 spark-shell:

cd /usr/local/spark-2.1.0
./bin/spark-shell

启动成功后,在 spark-shell 中执行下面 import 语句:

你可以看到,马上会报错,因为找不到相关的 jar 包。所以,现在我们就需要下载 spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示对应的Scala版本号,2.1.0表示Spark 版本号。打开下方的网址 http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.1.0,里 面有提供 spark-streaming-flume_2.11-2.1.0.jar 文件的下载。

查看Spark和Scala版本(Spark安装版本最好在2.4.8版本之下,高版本没有相关jar包

Spark下载网址:Index of /dist/spark (apache.org)

进入网站,点击红框内的jar下载

下载好的文件上传到 Linux 登录用户目录下

现在,我们在“/usr/local/spark-2.1.0/jars”(由于版本不匹配)目录下新建一个“flume”目录,就把这个文件复制到 Spark 目录的“/usr/local/spark-2.1.0/jars/flume”目录下。请新打开一个终端,输入下面命令:      

cd /usr/local/spark-2.1.0/jars
mkdir flume
cd ~
mv ./spark-streaming-flume_2.12-2.4.8.jar /usr/local/spark-2.1.0/jars/flume

我们就成功地把 spark-streaming-flume_2.11-2.1.0.jar 文件拷贝到了 “/usr/local/spark/jars/flume”目录下。

下面还要继续把 Flume 安装目录的 lib 目录下的所有 jar 文件复制到 “/usr/local/spark-2.4.8/jars/flume”目录下,请在终端中执行下面命令:

cd /usr/local/flume-1.7.0/lib
ls
cp ./* /usr/local/spark-2.1.0/jars/flume
(3)编写 Spark 程序使用 Flume 数据源

新打开一个终端,然后,执行命令创建代码目录:

cd /usr/local/spark-2.1.0/mycode
mkdir flume
cd flume
mkdir -p src/main/scala
cd src/main/scala
vim FlumeEventCount.scala

在 FlumeEventCount.scala 代码文件中输入以下代码:

package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: FlumeEventCount <host> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(host, IntParam(port)) = argsval batchInterval = Milliseconds(2000)// Create the context and set the batch sizeval sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, batchInterval)// Create a flume streamval stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)// Print out the count of events received from this server in each batchstream.count().map(cnt => "Received " + cnt + " flume events." ).print()ssc.start()ssc.awaitTermination()}
}

保存 FlumeEventCount.scala 文件并退出 vim 编辑器。FlumeEventCount.scala 程序在 编译后运行时,需要我们提供 host 和 port 两个参数,程序会对指定的 host 和指定的 port 进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定 的端口中获取由 Flume Sink 发给该端口的消息,然后进行处理,对消息进行统计,打印出 “Received 0 flume events.”这样的信息。

然后再使用 vim 编辑器新建 StreamingExamples.scala 文件,输入如下代码,用于控 制日志输出格式:

package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}}
}

保存 StreamingExamples.scala 文件并退出 vim 编辑器。

在“/usr/local/spark-2.4.8/mycode/flume/src/main/scala”目录下,就有了如下两个 代码文件:

然后,执行下面命令新建一个 simple.sbt 文件:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0" 

查看目录

保存文件退出 vim 编辑器。然后执行下面命令,进行打包编译:

cd /usr/local/spark-2.1.0/mycode/flume/
/usr/local/sbt/sbt package  #执行这一步之前需要先安装 sbt 插件

如果看到类似如下的屏幕信息,就表示打包成功了:

打包成功后,就可以执行程序测试效果了

(4)测试程序效果

关闭之前打开的所有终端。首先,请新建第 1 个 Linux 终端,启动 Spark Streaming 应 用程序,命令如下:

cd /usr/local/spark-2.1.0./bin/spark-submit --driver-class-path /usr/local/spark-2.1.0/jars/*:/usr/local/spark-2.1.0/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark-2.1.0/mycode/flume/target/scala-2.11/simple-project_2.11-1.0.jar localhost 44444

通过上面命令,我们为应用程序提供 host 和 port 两个参数的值分别为 localhost 和 44444,程序会对 localhost 的 44444 端口进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定的端口中获取由 Flume Sink 发给该端口的消息, 然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。

执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔 2 秒钟刷新一次信息, 大量信息中会包含如下重要信息:

因为目前 Flume 还没有启动,没有给 FlumeEventCount 发送任何消息,所以 Flume Events 的数量是 0。第 1 个终端不要关闭,让它一直处于监听状态。

现在,另外新建第 2 个终端,在这个新的终端中启动 Flume Agent,命令如 下:

cd /usr/local/flume-1.7.0
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

启动 agent 以后,该 agent 就会一直监听 localhost 的 33333 端口,这样,我们下面就 可以通过“telnet localhost 33333”命令向 Flume Source 发送消息。第 2 个终端也不要关闭, 让它一直处于监听状态。

另外新建第 3 个终端,执行如下命令:

nc localhost 33333

执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息 都会被 Flume 监听到,Flume 把消息采集到以后汇集到 Sink,然后由 Sink 发送给 Spark 的 FlumeEventCount 程序进行处理。然后,你就可以在运行 FlumeEventCount 的前面那个 终端窗口内看到类似如下的统计结果:

从屏幕信息中可以看出,我们在 telnet 那个终端内发送的消息,都被成功发送到 Spark 进行处理了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3031620.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

文旅行业| 某景区导游培养和管理项目成功案例纪实

——整合导游资源并进行统一管理&#xff0c;构建完善的培养与管理机制&#xff0c;发挥景区导游价值 【客户行业】文旅行业&#xff1b;景区&#xff1b;文旅企业 【问题类型】人才培养&#xff1b;人员管理 【客户背景】 南方某5A级景区&#xff0c;作为国内极具代表性和特…

经常睡不好觉?试试用上华为手环9新升级的睡眠监测功能

睡眠问题是不是经常困扰着你呢&#xff1f;听说&#xff0c;华为手环9的睡眠监测功能升级了&#xff0c;无论是入睡前、睡眠中还是睡醒后&#xff0c;都能够帮助我们改善睡眠&#xff0c;让我们告别糟糕的睡眠质量&#xff01; 睡觉前&#xff0c;打开华为手环9的睡眠模式&…

寻找最大价值的矿堆 - 矩阵

系列文章目录 文章目录 系列文章目录前言一、题目描述二、输入描述三、输出描述四、Java代码五、测试用例 前言 本人最近再练习算法&#xff0c;所以会发布一些解题思路&#xff0c;希望大家多指教 一、题目描述 给你一个由’0’(空地)、‘1’(银矿)、‘2’(金矿)组成的地图…

自动化测试基础 --- Jmeter

前置环境安装 首先我们需要知道如何下载Jmeter 这里贴上下载网站Apache JMeter - Download Apache JMeter 我们直接解压,然后在bin目录下找到jemter.bat即可启动使用 成功打开之后就是这个界面 每次打开可以用这种方式切换成简体中文 或者直接修改properties文件修改对应的语言…

第七届精武杯部分wp

第一部分&#xff1a;计算机和手机取证 1.请综合分析计算机和手机检材&#xff0c;计算机最近一次登录的账户名是 答案&#xff1a;admin 创建虚拟机时直接给出了用户名 2. 请综合分析计算机和手机检材&#xff0c;计算机最近一次插入的USB存储设备串号是 答案&#xff1a…

01面向类的讲解

指针指向类成员使用 代码&#xff1a; #include<iostream> using namespace std;class Test { public:void func() { cout << "call Test::func" << endl; }static void static_func();int ma;static int mb; //不依赖对象 }; void Test::static…

探索GitHub上的GPTs项目:泄露和被破解的GPT提示

GPTs项目是一个在GitHub上由用户linexjlin发起的开源项目&#xff0c;专注于提供泄露的GPT&#xff08;生成式预训练转换器&#xff09;提示。这些提示用于指导和优化AI模型的输出&#xff0c;进而提升代码生成的质量和效率。项目页面提供了丰富的功能和资源&#xff0c;旨在帮…

全套停车场管理系统报价多少钱?停车场管理系统由哪些设备组成?

随着城市化进程的加快&#xff0c;汽车保有量的不断攀升&#xff0c;停车场的管理和运营成为城市基础设施建设的重要组成部分。一个高效、智能的停车场收费系统不仅能提升停车效率&#xff0c;还能增强用户体验&#xff0c;对城市的交通管理起到关键作用。本文将为您详细介绍全…

mac 讨厌百度网盘怎么办

一、别拦我 首先请允许我泄个愤&#xff0c;tmd百度网盘下个1g的文件下载速度竟然超不过200k&#xff0c;只要不放在所有已打开软件的最前面&#xff0c;它就给你降到10k以内&#xff0c;关键是你慢就慢了&#xff0c;我也不是很着急&#xff0c;关键是你日常下载失败并且总是…

AI代理和AgentOps生态系统的剖析

1、AI代理的构成&#xff1a;AI代理能够根据用户的一般性指令自行做出决策和采取行动。 主要包含四个部分&#xff1a; &#xff08;1&#xff09;大模型&#xff08;LLM&#xff09; &#xff08;2&#xff09;工具&#xff1a;如网络搜索、代码执行等 &#xff08;3&#x…

在Qt工具栏上实现矩阵并排的按钮效果源码

如果这个要用MFC去实现头皮都得掉一层&#xff0c;建议大家以后要写GUI方面的小工具尽量转QT或其他吧&#xff0c;MFC真不适合搞这种花里胡哨的界面. 在Qt工具栏上实现矩阵并排的按钮效果源码如下&#xff1a; #include "mainwindow.h" #include "ui_mainwind…

初识指针(4)<C语言>

前言 前面的文章&#xff0c;已经对指针的基础概念以及运用有了初步了解&#xff0c;我们可以进一步探究指针比较深入的知识&#xff0c;下文将主要介绍&#xff1a;使用指针数组模拟二维数组、字符指针变量、数组指针、二维数组传参的本质、函数指针、typedef关键字等。 目录…

RustDesk 自建服务器部署和使用教程

RustDesk 是一个强大的开源远程桌面软件&#xff0c;是中国开发者的作品&#xff0c;它使用 Rust 编程语言构建&#xff0c;提供安全、高效、跨平台的远程访问体验。可以说是目前全球最火的开源远程桌面软件了&#xff0c;GitHub 星星数量达到了惊人的 64k&#xff01; 与 Team…

洪水仿真模拟(ArcGIS),水利数字孪生新利器

这两天ArcGIS Pro的官方账号释放了一个名为“Flood Simulation in ArcGIS Pro”的洪水模拟功能视频。根据视频详情页的介绍&#xff0c;该洪水仿真模拟功能会作为新功能出现在ArcGIS Pro 3.3中。 由于我目前从事的主要应用方向都是弱GIS的领域&#xff0c;所以我已经很久没有再…

图片逐层矢量化

摘要 图像光栅化是计算机图形学中一个成熟的技术&#xff0c;而图像向量化&#xff0c;即光栅化的逆过程&#xff0c;仍然是一个主要的挑战。最近&#xff0c;基于深度学习的先进模型实现了向量化和向量图的语义插值&#xff0c;并展示了生成新图形的更好拓扑结构。然而&#…

MYSQL中的DQL

语法&#xff1a; select 字段列表 from 表名列表 where 条件列表 group by 分组字段列表 having 分组后条件列表 order by 排序字段 limit 分页参数 条件查询 语法&#xff1a; 查询多个字段&#xff1a;select 字段1&#xff0c;字段2 from表名 查询所有字段&#xff1a…

GitHub搭建免费博客

一、GitHub仓库准备 ​ 搭建博客需要准备两个仓库。一个存放博客图床的仓库&#xff0c;另一个存放博客网站的仓库。 1.1、图床创建 新建仓库 第一步&#xff1a; ​ 第二步&#xff1a; 生成Token令牌 点击右上角头像->Settings->下拉&#xff0c;直到左侧到底&#…

使用IDA自带python patch的一道例题

首先看见就是迷宫 迷宫解出的路径&#xff0c;放在zip的文件可以得到一个硬编码 然后在原程序中&#xff0c;有一处很离谱 这个debugbreak就是IDA分析错误导致的 我们点进去发现里面全是nop 然后我们把我们得到的硬编码放在010里面&#xff0c;再用IDA打开 重新编译看汇编 你…

ACC-UNet: A Completely Convolutional UNet Model for the 2020s

文章目录 ACC-UNet: A Completely Convolutional UNet Model for the 2020s摘要方法实验结果 ACC-UNet: A Completely Convolutional UNet Model for the 2020s 摘要 这十年以来&#xff0c;计算机视觉领域引入了 Vision Transformer&#xff0c;标志着广泛的计算机视觉发生了…