Java版Flink使用指南——分流导出

大纲

  • 新建工程
  • 编码
    • Pom.xml
    • 自定义无界流
    • 分流
  • 测试
  • 工程代码

在之前的案例中,我们一直使用的是单个Sink来做数据的输出。实际上,Flink是支持多个输出流的。本文我们就来讲解如何在Flink数据输出时做分流处理。
我们将基于《Java版Flink使用指南——自定义无界流生成器》的输入流,按生成数字的奇偶性,将其分流输出到不同的RabbitMQ队列中。

新建工程

我们新建一个名字叫MultiSinkTo的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

编码

Pom.xml

因为我们要往RabbitMQ中输出,所以需要引入相关连接组件。

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
这块的代码可以见《Java版Flink使用指南——自定义无界流生成器》
它会每隔1秒钟生成一个递增的数字

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

分流

我们通过下面的代码生成数据流

		DataStreamSource<Long> longDataStreamSource = env.addSource(new UnBoundedStreamGenerator());

然后奇数发布到odd.data.to.rbtmq队列;偶数发布到even.data.to.rbtmq。
分流主要是通过filter来区分数据,然后针对不同的数据addSink来发布到不同的队列。
如果不需要区分数据,只是将相同的数据发布到不同的目的地,则可以直接多次addSink来达成。

		String host = "172.25.103.252"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();int parallelism = 1;String oddSinkQueueName = "odd.data.to.rbtmq"; RMQSink<String> oddRMQSink = new RMQSink<>(rmqConnectionConfig, oddSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 != 0).map(Object::toString).addSink(oddRMQSink).setParallelism(parallelism).name("oddSink");String evenSinkQueueName = "even.data.to.rbtmq";RMQSink<String> evenRMQSink = new RMQSink<>(rmqConnectionConfig, evenSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 == 0).map(Object::toString).addSink(evenRMQSink).setParallelism(parallelism).name("evenSink");

测试

执行一段时间后,我们看到两个队列相序增加
在这里插入图片描述
奇数队列
在这里插入图片描述
偶数队列
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3224641.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

公司内部配置GitLab,通过SSH密钥来实现免密clone、push等操作

公司内部配置GitLab&#xff0c;通过SSH密钥来实现免密clone、push等操作。以下是配置SSH密钥以实现免密更新的步骤&#xff1a; 1.生成SSH密钥 在本地计算机上打开终端或命令提示符。输入以下命令以生成一个新的SSH密钥&#xff1a;ssh-keygen -t rsa -b 4096 -C "your…

自动驾驶事故频发,安全痛点在哪里?

大数据产业创新服务媒体 ——聚焦数据 改变商业 近日&#xff0c;武汉城市留言板上出现了多条关于萝卜快跑的投诉&#xff0c;多名市民反映萝卜快跑出现无故停在马路中间、高架上占最左道低速行驶、转弯卡着不动等情况&#xff0c;导致早晚高峰时段出现拥堵。萝卜快跑是百度 A…

Mac VSCode 突然闪退、崩溃、打不开了

1、 思路历程 VSCode 作为前端常用开发工具&#xff0c;其重要性就不一一描述了。 所以 VSCode 突然打不开了&#xff0c;真的是让我一脸懵逼。 本来以为问题不大&#xff0c;于是 &#xff1a; 1、重启了一下VSCode 2、关机重启了一下电脑&#xff1b; 3、清理了一下缓存&am…

RequestContextHolder多线程获取不到request对象

RequestContextHolder多线程获取不到request对象&#xff0c;调用feign接口时&#xff0c;在Feign中的RequestInterceptor也获取不到HttpServletRequest问题解决方案。 1.RequestContextHolder多线程获取不到request对象 异常信息&#xff0c;报错如下&#xff1a; 2024-07-0…

设计模式8-桥模式

设计模式8-Bridge 桥模式 由来与目的模式定义结构代码推导1. 类和接口的定义2. 平台实现3. 业务抽象4. 使用示例总结1. 类数量过多&#xff0c;复杂度高2. 代码重复3. 不符合单一职责原则4. 缺乏扩展性改进后的设计1. 抽象和实现分离&#xff08;桥接模式&#xff09;2. 抽象类…

24/7/10总结

flex布局 父项常见属性 justify-content:设置主轴上的子元素排列方式 flex-wrap:设置子元素是否换行 align-items:设置侧轴上的子元素的排列方式&#xff08;单行&#xff09; 拉伸要把子盒子里的高度给去掉 如果两个align-items都是center并且主轴是y轴就是这种效果…

【鸿蒙学习笔记】使用动画

官方文档&#xff1a;使用动画 目录标题 属性动画&#xff1a;通用属性发生改变时而产生的属性渐变效果animationanimateTo自定义属性动画 AnimatableExtend 转场动画&#xff1a;是页面或组件的切换动画 , 显示/隐藏 切换时的动画出现/消失转场&#xff1a;实现一个组件出现或…

