from __future__ import annotations import serial from typing import List, Dict, Never, Iterator, Optional from enum import StrEnum import warnings import time import random class GammaDevice: # Определение типов периферийных устройств class PeripheryTypes(StrEnum): ADC = "A" # Аналого-цифровой преобразователь DAC = "D" # Цифро-аналоговый преобразователь TEMP = "C" # Температура COUNTER = "T" # Счетчик PORTB = "B" # Порт B # Класс для хранения данных ЦАП class DACStorage: parent: GammaDevice __channels: Dict[int, float] # Частный атрибут для хранения каналов def __init__(self, parent: GammaDevice): self.parent = parent # Ссылка на родительский объект GammaDevice self.__channels = {1: 0, 2: 0, 3: 0, 4: 0} # Инициализация каналов ЦАП def __getitem__(self, index: int) -> float: return self.__channels[index] # Получение значения канала def __setitem__(self, index: int, value: float): if index in (1, 2): self.__channels[index] = (int(value * 100) % 330) / 100 # Обработка значения для каналов 1 и 2 self.parent._GammaDevice__send( f"*D{index}W{f"{self.__channels[index] :04.2f}".replace(".", "")}#") # Отправка команды на устройство else: self.__channels[index] = int(value) % 256 # Обработка значения для других каналов self.parent._GammaDevice__send( f"*D{index}W{self.channels[index] :03d}#") # Отправка команды на устройство def __iter__(self) -> Iterator[int]: return self.__channels.__iter__() # Итератор по каналам comport: Optional[serial.Serial] # Последовательный порт, может быть None message: str # Сообщение для отправки/получения __periphery: Dict[PeripheryTypes, Dict[int, int | float] | DACStorage] # Словарь для периферийных устройств update_pending: bool # Флаг ожидания обновления loop_started: bool # Флаг запуска цикла exposition_time: float # Время экспозиции для цикла обновления def __init__(self, portID: str): try: self.comport = serial.Serial(portID, baudrate=230400) # Инициализация последовательного порта except serial.SerialException as e: raise e # Обработка исключения при инициализации порта self.message = "" # Инициализация сообщения self.__periphery = { GammaDevice.PeripheryTypes.ADC: {1: 0}, # Инициализация АЦП GammaDevice.PeripheryTypes.DAC: GammaDevice.DACStorage(self), # Инициализация ЦАП GammaDevice.PeripheryTypes.TEMP: {1: 0}, # Инициализация температуры GammaDevice.PeripheryTypes.COUNTER: {2: 0}, # Инициализация счетчика GammaDevice.PeripheryTypes.PORTB: {1: 0} # Инициализация порта B } self.update_pending = False # Инициализация флага ожидания обновления self.loop_started = False # Инициализация флага запуска цикла self.error_occured = False # Инициализация флага ошибки self.exposition_time = 0.1 # Инициализация времени экспозиции @property def temperature(self) -> float: return self.__periphery[GammaDevice.PeripheryTypes.TEMP][1] # Получение значения температуры @temperature.setter def temperature(self, value: float) -> Never: raise IOError( "Temperature cannot be set from userspace") # Запрет установки температуры из пользовательского пространства @property def counts(self) -> int: return int(self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]) # Получение значения счетчика @counts.setter def counts(self, value: int): if value: warnings.warn("Setting counts to non-zero values from userspace is not recommended", category=RuntimeWarning) # Предупреждение при установке счетчика self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = value % 1000 self.__send(f"*T2W{self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]}#") # Отправка команды на устройство @property def portB(self) -> bool: return True if self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] else False # Получение значения порта B @portB.setter def portB(self, value: bool): self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] = 1 if value else 0 self.__send(f"*B1W{self.__periphery[GammaDevice.PeripheryTypes.PORTB][1]}00#") # Отправка команды на устройство @property def adc(self) -> float: return self.__periphery[GammaDevice.PeripheryTypes.ADC][1] # Получение значения АЦП @adc.setter def adc(self, value: float) -> Never: raise IOError( "Temperature cannot be set from userspace") # Запрет установки АЦП из пользовательского пространства @property def dac(self) -> GammaDevice.DACStorage: result = self.__periphery[GammaDevice.PeripheryTypes.DAC] assert isinstance(result, GammaDevice.DACStorage) return result # Получение объекта DACStorage @dac.setter def dac(self, value: GammaDevice.DACStorage) -> Never: raise IOError( "DACStorage cannot be recreated from userspace") # Запрет пересоздания DACStorage из пользовательского пространства def update(self, updatedac: bool = False): if updatedac: self.__send("*D1R000#*D2R000#*D3R000#*D4R000#") # Отправка команд для обновления ЦАП else: self.__send("*C1R000#*T2R000#*B1R000#") # Отправка команд для обновления других устройств if self.comport is None: self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000, 2000) # Установка случайного значения счетчика, если последовательный порт не инициализирован def __send(self, message: str): if self.comport: self.comport.write( (message + "0" * (33 - len(message))).encode("ascii")) # Отправка сообщения на устройство def __update_values(self, log=False) -> int: commands_parsed = 0 if self.comport: self.message += self.comport.read_until(b"#").decode("utf8") # Чтение сообщения до символа '#' while "#" in self.message: args = self.message.split("#") command, self.message = args[0], "#".join(args[1:]) if "*" in command: command = command.replace("*", "") mode, deviceID, payload = command[0], command[1], command[2:] if int(deviceID) not in self.__periphery[GammaDevice.PeripheryTypes(mode)]: continue match GammaDevice.PeripheryTypes(mode): case GammaDevice.PeripheryTypes.TEMP: self.__periphery[GammaDevice.PeripheryTypes.TEMP][int(deviceID)] = int( payload) / 10 # Обновление значения температуры case GammaDevice.PeripheryTypes.COUNTER: self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = int( payload) # Обновление значения счетчика case GammaDevice.PeripheryTypes.PORTB: self.__periphery[GammaDevice.PeripheryTypes.PORTB][int(deviceID)] = int( payload[0]) # Обновление значения порта B case GammaDevice.PeripheryTypes.DAC: if int(deviceID) in (1, 2): value = int(payload[0:3]) / 100 # Обновление значения каналов 1 и 2 ЦАП else: value = int(payload[0:3]) # Обновление значения других каналов ЦАП self.__periphery[GammaDevice.PeripheryTypes.DAC]._DACStorage__channels[ int(deviceID)] = value # Обновление значения в __channels case GammaDevice.PeripheryTypes.ADC: self.__periphery[GammaDevice.PeripheryTypes.ADC][int(deviceID)] = int( payload[0:3]) / 100 # Обновление значения АЦП self.update_pending = True # Установка флага ожидания обновления if log: print(f"Parsed: {command}") commands_parsed += 1 else: self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000, 7000) # Установка случайного значения счетчика, если последовательный порт не инициализирован self.update_pending = True # Установка флага ожидания обновления return commands_parsed def read_loop(self) -> Never: while True: try: self.__update_values() except serial.SerialException as e: self.error_occured = True # Установка флага ошибки except Exception as e: print(e) time.sleep(self.exposition_time / 10) # Небольшая пауза перед следующей итерацией def update_loop(self) -> Never: self.loop_started = True # Установка флага запуска цикла while True: if self.loop_started and not self.error_occured: try: self.update() except serial.SerialException as e: self.error_occured = True # Установка флага ошибки except Exception as e: print(e) time.sleep(self.exposition_time) # Пауза перед следующей итерацией