AI-Project/AsyncProgress.py

197 lines
6.2 KiB
Python

import queue
import threading
import time
import tkinter as tk
import tkinter.ttk as ttk
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Optional
class ClearableQueue(queue.Queue):
def clear(self):
with self.mutex:
while self._qsize() != 0:
_ = self._get()
@contextmanager
def acquire_timeout(lock, timeout):
result = lock.acquire(timeout=timeout)
try:
yield result
finally:
if result:
lock.release()
class AsyncProgress(ttk.Frame):
@dataclass(repr=False)
class Stats:
total: int
current: int = 0
start_time: float = time.time()
last_elapsed_time: float = 0
total_elapsed_time: float = 0
def __repr__(self):
ips = 1 / self.last_elapsed_time if self.last_elapsed_time != 0 else 0
return (f"{'{v:{p}d}'.format(v=self.current, p=len(str(self.total)))}/{self.total} "
f"| {ips:.2f}it/s "
f"| elapsed: {time.time() - self.start_time:.2f} "
f"| remaining: {(self.total - self.current) * self.last_elapsed_time:.2f} ")
class Range:
def __init__(self, parent, iterable, update_interval):
self.iterable = iterable
self.parent = parent
self.update_interval = update_interval
self.parent.pbar['maximum'] = len(iterable)
def __iter__(self):
last_time = time.time()
last_update_time = 0
stats = AsyncProgress.Stats(len(self.iterable))
self.parent._start()
try:
for obj in self.iterable:
yield obj
stats.current += 1
stats.last_elapsed_time = time.time() - last_time
last_time = time.time()
if time.time() - last_update_time > self.update_interval / 1000:
self.parent.step(1, stats)
last_update_time = time.time()
stats.total_elapsed_time = time.time() - stats.start_time
self.parent._finish(stats)
finally:
self.close()
def close(self):
self.iterable = range(0)
def __init__(self, parent, *,
width=450,
height=30,
update_interval=20,
range_update_interval=10,
label=None):
super().__init__(parent, width=width, height=height)
self.grid_propagate(False)
self.__event_step_queue = ClearableQueue()
self.__lock = threading.Lock()
self.__tk_pbar_value = tk.IntVar()
self.__tk_stats_str = tk.StringVar()
self.__tk_stats_str.set("Not running")
self.running = False
self.update_interval = update_interval
self.range_update_interval = range_update_interval
self.label = ttk.Label(self, text=label if label is not None else "")
self.pbar = ttk.Progressbar(self, variable=self.__tk_pbar_value)
self.stats = ttk.Label(self, textvariable=self.__tk_stats_str)
self.label.configure(font='TkFixedFont')
self.stats.configure(font='TkFixedFont')
self.label.grid(row=0, column=0, sticky=tk.NW)
self.pbar.grid(row=0, column=1, sticky=tk.NW)
self.stats.grid(row=0, column=2, sticky=tk.NW, padx=5)
self.__schedule_update()
def step(self, amount: int = 1, stat: Optional[Stats] = None):
self.__event_step_queue.put((amount, stat))
def reset(self):
with self.__lock:
self.running = False
self.__event_step_queue.clear()
self.__tk_pbar_value.set(0)
self.__tk_stats_str.set("Not running")
def range(self, start_stop, stop=None, step=1):
with self.__lock:
if self.running:
raise RuntimeError('Progressbar is already running')
if stop is None:
stop = start_stop
start_stop = 0
return AsyncProgress.Range(self, range(start_stop, stop, step), self.range_update_interval)
def _start(self):
with self.__lock:
self.running = True
def _finish(self, stat):
with self.__lock:
self.running = False
self.__event_step_queue.clear()
self.__event_step_queue.put((0, stat))
def __schedule_update(self):
self.master.after(self.update_interval, self.__update_self)
def __update_self(self):
with acquire_timeout(self.__lock, 0.1):
while not self.__event_step_queue.empty():
(amount, stat) = self.__event_step_queue.get()
if stat is not None:
self.__tk_pbar_value.set(stat.current)
if self.running:
self.__tk_stats_str.set(stat)
else:
self.__tk_stats_str.set(
f'{stat.current}/{stat.total} | total: {stat.total_elapsed_time:.2f} seconds')
else:
self.__tk_pbar_value.set(self.__tk_pbar_value.get() + amount)
self.__schedule_update()
if __name__ == '__main__':
def worker(pbar, t):
print('Starting worker')
while True:
for i in pbar.range(10):
print(i)
time.sleep(t)
time.sleep(1)
pbar.reset()
time.sleep(1)
print("Finished worker" + str(t))
root = tk.Tk()
bar1 = AsyncProgress(root, label="Progressbar 1", update_interval=20, range_update_interval=10)
bar2 = AsyncProgress(root, label="Progressbar 2", update_interval=20, range_update_interval=10)
bar3 = AsyncProgress(root, label=None, update_interval=20, range_update_interval=10)
bar1.grid(row=0, column=0)
bar2.grid(row=1, column=0)
bar3.grid(row=2, column=0)
# worker(bar)
def start_worker():
threading.Thread(target=worker, args=(bar1, 0.10), daemon=True).start()
threading.Thread(target=worker, args=(bar2, 10.15), daemon=True).start()
threading.Thread(target=worker, args=(bar3, 0.05), daemon=True).start()
print("Worker threads started")
root.after(0, start_worker)
root.focus_set()
root.mainloop()