大数据Flume--入门

文章目录

  • Flume
    • Flume 定义
    • Flume 基础架构
      • Agent
      • Source
      • Sink
      • Channel
      • Event
    • Flume 安装部署
      • 安装地址
      • 安装部署
    • Flume 入门案例
      • 监控端口数据官方案例
      • 实时监控单个追加文件
      • 实时监控目录下多个新文件
      • 实时监控目录下的多个追加文件

Flume

Flume 定义

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。
为什么选择Flume

Flume 基础架构

在这里插入图片描述

Agent

Agent 是一个JVM进程,它以事件的形式将数据从源头送至目的。

Agent 主要有3个部分组成,Source、Channel、Sink

Source

Source 是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种
格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。

Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

Sink 组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Channel

Channel 是位于Source 和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel 是线程安全的,可以同时处理几个Source 的写入操作和几个Sink 的读取操作。

Flume 自带两种Channel:Memory ChannelFile Channel

Memory Channel 是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适
用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数
据。

Event

传输单元,Flume 数据传输的基本单元,以Event 的形式将数据从源头送至目的地。Event 由Header Body 两部分组成,Header用来存放该event的一些属性,为K-V结构,Body 用来存放该条数据,形式为字节数组。

Flume 安装部署

安装地址

(1)Flume 官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume
(4)Flume tar包
链接:https://pan.baidu.com/s/1O_CEiuHafNyuWSsrtZaydg?pwd=kw9k
提取码:kw9k

安装部署

(1)将apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下

(2)解压apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下

[yudan@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/ 

(3)修改apache-flume-1.9.0-bin 的名称为flume

[yudan@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume 

(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

[yudan@hadoop102 lib]$  rm /opt/module/flume/lib/guava-11.0.2.jar 

Flume 入门案例

监控端口数据官方案例

1)案例需求:

使用Flume监听一个端口,收集该端口数据,并打印到控制台。

2)需求分析:
在这里插入图片描述
3)实现步骤:

(1)安装netcat工具

[yudan@hadoop102 software]$ sudo yum install -y nc

(2)判断44444端口是否被占用

[yudan@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 44444

(3)创建Flume Agent配置文件flume-netcat-logger.conf

(4)在flume目录下创建job文件夹并进入job文件夹。

[yudan@hadoop102 flume]$ mkdir job 
[yudan@hadoop102 flume]$ cd job/ 

(5)在job文件夹下创建Flume Agent配置文件flume-netcat-logger.conf

[yudan@hadoop102 job]$ vim flume-netcat-logger.conf

(6)在flume-netcat-logger.conf 文件中添加如下内容。

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 # Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 # Describe the sink 
a1.sinks.k1.type = logger # Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

配置文件解析
(7)先开启flume监听端口

  • 第一种写法:

      [yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console 
    
  • 第二种写法:

      [yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console 
    
  • 参数说明:

    • –conf/-c:表示配置文件存储在conf/目录
    • –name/-n:表示给agent起名为a1
    • –conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf
      文件。
    • -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

(8)使用netcat工具向本机的44444端口发送内容

[yudan@hadoop102 ~]$ nc localhost 44444 
hello  
yudan 

(9)在Flume监听页面观察接收数据情况

实时监控单个追加文件

1)案例需求:实时监控Hive日志,并上传到HDFS中

2)需求分析:
实时读取本地文件到HDFS案例
3)实现步骤:

(1)Flume 要想将数据输出到HDFS,依赖Hadoop相关jar包

检查/etc/profile.d/my_env.sh 文件,确认 Hadoop和 Java 环境变量配置正确

JAVA_HOME=/opt/module/jdk1.8.0_212 
HADOOP_HOME=/opt/module/hadoop-3.1.3 
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin 
export PATH JAVA_HOME HADOOP_HOME

(2)创建flume-file-hdfs.conf 文件

[yudan@hadoop102 job]$ vim flume-file-hdfs.conf

