Flink 实时数仓(二)【DIM 层搭建】

1、DIM 层搭建

1.1、设计要点

DIM层设计要点:

  • DIM层存的是维度表(环境信息,比如人、场、货等)
  • DIM层的数据存储在 HBase 表中
  • DIM层表名的命名规范为dim_表名

DIM 层表是用于维度关联的,要通过主键(维度外键)去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,所以这里选用 HBase 存储维度数据。

1.2、设计分析

在 ODS 层,我们需要首先把所有维表全量同步一次,之后当事实数据来了的时候就可以直接关联;现在 DIM 层,我们需要考虑的问题就是,如何把维表信息保存到 HBase:

  • 读取数据
    • Kafka 的 topic_db 主题中(包含所有46张业务表)
    • 可以使用 Flink 的 Kafka 连接器读取
  • 过滤数据
    • 从 46 张业务表中过滤出所有维表数据
      • 在代码中写死十几张维度表的表名
      • 问题:如果增加维表就需要修改代码重新编译项目,重启任务
      • 思考:如何不修改代码且不重启任务?
        • 1)定时任务:每隔一段时间加载一次配置信息(实时性不好,不可取)
        • 2)监控配置信息:
          • MySQL binlog:配置信息写到 MySQL,然后使用 FlinkCDC 监控直接创建流(数据流 connect 配置流,将配置流中的信息写入到状态中,然后数据流把状态中存在的表过滤出来
          • 文件:Flume tailDir Source -> Kafka -> Flink 的 kafka source 消费创建流(太复杂了,不可取)

注意:多个并行度下,配置流中的数据(需要过滤的表名)会被分配给多个相同的算子处理,会导致并行算子之间的状态不一致,可能导致数据丢失的问题;这就需要把配置流做成一个广播流来和数据流进行 connect,这样写入并行算子的状态就是一致的了;

        广播流的缺点就是存在冗余,而且并行度越大冗余也越大;配置信息小点还好,如果配置信息很大,那么将会占用的资源页越大;这种情况下我们的解决办法就是分流:

        对数据流和配置流都按照表名进行 keyby,相同的 key 再去做 connect,但是这种方案会产生数据倾斜;

  • 写出数据
    • 使用 Phoenix 写出到 HBase(使用 JdbcSink,如果不行就自定义 Sink)

1.3、DIM 层实现

DIM 层的主要任务:读取 Kafka 的数据 -> 简单ETL ->  保存到HBase

        Maxwell 同步过来的数据是到 Kafka 的,我们通过 Flink 自带的 Kafka 连接器进行连接读取,然后对数据先进行简单的 ETL,比如

  • 删除掉非 JSON 格式的数据(这里因为是业务数据所以一般不会有非JSON的情况出现,但是日志数据可能会存在这种情况)
  • 删除掉 type 为 bootstrap-start 和 bootstrap-complete 的数据;
  • 删除掉 type 为 delete 的数据;

最后,通过 Phoenix API 将数据插入到 HBase;

1.3.1、读取 Kafka 中的数据

        读取 Kafka 中的数据为的是创建主流,这里设计到的一个重点就是:Flink 作为消费者在从 Kafka 消费的时候需要对数据进行反序列化,而在反序列时如果使用 Flink 默认的 Kafka 反序列化器(FlinkKafkaConsumer)进行消费的话,可能会出现空指针异常:

        可以看到,反序列化方法中是直接把 kafka message 创建为一个 String 对象,但是 String 的构造器源码中明确声明构造参数不可为 null,而我们的 message 又不可避免存在一些空值,所以这里我们需要重写 FlinkKafkaConsumer 的反序列化方法:

public class MyKafkaUtil {private static final String KAFKA_SERVER = "hadoop102:9092";public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic,String groupId){Properties properties = new Properties();properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);return new FlinkKafkaConsumer<String>(topic,// 反序列化格式new KafkaDeserializationSchema<String>() {@Overridepublic boolean isEndOfStream(String nextElement) {return false; // 无界流所以返回 false}@Overridepublic String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {if (record == null || record.value() == null){return null;}else {return new String(record.value());}}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}},properties);}
}

这样,我们就可以直接使用通过 Kafka 地址主题 和 组id 使用 Flink 对 Kafka 的主题进行消费:

env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

至此,我们的主流已经创建完毕; 

1.3.2、简单 ETL 

        这一步主要为的是将不必要的数据移除掉,比如非JSON数据(日志数据中才可能出现)、maxwell 的脏数据( bootstrap-start 、bootstrap-complete 和 delete)这种无意义的信息;

public class DimApp {public static void main(String[] args) {// TODO 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 生产环境中设置为kafka主题的分区数// 1.1 开启checkpointenv.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次// 1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());// TODO 2. 读取Kafka的topic_db主题,创建主流String topic = "topic_db";String groupId = "test";DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));// TODO 3. ETL// TODO 3.1 过滤掉非JSON数据以及Maxwell的脏数据SingleOutputStreamOperator<JSONObject> filterJsonDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {// 过滤非JSON数据JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if ("bootstrap-insert".equals(type) || "insert".equals(type) || "update".equals(type)) {out.collect(jsonObject);}} catch (Exception e) {System.out.println("发现脏数据" + value);}}});// ...}
}

关于 Flink 的并行度,生产中一般设置为 Kafka 的分区数量(使消费者数量 = 主题分区数量),而不是机器的 CPU核数(机器总不能只跑这一个任务)!

1.3.3、动态增删维表

        现在我们需要从 46 张业务表中过滤出需要的维表,而且这个维表并不是写死的,很可能会出现新增和删除,所以我们希望做到不修改代码且不重启服务的情况下实现,我们上面业说过了,最好使用监控配置信息的方式,一共有三种解决方案:

  • MySQL binlog
    • 也就是把配置信息做成表格,使用 FlinkCDC 实时监控,使用双流联结(主流和配置流),配置流把配置信息(需要同步的维度表信息,包括业务系统中的维表名、写入到phoenix的表名、字段、主键、额外信息等)写入到状态当中,然后主流再去状态中读取并处理;
  • 文件:使用 Flume 的 tailDir source 实时监听文件内容,写到 Kafka 当中,Flink 再从 Kafka 去读,这种方式太复杂了,一般不用
  • Zookeeper:通过 Zookeeper 的 watch 机制将配置信息写入到一个 znode 节点,同样比较复杂

综上,我们一般选择第一种方案:

1)创建配置表
CREATE TABLE `table_process` (`source_table` varchar(200) NOT NULL COMMENT '业务系统来源表', -- mysql业务系统中的表名`sink_table` varchar(200) DEFAULT NULL COMMENT 'phoenix输出表', -- phoenix的表名`sink_columns` varchar(2000) DEFAULT NULL COMMENT 'phoenix建表所需字段', -- 建表需要的字段,过滤主流数据字段`sink_pk` varchar(200) DEFAULT NULL COMMENT 'phoenix建表的主键字段', -- 建表使用的主键(表名做主键)`sink_extend` varchar(200) DEFAULT NULL COMMENT 'phoenix建表扩展', -- 比如预分区等信息PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

这里,我们将 source_table 作为配置表的主键,这样可以通过它获取到该表需要同步的字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。

2)创建配置表的实体类

配置流在创建的时候我们并不会直接把它转为 JSON 格式,毕竟我们还要对它进行一些处理,而操作java对象比json对象要更容易;

@Data
public class TableProcess {//来源表String sourceTable;//输出表String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}
3)使用 FlinkCDC 创建配置流

注意:FlinkCDC 把 binlog 读取过来会转为 json 格式

        // TODO 4. 使用FlinkCDC读取MySQL配置信息表创建配置流MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("hadoop102").port(3306).username("root").password("123456").databaseList("gmall").tableList("gmall.table_config").startupOptions(StartupOptions.initial()) // 全部读取.deserializer(new JsonDebeziumDeserializationSchema()) // flink读取binlog会把它转为json格式,所以这里需要一共json的反序列化方式.build();DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"MysqlSource");
4)配置流形成广播流

