forked from ialbert/plac
-
Notifications
You must be signed in to change notification settings - Fork 0
/
plac_tk.py
64 lines (56 loc) · 1.91 KB
/
plac_tk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from __future__ import print_function
import os
import sys
if sys.version_info < (3,):
import Queue as queue
else:
import queue
import plac_core
from Tkinter import Tk
from ScrolledText import ScrolledText
from plac_ext import Monitor, TerminatedProcess
class TkMonitor(Monitor):
"""
An interface over a dictionary {taskno: scrolledtext widget}, with
methods add_listener, del_listener, notify_listener and start/stop.
"""
def __init__(self, name, queue=None):
Monitor.__init__(self, name, queue)
self.widgets = {}
@plac_core.annotations(taskno=('task number', 'positional', None, int))
def add_listener(self, taskno):
"There is a ScrolledText for each task"
st = ScrolledText(self.root, height=5)
st.insert('end', 'Output of task %d\n' % taskno)
st.pack()
self.widgets[taskno] = st
@plac_core.annotations(taskno=('task number', 'positional', None, int))
def del_listener(self, taskno):
del self.widgets[taskno]
@plac_core.annotations(taskno=('task number', 'positional', None, int))
def notify_listener(self, taskno, msg):
w = self.widgets[taskno]
w.insert('end', msg + '\n')
w.update()
def start(self):
'Start the mainloop'
self.root = Tk()
self.root.title(self.name)
self.root.wm_protocol("WM_DELETE_WINDOW", self.stop)
self.root.after(0, self.read_queue)
try:
self.root.mainloop()
except KeyboardInterrupt:
print('Process %d killed by CTRL-C' % os.getpid(), file=sys.stderr)
except TerminatedProcess:
pass
def stop(self):
self.root.quit()
def read_queue(self):
try:
cmd_args = self.queue.get_nowait()
except queue.Empty:
pass
else:
getattr(self, cmd_args[0])(*cmd_args[1:])
self.root.after(100, self.read_queue)