注:要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。由于Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件。

# Name the components on this agent 
a2.sources = r2 
a2.sinks = k2 
a2.channels = c2 # Describe/configure the source 
a2.sources.r2.type = exec 
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log # Describe the sink 
a2.sinks.k2.type = hdfs 
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H 
#上传文件的前缀 
a2.sinks.k2.hdfs.filePrefix = logs- 
#是否按照时间滚动文件夹 
a2.sinks.k2.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a2.sinks.k2.hdfs.roundValue = 1 
#重新定义时间单位 
a2.sinks.k2.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a2.sinks.k2.hdfs.useLocalTimeStamp = true 
#积攒多少个Event才flush到HDFS一次 
a2.sinks.k2.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a2.sinks.k2.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a2.sinks.k2.hdfs.rollInterval = 60 
#设置每个文件的滚动大小 
a2.sinks.k2.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory 
a2.channels.c2.type = memory 
a2.channels.c2.capacity = 1000 
a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel 
a2.sources.r2.channels = c2 
a2.sinks.k2.channel = c2 

a2.sinks.k2.hdfs.path = hdfs://hadoop102:端口号/flume/%Y%m%d/%H
端口号是NameNode的地址,这个端口号在/opt/module/hadoop-3.1.3/etc/hadoop下core-site.xml文件中的fs.defaultFS配置过

注意:对于所有与时间相关的转义序列,Event Header中必须存在以 “timestamp”的
key(除非hdfs.useLocalTimeStamp设置为true,此方法会使用TimestampInterceptor自
动添加timestamp)。

a3.sinks.k3.hdfs.useLocalTimeStamp = true

在这里插入图片描述
(3)运行Flume

[yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.conf

(4)开启Hadoop和Hive并操作Hive产生日志

[yudan@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh 
[yudan@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh[yudan@hadoop102 hive]$ bin/hive 
hive (default)>

(5)在HDFS上查看文件。

实时监控目录下多个新文件

1)案例需求:使用Flume监听整个目录的文件,并上传至HDFS

2)需求分析:
在这里插入图片描述
3)实现步骤:

(1)创建配置文件flume-dir-hdfs.conf

创建一个文件 
[yudan@hadoop102 job]$ vim flume-dir-hdfs.conf
# 添加以下内容a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 # Describe/configure the source 
a3.sources.r3.type = spooldir 
a3.sources.r3.spoolDir = /opt/module/flume/upload 
a3.sources.r3.fileSuffix = .COMPLETED 
a3.sources.r3.fileHeader = true 
#忽略所有以.tmp结尾的文件,不上传 
a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#积攒多少个Event才flush到HDFS一次 
a3.sinks.k3.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60 
#设置每个文件的滚动大小大概是128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3 

在这里插入图片描述
(2)启动监控文件夹命令

[yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/flume-dir-hdfs.conf 

说明:在使用Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动。

(3)向upload文件夹中添加文件

在/opt/module/flume 目录下创建upload 文件夹

[yudan@hadoop102 flume]$ mkdir upload 

向upload文件夹中添加文件

[yudan@hadoop102 upload]$ touch 1.txt 
[yudan@hadoop102 upload]$ touch 2.tmp 
[yudan@hadoop102 upload]$ touch 3.log 

(4)查看HDFS上的数据

实时监控目录下的多个追加文件

Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

1)案例需求:使用Flume监听整个目录的实时追加文件,并上传至HDFS

2)需求分析:
在这里插入图片描述
3)实现步骤:

(1)创建配置文件flume-taildir-hdfs.conf

创建一个文件 
[yudan@hadoop102 job]$ vim flume-taildir-hdfs.conf 
# 添加如下内容 
a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 # Describe/configure the source 
a3.sources.r3.type = TAILDIR 
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json 
a3.sources.r3.filegroups = f1 f2 
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.* 
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.* # Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#积攒多少个Event才flush到HDFS一次 
a3.sinks.k3.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60 
#设置每个文件的滚动大小大概是128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3

