merge_cluster_algorithm.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import pandas as pd
  2. import pymongo
  3. import datetime
  4. from config_info.config import *
  5. from preparing_to_merge import prepare_to_merge_12d, prepare_to_merge_7d
  6. from merge_dinods_algorithm import merge_dinods
  7. db_client = pymongo.MongoClient(DB_URL)
  8. prisma_db = db_client["prisma-32_db"]
  9. def merge_clusters(data_cl_1, data_cl_2):
  10. """Процесс сшивки событий 1-го кластера с ближайшим по времени событием из 2-го.
  11. Временные ворота - 80 мс <<80e6>>"""
  12. merge_dict = {}
  13. for i in range(len(data_cl_1['time_ns'])):
  14. add_2_cl = data_cl_2[(data_cl_2['time_ns'] >= (data_cl_1['time_ns'][i] - 80e6)) & (
  15. data_cl_2['time_ns'] <= (data_cl_1['time_ns'][i] + 80e6))][['_id', 'time_ns']].reset_index(
  16. drop=True)
  17. # Тут идет рассмотрение возможности, когда на одно событие 1-го кластера идут два события второго
  18. if len(add_2_cl) == 1:
  19. merge_dict[data_cl_1['_id'][i]] = add_2_cl['_id'][0]
  20. elif len(add_2_cl) > 1:
  21. add_2_cl['time_ns'] = add_2_cl['time_ns'] - data_cl_1['time_ns'][i]
  22. add_2_cl.sort_values(by='time_ns', inplace=True).reset_index(drop=True)
  23. merge_dict[data_cl_1['_id'][i]] = add_2_cl['_id'][0]
  24. return merge_dict
  25. def prisma_events_db_copier(date_):
  26. collection_prisma = prisma_db[f'{str(date_)}_events']
  27. data_cl_1 = prepare_to_merge_12d(date_, cluster=1, a_crit=6, fr=4)
  28. data_cl_2 = prepare_to_merge_12d(date_, cluster=2, a_crit=6, fr=4)
  29. merge_cl = merge_clusters(data_cl_1, data_cl_2)
  30. # Чтобы добавить в list_of_ids _id 7-х динодов.
  31. # data_cl_1_12d = prepare_to_merge_12d(date_, cluster=1, a_crit=11, fr=4)
  32. # data_cl_2_12d = prepare_to_merge_12d(date_, cluster=2, a_crit=11, fr=4)
  33. #
  34. # data_cl_1_7d = prepare_to_merge_7d(date_, cluster=1)
  35. # data_cl_2_7d = prepare_to_merge_7d(date_, cluster=2)
  36. #
  37. # merge_din_cl_1 = merge_dinods(data_cl_1_12d, data_cl_1_7d)
  38. # merge_din_cl_2 = merge_dinods(data_cl_2_12d, data_cl_2_7d)
  39. all_data_cl_1 = pd.DataFrame.from_records(
  40. pymongo.MongoClient(DB_URL)["prisma-32_db"][f'{str(date_)}_12d'].find(
  41. {'cluster': 1, '_id': {'$nin': list(merge_cl.keys())}}))
  42. all_data_cl_2 = pd.DataFrame.from_records(
  43. pymongo.MongoClient(DB_URL)["prisma-32_db"][f'{str(date_)}_12d'].find(
  44. {'cluster': 2, '_id': {'$nin': list(merge_cl.values())}}))
  45. data_cl_merge = pd.DataFrame.from_records(
  46. pymongo.MongoClient(DB_URL)["prisma-32_db"][f'{str(date_)}_12d'].find(
  47. {'cluster': 1, '_id': {'$in': list(merge_cl.keys())}}))
  48. # print(all_data_cl_1['_id'][2000])
  49. for i in range(len(all_data_cl_1.index)):
  50. id_list = all_data_cl_1['_id'][i].split('_')
  51. try:
  52. new_record = {
  53. '_id': f"{id_list[0]}_pe_{id_list[-1]}",
  54. 'eas_event_time_ns': int(all_data_cl_1['time_ns'][i]),
  55. 'mask': 1,
  56. 'multiplicity': 1,
  57. 'list_of_cluster_numbers': [1],
  58. 'list_of_ids': [all_data_cl_1['_id'][i]]
  59. }
  60. ins_result = collection_prisma.insert_one(new_record)
  61. print(f'Copied - {ins_result.inserted_id}')
  62. except pymongo.errors.DuplicateKeyError:
  63. print(f'Ошибка - {id_list[0]}_pe_{id_list[-1]}')
  64. for i in range(len(all_data_cl_2.index)):
  65. id_list = all_data_cl_2['_id'][i].split('_')
  66. try:
  67. new_record = {
  68. '_id': f"{id_list[0]}_pe_{id_list[-1]}",
  69. 'eas_event_time_ns': int(all_data_cl_2['time_ns'][i]),
  70. 'mask': 2,
  71. 'multiplicity': 1,
  72. 'list_of_cluster_numbers': [2],
  73. 'list_of_ids': [all_data_cl_2['_id'][i]]
  74. }
  75. ins_result = collection_prisma.insert_one(new_record)
  76. print(f'Copied - {ins_result.inserted_id}')
  77. except pymongo.errors.DuplicateKeyError:
  78. print(f'Ошибка - {id_list[0]}_pe_{id_list[-1]}')
  79. for i in range(len(data_cl_merge.index)):
  80. id_list = data_cl_merge['_id'][i].split('_')
  81. try:
  82. new_record = {
  83. '_id': f"{id_list[0]}_pe_{id_list[-1]}",
  84. 'eas_event_time_ns': int(data_cl_merge['time_ns'][i]),
  85. 'mask': 3,
  86. 'multiplicity': 2,
  87. 'list_of_cluster_numbers': [1, 2],
  88. 'list_of_ids': [data_cl_merge['_id'][i], merge_cl[data_cl_merge['_id'][i]]]
  89. }
  90. ins_result = collection_prisma.insert_one(new_record)
  91. print(f'Copied - {ins_result.inserted_id}')
  92. except pymongo.errors.DuplicateKeyError:
  93. print(f'Ошибка - {id_list[0]}_pe_{id_list[-1]}')
  94. if __name__ == '__main__':
  95. date_time_start = datetime.date(2021, 12, 1) # посмотреть почему не собирается конец дня 2018-04-22
  96. date_time_stop = datetime.date(2021, 12, 31)
  97. LIST_OF_DATES = [(date_time_start + datetime.timedelta(days=i)) for i in
  98. range((date_time_stop - date_time_start).days + 1)]
  99. for date in LIST_OF_DATES:
  100. prisma_events_db_copier(date)
  101. print('test')