【HadoopShuffle原理剖析】基础篇二

Shuffle原理剖析

在这里插入图片描述

Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。分为Map端的操作和Reduce端的操作。

Shuffle过程

  • Map端的Shuffle

    Map的输出结果首先被缓存到内存,当缓存区容量到达80%(缓冲区默认100MB),就启动溢写操作。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序和合并(combine),之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来领取属于自己处理的数据。

  • 在Reduce端的Shuffle过程

    Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后对数据进行合并排序后交给Reduce处理

作用
  • 保证每一个Reduce任务处理的数据大致是一致的

  • Map任务输出的key相同,一定是相同分区,并且肯定是相同的Reduce处理的,保证计算结果的准确性

  • Reduce任务的数量决定了分区的数量,Reduce任务越多计算处理的并行度也就越高

    Reduce任务的数量(默认为1)可以通过:job.setNumReduceTasks(数量)

特点
  • Map端溢写时,key相同的一定是在相同的分区
  • Map端溢写时,排序减少了Reduce的全局排序的复杂度
  • Map端溢写是,合并(combiner【可选】)减少溢写文件的体积,提高了Reduce任务在Fetch数据时的效率,它是一种MapReduce优化策略
  • Reduce端计算或者输出时,它的数据都是有序的
Shuffle源码追踪
  • MapTask

    在这里插入图片描述

  • ReduceTask

    (略)

    建议阅读

数据清洗

数据清洗指将原始数据处理成有价值的数据的过程,就称为数据清洗。

企业大数据开发的基本流程:

  1. 采集数据(flume、logstash)先保存到MQ(Kafka)中
  2. 将MQ中的暂存数据存放到HDFS中保存
  3. 数据清洗(低价值密度的数据处理)存放到HDFS
  4. 算法干预(MapReduce),计算结果保存到HDFS或者HBase
  5. 计算结果的可视化展示(Echarts、HCharts)
需求

现有某系统某天的Nginx的访问日志,格式如下:

27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
110.52.250.126 - - [30/May/2013:17:38:20 +0800] "GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1" 200 1292
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_2.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90

大数据处理的算法,需要参数客户端的ip地址、请求时间、资源、响应状态码

正则表达式提取数据

Regex Expression主要作用字符串匹配抽取和替换

语法
规则解释
.匹配任意字符
\d匹配任意数字
\D匹配任意非数字
\w配置a-z和A-Z
\W匹配非a-z和A-Z
\s匹配空白符
^匹配字符串的开头
$匹配字符串的末尾
规则的匹配次数
语法解释
*规则匹配0到N次
规则匹配1次
{n}规则匹配N次
{n,m}规则匹配n到m次
+规则匹配1到N次(至少一次)
应用
# 匹配手机号码 11位数值构成
\d{11}# 邮箱地址校验  @
.+@.+
使用正则表达式提取Nginx访问日志中的四项指标

测试站点:http://regex101.com

分析后得到需要的正则表达式

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"\w*\s(.*)\sHTTP\/1.1"\s(\d{3}).*$
使用MapReduce分布式并行计算框架进行数据清洗

注意: 因为数据清洗不涉及统计计算,所以MapReduce程序通常只有map任务,而没有Reduce任务

job.setNumReduceTasks(0)

实现代码

数据清洗的Mapper

