信号发生器的实现

本文受How to terminate running Python threads using signals 文章启发,但只保留了多线程相关的部分。
起因是最近想用Python 模拟信号发生器的功能,自然需要通过主线程控制子线程发送数据(开始,暂停,继续,停止等状态)。第一版是通过lock 锁加上各种标志属性来实现的,总感觉思路不太清晰。于是参考上面的链接,采用event 来重构代码,总算得到了一个自己较为满意的(可扩展的)版本。

V0.1 的设计思路

v0.0 的设计思路是通过系统定时器来向端口定时发送数据,但Windows 平台的实时性不够,详见关于采样频率的说明。于是在基本逻辑不变的情况下采用了人为设置Δt 的策略,也就是v0.1

IGenerator 接口

考虑我们的信号发生器,最重要的就是将信号及时准确地发送出去。所以我们需要一个发送信号的方法:

from time import sleep

class IGenerator(object):
    """
    ISender 接口  
    可以发送数据、被重置
    """
    def __init__(self) -> None:
        pass  

    def reset(self):  # 重置功能
        pass  

    def generate_data(self):  # 生成数据
        pass

    def send(self):   # 发送功能
        # sleep(0.1)  # 可以通过sleep 函数控制数据发送的频率
        pass

这里我们通过类来表示接口的定义,通过实现接口方法,我们可以获取不同的信号发生器子类。

通过Event() 控制线程

为了可以不间断地发送正确地数据,我们需要在一个独立的线程中调用IGenerator 对象的generate_data/send 方法,并且需要在主线程中控制子线程的开始/暂停/停止 等功能。这里我们通过threading.Event() 来实现:

from threading import Event, Thread  

class Executor(Thread):
    def __init__(self, generator: IGenerator):
        Thread.__init__(self)
        self.daemon = True  # 设置为守护线程,当主线程退出时自动结束
        self.generator = generator  # 接收一个IGenerator 对象,用于发送数据
        self.pause_flag = Event()   # 暂停线程
        self.stop_flag = Event()    # 结束线程

    def resume(self):  # 开始/继续发送
        self.pause_flag.set()

    def pause(self):   # 暂停发送  
        self.pause_flag.clear()

    def stop(self):    # 停止发送,结束线程
        self.stop_flag.set()

    def run(self):     # 线程体
        while True:
            self.pause_flag.wait()  
            if self.stop_flag.is_set():
                break
            self.generator.send()

关于上面代码关于threading.Event 有以下几点可以帮助理解:

  • threading.Event 对象作为一个特殊的标志,is_set()默认为False;
  • threading.Event 对象的wait 方法会在is_set() 标志位为True 的时候阻塞线程,直到其变为False;
  • threading.Event 对象的clear 方法可以将is_set() 设置为False;相反,set 方法可以将其设置为True

端口

信号发生器需要通过端口连接到设备,端口可以是RS232、USB,抑或是并行端口。为了便于扩展,我们就需要抽象一个端口的接口,约定他要实现的功能:

class IPort(object):
    """
    IPort 输出接口
    需要重写接口函数
    """

    def __init__(self) -> None:
        self._is_on = False
        pass

    def turn_on(self):
        # 模拟端口打开,需要避免重复打开
        if not self._is_on:
            self._is_on = True

    def turn_off(self):
        # 端口关闭
        self._is_on = False

    def wait_writable(self):  # 等待端口可用
        while True:
            sleep(0.5)
            break

    def write(self, data: any):
        print("[%s] Port send: %s" % (datetime.now(), data))

    ##################################################
    # send 方法可以不用重写  
    ##################################################
    def send(self, data: any):
        # 模拟端口发送数据的过程
        if self._is_on:  # 模拟检查端口状态,如果端口已关闭就不再发送
            self.wait_writable()
            self.write(data)

测试代码

我们新建一个简单的类来实现IGenerator 接口来验证我们的想法:

from datetime import datetime

class BaseGenerator(IGenerator):
    def __init__(self,deltaT=0.001) -> None:
        """
        deltaT: 采样周期,单位是s
        """
        super().__init__()
        self._is_on = False
        self.counter = 0
        self.deltaT = deltaT
        self.exe = Executor(self)

    def generate_data(self):
        self.counter += self.deltaT

    def reset(self):
        self.counter = 0

    def send(self):
        self.port.turn_on()   # 保证端口已经打开
        self.generate_data()  # 生成信号
        self.port.send(self.counter)  # 模拟发送数据
    

    ###########################################################    
    # 将Executor 嵌套进信号发生器,将会使我们的代码更整洁  
    # 而且只需要在基类中定义一次以下方法就好了
    ###########################################################
    def turn_on(self):  
        if not self._is_on:
            self.exe.start()  # 开启线程,此方法只能执行一次  
            self._is_on = True
    
    def resume(self):  
        self.exe.resume() 
    
    def pasue(self):
        self.exe.pause()
    
    def stop(self):  
        self.exe.stop()
        self.port.turn_off()
        self.exe = Executor(self)  # 为开始新一轮任务做准备
        self._is_on = False

