SAE Teensy ECU
IIT SAE Microcontroller programming
Loading...
Searching...
No Matches
progress_bar.py
1"""Progress bar"""
2
3import os
4import threading
5import sys
6import io
7from typing import Optional, IO
8
9
10class ProgressBar:
11 """Progress bar that can be updated concurrently"""
12
13 bar_len = 10
14 maxcount = 0
15 lock = threading.Lock()
16 counter = 0
17 Lines = set()
18 run = True
19 prefix = ""
20 formatStr = "{} │{}│ {}{}\r"
21
22 class TextIO(io.TextIOWrapper):
23 """Text wrapper to capture stdout"""
24
25 def __init__(
26 self,
27 func,
28 buffer: IO[bytes],
29 encoding: str = ...,
30 errors: Optional[str] = ...,
31 newline: Optional[str] = ...,
32 line_buffering: bool = ...,
33 write_through: bool = ...,
34 ):
35 super(ProgressBar.TextIO, self).__init__(buffer, encoding, errors, newline, line_buffering, write_through)
36 self.func = func
37
38 def write(self, s) -> None:
39 self.func(s.strip("\n "))
40
41 def get_original(self) -> io.TextIOWrapper:
42 """Get the original Text Wrapper
43
44 Returns:
45 io.TextIOWrapper: Original TextIOWrapper
46 """
47 return super()
48
49 def __init__(self, maxcount: int, prefix: str):
50 """Create a new progress bar
51
52 Args:
53 maxcount (int): The target counter value
54 prefix (str): The string to prefix the bar with
55 """
56 self.maxcount = maxcount
57 self.stdout = sys.stdout
58 self.rename(prefix)
59 self.wrapper = ProgressBar.TextIO(
60 self._new_line,
61 sys.stdout.buffer,
62 sys.stdout.encoding,
63 sys.stdout.errors,
64 sys.stdout.newlines,
65 sys.stdout.line_buffering,
66 sys.stdout.write_through,
67 )
68 self.printer = threading.Thread(target=self._print_thread)
69
70 def rename(self, prefix: str) -> None:
71 """Change the prefix of the progress bar
72
73 Args:
74 prefix (str): The string to prefix the bar with
75 """
76 mx_sz = len(self.formatStr.format(prefix, " " * self.bar_len, 100.0, "%"))
77 self.bar_len = min(int(os.get_terminal_size().columns - 1 - (mx_sz / 1.25)), mx_sz)
78 self.bar_len = self.bar_len if self.bar_len > 0 else 0
79 self.prefix = prefix
80
81 def reset(self, maxcount: int = None, prefix: str = None) -> None:
82 """Reset the progress bar
83
84 Args:
85 maxcount (int): New target counter value. Defaults to current maxcount.
86 prefix (str): New string to prefix the bar with. Defaults to current prefix.
87 """
88 self.maxcount = maxcount or self.maxcount
89 if prefix:
90 self.rename(prefix)
91 with self.lock:
92 self.counter = 0
93
94 def _new_line(self, line: str) -> None:
95 """The TextIO line handler
96
97 Args:
98 line (str): line to consume
99 """
100 self.Lines.add(line)
101
102 def _progress(self, count, total, prefix="", printString: str = ""):
103 if total > 0:
104 count = min(count, total)
105 filled_len = int(round(self.bar_len * count / float(total)))
106
107 percents = round(100.0 * count / float(total), 1)
108 bar_prog = "█" * filled_len + "░" * (self.bar_len - filled_len)
109
110 pro_str = self.formatStr.format(prefix, bar_prog, percents, "%")
111 if len(printString) > 0:
112 self.stdout.write(" " * (os.get_terminal_size().columns - 1))
113 self.stdout.write("\033[F")
114 printString = printString.strip(" \n")
115 spacer = " " * (os.get_terminal_size().columns - 1 - len(printString))
116 self.stdout.write(f"{printString}{spacer}"[: os.get_terminal_size().columns - 1])
117 self.stdout.write("\n")
118 self.stdout.write(pro_str)
119 self.stdout.flush()
120
121 def _print_thread(self):
122 while self.run or len(self.Lines) > 0:
123 count = 0
124 with self.lock:
125 count = self.counter
126 if len(self.Lines) > 0:
127 self._progress(count, self.maxcount, self.prefix, self.Lines.pop())
128 else:
129 self._progress(count, self.maxcount, self.prefix)
130 self.wrapper.flush()
131
132 def start(self) -> None:
133 """Enable this progress bar"""
134 sys.stdout = self.wrapper
135 self.printer.start()
136
137 def progress(self):
138 """Increment the progress bar"""
139 with self.lock:
140 self.counter += 1
141
142 def finish(self):
143 """Disable the progress bar, setting it to 100%"""
144 self.run = False
145 self.printer.join()
146 self._progress(self.maxcount, self.maxcount, self.prefix)
147 self.wrapper.flush()
148 sys.stdout = self.wrapper.get_original()
149 print()