file_reader.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import datetime
  2. from collections import defaultdict
  3. import pandas as pd
  4. import pymongo
  5. class FileReader:
  6. __amp_n_cols = []
  7. for i in range(1, 17):
  8. __amp_n_cols.append(f'amp{i}')
  9. __amp_n_cols.append(f'n{i}')
  10. def __init__(self, cluster, single_date, path_to_files='', path_to_files_7d=''):
  11. self.cluster = cluster
  12. if cluster == 1:
  13. self.cluster_n = ''
  14. else:
  15. self.cluster_n = '2'
  16. self.path_to_files = path_to_files
  17. self.path_to_files_7d = path_to_files_7d
  18. self.single_date = single_date
  19. self.n_file_today, self.n_file_day_after = self._reading_n_file()
  20. self.n7_file_today, self.n7_file_day_after = self._reading_n7_file()
  21. def __del__(self):
  22. pass
  23. def reading_file(self, file_type) -> pd.DataFrame:
  24. file = pd.read_csv(
  25. f'{self.path_to_files}\\{file_type}\\{self.cluster_n}n_{self.single_date.month:02}' +
  26. f'-{self.single_date.day:02}.{self.single_date.year - 2000:02}',
  27. sep=r'\s[-]*\s*', header=None, skipinitialspace=True, index_col=False)
  28. file.dropna(axis=1, how='all', inplace=True)
  29. return file
  30. def preparing_n_file(self):
  31. n_file = self.reading_file(file_type='n')
  32. n_file.columns = ['time', 'number', 'sum_n', 'trigger'] + FileReader.__amp_n_cols
  33. def _reading_n_file(self):
  34. """Метод, прочитывающий n-файлы, возвращающий датафрейм дня на выходе. Или возвращающий filenotfounderror, если
  35. файла нет"""
  36. n_file = pd.read_csv(
  37. f'{self.path_to_files}\\{self.cluster_n}n_{self.single_date.month:02}' +
  38. f'-{self.single_date.day:02}.{self.single_date.year - 2000:02}',
  39. sep=' ', header=None, skipinitialspace=True, index_col=False,
  40. names=['time', 'number', 'sum_n', 'trigger'] + FileReader.__amp_n_cols)
  41. n_file.dropna(axis=1, how='all', inplace=True)
  42. time_difference = n_file['time'].diff()
  43. bad_end_time_index = time_difference[time_difference < -10000].index
  44. if any(bad_end_time_index):
  45. n_file_today = n_file[n_file.index < bad_end_time_index[0]]
  46. n_file_day_after = n_file[n_file.index >= bad_end_time_index[0]]
  47. return n_file_today, n_file_day_after
  48. return n_file, []
  49. def _reading_n7_file(self):
  50. n7_file = pd.read_csv(
  51. f'{self.path_to_files_7d}\\{self.cluster_n}n7_{self.single_date.month:02}' +
  52. f'-{self.single_date.day:02}.{self.single_date.year - 2000:02}',
  53. sep=' ', header=None, skipinitialspace=True, index_col=False)
  54. n7_file.dropna(axis=1, how='all', inplace=True)
  55. for i in range(len(n7_file[0])):
  56. if type(n7_file[0][i]) is str:
  57. n7_file.loc[i, 0] = float('.'.join(n7_file.loc[i, 0].split(',')))
  58. time_difference = n7_file[0].diff()
  59. bad_end_time_index = time_difference[time_difference < -10000].index
  60. if any(bad_end_time_index):
  61. n7_file_today = n7_file[n7_file.index < bad_end_time_index[0]]
  62. n7_file_day_after = n7_file[n7_file.index >= bad_end_time_index[0]]
  63. return n7_file_today, n7_file_day_after
  64. return n7_file, []
  65. @staticmethod
  66. def concat_n_data(cls_object, concat_n_df):
  67. cls_object.n_file_today['Date'] = [cls_object.single_date.date()] * len(cls_object.n_file_today.index)
  68. concat_n_df = pd.concat([concat_n_df, cls_object.n_file_today],
  69. ignore_index=True)
  70. if any(cls_object.n_file_day_after):
  71. cls_object.n_file_day_after['Date'] = [(cls_object.single_date + datetime.timedelta(
  72. days=1)).date()] * len(cls_object.n_file_day_after.index)
  73. concat_n_df = pd.concat([concat_n_df, cls_object.n_file_day_after],
  74. ignore_index=True)
  75. return concat_n_df
  76. def reading_p_file(self):
  77. """Метод, прочитывающий p-файлы, возвращающий датафрейм дня на выходе. Или возвращающий filenotfounderror, если
  78. файла нет"""
  79. try:
  80. p_file = pd.read_csv(
  81. f'{self.path_to_files}\\nv\\{self.cluster}p{self.single_date.date().month:02}' +
  82. f'-{self.single_date.date().day:02}.{self.single_date.date().year - 2000:02}',
  83. sep=r'\s[-]*\s*', header=None, skipinitialspace=True, engine='python')
  84. p_file.dropna(axis=1, how='all', inplace=True)
  85. corr_p_file = self.correcting_p_file(p_file)
  86. return corr_p_file
  87. except FileNotFoundError as error:
  88. print(f"File {self.path_to_files}\\nv\\{self.cluster}p{self.single_date.date().month:02}-" +
  89. f"{self.single_date.date().day:02}.{self.single_date.date().year - 2000:02} does not exist")
  90. return error.strerror
  91. @staticmethod
  92. def correcting_p_file(p_file):
  93. """Метод, корректирующий старые файлы ПРИЗМА-32, возвращающий скорректированный датафрейм"""
  94. p_file['time'] = p_file[0]
  95. del p_file[0]
  96. p_file = p_file.sort_values(by='time')
  97. if len(p_file['time']) > len(p_file['time'].unique()):
  98. """Данный костыль нужен для старых p-файлов ПРИЗМА-32(до 14-15 гг.), в которых индексы строк,
  99. по сути обозначающие 5 минут реального времени между ранами, могут повторяться. """
  100. p_file.drop_duplicates(keep=False, inplace=True)
  101. """После удаления полных дубликатов ищем повторяющиеся индексы. Сначала удаляем строки,
  102. состоящие полностью из нулей и точек (value = len(p_file.columns)), потом ищем множество
  103. дубликатов индексов и множество строк, почти полностью (value > 30) состоящих из нулей и точек.
  104. Берем пересечение этих двух множеств и удаляем находящиеся в пересечении строки"""
  105. null_row = dict(p_file.isin([0, '.']).sum(axis=1)) # Проверяем на нули и точки
  106. all_null_index = list(
  107. {key: value for key, value in null_row.items() if value == len(p_file.columns)}.keys())
  108. p_file.drop(index=all_null_index, inplace=True)
  109. null_index = list(
  110. {key: value for key, value in null_row.items() if value > len(p_file.columns) - 5}.keys())
  111. same_index = dict(p_file['time'].duplicated(keep=False))
  112. same_index_row = list({key: value for key, value in same_index.items() if value is True}.keys())
  113. bad_index = list(set(null_index) & set(same_index_row))
  114. p_file.drop(index=bad_index, inplace=True)
  115. """Также может быть, что после фильтрации осталось больше строк, чем нужно, так как в старых
  116. p-файлах может быть больше индексов, чем минут в дне. Тогда оставляем только 288"""
  117. if len(p_file.index) == 289:
  118. p_file = p_file.head(288)
  119. return p_file