在这里插入图片描述
(2)启动监控文件夹命令

[yudan@hadoop102 flume]$ bin/flume-ng agent -cconf/ -n a3 -f job/flume-taildir-hdfs.conf

(3)向files文件夹中追加内容

在/opt/module/flume目录下创建files文件夹 
[yudan@hadoop102 flume]$ mkdir files 向upload文件夹中添加文件 
[yudan@hadoop102 files]$ echo hello >> file1.txt 
[yudan@hadoop102 files]$ echo atguigu >> file2.txt 

(4)查看HDFS上的数据

Taildir 说明:

Taildir Source 维护了一个json 格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"} 
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode 号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2778231.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

幻兽帕鲁服务器怎么更新?进入游戏显示:加入的比赛正在运行不兼容的版本,请尝试升级游戏版本(阿里云)

幻兽帕鲁服务器怎么更新?进入游戏显示:加入的比赛正在运行不兼容的版本,请尝试升级游戏版本。这是因为游戏客户端或者服务器上的游戏服务端,没有更新版本。导致两个版本不一致,所以无法进入游戏。 最近幻兽帕鲁 官方客…

15.3 Redis入门(❤❤❤❤)

15.3 Redis入门❤❤❤❤ 1. redis简介与配置1.1 简介1.2 Windows安装1.3 Linux安装1.4 守护进程方式启动1.5 客户端启动与使用1.6 指定生成日志 2. 使用2.1 客户端redis使用命令2.2 redis存储的数据类型1. String字符串类型2. Hash键值类型3. List列表类型4. Set与Zset集合类型…

问题:超声波纵波斜入射时,当入射角大于第一临界角小于第二临界角时,在第二介质内只有折射横波。 #微信#经验分享#其他

问题:超声波纵波斜入射时,当入射角大于第一临界角小于第二临界角时,在第二介质内只有折射横波。 参考答案如图所示

面向对象编程:理解其核心概念与应用

引言 在编程的世界中,面向对象编程(Object-Oriented Programming, OOP)已成为一种主流的编程范式。它提供了一种组织和管理代码的有效方式,使得代码更加模块化、可重用和易于维护。本文将带您深入探讨面向对象编程的核心概念及其…

波奇学Linux:文件重定向和虚拟文件系统

重定向 文件描述符所对应的分配规则,从0开始,寻找最小没有使用的数组位置。 如图所示,关闭文件描述符的0,新打开的文件描述符为0,而关闭2,文件描述符为2。 重定向:文件输出的对象发生改变 例…

网络协议、网络传输认识

目录 网络协议概念 网络协议具象化理解 协议分层 TCP/IP模型 网络传输基本流程 网络协议概念 网络协议是计算机网络中用于在通信设备之间传输数据的规则集合。这些规则定义了数据的格式、传输方式、错误检测和纠正方法等,以确保不同设备之间的通信能够正确进行…

计算机网络之一

目录 1.因特网概述 1.1网络、互连网(互联网)和因特网 1.2.因特网发展的三个阶段 1.3基于ISP的三层架构的因特网 1.4.因特网的组成 2.三种交换方式 2.1电路交换 2.2分组交换 1.因特网概述 1.1网络、互连网(互联网)和因特网…

Qualcomm 蓝牙耳机 FAQ(41)---------Audio 问题分析之 ACAT Tools安装

大家好! 新的一年,在此祝大家:新年快乐!工作上步步高升!!龙年大吉!!! 也欢迎大家登录大大通平台,春节期间正常更新文章,期待你的到来&#xff0…

Docker-现代化应用部署的利器

一、容器部署的发展 今天我们来说说容器部署。我们知道容器部署的发展大致分三个阶段,下面来介绍一下不同阶段的部署方式的优缺点 物理机部署 优点是可以提供更高的性能、资源控制,也可以提供更好的数据隔离和安全性,因为不同的应用程序运行在…