配置流不能直接和主流联结,会造成数据丢失(多并行度下,配置信息会轮询发送到相同的算子上),所以我们需要把它转为广播流;

创建广播流需要传入一个 Map 类型的状态描述器:

  • K 必须是主流和配置流都有的信息,这样主流才能和广播流产生关联,所以这里我们使用表名做为 K;
  • V 是配置流中的数据,这里我们选择上面自定义 TableProcess 对象,这个对象包含了该表(K)的所有配置信息;
        // TODO 5. 将配置流处理为广播流// K的要求: 1.必须是主流和配置流都有的字段 2. 唯一// V: 这里的v应该是配置流中的数据,但是为了方便过滤字段(操作java对象比json对象要容易),所以这里转为Java对象MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);// 这里的泛型是广播流的类型BroadcastStream<String> configBroadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);
5)双流联结并处理

这里对主流和配置流联结后需要进行处理:

  • 把配置流中的配置信息写入到状态后端使其自动广播
    • 从配置流(FlinkCDC 读取过来 json 信息)中提取出表格信息
    • 校验 phoenix 表格(不存在就创建)
    • 写入到状态中(因为现在是广播流所以会自动广播)
  • 把主流中的非维表去除掉,以及维表中不需要的字段
    • 从状态中获得配置信息
    • 除去非维表并过滤字段,只留下配置流当中存在的字段
    • 给主流添加上配置信息中的 sinkTable 字段(因为主流不知道最终向哪个phoenix表写入)
        // TODO 6. 连接主流和广播流// 这里的泛型: 1. 非广播流的数据类型 2. 广播流的数据类型BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonDS.connect(configBroadcastStream);// TODO 7. 处理连接流,根据配置信息处理主流数据// 得到维表数据流(已经把配置流中不需要的维表字段过滤掉了,以及非维表也被过滤掉了)SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));

