|
@@ -0,0 +1,203 @@
|
|
|
|
+from __future__ import annotations
|
|
|
|
+
|
|
|
|
+import serial
|
|
|
|
+from typing import List, Dict, Never, Iterator, Optional
|
|
|
|
+from enum import StrEnum
|
|
|
|
+import warnings
|
|
|
|
+import time
|
|
|
|
+import random
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class GammaDevice:
|
|
|
|
+ # Определение типов периферийных устройств
|
|
|
|
+ class PeripheryTypes(StrEnum):
|
|
|
|
+ ADC = "A" # Аналого-цифровой преобразователь
|
|
|
|
+ DAC = "D" # Цифро-аналоговый преобразователь
|
|
|
|
+ TEMP = "C" # Температура
|
|
|
|
+ COUNTER = "T" # Счетчик
|
|
|
|
+ PORTB = "B" # Порт B
|
|
|
|
+
|
|
|
|
+ # Класс для хранения данных ЦАП
|
|
|
|
+ class DACStorage:
|
|
|
|
+ parent: GammaDevice
|
|
|
|
+ __channels: Dict[int, float] # Частный атрибут для хранения каналов
|
|
|
|
+
|
|
|
|
+ def __init__(self, parent: GammaDevice):
|
|
|
|
+ self.parent = parent # Ссылка на родительский объект GammaDevice
|
|
|
|
+ self.__channels = {1: 0, 2: 0, 3: 0, 4: 0} # Инициализация каналов ЦАП
|
|
|
|
+
|
|
|
|
+ def __getitem__(self, index: int) -> float:
|
|
|
|
+ return self.__channels[index] # Получение значения канала
|
|
|
|
+
|
|
|
|
+ def __setitem__(self, index: int, value: float):
|
|
|
|
+ if index in (1, 2):
|
|
|
|
+ self.__channels[index] = (int(value * 100) % 330) / 100 # Обработка значения для каналов 1 и 2
|
|
|
|
+ self.parent._GammaDevice__send(
|
|
|
|
+ f"*D{index}W{f"{self.__channels[index] :04.2f}".replace(".", "")}#") # Отправка команды на устройство
|
|
|
|
+ else:
|
|
|
|
+ self.__channels[index] = int(value) % 256 # Обработка значения для других каналов
|
|
|
|
+ self.parent._GammaDevice__send(
|
|
|
|
+ f"*D{index}W{self.channels[index] :03d}#") # Отправка команды на устройство
|
|
|
|
+
|
|
|
|
+ def __iter__(self) -> Iterator[int]:
|
|
|
|
+ return self.__channels.__iter__() # Итератор по каналам
|
|
|
|
+
|
|
|
|
+ comport: Optional[serial.Serial] # Последовательный порт, может быть None
|
|
|
|
+ message: str # Сообщение для отправки/получения
|
|
|
|
+
|
|
|
|
+ __periphery: Dict[PeripheryTypes, Dict[int, int | float] | DACStorage] # Словарь для периферийных устройств
|
|
|
|
+ update_pending: bool # Флаг ожидания обновления
|
|
|
|
+ loop_started: bool # Флаг запуска цикла
|
|
|
|
+ exposition_time: float # Время экспозиции для цикла обновления
|
|
|
|
+
|
|
|
|
+ def __init__(self, portID: str):
|
|
|
|
+ try:
|
|
|
|
+ self.comport = serial.Serial(portID, baudrate=230400) # Инициализация последовательного порта
|
|
|
|
+ except serial.SerialException as e:
|
|
|
|
+ raise e # Обработка исключения при инициализации порта
|
|
|
|
+
|
|
|
|
+ self.message = "" # Инициализация сообщения
|
|
|
|
+ self.__periphery = {
|
|
|
|
+ GammaDevice.PeripheryTypes.ADC: {1: 0}, # Инициализация АЦП
|
|
|
|
+ GammaDevice.PeripheryTypes.DAC: GammaDevice.DACStorage(self), # Инициализация ЦАП
|
|
|
|
+ GammaDevice.PeripheryTypes.TEMP: {1: 0}, # Инициализация температуры
|
|
|
|
+ GammaDevice.PeripheryTypes.COUNTER: {2: 0}, # Инициализация счетчика
|
|
|
|
+ GammaDevice.PeripheryTypes.PORTB: {1: 0} # Инициализация порта B
|
|
|
|
+ }
|
|
|
|
+ self.update_pending = False # Инициализация флага ожидания обновления
|
|
|
|
+ self.loop_started = False # Инициализация флага запуска цикла
|
|
|
|
+ self.error_occured = False # Инициализация флага ошибки
|
|
|
|
+ self.exposition_time = 0.1 # Инициализация времени экспозиции
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def temperature(self) -> float:
|
|
|
|
+ return self.__periphery[GammaDevice.PeripheryTypes.TEMP][1] # Получение значения температуры
|
|
|
|
+
|
|
|
|
+ @temperature.setter
|
|
|
|
+ def temperature(self, value: float) -> Never:
|
|
|
|
+ raise IOError(
|
|
|
|
+ "Temperature cannot be set from userspace") # Запрет установки температуры из пользовательского пространства
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def counts(self) -> int:
|
|
|
|
+ return int(self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]) # Получение значения счетчика
|
|
|
|
+
|
|
|
|
+ @counts.setter
|
|
|
|
+ def counts(self, value: int):
|
|
|
|
+ if value:
|
|
|
|
+ warnings.warn("Setting counts to non-zero values from userspace is not recommended",
|
|
|
|
+ category=RuntimeWarning) # Предупреждение при установке счетчика
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = value % 1000
|
|
|
|
+ self.__send(f"*T2W{self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]}#") # Отправка команды на устройство
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def portB(self) -> bool:
|
|
|
|
+ return True if self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] else False # Получение значения порта B
|
|
|
|
+
|
|
|
|
+ @portB.setter
|
|
|
|
+ def portB(self, value: bool):
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] = 1 if value else 0
|
|
|
|
+ self.__send(f"*B1W{self.__periphery[GammaDevice.PeripheryTypes.PORTB][1]}00#") # Отправка команды на устройство
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def adc(self) -> float:
|
|
|
|
+ return self.__periphery[GammaDevice.PeripheryTypes.ADC][1] # Получение значения АЦП
|
|
|
|
+
|
|
|
|
+ @adc.setter
|
|
|
|
+ def adc(self, value: float) -> Never:
|
|
|
|
+ raise IOError(
|
|
|
|
+ "Temperature cannot be set from userspace") # Запрет установки АЦП из пользовательского пространства
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def dac(self) -> GammaDevice.DACStorage:
|
|
|
|
+ result = self.__periphery[GammaDevice.PeripheryTypes.DAC]
|
|
|
|
+ assert isinstance(result, GammaDevice.DACStorage)
|
|
|
|
+ return result # Получение объекта DACStorage
|
|
|
|
+
|
|
|
|
+ @dac.setter
|
|
|
|
+ def dac(self, value: GammaDevice.DACStorage) -> Never:
|
|
|
|
+ raise IOError(
|
|
|
|
+ "DACStorage cannot be recreated from userspace") # Запрет пересоздания DACStorage из пользовательского пространства
|
|
|
|
+
|
|
|
|
+ def update(self, updatedac: bool = False):
|
|
|
|
+ if updatedac:
|
|
|
|
+ self.__send("*D1R000#*D2R000#*D3R000#*D4R000#") # Отправка команд для обновления ЦАП
|
|
|
|
+ else:
|
|
|
|
+ self.__send("*C1R000#*T2R000#*B1R000#") # Отправка команд для обновления других устройств
|
|
|
|
+
|
|
|
|
+ if self.comport is None:
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000,
|
|
|
|
+ 2000) # Установка случайного значения счетчика, если последовательный порт не инициализирован
|
|
|
|
+
|
|
|
|
+ def __send(self, message: str):
|
|
|
|
+ if self.comport:
|
|
|
|
+ self.comport.write(
|
|
|
|
+ (message + "0" * (33 - len(message))).encode("ascii")) # Отправка сообщения на устройство
|
|
|
|
+
|
|
|
|
+ def __update_values(self, log=False) -> int:
|
|
|
|
+ commands_parsed = 0
|
|
|
|
+
|
|
|
|
+ if self.comport:
|
|
|
|
+ self.message += self.comport.read_until(b"#").decode("utf8") # Чтение сообщения до символа '#'
|
|
|
|
+
|
|
|
|
+ while "#" in self.message:
|
|
|
|
+ args = self.message.split("#")
|
|
|
|
+ command, self.message = args[0], "#".join(args[1:])
|
|
|
|
+ if "*" in command:
|
|
|
|
+ command = command.replace("*", "")
|
|
|
|
+ mode, deviceID, payload = command[0], command[1], command[2:]
|
|
|
|
+ if int(deviceID) not in self.__periphery[GammaDevice.PeripheryTypes(mode)]:
|
|
|
|
+ continue
|
|
|
|
+ match GammaDevice.PeripheryTypes(mode):
|
|
|
|
+ case GammaDevice.PeripheryTypes.TEMP:
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.TEMP][int(deviceID)] = int(
|
|
|
|
+ payload) / 10 # Обновление значения температуры
|
|
|
|
+ case GammaDevice.PeripheryTypes.COUNTER:
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = int(
|
|
|
|
+ payload) # Обновление значения счетчика
|
|
|
|
+ case GammaDevice.PeripheryTypes.PORTB:
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.PORTB][int(deviceID)] = int(
|
|
|
|
+ payload[0]) # Обновление значения порта B
|
|
|
|
+ case GammaDevice.PeripheryTypes.DAC:
|
|
|
|
+ if int(deviceID) in (1, 2):
|
|
|
|
+ value = int(payload[0:3]) / 100 # Обновление значения каналов 1 и 2 ЦАП
|
|
|
|
+ else:
|
|
|
|
+ value = int(payload[0:3]) # Обновление значения других каналов ЦАП
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.DAC]._DACStorage__channels[
|
|
|
|
+ int(deviceID)] = value # Обновление значения в __channels
|
|
|
|
+ case GammaDevice.PeripheryTypes.ADC:
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.ADC][int(deviceID)] = int(
|
|
|
|
+ payload[0:3]) / 100 # Обновление значения АЦП
|
|
|
|
+ self.update_pending = True # Установка флага ожидания обновления
|
|
|
|
+ if log:
|
|
|
|
+ print(f"Parsed: {command}")
|
|
|
|
+ commands_parsed += 1
|
|
|
|
+ else:
|
|
|
|
+ self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000,
|
|
|
|
+ 7000) # Установка случайного значения счетчика, если последовательный порт не инициализирован
|
|
|
|
+ self.update_pending = True # Установка флага ожидания обновления
|
|
|
|
+
|
|
|
|
+ return commands_parsed
|
|
|
|
+
|
|
|
|
+ def read_loop(self) -> Never:
|
|
|
|
+ while True:
|
|
|
|
+ try:
|
|
|
|
+ self.__update_values()
|
|
|
|
+ except serial.SerialException as e:
|
|
|
|
+ self.error_occured = True # Установка флага ошибки
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(e)
|
|
|
|
+ time.sleep(self.exposition_time / 10) # Небольшая пауза перед следующей итерацией
|
|
|
|
+
|
|
|
|
+ def update_loop(self) -> Never:
|
|
|
|
+ self.loop_started = True # Установка флага запуска цикла
|
|
|
|
+ while True:
|
|
|
|
+ if self.loop_started and not self.error_occured:
|
|
|
|
+ try:
|
|
|
|
+ self.update()
|
|
|
|
+ except serial.SerialException as e:
|
|
|
|
+ self.error_occured = True # Установка флага ошибки
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(e)
|
|
|
|
+
|
|
|
|
+ time.sleep(self.exposition_time) # Пауза перед следующей итерацией
|