package com.baizhi.dataclean;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;public class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable> {/*** @param key* @param value   nginx访问日志中的一行记录(原始数据)* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {final String regex = "^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}).*\\[(.*)\\]\\s\"\\w*\\s(.*)\\sHTTP\\/1.1\"\\s(\\d{3}).*$";String line = value.toString();final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);final Matcher matcher = pattern.matcher(line);while (matcher.find()) {// 四项关键指标  ip 请求时间 请求资源 响应状态码String clientIp = matcher.group(1);// yyyy-MM-dd HH:mm:ssString accessTime = matcher.group(2);String accessResource = matcher.group(3);String status = matcher.group(4);// 30/May/2013:17:38:21 +0800// 30/05/2013:17:38:21SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);try {Date date = sdf.parse(accessTime);SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String finalDate = sdf2.format(date);context.write(new Text(clientIp + " " + finalDate + " " + accessResource + " " + status), null);} catch (ParseException e) {e.printStackTrace();}}}
}

初始化类

package com.baizhi.dataclean;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;public class DataCleanApplication {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration(), "data clean");job.setJarByClass(DataCleanApplication.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.setInputPaths(job,new Path("file:///E:/access.log"));TextOutputFormat.setOutputPath(job,new Path("file:///E:/final"));job.setMapperClass(DataCleanMapper.class);// 注意:数据清洗通常只有map任务而没有reducejob.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.waitForCompletion(true);}
}

数据倾斜

数据分区默认策略

数据倾斜指大量的key相同的数据交由一个reduce任务统计计算,造成”闲的闲死,忙的忙死“这样的现象。不符合分布式并行计算的设计初衷的。

现象
  • 某一个reduce运行特别耗时
  • Reduce任务内存突然溢出
解决方案
  • 增大Reduce任务机器JVM的内存(硬件的水平扩展)
  • 增加Reduce任务的数量,每个Reduce任务只负责极少部分的数据处理,并且Reduce任务的数量增加提高了数据计算的并行度

Reduce任务的正确数量: 0.95或者1.75 * (NodeManage数量 * 每个节点最大容器数量)

  • 自定义分区规则Partitioner
package com.baizhi.partition;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Partitioner;/*** 自定义分区规则*/
public class CustomPartitioner extends Partitioner<Text, LongWritable> {/*** @param key* @param value* @param i     numReduceTasks* @return 分区序号*/public int getPartition(Text key, LongWritable value, int i) {if (key.toString().equals("CN-GD")) return 0;else if (key.toString().equals("CN-GX")) return 1;else if (key.toString().equals("CN-HK")) return 2;else if (key.toString().equals("JP-TY")) return 3;else return 4;}
}
  • 合适使用Combiner,将key相同的value进行整合合并

在combiner合并时,v必须得能支持迭代计算,并且不能够影响Reduce任务的输入

combiner通常就是Reducer任务

// 优化策略:combiner合并操作
job.setCombinerClass(MyReducer.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3280574.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

基于FPGA的数字信号处理(20)--半加器和全减器

1、前言 既然有半加器和全加器&#xff0c;那自然也有半减器和全减器了。尽管在电路中减法的实现基本都是 补码 加法 的形式&#xff0c;但是正所谓技多不压身&#xff0c;了解一下半减器和全减器还是有一定作用的&#xff0c;至少能扩宽知识面嘛。 2、半减器 最简单的减法器叫…

RFID分体式天线如何选购?

分体式读写器天线具有可与RFID读写器分离部署&#xff0c;在远距离读写、集成应用等场景下拥有十分广泛的应用。下面我们就跟大家一起来了解一下&#xff0c;分体式天线选购的要点有哪些? 分体式天线的选购要点主要包括以下几个方面&#xff1a; 一、明确使用需求 应用场景…

机器学习(五) -- 无监督学习(2) --降维2

系列文章目录及链接 上篇&#xff1a;机器学习&#xff08;五&#xff09; -- 无监督学习&#xff08;2&#xff09; --降维1 下篇&#xff1a; 前言 tips&#xff1a;标题前有“***”的内容为补充内容&#xff0c;是给好奇心重的宝宝看的&#xff0c;可自行跳过。文章内容被…

仪器内部压力不稳定的原因分析

仪器的液路压力波动可能由多种原因引起&#xff0c;具体分析如下&#xff1a; 气泡的影响&#xff1a; 流动相未平衡或柱箱温度不稳定时&#xff0c;容易在色谱柱内产生气泡。泵作用下&#xff0c;流动相中的空气可能会分离出来&#xff0c;留在泵体内排不出去。使用梯度程序时…

vmware 设置ip

要用xshell连接vmware虚拟机&#xff0c;要指定虚拟机的ip地址。 进入虚拟机&#xff0c;用ifconfig命令&#xff0c;看下ip 试了下连接192.168.122.1 连接不上&#xff0c;需要重新设置一个。 1&#xff0c;查看电脑的IP地址 winr,输入cmd&#xff0c;再输入ipconfig 看下本…

Java刷题: 丑数判断

题目 丑数 就是只包含质因数 2、3 和 5 的正整数。 给你一个整数 n &#xff0c;请你判断 n 是否为 丑数 。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 解题思路 我觉得刷题是为了扩宽思考的广度。看到这题的时候&#xff0c;我的大脑是发懵的…

letcode - string

翻转字符串 344. 反转字符串 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/reverse-string/ class Solution { public:void reverseString(vector<char>& s) {reverse(s.begin(),s.end());//直接上逆置接口} }; 函数签名: void reverseStr…

redis的代码开发

redis是什么? 前提:官网地址https://redis.io 1.Redis是一个开源的,key,value格式的,内存型数据结构存储系统;它可用作数据库、缓存和消息中间件。 value支持多种类型的数据结构如strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglo…

VMware虚拟机安装及虚拟机下安装ubuntu(附安装包)

VMware虚拟机安装及虚拟机下安装ubuntu 0 前期准备1 VMware安装2 VMware虚拟机下安装ubuntu2.1 配置虚拟机2.2 安装虚拟机ubuntu 3 在虚拟机中卸载Ubuntu参考 0 前期准备 1、VMware Wworkstation Pro安装包下载 官网-添加链接描述 百度网盘分享&#xff1a; 链接: VMware 提取…

【实践出真知】使用Docusaurus将md文档组织起来就是一个网站(写API文档,写教程、写日记、写博客的有福了)

文章目录 前言一、Docusaurus 是什么&#xff1f;二、一键生成网站框架并预览1. 系统需求2. 脚手架项目网站&#xff08;一键生成网站框架&#xff09;3. 生成的目录内容4. 网站运行与展示 总结 前言 前段时间&#xff0c;学习Flet&#xff0c;访问到Flet中文网&#xff0c;被…

Golang `os/signal`包详解:全面掌握信号处理技巧

Golang os/signal包详解&#xff1a;全面掌握信号处理技巧 1. 引言2. os/signal包简介2.1 基本功能2.2 主要用途2.3 基本概念2.4 使用方法 3. 信号的类型和使用场景3.1 常见的操作系统信号3.2 信号的使用场景3.2.1 优雅关闭程序3.2.2 重新加载配置文件3.2.3 处理子进程退出 4. …

Chapter 21 深入理解JSON

欢迎大家订阅【Python从入门到精通】专栏&#xff0c;一起探索Python的无限可能&#xff01; 文章目录 前言一、JSON数据格式1. 什么是JSON&#xff1f;2. JSON数据的格式 二、JSON格式数据转化三、格式化JSON数据的在线工具 前言 在当今数据驱动的世界中&#xff0c;JSON&…

【iOS】线程同步读写安全技术(锁、信号量、同步串行队列)

目录 多线程安全隐患存钱取钱问题卖票问题 解决方案1. 锁自旋锁OSSpinLockos_unfair_lockatomic 互斥锁pthread_mutex_t条件pthread_cond_t&#xff08;线程检查器&#xff09;NSLock&NSRecursiveLock&#xff08;递归锁&#xff09;NSCondition&#xff08;条件锁&#xf…

C++ | Leetcode C++题解之第307题区域和检索-数组可修改

题目&#xff1a; 题解&#xff1a; class NumArray { private:vector<int> tree;vector<int> &nums;int lowBit(int x) {return x & -x;}void add(int index, int val) {while (index < tree.size()) {tree[index] val;index lowBit(index);}}int p…

3.2.微调

微调 ​ 对于一些样本数量有限的数据集&#xff0c;如果使用较大的模型&#xff0c;可能很快过拟合&#xff0c;较小的模型可能效果不好。这个问题的一个解决方案是收集更多数据&#xff0c;但其实在很多情况下这是很难做到的。 ​ 另一种方法就是迁移学习(transfer learning…

FFplay介绍及命令使用指南

&#x1f60e; 作者介绍&#xff1a;欢迎来到我的主页&#x1f448;&#xff0c;我是程序员行者孙&#xff0c;一个热爱分享技术的制能工人。计算机本硕&#xff0c;人工制能研究生。公众号&#xff1a;AI Sun&#xff08;领取大厂面经等资料&#xff09;&#xff0c;欢迎加我的…

微软Win11 24H2最新可选更新补丁26100.1301来袭!

系统之家于7月31日发出最新报道&#xff0c;微软针对Win11 24H2用户推出七月最新的可选更新KB5040529&#xff0c;本次更新为开始菜单引入了全新的账号管理器&#xff0c;也改进了任务栏上的小组件图标。接下来跟随系统之家小编来看看本次更新的详细内容吧&#xff01;【推荐下…

不同类型游戏安全风险对抗概览(下)| FPS以及小游戏等外挂问题,一文读懂!

FPS 游戏安全问题 由于射击类游戏本身需要大量数值计算&#xff0c;游戏方会将部分计算存放于本地客户端&#xff0c;而这为外挂攻击者提供了攻击的温床。可以说&#xff0c;射击类游戏是所有游戏中被外挂攻击最为频繁的游戏类型。 根据网易易盾游戏安全部门检测数据显示&#…

【排序算法】Java实现三大非比较排序:计数排序、桶排序、基数排序

非比较排序概念 非比较排序是一种排序算法&#xff0c;它不通过比较元素之间的大小关系来进行排序&#xff0c;而是基于元素的特征或属性进行排序。这种方法在特定情况下可以比比较排序方法&#xff08;如快速排序、归并排序等&#xff09;更有效率&#xff0c;尤其是在处理大…

【原创】java+ssm+mysql医生信息管理系统设计与实现

个人主页&#xff1a;程序员杨工 个人简介&#xff1a;从事软件开发多年&#xff0c;前后端均有涉猎&#xff0c;具有丰富的开发经验 博客内容&#xff1a;全栈开发&#xff0c;分享Java、Python、Php、小程序、前后端、数据库经验和实战 开发背景&#xff1a; 随着信息技术的…