import queue import threading import time import tkinter as tk import tkinter.ttk as ttk from contextlib import contextmanager from dataclasses import dataclass from typing import Optional, Callable class ClearableQueue(queue.Queue): def clear(self): with self.mutex: while self._qsize() != 0: _ = self._get() @contextmanager def acquire_timeout(lock, timeout): result = lock.acquire(timeout=timeout) try: yield result finally: if result: lock.release() def format_time(seconds): m, s = divmod(seconds, 60) h, m = divmod(m, 60) return f'{str(int(h)) + "h " if h > 0 else ""}{"{:2d}min ".format(int(m)) if m > 0 else ""}{int(s):2d}s' class AsyncProgress(ttk.Frame): @dataclass(init=False, repr=False) class Stats: start_time: float total: int current: int = 0 last_elapsed_time: float = 0 total_elapsed_time: float = 0 def __init__(self, total: int): self.total = total self.start_time = time.time() def __repr__(self): ips = 1 / self.last_elapsed_time if self.last_elapsed_time != 0 else 0 return (f"{'{v:{p}d}'.format(v=self.current, p=len(str(self.total)))}/{self.total} " f"| {f'{self.last_elapsed_time:.2f}s/it' if ips < 1 else f'{ips:.2f}it/s'} " f"| elapsed: {format_time(time.time() - self.start_time)} " f"| remaining: {format_time((self.total - self.current) * self.last_elapsed_time):} ") class Range: def __init__(self, parent, iterable, iter_len, update_interval, stop_event): self.iterable = iterable self.iter_len = iter_len self.parent = parent self.update_interval = update_interval self.stop_event = stop_event self.parent.pbar['maximum'] = iter_len def __iter__(self): last_time = time.time() last_update_time = 0 stats = AsyncProgress.Stats(len(self.iterable)) self.parent._start() try: for obj in self.iterable: if self.stop_event.is_set(): return yield obj stats.current += 1 stats.last_elapsed_time = time.time() - last_time last_time = time.time() if time.time() - last_update_time > self.update_interval / 1000: self.parent.step(1, stats) last_update_time = time.time() finally: stats.total_elapsed_time = time.time() - stats.start_time self.parent._finish(stats) self.close() def close(self): self.iterable = range(0) def __init__(self, parent, *, width=800, height=30, update_interval=20, range_update_interval=10, label=None): super().__init__(parent, width=width, height=height) self.grid_propagate(False) self.__event_step_queue = ClearableQueue() self.__lock = threading.Lock() self.__cancel_event = threading.Event() self.__stop_condition = None self.__tk_pbar_value = tk.IntVar() self.__tk_stats_str = tk.StringVar() self.__tk_stats_str.set("Not running") self.running = False self.update_interval = update_interval self.range_update_interval = range_update_interval self.label = ttk.Label(self, text=label if label is not None else "") self.pbar = ttk.Progressbar(self, variable=self.__tk_pbar_value) self.stats = ttk.Label(self, textvariable=self.__tk_stats_str) self.label.configure(font='TkFixedFont') self.stats.configure(font='TkFixedFont') self.label.grid(row=0, column=0, sticky=tk.NW) self.pbar.grid(row=0, column=1, sticky=tk.NW) self.stats.grid(row=0, column=2, sticky=tk.NW, padx=5) self.__schedule_update() def step(self, amount: int = 1, stat: Optional[Stats] = None): self.__event_step_queue.put((amount, stat)) def reset(self): with self.__lock: self.running = False self.__event_step_queue.clear() self.__tk_pbar_value.set(0) self.__tk_stats_str.set("Not running") def stop(self): self.__cancel_event.set() def stop_if(self, condition: Callable): if (condition()): self.stop() else: self.__stop_condition = condition def range(self, start_stop, stop=None, step=1): _start = start_stop _stop = stop if _stop is None: _stop = start_stop _start = 0 return self.iter(range(_start, _stop, step)) def iter(self, iterable, length=None): if length is None: length = len(iterable) with self.__lock: if self.running: raise RuntimeError('Progressbar is already running') return AsyncProgress.Range(self, iterable, length, self.range_update_interval, self.__cancel_event) def _start(self): self.__cancel_event.clear() with self.__lock: self.running = True def _finish(self, stat): with self.__lock: self.running = False self.__event_step_queue.clear() self.__event_step_queue.put((0, stat)) def __schedule_update(self): self.master.after(self.update_interval, self.__update_self) def __update_self(self): with acquire_timeout(self.__lock, 0.1): if self.running and self.__stop_condition is not None and self.__stop_condition(): self.stop() while not self.__event_step_queue.empty(): (amount, stat) = self.__event_step_queue.get() if stat is not None: self.__tk_pbar_value.set(stat.current) if self.running: self.__tk_stats_str.set(stat) else: self.__tk_stats_str.set( f'{stat.current}/{stat.total} | total: {stat.total_elapsed_time:.2f} seconds') else: self.__tk_pbar_value.set(self.__tk_pbar_value.get() + amount) self.__schedule_update() if __name__ == '__main__': def worker(pbar, t): print('Starting worker') while True: for i in pbar.range(1000): print(i) time.sleep(t) time.sleep(1) pbar.reset() time.sleep(1) print("Finished worker" + str(t)) root = tk.Tk() root.minsize(width=800, height=600) bar1 = AsyncProgress(root, label="Progressbar 1", update_interval=20, range_update_interval=10) bar2 = AsyncProgress(root, label="Progressbar 2", update_interval=20, range_update_interval=10) bar3 = AsyncProgress(root, label=None, update_interval=20, range_update_interval=10) bar1.grid(row=0, column=0) bar2.grid(row=1, column=0) bar3.grid(row=2, column=0) # worker(bar) def start_worker(): threading.Thread(target=worker, args=(bar1, 0.10), daemon=True).start() threading.Thread(target=worker, args=(bar2, 10.15), daemon=True).start() threading.Thread(target=worker, args=(bar3, 0.05), daemon=True).start() print("Worker threads started") root.after(0, start_worker) root.focus_set() root.mainloop()