这里的 TableProcessFunction 是我们自定义的广播流处理函数:

// 这里的泛型: 1. 非广播流的数据类型 2. 广播流的数据类型 3. 输出类型这里选择主流的数据类型,因为毕竟我们用配置流的目的就是为了得到过滤后的主流数据
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private Connection connection;private MapStateDescriptor<String, TableProcess> mapStateDescriptor;public TableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor){this.mapStateDescriptor = mapStateDescriptor;}// 保证每个并行度创建一个连接@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}@Overridepublic void close() throws Exception {connection.close();}/*** value 的值:* {*  "before":null,*  "after":{*      "source_table":"aa",*      "sink_table":"bb",*      "sink_columns":"cc",*      "sink_pk":"id",*      "sink_extend":"xxx"},*  "source":{*      "version":"1.5.4.Final",*      "connector":"mysql",*      "name":"mysql_binlog_source",*      "ts_ms":1652513039549,*      "snapshot":"false",*      "db":"gmall-211126-config",*      "sequence":null,*      "table":"table_process",*      "server_id":0,*      "gtid":null,*      "file":"",*      "pos":0,*      "row":0,*      "thread":null,*      "query":null},*   "op":"r",*   "ts_ms":1652513039551,*   "transaction":null}*/@Overridepublic void processBroadcastElement(String value, Context context, Collector<JSONObject> collector) throws Exception {// TODO 1. 获取并解析数据,方便主流操作(把 "after" 字段的内容解析为 TableProcess 对象)JSONObject jsonObject = JSONObject.parseObject(value);TableProcess tableProcess = JSONObject.parseObject(jsonObject.getString("after"), TableProcess.class);// TODO 2. 校验表在phoenix中是否存在checkTable(tableProcess.getSinkTable(),tableProcess.getSinkColumns(),tableProcess.getSinkPk(),tableProcess.getSinkExtend());// TODO 3. 写入状态,广播出去BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);broadcastState.put(tableProcess.getSourceTable(),tableProcess);}/*** 校验并创建phoenix表: create table if not exists db.tb(xx varchar primary key,xx varchar, ...) xxx* @param sinkTable 表名* @param sinkColumns 字段* @param sinkPk 主键* @param sinkExtend 扩展字段*/private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {PreparedStatement preparedStatement = null;try {// 处理特殊字段值(null)if (sinkPk == null || "".equals(sinkPk))sinkPk = "id";if (sinkExtend == null)sinkExtend = "";// 拼接 SQLStringBuilder sql = new StringBuilder("create table if not exists ").append(GmallConfig.HBASE_SCHEMA).append(".").append(sinkTable).append("(");String[] columns = sinkColumns.split(",");for (int i = 0; i < columns.length-1; i++) {sql.append(" ").append(columns[i]).append(" varchar");if (columns[i].equals(sinkPk))sql.append(" primary key");sql.append(",");}sql.append(columns[columns.length-1]).append(") ").append(sinkExtend);// 编译 SQLpreparedStatement = connection.prepareStatement(sql.toString());// 执行 SQLpreparedStatement.execute();}catch (SQLException e){// 要停止程序必须使用运行时异常而不是编译时异常throw new RuntimeException("建表失败 " + sinkTable);}finally {// 释放资源if (preparedStatement != null){try {preparedStatement.close();} catch (SQLException e) {e.printStackTrace();}}}}// 主流数据是一个json格式(maxwell数据)/*** {*  "database":"gmall-211126-flink",*  "table":"base_trademark",*  "type":"bootstrap-insert",*  "ts":1652499295,*  "data":{*      "id":1,*      "tm_name":"三星",*      "logo_url":"/static/default.jpg"*      }* }*/@Overridepublic void processElement(JSONObject jsonObject, ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {// TODO 1. 获取广播的配置数据ReadOnlyBroadcastState<String, TableProcess> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);// 如果返回 null 说明不是维表TableProcess tableProcess = broadcastState.get(jsonObject.getString("table"));// TODO 2. 过滤字段,只留下配置流当中存在的字段if (tableProcess == null) return;filterColumns(jsonObject.getJSONObject("data"),tableProcess.getSinkColumns());// TODO 3. 补充 SinkTable 字段(因为主流中是不包含phoenix表名的)jsonObject.put("sinkTable",tableProcess.getSinkTable());collector.collect(jsonObject);}/*** 过滤字段: 将主流中配置流中有的字段留下,其它字段删除* @param data 可能是维表,也可能是事实表 {"database":"gmall-211126-flink","table":"base_trademark","type":"bootstrap-insert","ts":1652499295,"data":{"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}}* @param sinkColumns phoenix 列名*/private void filterColumns(JSONObject data, String sinkColumns) {// 把 JSONObject 当做 Map 处理即可String[] split = sinkColumns.split(",");Set<String> phoenix_columns = new HashSet<>(Arrays.asList(split));for (String column : data.keySet()){if (!phoenix_columns.contains(column)){data.remove(column);}}// 简写: data.entrySet().removeIf(entry -> !phoenix_columns.contains(entry.getKey()));}}

