AI-Project/AsyncProgress.py

236 lines
7.5 KiB
Python

import queue
import threading
import time
import tkinter as tk
import tkinter.ttk as ttk
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Optional, Callable
class ClearableQueue(queue.Queue):
def clear(self):
with self.mutex:
while self._qsize() != 0:
_ = self._get()
@contextmanager
def acquire_timeout(lock, timeout):
result = lock.acquire(timeout=timeout)
try:
yield result
finally:
if result:
lock.release()
def format_time(seconds):
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f'{str(int(h)) + "h " if h > 0 else ""}{"{:2d}min ".format(int(m)) if m > 0 else ""}{int(s):2d}s'
class AsyncProgress(ttk.Frame):
@dataclass(init=False, repr=False)
class Stats:
start_time: float
total: int
current: int = 0
last_elapsed_time: float = 0
total_elapsed_time: float = 0
def __init__(self, total: int):
self.total = total
self.start_time = time.time()
def __repr__(self):
ips = 1 / self.last_elapsed_time if self.last_elapsed_time != 0 else 0
return (f"{'{v:{p}d}'.format(v=self.current, p=len(str(self.total)))}/{self.total} "
f"| {self.last_elapsed_time}s/it" if ips < 1 else f"{ips:.2f}it/s "
f"| elapsed: {format_time(time.time() - self.start_time)} "
f"| remaining: {format_time((self.total - self.current) * self.last_elapsed_time):} ")
class Range:
def __init__(self, parent, iterable, iter_len, update_interval, stop_event):
self.iterable = iterable
self.iter_len = iter_len
self.parent = parent
self.update_interval = update_interval
self.stop_event = stop_event
self.parent.pbar['maximum'] = iter_len
def __iter__(self):
last_time = time.time()
last_update_time = 0
stats = AsyncProgress.Stats(len(self.iterable))
self.parent._start()
try:
for obj in self.iterable:
if self.stop_event.is_set():
return
yield obj
stats.current += 1
stats.last_elapsed_time = time.time() - last_time
last_time = time.time()
if time.time() - last_update_time > self.update_interval / 1000:
self.parent.step(1, stats)
last_update_time = time.time()
finally:
stats.total_elapsed_time = time.time() - stats.start_time
self.parent._finish(stats)
self.close()
def close(self):
self.iterable = range(0)
def __init__(self, parent, *,
width=800,
height=30,
update_interval=20,
range_update_interval=10,
label=None):
super().__init__(parent, width=width, height=height)
self.grid_propagate(False)
self.__event_step_queue = ClearableQueue()
self.__lock = threading.Lock()
self.__cancel_event = threading.Event()
self.__stop_condition = None
self.__tk_pbar_value = tk.IntVar()
self.__tk_stats_str = tk.StringVar()
self.__tk_stats_str.set("Not running")
self.running = False
self.update_interval = update_interval
self.range_update_interval = range_update_interval
self.label = ttk.Label(self, text=label if label is not None else "")
self.pbar = ttk.Progressbar(self, variable=self.__tk_pbar_value)
self.stats = ttk.Label(self, textvariable=self.__tk_stats_str)
self.label.configure(font='TkFixedFont')
self.stats.configure(font='TkFixedFont')
self.label.grid(row=0, column=0, sticky=tk.NW)
self.pbar.grid(row=0, column=1, sticky=tk.NW)
self.stats.grid(row=0, column=2, sticky=tk.NW, padx=5)
self.__schedule_update()
def step(self, amount: int = 1, stat: Optional[Stats] = None):
self.__event_step_queue.put((amount, stat))
def reset(self):
with self.__lock:
self.running = False
self.__event_step_queue.clear()
self.__tk_pbar_value.set(0)
self.__tk_stats_str.set("Not running")
def stop(self):
self.__cancel_event.set()
def stop_if(self, condition: Callable):
if (condition()):
self.stop()
else:
self.__stop_condition = condition
def range(self, start_stop, stop=None, step=1):
_start = start_stop
_stop = stop
if _stop is None:
_stop = start_stop
_start = 0
return self.iter(range(_start, _stop, step))
def iter(self, iterable, length=None):
if length is None:
length = len(iterable)
with self.__lock:
if self.running:
raise RuntimeError('Progressbar is already running')
return AsyncProgress.Range(self, iterable, length, self.range_update_interval, self.__cancel_event)
def _start(self):
self.__cancel_event.clear()
with self.__lock:
self.running = True
def _finish(self, stat):
with self.__lock:
self.running = False
self.__event_step_queue.clear()
self.__event_step_queue.put((0, stat))
def __schedule_update(self):
self.master.after(self.update_interval, self.__update_self)
def __update_self(self):
with acquire_timeout(self.__lock, 0.1):
if self.running and self.__stop_condition is not None and self.__stop_condition():
self.stop()
while not self.__event_step_queue.empty():
(amount, stat) = self.__event_step_queue.get()
if stat is not None:
self.__tk_pbar_value.set(stat.current)
if self.running:
self.__tk_stats_str.set(stat)
else:
self.__tk_stats_str.set(
f'{stat.current}/{stat.total} | total: {stat.total_elapsed_time:.2f} seconds')
else:
self.__tk_pbar_value.set(self.__tk_pbar_value.get() + amount)
self.__schedule_update()
if __name__ == '__main__':
def worker(pbar, t):
print('Starting worker')
while True:
for i in pbar.range(1000):
print(i)
time.sleep(t)
time.sleep(1)
pbar.reset()
time.sleep(1)
print("Finished worker" + str(t))
root = tk.Tk()
root.minsize(width=800, height=600)
bar1 = AsyncProgress(root, label="Progressbar 1", update_interval=20, range_update_interval=10)
bar2 = AsyncProgress(root, label="Progressbar 2", update_interval=20, range_update_interval=10)
bar3 = AsyncProgress(root, label=None, update_interval=20, range_update_interval=10)
bar1.grid(row=0, column=0)
bar2.grid(row=1, column=0)
bar3.grid(row=2, column=0)
# worker(bar)
def start_worker():
threading.Thread(target=worker, args=(bar1, 0.10), daemon=True).start()
threading.Thread(target=worker, args=(bar2, 10.15), daemon=True).start()
threading.Thread(target=worker, args=(bar3, 0.05), daemon=True).start()
print("Worker threads started")
root.after(0, start_worker)
root.focus_set()
root.mainloop()