milvus对象存储和消息中间件的工厂设计模式分析

milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

mq:type: pulsarcommon:storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

在这里插入图片描述

工厂接口

代码路径:internal\util\dependency\factory.go

type Factory interface {msgstream.Factory// Init()给工厂传递参数。Init(p *paramtable.ComponentParam)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

type Factory interface {Init(p *paramtable.ComponentParam)NewMsgStream(ctx context.Context) (MsgStream, error)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {standAlone          boolchunkManagerFactory storage.FactorymsgStreamFactory    msgstream.Factory
}// storage.Factory
// internal\storage\factory.go
type Factory interface {NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {// skip if using default factoryif f.msgStreamFactory != nil {return}// 初始化chunkManagerFactoryf.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)// initialize mq client or embedded mq.// 初始化msgStreamFactoryif err := f.initMQ(f.standAlone, params); err != nil {panic(err)}
}

f.chunkManagerFactory:

return &ChunkManagerFactory{persistentStorage: persistentStorage,config:            c,}

f.msgStreamFactory:

func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))switch mqType {case mqTypeNatsmq:f.msgStreamFactory = msgstream.NewNatsmqFactory()case mqTypeRocksmq:f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)case mqTypePulsar:f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)case mqTypeKafka:f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)}if f.msgStreamFactory == nil {return errors.New("failed to create MQ: check the milvus log for initialization failures")}return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {return f.newChunkManager(ctx, f.persistentStorage)
}func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {switch engine {case "local":return NewLocalChunkManager(RootPath(f.config.rootPath)), nilcase "minio":return newMinioChunkManagerWithConfig(ctx, f.config)case "remote":return NewRemoteChunkManager(ctx, f.config)default:return nil, errors.New("no chunk manager implemented with engine: " + engine)}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.ClientbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStoragebucketName stringrootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {Newer             func(context.Context) (mqwrapper.Client, error) // client constructorDispatcherFactory ProtoUDFactoryReceiveBufSize    int64MQBufSize         int64
}// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {dispatcherFactory ProtoUDFactoryconfig            *paramtable.KafkaConfigReceiveBufSize    int64MQBufSize         int64
}// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {dispatcherFactory ProtoUDFactory// the following members must be public, so that mapstructure.Decode() can access themPulsarAddress    stringPulsarWebAddress stringReceiveBufSize   int64MQBufSize        int64PulsarAuthPlugin stringPulsarAuthParams stringPulsarTenant     stringPulsarNameSpace  stringRequestTimeout   time.DurationmetricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {Close()AsProducer(channels []string)Produce(*MsgPack) errorSetRepackFunc(repackFunc RepackFunc)GetProduceChannels() []stringBroadcast(*MsgPack) (map[string][]MessageID, error)AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) errorChan() <-chan *MsgPackSeek(ctx context.Context, offset []*MsgPosition) errorGetLatestMsgID(channel string) (MessageID, error)CheckTopicValid(channel string) errorEnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

type mqMsgStream struct {ctx              context.Contextclient           mqwrapper.Clientproducers        map[string]mqwrapper.ProducerproducerChannels []stringconsumers        map[string]mqwrapper.ConsumerconsumerChannels []stringrepackFunc    RepackFuncunmarshal     UnmarshalDispatcherreceiveBuf    chan *MsgPackcloseRWMutex  *sync.RWMutexstreamCancel  func()bufSize       int64producerLock  *sync.RWMutexconsumerLock  *sync.Mutexclosed        int32onceChan      sync.OnceenableProduce atomic.Value
}// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {*mqMsgStreamchanMsgBuf         map[mqwrapper.Consumer][]TsMsgchanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPositionchanStopChan       map[mqwrapper.Consumer]chan boolchanTtMsgTime      map[mqwrapper.Consumer]TimestampchanMsgBufMutex    *sync.MutexchanTtMsgTimeMutex *sync.RWMutexchanWaitGroup      *sync.WaitGrouplastTimeStamp      TimestampsyncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {// RootPath returns current root path.RootPath() string// Path returns path of @filePath.Path(ctx context.Context, filePath string) (string, error)// Size returns path of @filePath.Size(ctx context.Context, filePath string) (int64, error)// Write writes @content to @filePath.Write(ctx context.Context, filePath string, content []byte) error// MultiWrite writes multi @content to @filePath.MultiWrite(ctx context.Context, contents map[string][]byte) error// Exist returns true if @filePath exists.Exist(ctx context.Context, filePath string) (bool, error)// Read reads @filePath and returns content.Read(ctx context.Context, filePath string) ([]byte, error)// Reader return a reader for @filePathReader(ctx context.Context, filePath string) (FileReader, error)// MultiRead reads @filePath and returns content.MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)// ReadWithPrefix reads files with same @prefix and returns contents.ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.// if all bytes are read, @err is io.EOF.// return other error if read failed.ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)// Remove delete @filePath.Remove(ctx context.Context, filePath string) error// MultiRemove delete @filePaths.MultiRemove(ctx context.Context, filePaths []string) error// RemoveWithPrefix remove files with same @prefix.RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.Client//	ctx        context.ContextbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStorage//	ctx        context.ContextbucketName stringrootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2980230.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