至此,我们就得到了最终等待写入到 HBase 的流 dimDS;

1.3.4、写入 Phoenix

        上面我们在连接 phoenix 校验表格的时候用的是 jdbc 来访问的,而 Flink 也提供了 JdbcSink 连接器,那这里我们能不能使用呢?

        其实这里使用 JdbcSink 是可以的,但是不推荐,因为 JdbcSink 适合的是单表写入的场景,而我们的 dimDS 数据流中存放的是多个维度表的数据,这就要求当数据来的时候,我们要根据不同的表生成不同的 SQL,而这里的 addSink 方法中的 sql 语句必须是先给定的,尽管不确定的表名、字段名等可以使用占位符,但是我们不能保证所有维表的字段数量都是一样的;所以,这种方式显然不可取,那我们就只能自定义一个 Sink 了:

1)创建 Druid 连接池

Phoenix 是支持 JDBC 协议,这里为了方便连接管理我们使用 Druid 来创建连接池;

public class DruidDSUtil {private static DruidDataSource druidDataSource = null;public static DruidDataSource createDataSource() {// 创建连接池druidDataSource = new DruidDataSource();// 设置驱动全类名druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);// 设置连接 urldruidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);// 设置初始化连接池时池中连接的数量druidDataSource.setInitialSize(5);// 设置同时活跃的最大连接数druidDataSource.setMaxActive(20);// 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0druidDataSource.setMinIdle(1);// 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待druidDataSource.setMaxWait(-1);// 验证连接是否可用使用的 SQL 语句druidDataSource.setValidationQuery("select 1");// 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除// 注意,默认值为 true,如果没有设置 validationQuery,则报错// testWhileIdle is true, validationQuery not setdruidDataSource.setTestWhileIdle(true);// 借出连接时,是否测试,设置为 false,不测试,否则很影响性能druidDataSource.setTestOnBorrow(false);// 归还连接时,是否测试druidDataSource.setTestOnReturn(false);// 设置空闲连接回收器每隔 30s 运行一次druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);// 设置池中连接空闲 30min 被回收,默认值即为 30 mindruidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);return druidDataSource;}
}
2)Phoenix 工具类

