全网最适合入门的面向对象编程教程:20 类和对象的 Python 实现-组合关系的实现与 CSV 文件保存
摘要:
本文主要介绍了在使用 Python 面向对象编程时,如何实现组合关系,同时对比了组合关系和继承关系的优缺点,并讲解了如何通过 csv 模块来保存 Python 接收/生成的数据。
原文链接:
FreakStudio的博客
往期推荐:
可能是全网最适合入门的面向对象编程教程:Python实现-嵌入式爱好者必看!
全网最适合入门的面向对象编程教程:00 面向对象设计方法导论
全网最适合入门的面向对象编程教程:01 面向对象编程的基本概念
全网最适合入门的面向对象编程教程:02 类和对象的Python实现-使用Python创建类
全网最适合入门的面向对象编程教程:03 类和对象的Python实现-为自定义类添加属性
全网最适合入门的面向对象编程教程:04 类和对象的 Python 实现-为自定义类添加方法
全网最适合入门的面向对象编程教程:05 类和对象的Python实现-PyCharm代码标签
全网最适合入门的面向对象编程教程:06 类和对象的 Python 实现-自定义类的数据封装
全网最适合入门的面向对象编程教程:07 类和对象的Python实现-类型注解
全网最适合入门的面向对象编程教程:08 类和对象的 Python 实现-@property 装饰器
全网最适合入门的面向对象编程教程:09 类和对象的Python实现-类之间的关系
全网最适合入门的面向对象编程教程:10 类和对象的Python实现-类的继承和里氏替换原则
全网最适合入门的面向对象编程教程:11 类和对象的Python实现-子类调用父类方法
全网最适合入门的面向对象编程教程:12 类和对象的 Python 实现-Python 使用 logging 模块输出程序运行日志
全网最适合入门的面向对象编程教程:13 类和对象的 Python 实现-可视化阅读代码神器 Sourcetrail 的安装使用
全网最适合入门的面向对象编程教程:14 类和对象的Python实现-类的静态方法和类方法
全网最适合入门的面向对象编程教程:15 类和对象的Python实现-__slots__魔法方法
全网最适合入门的面向对象编程教程:16 类和对象的Python实现-多态、方法重写与开闭原则
全网最适合入门的面向对象编程教程:17 类和对象的Python实现-鸭子类型与“file-like object“
全网最适合入门的面向对象编程教程:18 类和对象的Python实现-多重继承与PyQtGraph串口数据绘制曲线图
全网最适合入门的面向对象编程教程:19 类和对象的 Python 实现-使用 PyCharm 自动生成文件注释和函数注释
更多精彩内容可看:
CM3调试系统简析
肝了半个月,嵌入式技术栈大汇总出炉
比赛获奖的武林秘籍:07 一文速通电子设计大赛,电子人必看的获奖秘籍!
比赛获奖的武林秘籍:06 5 分钟速通比赛路演答辩,国奖选手的血泪经验!
比赛获奖的武林秘籍:05 电子计算机类比赛国奖队伍技术如何分工和学习内容
比赛获奖的武林秘籍:04 电子类比赛嵌入式开发快速必看的上手指南
比赛获奖的武林秘籍:03 好的创意选取-获得国奖的最必要前提
比赛获奖的武林秘籍:02 国奖秘籍-大学生电子计算机类竞赛快速上手的流程,小白必看
比赛获奖的武林秘籍:01 如何看待当代大学生竞赛中“卷”“祖传老项目”“找关系”的现象?
比赛获奖的武林秘籍:00 学科竞赛-工科类大学生绕不开的话题,你了解多少?
文档和代码获取:
可访问如下链接进行对文档下载:
https://github.com/leezisheng/Doc
本文档主要介绍如何使用 Python 进行面向对象编程,需要读者对 Python 语法和单片机开发具有基本了解。相比其他讲解 Python 面向对象编程的博客或书籍而言,本文档更加详细、侧重于嵌入式上位机应用,以上位机和下位机的常见串口数据收发、数据处理、动态图绘制等为应用实例,同时使用 Sourcetrail 代码软件对代码进行可视化阅读便于读者理解。
相关示例代码获取链接如下:https://github.com/leezisheng/Python-OOP-Demo
正文
前面讲了面向类与对象的继承,知道了继承是一种什么“是”什么的关系。然而类与类之间还有另一种关系,这就是组合。组合是将几个对象收集在一起生成一个新对象的行为。当一个对象是另外一个对象的一部分时,组合通常是不错的选择。
例如,汽车是由发动机、传动装置、启动装置、车前灯、挡风玻璃以及其他部件组成的,发动机又是由活塞、曲柄轴和阀门等组合而成的。**汽车是发动机等多个元器件的抽象,而发动机是活塞等元器件的抽象,二者处于不同的层次而又有彼此交互的接口,组合是提供不同抽象层的好办法。**汽车对象可以提供司机所需要的接口,同时也能够获取内在组成部分,从而为机械师提供适合操作的深层抽象。当然,如果机械师需要更多信息来诊断问题或调整发动机,这些组成部分也可以进一步被细分。
总的来说,组合就是让不同的类混合并且加入其他类中来增加功能和代码重用性,这种适用于由多个小类组成一个大类的情况,并且不需要对小类进行太多修改。在前面示例中,我们实现了主机的串口收发和绘图功能,在实际应用中,我们往往需要将传感器数据存储到文件中,以便后续的查看和处理,很明显前面的传感器数据为一维的时间序列数据,适合存储为表格类型(即列标题为索引和值),我们通常将该类数据保存为 csv 格式文件,csv 是一种字符串文件的格式,它组织数据的语法就是在字符串之间加分隔符(行与行之间是加换行符,同行字符之间是加逗号分隔),可以用任意的文本编辑器打开(如记事本),也可以用 Excel 打开,还可以通过 Excel 把文件另存为 csv 格式。**用 csv 格式存储数据,读写比较方便,易于实现,文件也会比 Excel 文件小。**但 csv 文件缺少 Excel 文件本身的很多功能,如不能嵌入图像和图表,不能生成公式等等。
操作 csv 文件我们需要借助 csv 模块,python 自带 csv 模块,不需要我们使用 pip 安装,我们可以点击如下链接查看 csv 模块使用方法:
https://docs.python.org/zh-cn/3.13/library/csv.html#csv.writer
这里,我们首先定义一个 FileIOClass 类,其中具有初始化方法、写入传感器数据到文件方法和关闭文件方法,示例代码如下:
import csv
_# 使用typing模块提供的复合注解功能_
from typing import Listclass FileIOClass:def __init__(self,path:str="G:\\Python面向对象编程\\Demo\\file.csv"):'''初始化csv文件和列标题:param path: 文件路径和文件名'''self.path = path_# path为输出路径和文件名,newline=''是为了不出现空行_self.csvFile = open(path, "w+", newline='')_# rowname为列名,index-索引,data-数据_self.rowname = ['index', 'data']_# 返回一个writer对象,将用户的数据在给定的文件型对象上转换为带分隔符的字符串_self.writer = csv.writer(self.csvFile)_# 写入csv文件的列标题_self.writer.writerow( self.rowname)def WriteFile(self,index:List[int],data:List[int])->None:''':param index: 传感器索引列表:param data: 传感器数据列表:return:'''writedatalist = []for i in range(len(data)):writedatalist.append([index[i],data[i]])_# 将列表中的每个元素将被写入CSV文件的一列中_self.writer.writerow(writedatalist[i])def CloseFile(self)->None:'''关闭文件:return: None'''self.csvFile.close()
这里,在初始化方法中,我们需要传入文件保存路径。之后创建一个 writer 对象,将用户的数据在给定的文件型对象上转换为带分隔符的字符串,同时写入 csv 文件的列标题。在 WriteFile 方法中传入数据的索引列表用于表示数据的先后顺序,之后是数据列表(这里的类型注解需要使用 typing 模块提供的复合注解功能),并循环将每个元素将被写入 CSV 文件的一列中,最后定义了文件的关闭方法。
在主函数中,我们创建 FileIOClass 对象,写入模拟传感器数据后关闭文件,以下为示例代码和运行效果:
if __name__ == '__main__':path = "G:\\Python面向对象编程\\Demo\\file.csv"data = [11,42,307,46,55,61,78,80,19,11]index = [count for count in range(len(data))]file = FileIOClass(path)file.WriteFile(index,data)file.CloseFile()
这里,我们可以直接在 MasterClass 类的初始化中创建 FileIOClass 类的实例化对象来实现组合。代码如下:
_# 文件保存路径_self.savepath = "G:\\Python面向对象编程\\Demo\\file.csv"_# 创建FileIOClass类的实例化对象_self.fileio = FileIOClass(self.savepath)
通过 sourcetrail,我们可以清晰看到类之间的组合与继承关系:
在主程序中,我们在主机接收 10 次数据后,将数据保存到 file.csv 中:
if __name__ == "__main__":
_ # 创建数据列表_datalist = []m = MasterClass(state = MasterClass.IDLE_STATE,port = "COM17",wintitle = "Basic plotting examples",plottitle = "Updating plot",width = 1000,height = 600)m.StartMaster()m.SendSensorCMD(MasterClass.SENDID_CMD)m.RecvSensorID()# 循环10次接收数据for i in range(10):m.SendSensorCMD(MasterClass.SENDVALUE_CMD)value = m.RecvSensorValue()datalist.append(value)indexlist = [count for count in range(len(datalist))]# 写入数据m.fileio.WriteFile(indexlist,datalist)m.fileio.CloseFile()
如下为运行效果:
目前,整个文件的完整代码如下,可以看到单单是这么一个简单程序就有了三百多行,对于代码查找修改来讲,非常不便。同时我们注意到,几个不同类之间似乎功能并不相同,不应该放到一个文件中。下一节我们将会说如何利用 Python 中的模块和包来组织我们的代码。
完整代码如下:
_# 串口相关库_
import serial
import serial.tools.list_ports
_# 队列相关_
import queue
import random
_# 日志输出相关库_
import logging
_# 曲线作图相关库_
import pyqtgraph as pg
import numpy as np
from pyqtgraph.Qt import QtCore
_# 文件读写相关库_
import csv
_# 使用typing模块提供的复合注解功能_
from typing import List
import time_# # 设置日志输出级别_
_# logging.basicConfig(level=logging.DEBUG)_
_# 在配置下日志输出目标文件和日志格式_
LOG_FORMAT="%(asctime)s-%(levelname)s-%(message)s"
logging.basicConfig(filename='my.log',level=logging.DEBUG,format=LOG_FORMAT)class SerialClass:_# 限定SerialClass对象只能绑定以下属性___slots__ = ('dev','_SerialClass__devstate')_# 初始化__# 使用默认参数_def __init__(self,devport:str = "COM17",devbaudrate:int = 115200,devbytesize:int = serial.EIGHTBITS,devparity :str = serial.PARITY_NONE,devstopbits:int = serial.STOPBITS_ONE):_# 直接传入serial.Serial()类_self.dev = serial.Serial()self.dev.port = devportself.dev.baudrate = devbaudrateself.dev.bytesize = devbytesizeself.dev.parity = devparityself.dev.stopbits = devstopbits_# 表示串口设备的状态-打开或者关闭__# 初始化时为关闭_self.__devstate = Falseprint("SerialClass init")logging.info("SerialClass init")_# 取值方法_@propertydef devstate(self):return self.__devstate_# 打开串口_def OpenSerial(self):print("SerialClass-OpenSerial")logging.info("SerialClass-OpenSerial")self.dev.open()self.__devstate = True_# 关闭串口_def CloseSerial(self):print("SerialClass-CloseSerial")logging.info("SerialClass-CloseSerial")self.dev.close()self.__devstate = False_# 串口读取_def ReadSerial(self):print("SerialClass-ReadSerial")logging.info("SerialClass-ReadSerial")if self.__devstate:_# 阻塞方式读取__# 按行读取_data = self.dev.readline()_# 收到为二进制数据__# 用utf-8编码将二进制数据解码为unicode字符串__# 字符串转为int类型_data = int(data.decode('utf-8', 'replace'))return data_# 串口写入_def WriteSerial(self,write_data):print("SerialClass-WriteSerial")logging.info("SerialClass-WriteSerial")if self.__devstate:_# 非阻塞方式写入_self.dev.write(write_data.encode())_# 输出换行符__# write的输入参数必须是bytes 格式__# 字符串数据需要encode()函数将其编码为二进制数据,然后才可以顺利发送__# \r\n表示换行回车_self.dev.write('\r\n'.encode())def RetSerialState(self):if self.dev.isOpen():self.__devstate = Truereturn Trueelse:self.__devstate = Falsereturn Falseclass PlotClass:_# 绘图类初始化_def __init__(self,wintitle:str="Basic plotting examples",plottitle:str="Updating plot",width:int=1000,height:int=600):'''用于初始化Plot类:param wintitle: 窗口标题:param plottitle: 图层标题:param width: 窗口宽度:param height: 窗口高度'''_# Qt应用实例对象_self.app = None_# 窗口对象_self.win = None_# 设置窗口标题_self.title = wintitle_# 设置窗口尺寸_self.width = widthself.height = height_# 传感器数据_self.value = 0_# 计数变量_self.__count = 0_# 传感器数据缓存列表_self.valuelist = []_# 绘图曲线_self.curve = None_# 图层对象_self.plotob = None_# 图层标题_self.plottitle = plottitle_# 定时器对象_self.timer = QtCore.QTimer()_# 定时时间_self.time = 0_# Qt应用和窗口初始化_self.appinit()print("PLOT INIT SUCCESS")logging.info("PLOT INIT SUCCESS")_# 应用程序初始化_def appinit(self):'''用于qt应用程序初始化,添加窗口、曲线和图层:return: None'''_# 创建一个Qt应用,并返回该应用的实例对象_self.app = pg.mkQApp("Plotting Example")_# 生成多面板图形__# show:(bool) 如果为 True,则在创建小部件后立即显示小部件。__# title:(str 或 None)如果指定,则为此小部件设置窗口标题。_self.win = pg.GraphicsLayoutWidget(show=True, title=self.title)_# 设置窗口尺寸_self.win.resize(self.width, self.height)_# 进行窗口全局设置,setConfigOptions一次性配置多项参数__# antialias启用抗锯齿,useNumba对图像进行加速_pg.setConfigOptions(antialias=True, useNumba=True)_# 添加图层_self.plotob = self.win.addPlot(title=self.plottitle)_# 添加曲线_self.curve = self.plotob.plot(pen='y')_# 接收数据_def GetValue(self,value):'''用于接收传感器数据,加入缓存列表:param value: 传感器数据:return: None'''self.value = value_# 加入数据缓存列表_self.valuelist.append(value)print("PLOT RECV DATA : "+str(self.value))logging.info("PLOT RECV DATA : "+str(self.value))_# 更新曲线数据_def DataUpdate(self):'''用于定时进行曲线更新,这里模拟绘制正弦曲线:return: None'''_# 模拟绘制正弦曲线__# 计数变量更新_self.__count = self.__count + 0.1self.value = np.sin(self.__count)self.GetValue(self.value)_# 将数据转化为图形_self.curve.setData(self.valuelist)_# 设置定时更新_def SetUpdate(self,time:int = 100):'''设置定时更新任务:param time: 定时的时间:return: None'''_# 定时器结束,触发DataUpdate方法_self.timer.timeout.connect(self.DataUpdate)_# 启动定时器_self.timer.start(time)_# 定时时间_self.time = timeprint("PLOT SET UPDATA")logging.info("PLOT SET UPDATA")_# 进入主事件循环并等待_pg.exec()class FileIOClass:def __init__(self,path:str="G:\\Python面向对象编程\\Demo\\file.csv"):'''初始化csv文件和列标题:param path: 文件路径和文件名'''self.path = path_# path为输出路径和文件名,newline=''是为了不出现空行_self.csvFile = open(path, "w+", newline='')_# rowname为列名,index-索引,data-数据_self.rowname = ['index', 'data']_# 返回一个writer对象,将用户的数据在给定的文件型对象上转换为带分隔符的字符串_self.writer = csv.writer(self.csvFile)_# 写入csv文件的列标题_self.writer.writerow(self.rowname)def WriteFile(self,index:List[int],data:List[int])->None:''':param index: 传感器索引列表:param data: 传感器数据列表:return:'''writedatalist = []for i in range(len(data)):writedatalist.append([index[i],data[i]])_# 将列表中的每个元素将被写入CSV文件的一列中_self.writer.writerow(writedatalist[i])def CloseFile(self)->None:'''关闭文件:return: None'''self.csvFile.close()class SensorClass(SerialClass):_# 类变量:__# RESPOND_MODE -响应模式-0__# LOOP_MODE -循环模式-1_RESPOND_MODE,LOOP_MODE = (0,1)_# 类变量:__# START_CMD - 开启命令 -0__# STOP_CMD - 关闭命令 -1__# SENDID_CMD - 发送ID命令 -2__# SENDVALUE_CMD - 发送数据命令 -3_START_CMD,STOP_CMD,SENDID_CMD,SENDVALUE_CMD = (0,1,2,3)_# 类的初始化_def __init__(self,port:str = "COM11",id:int = 0,state:int = RESPOND_MODE):_# 调用父类的初始化方法,super() 函数将父类和子类连接_super().__init__(port)self.sensorvalue = 0self.sensorid = idself.sensorstate = stateprint("Sensor Init")logging.info("Sensor Init")@staticmethod_# 判断传感器ID号是否正确:这里判断ID号是否在0到99之间_def IsTrueID(id:int = 0):if id >= 0 and id <= 99:print("Sensor ID True")return Trueelse:print("Sensor ID False")return False_# 传感器上电初始化_def InitSensor(self):_# 传感器上电初始化工作__# 同时输出ID号以及状态_print("Sensor %d Init complete : %d"%(self.sensorid,self.sensorstate))logging.info("Sensor %d Init complete : %d"%(self.sensorid,self.sensorstate))_# 开启传感器_def StartSensor(self):super().OpenSerial()print("Sensor %d start serial %s "%(self.sensorid,self.dev.port))logging.info("Sensor %d start serial %s "%(self.sensorid,self.dev.port))_# 停止传感器_def StopSensor(self):super().CloseSerial()print("Sensor %d close serial %s " % (self.sensorid, self.dev.port))logging.info("Sensor %d close serial %s " % (self.sensorid, self.dev.port))_# 发送传感器ID号_def SendSensorID(self):super().WriteSerial(str(self.sensorid))print("Sensor %d send id "%self.sensorid)logging.info("Sensor %d send id "%self.sensorid)_# 发送传感器数据_def SendSensorValue(self):_# 生成[1, 10]内的随机整数_data = random.randint(1, 10)super().WriteSerial(str(data))print("Sensor %d send data %d" % (self.sensorid,data))logging.info("Sensor %d send data %d" % (self.sensorid,data))_# 接收主机指令_def RecvMasterCMD(self):cmd = super().ReadSerial()print("Sensor %d recv cmd %d " % (self.sensorid,cmd))logging.info("Sensor %d recv cmd %d " % (self.sensorid,cmd))return cmdclass MasterClass(SerialClass,PlotClass):_# 类变量:__# BUSY_STATE -忙碌状态-0__# IDLE_STATE -空闲状态-1_BUSY_STATE, IDLE_STATE = (0, 1)_# 类变量:__# START_CMD - 开启命令 -0__# STOP_CMD - 关闭命令 -1__# SENDID_CMD - 发送ID命令 -2__# SENDVALUE_CMD - 发送数据命令 -3_START_CMD, STOP_CMD, SENDID_CMD, SENDVALUE_CMD = (0, 1, 2, 3)_# 类的初始化_def __init__(self,state:int = IDLE_STATE,port:str = "COM17",wintitle:str="Basic plotting examples",plottitle:str="Updating plot",width:int=1000,height:int=600):_# 分别调用不同父类的__init__方法_SerialClass.__init__(self,port)PlotClass.__init__(self,wintitle,plottitle,width,height)self.valuequeue = queue.Queue(10)self.__masterstatue = state_# 初始化完成的标志量_self.INIT_FLAG = False_# 文件保存路径_self.savepath = "G:\\Python面向对象编程\\Demo\\file.csv"_# 创建FileIOClass类的实例化对象_self.fileio = FileIOClass(self.savepath)print("MASTER INIT SUCCESSS")logging.info("MASTER INIT SUCCESSS")@classmethoddef MasterInfo(cls):print("Info : "+str(cls))_# 开启主机_def StartMaster(self):super().OpenSerial()print("START MASTER :"+self.dev.port)logging.info("START MASTER :"+self.dev.port)_# 停止主机_def StopMaster(self):super().CloseSerial()print("CLOSE MASTER :" + self.dev.port)logging.info("CLOSE MASTER :" + self.dev.port)_# 接收传感器ID号_def RecvSensorID(self):sensorid = super().ReadSerial()print("MASTER RECIEVE ID : " + str(sensorid))logging.info("MASTER RECIEVE ID : " + str(sensorid))return sensorid_# 接收传感器数据_def RecvSensorValue(self):data = super().ReadSerial()print("MASTER RECIEVE DATA : " + str(data))logging.info("MASTER RECIEVE DATA : " + str(data))self.valuequeue.put(data)return data_# 主机发送命令_def SendSensorCMD(self,cmd):super().WriteSerial(str(cmd))print("MASTER SEND CMD : " + str(cmd))logging.info("MASTER SEND CMD : " + str(cmd))_# 主机返回工作状态-_def RetMasterStatue(self):return self.__masterstatue_# 重写父类的DataUpdate方法_def DataUpdate(self):self.SendSensorCMD(self.SENDVALUE_CMD)self.value = self.RecvSensorValue()self.WriteSerial("Recv:"+str(self.value))self.GetValue(self.value)self.curve.setData(self.valuelist)print("PLOT UPDATA : " + str(self.value))logging.info("PLOT UPDATA : " + str(self.value))class DevClass(SerialClass):def __init__(self,port:str = "COM1"):super().__init__(port)_# 开启设备_def StartDev(self):super().OpenSerial()print("START Dev :" + self.dev.port)def ReadSerial(self,byte_size):if super().RetSerialState():data = self.dev.read(byte_size)data = int(data.decode('utf-8', 'replace'))return data_# 判断串口类对象的串口是否开启_
def IsSerialConnected(serialclass):return serialclass.RetSerialState()if __name__ == "__main__":_# 创建数据列表_datalist = []m = MasterClass(state = MasterClass.IDLE_STATE,port = "COM17",wintitle = "Basic plotting examples",plottitle = "Updating plot",width = 1000,height = 600)m.StartMaster()m.SendSensorCMD(MasterClass.SENDID_CMD)m.RecvSensorID()_# 循环10次接收数据_for i in range(10):m.SendSensorCMD(MasterClass.SENDVALUE_CMD)value = m.RecvSensorValue()datalist.append(value)indexlist = [count for count in range(len(datalist))]_# 写入数据_m.fileio.WriteFile(indexlist,datalist)m.fileio.CloseFile()