ClickHouse用UDF解析XML字符串和XML文件

一.如果是读取xml文件的时候&#xff0c;文件入库需要使用文件读取UDF 创建了1个测试文件 wsdFileRead()&#xff1a; 直接读取文件内容 SELECT wsdFileRead(/home/temp/wsd_test.xml)Query id: 09b6e5fe-7169-43f7-b001-90e2eeabb8da┌─wsdFileRead(/home/temp/wsd_test.xm…

OpenHarmony实战开发-内存快照Snapshot Profiler功能使用指导。

DevEco Studio集成的DevEco Profiler性能调优工具&#xff08;以下简称为Profiler&#xff09;&#xff0c;提供Time、Allocation、Snapshot、CPU等场景化分析任务类型。内存快照&#xff08;Snapshot&#xff09;是一种用于分析应用程序内存使用情况的工具&#xff0c;通过记录…

鸟哥的Linux私房菜 总结索引 | 第二章:主机规划与磁盘分区

要安装好一部Linux主机并不是那么简单的事情&#xff0c;你必须要针对distributions的特性、服务器软件的能力、 未来的升级需求、硬件扩充性需求等等来考虑&#xff0c;还得要知道磁盘分区、文件系统、Linux操作较频繁的目录等等&#xff0c; 都得要有一定程度的了解才行 1、…

LlamaIndex 加 Ollama 实现 Agent

AI Agent 是 AIGC 落地实现的场景之一&#xff0c;与 RAG 不同&#xff0c;RAG 是对数据的扩充&#xff0c;是模型可以学习到新数据或者本地私有数据。AI Agent 是自己推理&#xff0c;自己做&#xff0c;例如你对 AI Agent 说我要知道今天上海的天气怎么样&#xff0c;由于 AI…

李沐56_门控循环单元——自学笔记

关注每一个序列 1.不是每个观察值都是同等重要 2.想只记住的观察需要&#xff1a;能关注的机制&#xff08;更新门 update gate&#xff09;、能遗忘的机制&#xff08;重置门 reset gate&#xff09; !pip install --upgrade d2l0.17.5 #d2l需要更新import torch from tor…

集群工具之HAProxy

集群工具之HAProxy HAProxy简介 它是一款实现负载均衡的调度器适用于负载特别大的web站点HAProxy的工作模式 mode http&#xff1a;只适用于web服务mode tcp&#xff1a;适用于各种服务mode health&#xff1a;仅做健康检查&#xff0c;很少使用 配置HAProxy client&#x…

Datawhale |【独家】万字长文带你梳理Llama开源家族:从Llama-1到Llama-3

本文来源公众号“Datawhale”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;【独家】万字长文带你梳理Llama开源家族&#xff1a;从Llama-1到Llama-3 0. 引言 在AI领域&#xff0c;大模型的发展正以前所未有的速度推进技术的边界…

4(第三章,数据治理)

目录 概述 业务驱动因素 目标和原则 1、可持续发展 2、嵌入式 3、可度量 基本概念 数据治理与数据管理的关系 数据治理组织 数据治理运营模型类型 数据管理岗位的类型 数据治理的成果体现 国内的数据治理 什么是数据治理 为什么进行数据治理 数据治理的必要性 …

Linux 操作系统的引导过程

Linux系统开机引导过程&#xff1a; 开机自检 检测硬件设备&#xff0c;找到能够引导系统的设备&#xff0c;比如硬盘MBR引导 运行MBR扇区里的主引导程序GRUB启动GRUB菜单 系统读取GRUB配置文件(/boot/grub2/grub.cfg)获取内核的设置和…

《内向者优势》:不要低估一个内向的人