为了方法的复用性,我们把写入 Phoenix 的方法抽出来:

public class PhoenixUtil {/*** 将主流数据写入 phoenix* @param connection phoenix连接* @param sinkTable 表名* @param data 数据* @throws SQLException 这里的异常直接抛出去.因为工具类中的方法是给大家公用的,而不同的业务捕获到异常的处理方案是不一样的* 所以这里把处理异常的权利交给每个调用该方法的人*/public static void upsertValues(DruidPooledConnection connection, String sinkTable, JSONObject data) throws SQLException {// 1. 拼接 SQL: upsert into db.tb(id,name,sex) values ('1001','zhangsan','man')Set<String> columns = data.keySet();Collection<Object> values = data.values();String sql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "("+ StringUtils.join(columns,",") +") values ( '"+ StringUtils.join(values,"','") + "')";// 2. 预编译 SQLPreparedStatement preparedStatement = connection.prepareStatement(sql);// 3. 执行preparedStatement.execute();connection.commit();// 4. 释放资源preparedStatement.close(); // connection 在 Sink 的 invoke 里面关}
}
3)自定义 Sink

自定义 Sink ,在 open 方法中获得连接,在 invoke 中执行插入数据到 Phoenix ,然后回收连接;

// 输入数据类型应该是 dimDS 的类型,也就是主流类型 JSONObject
public class DimSinkFunction extends RichSinkFunction<JSONObject> {private DruidDataSource druidDataSource = null;@Overridepublic void open(Configuration parameters) throws Exception {druidDataSource = DruidDSUtil.createDataSource();}/*** value:* {*  "database":"gmall-211126-flink",*  "table":"base_trademark",*  "type":"bootstrap-insert",*  "ts":1652499295,*  "data":{*      "id":1,*      "tm_name":"三星"*      },*  "sinkTable": "dim_xxx"* }*/@Overridepublic void invoke(JSONObject value, Context context) throws Exception {// 获取连接DruidPooledConnection connection = druidDataSource.getConnection();// 写出数据(需要知道写出的表名、字段)String sinkTable = value.getString("sinkTable");JSONObject data = value.getJSONObject("data");// 如果插入数据失败 invoke 方法抛出的 Exception 会导致程序停止PhoenixUtil.upsertValues(connection,sinkTable,data);// 归还连接connection.close();}
}

总结