# 开始功能验证  
if __name__ == "__main__":
    bg = BaseGenerator()

    bg.turn_on()  # 开启
    bg.resume()   # 开始输出
    sleep(3)

    bg.pasue()    # 暂停
    sleep(3)

    bg.resume()   # 继续输出
    sleep(1.5)

    bg.reset()    # 中间重置状态
    sleep(2.5)

    bg.stop()     # 停止
    sleep(3)

可以将上面的代码保存为app.py 并在命令行中执行:

$> python ./app.py  # 执行过程中Ctrl-C 中断进程
# 信号发生器打开
# 开始发送数据
[2022-07-28 21:14:40.003383] Port send: 0.001
[2022-07-28 21:14:40.521429] Port send: 0.002
[2022-07-28 21:14:41.050418] Port send: 0.003
[2022-07-28 21:14:41.579746] Port send: 0.004
[2022-07-28 21:14:42.107661] Port send: 0.005
[2022-07-28 21:14:42.637441] Port send: 0.006
Traceback (most recent call last):
  File "./app.py", line 113, in <module>
    sleep(3)
KeyboardInterrupt
$> 
$> 
$> python ./app.py  # 正常退出
# 发生器已打开
# 开始发送数据
[2022-07-28 21:14:54.380992] Port send: 0.001
[2022-07-28 21:14:54.909388] Port send: 0.002
[2022-07-28 21:14:55.439230] Port send: 0.003
[2022-07-28 21:14:55.956788] Port send: 0.004
[2022-07-28 21:14:56.487928] Port send: 0.005
[2022-07-28 21:14:57.018972] Port send: 0.006
[2022-07-28 21:15:00.378835] Port send: 0.007
[2022-07-28 21:15:00.910546] Port send: 0.008
# 发生器即将暂停,然后再次开始发送数据
[2022-07-28 21:15:31.822823] Port send: 0.007
[2022-07-28 21:15:32.340982] Port send: 0.008
[2022-07-28 21:15:32.860573] Port send: 0.0
# 中间人为重置信号
[2022-07-28 21:15:33.378901] Port send: 0.001
[2022-07-28 21:15:33.897071] Port send: 0.002
[2022-07-28 21:15:34.414886] Port send: 0.003
[2022-07-28 21:15:34.931465] Port send: 0.004
# 信号发生器停止并退出
$>

可见程序运行符合预期。并且可以通过继承BaseGenerator 重写generate_data/send 方法产生不同波形的数据向不同的设备发送数据。这也是这篇笔记最重要的想法。


📅2022-07-22 Aachen

关于采样频率

因为通信中最重要的就是数据的准确、及时,而Python 中的sleep 函数的精确度是不稳定的,所以我们就需要给产生的信号以时间信息,这里有三种方法:

  1. 采用C/C++ 扩展,但是这是一种出力不讨好的事情,因为很难保证Python 的其他代码的执行效率;
  2. 给发送数据带上时间戳,这样会占用一部分带宽;
  3. 我们按照某个步长产生数据,接收器按照同样的步长采集数据。

经过与做电子产品设计的同学讨论,方案三是生产中最常用的方法。如果对精度要求更高,可以考虑常采用实时的设备、给数据带上时间戳等方案。
关于Windows 平台下精确延时的方案可以参见这篇笔记VOFA+ 中同样也采用Δt 来人为地调整数据的采样周期。

V0.2 版设计思路

在实现v0.1 的过程中,逐渐意识到:信号发生器本身是一个空壳子,最核心的部分是其中包含的函数部分与通信端口。于是将Generator 设计为一个线程类,通过Generator 对象直接控制线程的运行,并且引入了生成函数的概念,使得信号的产生和输出更加灵活。

端口接口

通过引入抽象类的概念,我们现在可以理直气壮地称呼端口为接口了。其中仍然只包含端口打开、关闭、等待可用、发送数据的逻辑功能。具体功能需要在子类中实现。需要注意的是,send 函数可以接收一个list 对象,也就是不止一个信号生成函数的返回值,可以在写入端口时进一步筛选和加工:

import abc
from threading import Event, Thread
from typing import List, Union