#世界读书日 作者主页&#xff1a; &#x1f517;进朱者赤的博客 精选专栏&#xff1a;&#x1f517;经典算法 作者简介&#xff1a;阿里非典型程序员一枚 &#xff0c;记录在大厂的打怪升级之路。 一起学习Java、大数据、数据结构算法&#xff08;公众号同名&#xff09; ❤…

[RTOS 学习记录] 复杂工程项目的管理

[RTOS 学习记录] 复杂工程项目的管理 这篇文章是我阅读《嵌入式实时操作系统μCOS-II原理及应用》后的读书笔记&#xff0c;记录目的是为了个人后续回顾复习使用。 前置内容&#xff1a; 工程管理工具make及makefile 文章目录 1 批处理文件与makefile的综合使用1.1 批处理文件…

C语言学习/复习29--内存操作函数memcpy/memmove/memset/memcmp

一、内存操作函数 1.memcpy()函数 注意事项1&#xff1a;复制的数目以字节为单位 注意事项2&#xff1a;一定要保证有足够空间复制 模拟实现1 拷贝字符案例&#xff1a;由于拷贝时函数本事就以字节为单位拷贝所以该例子也可用于其他类型数据的拷贝。 模拟实现2 将自身的…

YOLOv8 关键点检测模型训练部署

文章目录 1、YOLOv8安装及使用1.2、命令行使用1.3、使用python-API模型预测1.4、pt转换ONNX 2、训练三角板关键点检测模型2.1、训练命令 3、ONNX Runtime部署 1、YOLOv8安装及使用 参考链接: 同济子豪兄视频 github原文链接 # 安装yolov8 pip install ultralytics --upgrade …

Linux-LVM与磁盘配额

一、LVM概述 Logical Volume Manager&#xff0c;逻辑卷管理 能够在保持现有数据不变的情况下动态调整磁盘容量&#xff0c;从而提高磁盘管理的灵活性 /boot分区用于存放引导文件&#xff0c;不能基于LVM创建 LVM机制的基本概念 PV&#xff08;物理卷&#xff09;&#xff…

情感识别——情感计算的模型和数据集调查

概述 情感计算指的是识别人类情感、情绪和感觉的工作&#xff0c;已经成为语言学、社会学、心理学、计算机科学和生理学等领域大量研究的主题。 本文将概述情感计算的重要性&#xff0c;涵盖思想、概念和方法。 情感计算是皮卡德于 1997 年提出的一个想法&#xff0c;此后出…

生产数据采集系统

在数字化浪潮的推动下&#xff0c;生产数据采集系统已经成为企业提升生产效率、优化运营管理的关键工具。那么&#xff0c;什么是生产数据采集系统呢&#xff1f;简单来说&#xff0c;生产数据采集系统是指通过一系列技术手段&#xff0c;实时收集、处理和分析生产线上的各类数…

STM32 I²C通信

一、IC总线通信 1.1 IC总线特点 IC&#xff08;Inter Integrated Circuit&#xff0c;集成电路总线&#xff09;&#xff0c;通过串行数据线SDA&#xff08;Serial Data&#xff09;和串行时钟线SCL&#xff08;Serial Clock&#xff09;来完成数据的传输。 特点&#xff1a;…

java泛型介绍

Java 泛型是 JDK 5 引入的一个特性&#xff0c;它允许我们在定义类、接口和方法时使用类型参数&#xff0c;从而使代码更加灵活和类型安全。泛型的主要目的是在编译期提供类型参数&#xff0c;让程序员能够在编译期间就捕获类型错误&#xff0c;而不是在运行时才发现。这样做提…

(ICML-2021)从自然语言监督中学习可迁移的视觉模型

从自然语言监督中学习可迁移的视觉模型 Title&#xff1a;Learning Transferable Visual Models From Natural Language Supervision paper是OpenAI发表在ICML 21的工作 paper链接 Abstract SOTA计算机视觉系统经过训练可以预测一组固定的预定目标类别。这种受限的监督形式限制…

[笔试训练](四)

010 Fibonacci数列_牛客题霸_牛客网 (nowcoder.com) 题目&#xff1a; 题解&#xff1a; 1.创建一个数组fib[]&#xff0c;保存范围内的所有斐波那契数&#xff0c;再求离N最近的斐波那契数。 2.创建3个数a,b,c,依次先后滚动&#xff0c;可得出所有的斐波那契数&#xff0c…