        至此,DIM 层搭建完毕,在离线数仓的 DIM 层中,它需要在 ODS 层的基础上抽取出主维表和相关维表,然后主维表通过 left join 相关维表得到最终的 dim 层的维表;而实时数仓中我们主要是通过 Flink 代码来对数据流进行实时处理,代码的编写确实比 SQL 更有意思;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3280774.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Magento2 常用命令以及技巧

1.Magento 命令行工具 Magento2 带有一个命令行工具&#xff0c;在windows下&#xff0c;用管理员权限打开MS-DOS命令提示符&#xff0c;然后cd到Magento根目录&#xff0c;运行下面命令&#xff0c;就可 以看到这个强大的命令行工具的命令清单&#xff1a; php bin/magentoU…

【echarts】echarts-liquidfill 水球图

echarts-liquidfill3兼容echarts5 echarts-liquidfill2兼容echarts4 npm install echarts npm install echarts-liquidfill设置水球图背景色和内边框样式 var option {series: [{type: liquidFill,data: [0.6, 0.5, 0.4, 0.3],backgroundStyle: {borderWidth: 5,//边框宽度bo…

《史上最简单的SpringAI+Llama3.x教程》-04-RAG核心Embedding及向量检索Retrieval

上一节咱们顺利从本地读取了文件内容&#xff0c;并且可以使用transform工具对文件进行内容处理&#xff0c;下面咱们继续看看如何将文件进行向量化&#xff0c;并且存储到向量数据库中。 Embedding 知识扩展 Embeddings是一种将高维数据映射到低维空间的技术&#xff0c;它能…

【Webpack 踩坑】img 标签图片加载不出来

问题&#xff1a;在html的img标签路径解析错误&#xff0c;导致加载不出来 一直用框架开发&#xff0c;好久没用过webpack写原生代码&#xff0c;一下子踩了好多坑… 图片位置&#xff1a; 其中一个就是在html中写了图片地址&#xff1a; <!-- src/pages/index.html --&…

实战:ElasticSearch 索引操作命令(补充)

四.ElasticSearch 操作命令 4.1 集群信息操作命令 4.1.1 查询集群状态 &#xff08;1&#xff09;使用 Postman 客户端直接向 ES 服务器发 GET 请求 http://hlink1:9200/_cat/health?v &#xff08;2&#xff09;使用服务端进行查询 curl -XGET "hlink1:9200/_cat/h…

2024.07纪念一 debezium : spring-boot结合debezium

使用前提&#xff1a; 一、mysql开启了logibin 在mysql的安装路径下的my.ini中 【mysqlid】下 添加 log-binmysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义&#xff0c;不要和 canal 的 slaveId 重复 参考gitee的项目…

收藏!国内外GPU算力厂商详细盘点

如今&#xff0c;图形处理器&#xff08;GPU&#xff09;作为计算领域的核心部件&#xff0c;其算力性能直接决定了诸多应用场景的效率和效果。从深度学习、科学计算到视频处理&#xff0c;GPU的算力已成为衡量技术实力的重要指标。本文将详细盘点当前市场上GPU算力领先的厂商&…

iOS开发-图片上涂鸦绘制撤销功能

iOS开发-图片上涂鸦绘制撤销功能 当我们需要重新在图片上进行绘制涂鸦生成新的图&#xff0c;这里使用到了Graphics中的API功能。 Graphics Framework是一套基于C的API框架&#xff0c;使用了Quartz作为绘图引擎。它提供了低级别、轻量级、高保真度的2D渲染。 微信搜索小游戏…

单线程 和多线程区别,看打印输出1000个数字效果

执⾏过程: 加载func() -> 执⾏main -> 创建⼦线程t -> ⼦线程t启动 -> 执⾏func中的内容 |-> 继续执⾏main from threading import Thread #此线程不用安装自带。T是大写注意哟 def func():for i in range(1000):print(func,i) #定义一个函数打印 if __name__ …

<数据集>DOTA v1.0遥感航拍目标识别数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;1869张&#xff08;训练集1411&#xff0c;验证集458&#xff09; 标注数量(xml文件个数)&#xff1a;1869 标注数量(txt文件个数)&#xff1a;1869 标注类别数&#xff1a;15 标注类别名称&#xff1a;[plane, ba…

基于Python的哔哩哔哩国产动画排行数据分析系统

需要本项目的可以私信博主&#xff0c;提供完整的部署、讲解、文档、代码服务 随着经济社会的快速发展&#xff0c;中国影视产业迎来了蓬勃发展的契机&#xff0c;其中动漫产业发展尤为突出。中国拥有古老而又璀璨的文明&#xff0c;仅仅从中提取一部分就足以催生出大量精彩的…

python——joblib进行缓存记忆化-对计算结果缓存

问题场景 在前端多选框需要选取多个数据进行后端计算。 传入后端是多个数据包的对应路径。 这些数据包需要按一定顺序运行&#xff0c;通过一个Bag(path).get_start_time() 可以获得一个float时间值进行排序&#xff0c;但由于数据包的特性&#xff0c;这一操作很占用性能和时…

碰撞检测 | 矩形增量膨胀安全走廊模型(附C++/Python仿真)

目录 0 专栏介绍1 安全走廊建模的动机2 矩形增量膨胀算法3 算法仿真3.1 C实现3.2 Python实现 0 专栏介绍 &#x1f525;课设、毕设、创新竞赛必备&#xff01;&#x1f525;本专栏涉及更高阶的运动规划算法轨迹优化实战&#xff0c;包括&#xff1a;曲线生成、碰撞检测、安全走…

哪个牌子的眼镜清洗机好?买超声波清洗机有必要吗

生活中&#xff0c;我们经常忽视眼镜的清洁。你知道吗&#xff1f;眼镜如果长时间不清洁的话&#xff0c;镜片上的污垢和油脂会让视线变得模糊不清&#xff0c;甚至油污滋生的细菌还可能伤害到我们的眼睛&#xff0c;比如引起眼睛疲劳或炎症。为了保持眼镜干净&#xff0c;现在…

生鲜 52 周 MD如何助力业绩提升

生鲜 52 周 MD &#xff0c;顾名思义&#xff0c;就是以一年 52 周为周期&#xff0c;对生鲜商品进行精细化、动态化的营销规划。它不再是传统的固定化、模式化的销售方式&#xff0c;而是根据每周的季节特点、节日氛围、消费趋势以及市场变化&#xff0c;精心策划生鲜商品的种…

11 优化器

目录 1. 随机梯度下降系优化器&#xff1a;SGD 1.1 算法种类 1.2 优缺点 2 SGDM 即为SGD with momentum 动量 2.1 公式 2.2 动量的优缺点 优点 缺点 2.3 使用场景 3 AdaGrad 3.1 公式 3.2 AdaGrad的优缺点 优点 缺点 3.3 使用场景 3.4 Adam 3.4.1 Adam优化器的…

计算机网络01

文章目录 浏览器输入URL后发生了什么&#xff1f;Linux 系统是如何收发网络包的&#xff1f;Linux 网络协议栈Linux 接收网络包的流程Linux 发送网络包的流程 浏览器输入URL后发生了什么&#xff1f; URL解析 当在浏览器中输入URL后&#xff0c;浏览器首先对拿到的URL进行识别…

2024最全RabbitMQ集群方案汇总

之前在网上找rabbitmq集群方案有哪几种时&#xff0c;基本上看到的答案都不太一样&#xff0c;所以此文的主要目的是梳理一下rabbitmq集群方案&#xff0c;对rabbitmq集群方案的笔记并不是搭建的笔记。 总结了一些文章&#xff0c;rabbitmq集群大概有五种方案&#xff1a;普通…

[Linux安全运维] iptables包过滤

前言 防火墙是网络安全中非常重要的设备&#xff0c;是一种将内部网络和外部网络隔离开的技术。简单来说&#xff0c;防火墙技术就是访问控制技术&#xff0c;由规则和动作组成。 1. Linux 包过滤防火墙 1 .1 概述 iptables&#xff1a; 指的是管理Linux防火墙的命令程序&a…

Windows 下后台启动 jar 包,UTF-8 启动 jar 包_windows启动jar

转自:https://blog.csdn.net/2401_83817971/article/details/137514739 本文介绍了如何使用javaw.exe后台启动Javajar包&#xff0c;如何在Windows中管理和设置cmd编码&#xff0c;以及与Python开发相关的学习资源。包括UTF-8编码启动jar包的方法和Windows下关闭后台服务的技…