瀏覽代碼

Merge branch 'parrerel_processing' of blezz-tech/sharix-open-webservice-running into unstable

共有 2 個文件被更改,包括 27 次插入22 次删除
  1. 15 22
      handlers/handlers/open_access_request_pending.py
  2. 12 0
      handlers/lib.py

+ 15 - 22
handlers/handlers/open_access_request_pending.py

@@ -6,9 +6,10 @@ from EchoBot import JabberBot
 from slixmpp.stanza import Message
 import jsonAndRequest as jsreq
 from time import sleep
-from lib import filter_interval_tickets
+from lib import run_process
 from classes.Ticket import Ticket
 from typing import List
+from concurrent.futures import ThreadPoolExecutor
 
 botname = "open_access_request_pending"
 operating_status = 320
@@ -36,9 +37,6 @@ INTERVAL_ATTEMPTS = 3
 # INTERVAL_TIME = 30 * 60  # 30 минут в секундах
 INTERVAL_TIME = 5 # Для проверки
 
-global tickets
-tickets: List[Ticket] 
-
 def message_handler(msg: Message):
     """Обработчик входящих сообщений"""
 
@@ -55,24 +53,19 @@ def start_handler():
 
     logging.info(">>>>>  %s  |---| %s  <<<<<", JID, PASSWORD)
 
-    global tickets
-    tickets = get_fake_data()
-
-    while(True):
-        # TODO: Придумать более надёжный механизм парарельной обработки тикетов
-        # А также более надёжный механизм удаления тикетов из массива
-        if tickets:
-            for ticket in tickets:
-                ticket.interval_attempts -= 1
-                if ticket.interval_attempts <= 0:
-                    interval_attempts_unlimited(ticket)
-                else:
-                    processing(ticket)
-            tickets = filter_interval_tickets(tickets)
-        
-        logging.debug(tickets)
-        logging.debug("Sleep")
-        sleep(INTERVAL_TIME)
+    tickets: List[Ticket]  = get_fake_data()
+
+    with ThreadPoolExecutor() as executor:
+        while True:
+            # Параллельная обработка всех тикетов
+            processed = list(executor.map(run_process(interval_attempts_unlimited, processing), tickets))
+            
+            # Фильтрация тикетов
+            tickets = [t for t in processed if t is not None]
+            
+            logging.debug(tickets)
+            logging.debug("Sleep")
+            sleep(INTERVAL_TIME)
 
 
 # START CUSTOM FUNCTIONS

+ 12 - 0
handlers/lib.py

@@ -3,3 +3,15 @@ def filter_interval_tickets(tickets):
     """ Очистка тикетов, у которых закончились попытки """
     return list(filter(lambda ticket: ticket.interval_attempts > 0, tickets))
 
+def run_process(interval_attempts_unlimited, processing):
+    """Обработка одного тикета с удалением."""
+    def go(ticket):    
+        ticket.interval_attempts -= 1
+        if ticket.interval_attempts <= 0:
+            interval_attempts_unlimited(ticket)
+            return None  # Маркер для удаления
+        else:
+            processing(ticket)
+        return ticket
+
+    return go