wrapper.py 11 KB


  1. from __future__ import annotations
  2. import serial
  3. from typing import List, Dict, Never, Iterator, Optional
  4. from enum import StrEnum
  5. import warnings
  6. import time
  7. import random
  8. class GammaDevice:
  9. # Определение типов периферийных устройств
  10. class PeripheryTypes(StrEnum):
  11. ADC = "A" # Аналого-цифровой преобразователь
  12. DAC = "D" # Цифро-аналоговый преобразователь
  13. TEMP = "C" # Температура
  14. COUNTER = "T" # Счетчик
  15. PORTB = "B" # Порт B
  16. # Класс для хранения данных ЦАП
  17. class DACStorage:
  18. parent: GammaDevice
  19. __channels: Dict[int, float] # Частный атрибут для хранения каналов
  20. def __init__(self, parent: GammaDevice):
  21. self.parent = parent # Ссылка на родительский объект GammaDevice
  22. self.__channels = {1: 0, 2: 0, 3: 0, 4: 0} # Инициализация каналов ЦАП
  23. def __getitem__(self, index: int) -> float:
  24. return self.__channels[index] # Получение значения канала
  25. def __setitem__(self, index: int, value: float):
  26. if index in (1, 2):
  27. self.__channels[index] = (int(value * 100) % 330) / 100 # Обработка значения для каналов 1 и 2
  28. self.parent._GammaDevice__send(
  29. f"*D{index}W{f"{self.__channels[index] :04.2f}".replace(".", "")}#") # Отправка команды на устройство
  30. else:
  31. self.__channels[index] = int(value) % 256 # Обработка значения для других каналов
  32. self.parent._GammaDevice__send(
  33. f"*D{index}W{self.channels[index] :03d}#") # Отправка команды на устройство
  34. def __iter__(self) -> Iterator[int]:
  35. return self.__channels.__iter__() # Итератор по каналам
  36. comport: Optional[serial.Serial] # Последовательный порт, может быть None
  37. message: str # Сообщение для отправки/получения
  38. __periphery: Dict[PeripheryTypes, Dict[int, int | float] | DACStorage] # Словарь для периферийных устройств
  39. update_pending: bool # Флаг ожидания обновления
  40. loop_started: bool # Флаг запуска цикла
  41. exposition_time: float # Время экспозиции для цикла обновления
  42. def __init__(self, portID: str):
  43. try:
  44. self.comport = serial.Serial(portID, baudrate=230400) # Инициализация последовательного порта
  45. except serial.SerialException as e:
  46. raise e # Обработка исключения при инициализации порта
  47. self.message = "" # Инициализация сообщения
  48. self.__periphery = {
  49. GammaDevice.PeripheryTypes.ADC: {1: 0}, # Инициализация АЦП
  50. GammaDevice.PeripheryTypes.DAC: GammaDevice.DACStorage(self), # Инициализация ЦАП
  51. GammaDevice.PeripheryTypes.TEMP: {1: 0}, # Инициализация температуры
  52. GammaDevice.PeripheryTypes.COUNTER: {2: 0}, # Инициализация счетчика
  53. GammaDevice.PeripheryTypes.PORTB: {1: 0} # Инициализация порта B
  54. }
  55. self.update_pending = False # Инициализация флага ожидания обновления
  56. self.loop_started = False # Инициализация флага запуска цикла
  57. self.error_occured = False # Инициализация флага ошибки
  58. self.exposition_time = 0.1 # Инициализация времени экспозиции
  59. @property
  60. def temperature(self) -> float:
  61. return self.__periphery[GammaDevice.PeripheryTypes.TEMP][1] # Получение значения температуры
  62. @temperature.setter
  63. def temperature(self, value: float) -> Never:
  64. raise IOError(
  65. "Temperature cannot be set from userspace") # Запрет установки температуры из пользовательского пространства
  66. @property
  67. def counts(self) -> int:
  68. return int(self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]) # Получение значения счетчика
  69. @counts.setter
  70. def counts(self, value: int):
  71. if value:
  72. warnings.warn("Setting counts to non-zero values from userspace is not recommended",
  73. category=RuntimeWarning) # Предупреждение при установке счетчика
  74. self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = value % 1000
  75. self.__send(f"*T2W{self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]}#") # Отправка команды на устройство
  76. @property
  77. def portB(self) -> bool:
  78. return True if self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] else False # Получение значения порта B
  79. @portB.setter
  80. def portB(self, value: bool):
  81. self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] = 1 if value else 0
  82. self.__send(f"*B1W{self.__periphery[GammaDevice.PeripheryTypes.PORTB][1]}00#") # Отправка команды на устройство
  83. @property
  84. def adc(self) -> float:
  85. return self.__periphery[GammaDevice.PeripheryTypes.ADC][1] # Получение значения АЦП
  86. @adc.setter
  87. def adc(self, value: float) -> Never:
  88. raise IOError(
  89. "Temperature cannot be set from userspace") # Запрет установки АЦП из пользовательского пространства
  90. @property
  91. def dac(self) -> GammaDevice.DACStorage:
  92. result = self.__periphery[GammaDevice.PeripheryTypes.DAC]
  93. assert isinstance(result, GammaDevice.DACStorage)
  94. return result # Получение объекта DACStorage
  95. @dac.setter
  96. def dac(self, value: GammaDevice.DACStorage) -> Never:
  97. raise IOError(
  98. "DACStorage cannot be recreated from userspace") # Запрет пересоздания DACStorage из пользовательского пространства
  99. def update(self, updatedac: bool = False):
  100. if updatedac:
  101. self.__send("*D1R000#*D2R000#*D3R000#*D4R000#") # Отправка команд для обновления ЦАП
  102. else:
  103. self.__send("*C1R000#*T2R000#*B1R000#") # Отправка команд для обновления других устройств
  104. if self.comport is None:
  105. self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000,
  106. 2000) # Установка случайного значения счетчика, если последовательный порт не инициализирован
  107. def __send(self, message: str):
  108. if self.comport:
  109. self.comport.write(
  110. (message + "0" * (33 - len(message))).encode("ascii")) # Отправка сообщения на устройство
  111. def __update_values(self, log=False) -> int:
  112. commands_parsed = 0
  113. if self.comport:
  114. self.message += self.comport.read_until(b"#").decode("utf8") # Чтение сообщения до символа '#'
  115. while "#" in self.message:
  116. args = self.message.split("#")
  117. command, self.message = args[0], "#".join(args[1:])
  118. if "*" in command:
  119. command = command.replace("*", "")
  120. mode, deviceID, payload = command[0], command[1], command[2:]
  121. if int(deviceID) not in self.__periphery[GammaDevice.PeripheryTypes(mode)]:
  122. continue
  123. match GammaDevice.PeripheryTypes(mode):
  124. case GammaDevice.PeripheryTypes.TEMP:
  125. self.__periphery[GammaDevice.PeripheryTypes.TEMP][int(deviceID)] = int(
  126. payload) / 10 # Обновление значения температуры
  127. case GammaDevice.PeripheryTypes.COUNTER:
  128. self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = int(
  129. payload) # Обновление значения счетчика
  130. case GammaDevice.PeripheryTypes.PORTB:
  131. self.__periphery[GammaDevice.PeripheryTypes.PORTB][int(deviceID)] = int(
  132. payload[0]) # Обновление значения порта B
  133. case GammaDevice.PeripheryTypes.DAC:
  134. if int(deviceID) in (1, 2):
  135. value = int(payload[0:3]) / 100 # Обновление значения каналов 1 и 2 ЦАП
  136. else:
  137. value = int(payload[0:3]) # Обновление значения других каналов ЦАП
  138. self.__periphery[GammaDevice.PeripheryTypes.DAC]._DACStorage__channels[
  139. int(deviceID)] = value # Обновление значения в __channels
  140. case GammaDevice.PeripheryTypes.ADC:
  141. self.__periphery[GammaDevice.PeripheryTypes.ADC][int(deviceID)] = int(
  142. payload[0:3]) / 100 # Обновление значения АЦП
  143. self.update_pending = True # Установка флага ожидания обновления
  144. if log:
  145. print(f"Parsed: {command}")
  146. commands_parsed += 1
  147. else:
  148. self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000,
  149. 7000) # Установка случайного значения счетчика, если последовательный порт не инициализирован
  150. self.update_pending = True # Установка флага ожидания обновления
  151. return commands_parsed
  152. def read_loop(self) -> Never:
  153. while True:
  154. try:
  155. self.__update_values()
  156. except serial.SerialException as e:
  157. self.error_occured = True # Установка флага ошибки
  158. except Exception as e:
  159. print(e)
  160. time.sleep(self.exposition_time / 10) # Небольшая пауза перед следующей итерацией
  161. def update_loop(self) -> Never:
  162. self.loop_started = True # Установка флага запуска цикла
  163. while True:
  164. if self.loop_started and not self.error_occured:
  165. try:
  166. self.update()
  167. except serial.SerialException as e:
  168. self.error_occured = True # Установка флага ошибки
  169. except Exception as e:
  170. print(e)
  171. time.sleep(self.exposition_time) # Пауза перед следующей итерацией