123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- from __future__ import annotations
- import serial
- from typing import List, Dict, Never, Iterator, Optional
- from enum import StrEnum
- import warnings
- import time
- import random
- class GammaDevice:
- # Определение типов периферийных устройств
- class PeripheryTypes(StrEnum):
- ADC = "A" # Аналого-цифровой преобразователь
- DAC = "D" # Цифро-аналоговый преобразователь
- TEMP = "C" # Температура
- COUNTER = "T" # Счетчик
- PORTB = "B" # Порт B
- # Класс для хранения данных ЦАП
- class DACStorage:
- parent: GammaDevice
- __channels: Dict[int, float] # Частный атрибут для хранения каналов
- def __init__(self, parent: GammaDevice):
- self.parent = parent # Ссылка на родительский объект GammaDevice
- self.__channels = {1: 0, 2: 0, 3: 0, 4: 0} # Инициализация каналов ЦАП
- def __getitem__(self, index: int) -> float:
- return self.__channels[index] # Получение значения канала
- def __setitem__(self, index: int, value: float):
- if index in (1, 2):
- self.__channels[index] = (int(value * 100) % 330) / 100 # Обработка значения для каналов 1 и 2
- self.parent._GammaDevice__send(
- f"*D{index}W{f"{self.__channels[index] :04.2f}".replace(".", "")}#") # Отправка команды на устройство
- else:
- self.__channels[index] = int(value) % 256 # Обработка значения для других каналов
- self.parent._GammaDevice__send(
- f"*D{index}W{self.channels[index] :03d}#") # Отправка команды на устройство
- def __iter__(self) -> Iterator[int]:
- return self.__channels.__iter__() # Итератор по каналам
- comport: Optional[serial.Serial] # Последовательный порт, может быть None
- message: str # Сообщение для отправки/получения
- __periphery: Dict[PeripheryTypes, Dict[int, int | float] | DACStorage] # Словарь для периферийных устройств
- update_pending: bool # Флаг ожидания обновления
- loop_started: bool # Флаг запуска цикла
- exposition_time: float # Время экспозиции для цикла обновления
- def __init__(self, portID: str):
- try:
- self.comport = serial.Serial(portID, baudrate=230400) # Инициализация последовательного порта
- except serial.SerialException as e:
- raise e # Обработка исключения при инициализации порта
- self.message = "" # Инициализация сообщения
- self.__periphery = {
- GammaDevice.PeripheryTypes.ADC: {1: 0}, # Инициализация АЦП
- GammaDevice.PeripheryTypes.DAC: GammaDevice.DACStorage(self), # Инициализация ЦАП
- GammaDevice.PeripheryTypes.TEMP: {1: 0}, # Инициализация температуры
- GammaDevice.PeripheryTypes.COUNTER: {2: 0}, # Инициализация счетчика
- GammaDevice.PeripheryTypes.PORTB: {1: 0} # Инициализация порта B
- }
- self.update_pending = False # Инициализация флага ожидания обновления
- self.loop_started = False # Инициализация флага запуска цикла
- self.error_occured = False # Инициализация флага ошибки
- self.exposition_time = 0.1 # Инициализация времени экспозиции
- @property
- def temperature(self) -> float:
- return self.__periphery[GammaDevice.PeripheryTypes.TEMP][1] # Получение значения температуры
- @temperature.setter
- def temperature(self, value: float) -> Never:
- raise IOError(
- "Temperature cannot be set from userspace") # Запрет установки температуры из пользовательского пространства
- @property
- def counts(self) -> int:
- return int(self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]) # Получение значения счетчика
- @counts.setter
- def counts(self, value: int):
- if value:
- warnings.warn("Setting counts to non-zero values from userspace is not recommended",
- category=RuntimeWarning) # Предупреждение при установке счетчика
- self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = value % 1000
- self.__send(f"*T2W{self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2]}#") # Отправка команды на устройство
- @property
- def portB(self) -> bool:
- return True if self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] else False # Получение значения порта B
- @portB.setter
- def portB(self, value: bool):
- self.__periphery[GammaDevice.PeripheryTypes.PORTB][1] = 1 if value else 0
- self.__send(f"*B1W{self.__periphery[GammaDevice.PeripheryTypes.PORTB][1]}00#") # Отправка команды на устройство
- @property
- def adc(self) -> float:
- return self.__periphery[GammaDevice.PeripheryTypes.ADC][1] # Получение значения АЦП
- @adc.setter
- def adc(self, value: float) -> Never:
- raise IOError(
- "Temperature cannot be set from userspace") # Запрет установки АЦП из пользовательского пространства
- @property
- def dac(self) -> GammaDevice.DACStorage:
- result = self.__periphery[GammaDevice.PeripheryTypes.DAC]
- assert isinstance(result, GammaDevice.DACStorage)
- return result # Получение объекта DACStorage
- @dac.setter
- def dac(self, value: GammaDevice.DACStorage) -> Never:
- raise IOError(
- "DACStorage cannot be recreated from userspace") # Запрет пересоздания DACStorage из пользовательского пространства
- def update(self, updatedac: bool = False):
- if updatedac:
- self.__send("*D1R000#*D2R000#*D3R000#*D4R000#") # Отправка команд для обновления ЦАП
- else:
- self.__send("*C1R000#*T2R000#*B1R000#") # Отправка команд для обновления других устройств
- if self.comport is None:
- self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000,
- 2000) # Установка случайного значения счетчика, если последовательный порт не инициализирован
- def __send(self, message: str):
- if self.comport:
- self.comport.write(
- (message + "0" * (33 - len(message))).encode("ascii")) # Отправка сообщения на устройство
- def __update_values(self, log=False) -> int:
- commands_parsed = 0
- if self.comport:
- self.message += self.comport.read_until(b"#").decode("utf8") # Чтение сообщения до символа '#'
- while "#" in self.message:
- args = self.message.split("#")
- command, self.message = args[0], "#".join(args[1:])
- if "*" in command:
- command = command.replace("*", "")
- mode, deviceID, payload = command[0], command[1], command[2:]
- if int(deviceID) not in self.__periphery[GammaDevice.PeripheryTypes(mode)]:
- continue
- match GammaDevice.PeripheryTypes(mode):
- case GammaDevice.PeripheryTypes.TEMP:
- self.__periphery[GammaDevice.PeripheryTypes.TEMP][int(deviceID)] = int(
- payload) / 10 # Обновление значения температуры
- case GammaDevice.PeripheryTypes.COUNTER:
- self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = int(
- payload) # Обновление значения счетчика
- case GammaDevice.PeripheryTypes.PORTB:
- self.__periphery[GammaDevice.PeripheryTypes.PORTB][int(deviceID)] = int(
- payload[0]) # Обновление значения порта B
- case GammaDevice.PeripheryTypes.DAC:
- if int(deviceID) in (1, 2):
- value = int(payload[0:3]) / 100 # Обновление значения каналов 1 и 2 ЦАП
- else:
- value = int(payload[0:3]) # Обновление значения других каналов ЦАП
- self.__periphery[GammaDevice.PeripheryTypes.DAC]._DACStorage__channels[
- int(deviceID)] = value # Обновление значения в __channels
- case GammaDevice.PeripheryTypes.ADC:
- self.__periphery[GammaDevice.PeripheryTypes.ADC][int(deviceID)] = int(
- payload[0:3]) / 100 # Обновление значения АЦП
- self.update_pending = True # Установка флага ожидания обновления
- if log:
- print(f"Parsed: {command}")
- commands_parsed += 1
- else:
- self.__periphery[GammaDevice.PeripheryTypes.COUNTER][2] = random.randint(1000,
- 7000) # Установка случайного значения счетчика, если последовательный порт не инициализирован
- self.update_pending = True # Установка флага ожидания обновления
- return commands_parsed
- def read_loop(self) -> Never:
- while True:
- try:
- self.__update_values()
- except serial.SerialException as e:
- self.error_occured = True # Установка флага ошибки
- except Exception as e:
- print(e)
- time.sleep(self.exposition_time / 10) # Небольшая пауза перед следующей итерацией
- def update_loop(self) -> Never:
- self.loop_started = True # Установка флага запуска цикла
- while True:
- if self.loop_started and not self.error_occured:
- try:
- self.update()
- except serial.SerialException as e:
- self.error_occured = True # Установка флага ошибки
- except Exception as e:
- print(e)
- time.sleep(self.exposition_time) # Пауза перед следующей итерацией
|