Allegro纠纷管理:构建和谐交易的桥梁

在Allegro这个欧洲领先的电商平台&#xff0c;每天都有成千上万的交易发生&#xff0c;纠纷可能源于多种原因&#xff0c;包括但不限于商品描述不符、运输损坏、未收到货、退货退款问题等。纠纷留言便成为了连接买卖双方&#xff0c;解决争议的桥梁。在Allegro平台上&#xff0…

部署前端项目

常见部署方式有&#xff1a;静态托管服务、服务器部署 1. 静态托管服务 使用平台部署代码&#xff0c;比如 GitHub。 | 创建一个仓库&#xff0c;仓库名一般是 yourGithubName.github.io。 | 将打包后的静态文件文件上传到仓库。 | 在“Settings”&#xff08;选项&#xff0…

初始化线程的4种方式

1. 继承Thread 缺点&#xff1a;无法获取线程的运算结果。 public class ThreadTest{public static void main(String[] args){Thread01 thread new Thread01();thread.start();}public static class Thread01 extends Thread{public void run(){System.out.println("当前…

PageDTO<T>,PageQuery,BeanUtils,CollUtils的封装

一、PageDTO<T> import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.annotation.JsonIgnore; import com.tianji.common.utils.BeanUtils; import com.tianji.common.utils.CollUtils; import com.tianji.common.utils.…

koa + http-proxy-middleware 搭建一个带转发的静态服务器

背景 由于工作中碰到写普通页面&#xff08;未使用脚手架&#xff09;&#xff0c;需要发起接口请求&#xff0c;但普通页面又无法对接口发起正常请求&#xff0c;故编写一个Koa搭建的带转发功能的静态服务器。 起步 新建一个文件夹&#xff0c;在文件夹下打开 cmd 或者 git …

AI提示词:AI辅导「数学作业」

辅导孩子作业对许多家长来说可能是一件头疼的事&#xff0c;但这部分工作可以在一定程度上交给AI来完成。 打开ChatGPT4,输入以下内容&#xff1a; # Role 数学辅导专家## Profile - author: 姜小尘 - version: 02 - LLM: Kimi - language: 中文 - description: 专门为小学生…

iOS 开发技巧 - 使用本地 json 文件

前言 使用本地 json 文件的场景&#xff0c;在我们开发功能的阶段&#xff0c;服务端接口字段定义好了后&#xff0c;有些接口响应很慢&#xff0c;请求到响应可能要 几十秒甚至一分钟&#xff0c;我们需要频繁调用接口来调试功能&#xff1b;还有就是调用一些我们需要付费的三…

宿州降本 提质 增效 数据采集监控平台提高生产自动化水平

在当今竞争激烈的市场环境中&#xff0c;企业追求降本、提质、增效已成为发展的关键。而我们的[数据采集监控平台名称]数据采集监控平台&#xff0c;正是助力企业实现这一目标的强大工具。 LP-SCADA数据采集监控平台是工业4.0中主要的数据采集系统之一&#xff0c;主要针对产线…

限流组件都有哪些

限流组件有很多&#xff0c;下面是一些常见的限流组件&#xff1a; 1.Sentinel&#xff1a;轻量级的流量控制、熔断降级组件&#xff0c;适用于分布式系统间的流量控制、负载保护和系统防护。 1.Guava RateLimiter&#xff1a;Google 的 Guava 库提供的限流器&#xff0c;基于令…

基于STM32的智能加湿器

1.简介 基于STM32的加湿器发展前景非常乐观&#xff0c;这主要得益于其在技术、市场需求、应用场景以及政策支持等多方面的优势。STM32微控制器具备强大的处理能力和丰富的外设接口&#xff0c;能够实现精确的湿度监测和智能化控制。基于STM32的加湿器可以根据环境湿度自动调节…

2-29 基于matlab的CEEMD

基于matlab的CEEMD&#xff08;Complementary Ensemble Empirical Mode Decomposition&#xff0c;互补集合经验模态分解&#xff09;&#xff0c;先将数据精心ceemd分解&#xff0c;得到imf分量&#xff0c;然后通过相关系数帅选分量&#xff0c;在求出他们的样本熵的特征。用…

JAVA从入门到精通之入门初阶(一)

1. 认识变量 一、 首先变量名要遵循如下命名规则&#xff1a; 1. 变量名只能由字母、数字和下划线组成 2. 变量名必须以字母或下划线开头 3. 变量名大小写敏感 4. 变量名不能使用关键字&#xff0c;如const、static等 5. 变量名应具有描述性&#xff0c;以便于代码的可读性…

【人工智能】-- 搜索技术(状态空间法)

个人主页&#xff1a;欢迎来到 Papicatch的博客 课设专栏 &#xff1a;学生成绩管理系统 专业知识专栏&#xff1a; 专业知识 文章目录 &#x1f349;引言 &#x1f348;介绍 &#x1f349;状态空间法 &#x1f348;状态空间的构成 &#x1f34d;状态 &#x1f34d;算符…