(十八)springboot实战——spring securtity注解方式的授权流程源码解析

前言 在上一节内容中,我们介绍了如何在FilterSecurityInterceptor过滤器中处理用户的授权流程,并分析了其源码,spring security还提供了方法级别的授权方式,通过EnableMethodSecurity注解启用权限认证流程,只需要在方…

回归预测模型:MATLAB多项式回归

1. 多项式回归模型的基本原理 多项式回归是线性回归的一种扩展,用于分析自变量 X X X与因变量 Y Y Y之间的非线性关系。与简单的线性回归模型不同,多项式回归模型通过引入自变量的高次项来增加模型的复杂度,从而能够拟合数据中的非线性模式。…

【数据结构与算法】【腾讯阿里链表面试题】算法题--链表易懂版讲解

🎉🎉欢迎光临🎉🎉 🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀 🌟特别推荐给大家我的最新专栏《Spring 狂野之旅:底层原理高级进阶》 &#x1f680…

飞天使-linux操作的一些技巧与知识点8-zabbix6.0 容器搭建

文章目录 安装docker安装步骤mysql下载镜像安装zabbix 使用zabbix非host模式创建 测试效果 安装docker 1. 配置官方 yum 源$ sudo yum install -y yum-utils $ sudo yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo2. 安装 Docker$ …

【02】右旋函数(C语言)

目录 题目:给定一个整数数组nums,将数组中的元素向右轮转k个位置,其中k是非负数。 1.暴力求解(轮转k次) 2. 三段逆置求解 ①逆置函数 ②轮转函数 3.空间换时间求解 题目:给定一个整数数组nums,将数组中…

【EEG信号处理】对信号进行模拟生成

生成信号的目的还是主要是为了学习和探究后面的分析方法;本文主要是对方法进行整理 瞬态 transient 瞬态信号是指的是一瞬间信号上去了,这种情况我们可以用在时域上高斯模拟 peaktime 1; % seconds width .12; ampl 9; gaus ampl * exp( -(EEG.tim…

如何实现视线(目光)的检测与实时跟踪

如何实现视线(目光)的检测与实时跟踪 核心步骤展示说明 找到人脸 检测人脸特征点 根据特征点找到人眼区域 高精度梯度算法检测瞳孔中心 根据眼睛周边特征点计算眼睛中心 瞳孔中心和眼睛中心基于视线模型计算视线方向 视线方向可视化 详细实现与说明: https://stud…

分享86个表单按钮JS特效,总有一款适合您

分享86个表单按钮JS特效,总有一款适合您 86个表单按钮JS特效下载链接:https://pan.baidu.com/s/1WwQGFPWv8464JBcuEMJZ_Q?pwd8888 提取码:8888 Python采集代码下载链接:采集代码.zip - 蓝奏云 学习知识费力气,…

第二节 zookeeper基础应用与实战

目录 1. Zookeeper命令操作 1.1 Zookeeper 数据模型 1.2 Zookeeper服务端常用命令 1.3 Zookeeper客户端常用命令 1.3.1 基本CRUD 1.3.2 创建临时&顺序节点 2. Zookeeper JavaAPI操作 2.1 Curator介绍 2.2 引入Curator 2.3 建立连接 2.4 添加节点 2.5 修改节点 …

JMM(Java内存模型)

定义 JMM即Java内存模型(Java memory model),在JSR133里指出了JMM是用来定义一个一致的、跨平台的内存模型,是缓存一致性协议,用来定义数据读写的规则。 内存可见性 在Java中,不同线程拥有各自的私有工作内存,当线程…

Python中使用opencv-python进行人脸检测

Python中使用opencv-python进行人脸检测 之前写过一篇VC中使用OpenCV进行人脸检测的博客。以数字图像处理中经常使用的lena图像为例,如下图所示: 使用OpenCV进行人脸检测十分简单,OpenCV官网给了一个Python人脸检测的示例程序,…