class IPort(abc.ABC):
    """
    IPort 信号发生器的输出接口    
    内置了一个状态标志self._is_on = False
    """

    def __init__(self) -> None:
        super().__init__()
        self._is_on = False

    @abc.abstractmethod
    def turn_on(self):
        """
        打开端口,而且只能打开一次。
        最后需要修改self._is_on = True
        """
        if not self._is_on:
            self._is_on = True

    @abc.abstractmethod
    def turn_off(self):
        """
        关闭端口。
        需要修改self._is_on = False 
        """
        self._is_on = False

    @abc.abstractmethod
    def wait_port_available(self):  # 等待端口可用
        """
        等待端口可用
        """

    @abc.abstractmethod
    def send_data(self, data: List[float]):
        """
        向端口写入数据
        可以自定义写入数据的格式
        """

    def send(self, data: List[float]):
        """
        等待端口可用后写入数据
        """
        if self._is_on:
            self.wait_port_available()
            self.send_data(data)
        else:
            raise IOError("端口未打开或不可用")

信号生成函数

信号生成函数其实是一个对象,有自己的计数器属性,但是步长需要在Generator 对象中统一管理(也可以不这样设置,但是担心使用起来会比较混乱)。也可以在子类中添加其他自定义属性,用来生成更复杂的波形:

import abc
from threading import Event, Thread
from typing import List, Union

class IFunction(abc.ABC):
    """
    信号产生函数,包含以下属性:  
    - timer: 计数器  
    - value: 输出结果的缓存  
    - deltaT: 0.001s 函数产生的步长,在Generator 中自动设定
    """

    def __init__(self, deltaT=0.001):
        self.timer = 0.
        self.value = 0.
        self.setDeltaT(deltaT)

    def setDeltaT(self, deltaT=0.001):
        self.deltaT = deltaT

    @abc.abstractmethod
    def call(self):
        """
        进行一步计算,返回信号值
        """

    @abc.abstractmethod
    def reset(self):
        """
        重置函数:计数器、当前值等
        """

信号发生器

信号发生器的基本控制逻辑没有变,还是通过Event() 事件控制线程的运行。不过将所有的控制函数都绑定在线程对象本身了,并且理论上可以添加任意多个信号生成函数和端口:

import abc
from threading import Event, Thread
from typing import List, Union

class Generator(Thread):
    """
    Generator 信号发生器类:
    本质上是一个线程,其中包含一个run() 函数,用于不间断地产生信号。此线程为守护线程,会在主线程退出后自动结束。 
    """

    def __init__(self, deltaT: float, funcs: Union[IFunction, List[IFunction]], ports: Union[IPort, List[IPort]]):
        """ 
        参数说明:  
        - deltaT: 产生信号的步长,非负数;  
        - funcs: 用于产生信号的函数规则集合;  
        - ports: 用于发送信号的(物理)端口,数据会以list 的形式传递给port 端口。  


        私有属性:  
        - self.pause_flag = Event() 默认为False,会造成self.pause_flag.wait 方法阻塞  
        - self.stop_flag = Event() 默认为False,会造成self.stop_flag.wait 方法阻塞
        """
        Thread.__init__(self)
        self.daemon = True  # 设置为守护线程,当主线程退出时自动结束
        self.deltaT = deltaT if deltaT > 0. else 0.001
        self.funcs = funcs if isinstance(funcs, list) else [funcs]
        self.ports = ports if isinstance(ports, list) else [ports]

        # 私有属性
        self.pause_flag = Event()
        self.stop_flag = Event()

        self.setDeltaT()

    def setDeltaT(self):
        """
        更改信号产生的步长
        """
        for func in self.funcs:
            func.setDeltaT(self.deltaT)

    def turn_on(self):
        for port in self.ports:
            port.turn_on()
        self.start()

    def resume(self):
        """
        继续执行
        """
        self.pause_flag.set()

    def pause(self):
        """
        暂停执行
        """
        self.pause_flag.clear()

    def reset(self):
        """
        重置所有的函数  
        在信号发生器运行时执行此函数可能造成意料之外的后果
        """
        for func in self.funcs:
            func.reset()

    def stop(self):
        """
        停止执行并退出
        """
        self.stop_flag.set()
        for port in self.ports:
            port.turn_off()

    def run(self):
        while True:
            self.pause_flag.wait()  # 暂停线程
            if self.stop_flag.is_set():
                break  # 退出线程
            data = [func.call() for func in self.funcs]
            for port in self.ports:
                port.send(data)

测试代码

可以通过查看或下载Github 中的代码熟悉逻辑,并且其中也定义了几个基本的端口和信号生成函数。

from time import sleep
from SignalGenerator import Generator, DefaultFunction, DefaultPort, DIntTFunction, CIntTFunction


bg = Generator(0.1, [DefaultFunction(), DIntTFunction(),  ## 初始化信号发生器时可以设置多路输出
               CIntTFunction()], [DefaultPort(),DefaultPort()])
bg.turn_on()  # 开启
bg.resume()   # 开始输出
sleep(3)

bg.pause()    # 暂停
sleep(3)

bg.resume()   # 继续输出
sleep(5)

bg.reset()    # 中间重置状态
sleep(2.5)

bg.stop()     # 停止
sleep(3)

经测试,代码的运行符合预期。


📅2022-08-03 Aachen