diff --git a/bin/logdaemon.py b/bin/logdaemon.py index a277ad05..e25e6afd 100644 --- a/bin/logdaemon.py +++ b/bin/logdaemon.py @@ -36,9 +36,7 @@ @author: Jens Timmerman (Ghent University) """ -from optparse import OptionParser -from vsc.utils import fancylogger -from vsc.utils.daemon import Daemon + import logging import os import pickle @@ -46,6 +44,10 @@ import sys import traceback +from optparse import OptionParser +from vsc.utils import fancylogger +from vsc.utils.daemon import Daemon + class LogDaemon(Daemon): """ This is the logging daemon, it get a logger and can log to a local file. @@ -78,21 +80,21 @@ def start(self): """ # Check for a pidfile to see if the daemon already runs try: - with open(self.pidfile, 'r') as pidf: + with open(self.pidfile, encoding='utf8') as pidf: pid = int(pidf.read().strip()) - except IOError: + except OSError: pid = None if pid: - message = "pidfile %s already exist. Daemon already running?\n" - sys.stderr.write(message % self.pidfile) + message = f"pidfile {self.pidfile} already exist. Daemon already running?\n" + sys.stderr.write(message) sys.exit(1) # get socket self.socket_.bind((self.hostname, self.port)) - print("FANCYLOG_SERVER_PID=%s" % self.pidfile) - print("FANCYLOG_SERVER=%s:%d" % (socket.gethostname(), self.socket_.getsockname()[-1])) - print("FANCYLOG_SERVER_LOGFILE=%s" % self.logfile) + print(f"FANCYLOG_SERVER_PID={self.pidfile}") + print(f"FANCYLOG_SERVER={socket.gethostname()}:{int(self.socket_.getsockname()[-1])}") + print(f"FANCYLOG_SERVER_LOGFILE={self.logfile}") sys.stdout.flush() # Start the daemon @@ -122,7 +124,7 @@ def main(args): Initiate the daemon """ # parse options - parser = OptionParser(usage="usage: %s start|stop|restart [options]" % args[0]) + parser = OptionParser(usage=f"usage: {args[0]} start|stop|restart [options]") # general options parser.add_option("-i", "--ip", dest="host", help="Ip to bind to [default: %default (all)]", default="", type="string") @@ -153,7 +155,7 @@ def main(args): if len(args) == 2: if 'stop' == args[1]: # save state before stopping? - sys.stderr.write("stopping daemon with pidfile: %s\n" % pidfile) + sys.stderr.write(f"stopping daemon with pidfile: {pidfile}\n") daemon.stop() elif 'restart' == args[1]: daemon.restart() @@ -161,7 +163,7 @@ def main(args): daemon.start() else: sys.stderr.write("Unknown command\n") - sys.stderr.write("usage: %s start|stop|restart [options]\n" % args[0]) + sys.stderr.write(f"usage: {args[0]} start|stop|restart [options]\n") sys.exit(2) sys.exit(0) else: diff --git a/lib/vsc/utils/daemon.py b/lib/vsc/utils/daemon.py index 3e1a05a7..66151b46 100644 --- a/lib/vsc/utils/daemon.py +++ b/lib/vsc/utils/daemon.py @@ -42,7 +42,7 @@ def daemonize(self): # exit first parent sys.exit(0) except OSError as err: - sys.stderr.write("fork #1 failed: %d (%s)\n" % (err.errno, err.strerror)) + sys.stderr.write(f"fork #1 failed: {int(err.errno)} ({err.strerror})\n") sys.exit(1) # decouple from parent environment @@ -57,13 +57,13 @@ def daemonize(self): # exit from second parent sys.exit(0) except OSError as err: - sys.stderr.write("fork #2 failed: %d (%s)\n" % (err.errno, err.strerror)) + sys.stderr.write(f"fork #2 failed: {int(err.errno)} ({err.strerror})\n") sys.exit(1) # redirect standard file descriptors sys.stdout.flush() sys.stderr.flush() - with open(self.stdin, 'r') as sti: + with open(self.stdin) as sti: os.dup2(sti.fileno(), sys.stdin.fileno()) with open(self.stdout, 'a+') as sto: os.dup2(sto.fileno(), sys.stdout.fileno()) @@ -74,7 +74,7 @@ def daemonize(self): atexit.register(self.delpid) pid = str(os.getpid()) with open(self.pidfile, 'w+') as pidf: - pidf.write("%s\n" % pid) + pidf.write(f"{pid}\n") def delpid(self): os.remove(self.pidfile) @@ -85,9 +85,9 @@ def start(self): """ # Check for a pidfile to see if the daemon already runs try: - with open(self.pidfile, 'r') as pidf: + with open(self.pidfile) as pidf: pid = int(pidf.read().strip()) - except IOError: + except OSError: pid = None if pid: @@ -105,9 +105,9 @@ def stop(self): """ # Get the pid from the pidfile try: - with open(self.pidfile, 'r') as pidf: + with open(self.pidfile) as pidf: pid = int(pidf.read().strip()) - except IOError: + except OSError: pid = None if not pid: diff --git a/lib/vsc/utils/dateandtime.py b/lib/vsc/utils/dateandtime.py index f47ae6bc..913789ff 100644 --- a/lib/vsc/utils/dateandtime.py +++ b/lib/vsc/utils/dateandtime.py @@ -73,7 +73,7 @@ def __init__(self, tmpdate=None, year=None, month=None, day=None): def set_details(self): """Get first/last day of the month of date""" - class MyCalendar(object): + class MyCalendar: """Backport minimal calendar.Calendar code from 2.7 to support itermonthdays in 2.4""" def __init__(self, firstweekday=0): self.firstweekday = firstweekday # 0 = Monday, 6 = Sunday @@ -132,7 +132,7 @@ def number(self, otherdate): """ if self.include is False: msg = "number: include=False not implemented" - raise(Exception(msg)) + raise NotImplementedError(msg) else: startdate, enddate = self.get_start_end(otherdate) @@ -152,7 +152,7 @@ def interval(self, otherdate): """Return time ordered list of months between date and otherdate""" if self.include is False: msg = "interval: include=False not implemented" - raise(Exception(msg)) + raise NotImplementedError(msg) else: nr = self.number(otherdate) startdate, _ = self.get_start_end(otherdate) @@ -166,13 +166,13 @@ def parser(self, txt): """Based on strings, return date: eg BEGINTHIS returns first day of the current month""" supportedtime = (BEGIN, END,) supportedshift = ['THIS', 'LAST', 'NEXT'] - regtxt = r"^(%s)(%s)?" % ('|'.join(supportedtime), '|'.join(supportedshift)) + regtxt = rf"^({'|'.join(supportedtime)})({'|'.join(supportedshift)})?" reseervedregexp = re.compile(regtxt) reg = reseervedregexp.search(txt) if not reg: - msg = "parse: no match for regexp %s for txt %s" % (regtxt, txt) - raise(Exception(msg)) + msg = f"parse: no match for regexp {regtxt} for txt {txt}" + raise ValueError(msg) shifttxt = reg.group(2) if shifttxt is None or shifttxt == 'THIS': @@ -182,8 +182,8 @@ def parser(self, txt): elif shifttxt == 'NEXT': shift = 1 else: - msg = "parse: unknown shift %s (supported: %s)" % (shifttxt, supportedshift) - raise(Exception(msg)) + msg = f"parse: unknown shift {shifttxt} (supported: {supportedshift})" + raise ValueError(msg) nm = self.get_other(shift) @@ -193,8 +193,8 @@ def parser(self, txt): elif timetxt == END: res = nm.last else: - msg = "parse: unknown time %s (supported: %s)" % (timetxt, supportedtime) - raise(Exception(msg)) + msg = f"parse: unknown time {timetxt} (supported: {supportedtime})" + raise ValueError(msg) return res @@ -235,16 +235,17 @@ def date_parser(txt): thisday -= oneday res = thisday.date() else: - msg = 'dateparser: unimplemented reservedword %s' % txt - raise(Exception(msg)) + msg = f'dateparser: unimplemented reservedword {txt}' + raise NotImplementedError(msg) else: try: datetuple = [int(x) for x in txt.split("-")] res = date(*datetuple) except: - msg = ("dateparser: failed on '%s' date txt expects a YYYY-MM-DD format or " - "reserved words %s") % (txt, ','.join(reserveddate)) - raise(Exception(msg)) + msg = ( + f"dateparser: failed on '{txt}' date txt expects a YYYY-MM-DD format or " + f"reserved words {','.join(reserveddate)}") + raise ValueError(msg) return res diff --git a/lib/vsc/utils/docs.py b/lib/vsc/utils/docs.py index 4aa00633..92842c2b 100644 --- a/lib/vsc/utils/docs.py +++ b/lib/vsc/utils/docs.py @@ -29,9 +29,6 @@ @author: Caroline De Brouwer (Ghent University) """ -INDENT_4SPACES = ' ' * 4 - - class LengthNotEqualException(ValueError): pass @@ -46,30 +43,39 @@ def mk_rst_table(titles, columns): title_cnt, col_cnt = len(titles), len(columns) if title_cnt != col_cnt: - msg = "Number of titles/columns should be equal, found %d titles and %d columns" % (title_cnt, col_cnt) + msg = f"Number of titles/columns should be equal, found {int(title_cnt)} titles and {int(col_cnt)} columns" raise LengthNotEqualException(msg) + table = [] - tmpl = [] - line = [] + separator_blocks = [] + title_items = [] + column_widths = [] - # figure out column widths for i, title in enumerate(titles): + # figure out column widths width = max(map(len, columns[i] + [title])) - # make line template - tmpl.append('{%s:{c}<%s}' % (i, width)) + column_widths.append(width) + separator_blocks.append(f"{'='*width}") + title_items.append(f'{title}'.ljust(width)) - line = [''] * col_cnt - line_tmpl = INDENT_4SPACES.join(tmpl) - table_line = line_tmpl.format(*line, c='=') + separator_line = " ".join(separator_blocks) - table.append(table_line) - table.append(line_tmpl.format(*titles, c=' ')) - table.append(table_line) + # header + table.extend([ + separator_line, + " ".join(title_items), + separator_line + ]) + # rows for row in map(list, zip(*columns)): - table.append(line_tmpl.format(*row, c=' ')) + row_items = [] + for i, item in enumerate(row): + row_items.append((item.ljust(column_widths[i]))) + table.append(" ".join(row_items)) - table.extend([table_line, '']) + # footer + table.extend([separator_line, '']) return table diff --git a/lib/vsc/utils/exceptions.py b/lib/vsc/utils/exceptions.py index 50806ca0..41ba1da7 100644 --- a/lib/vsc/utils/exceptions.py +++ b/lib/vsc/utils/exceptions.py @@ -111,7 +111,7 @@ def __init__(self, msg, *args, **kwargs): # include location info at the end of the message # for example: "Nope, giving up (at easybuild/tools/somemodule.py:123 in some_function)" - msg = "%s (at %s:%s in %s)" % (msg, relpath, frameinfo[2], frameinfo[3]) + msg = f"{msg} (at {relpath}:{frameinfo[2]} in {frameinfo[3]})" logger = kwargs.get('logger', None) # try to use logger defined in caller's environment @@ -123,4 +123,4 @@ def __init__(self, msg, *args, **kwargs): getattr(logger, self.LOGGING_METHOD_NAME)(msg) - super(LoggedException, self).__init__(msg) + super().__init__(msg) diff --git a/lib/vsc/utils/frozendict.py b/lib/vsc/utils/frozendict.py index 82f9b239..b2f469f7 100644 --- a/lib/vsc/utils/frozendict.py +++ b/lib/vsc/utils/frozendict.py @@ -43,7 +43,7 @@ def __len__(self): return len(self.__dict) def __repr__(self): - return '' % repr(self.__dict) + return f'' def __hash__(self): if self.__hash is None: diff --git a/lib/vsc/utils/groups.py b/lib/vsc/utils/groups.py index 6c269819..aea51a5d 100644 --- a/lib/vsc/utils/groups.py +++ b/lib/vsc/utils/groups.py @@ -42,11 +42,11 @@ def getgrouplist(user, groupnames=True): """ libc = cdll.LoadLibrary(find_library('libc')) - getgrouplist = libc.getgrouplist + ngetgrouplist = libc.getgrouplist # max of 50 groups should be enough as first try ngroups = 50 - getgrouplist.argtypes = [c_char_p, c_uint, POINTER(c_uint * ngroups), POINTER(c_int32)] - getgrouplist.restype = c_int32 + ngetgrouplist.argtypes = [c_char_p, c_uint, POINTER(c_uint * ngroups), POINTER(c_int32)] + ngetgrouplist.restype = c_int32 grouplist = (c_uint * ngroups)() ngrouplist = c_int32(ngroups) @@ -59,15 +59,15 @@ def getgrouplist(user, groupnames=True): # .encode() is required in Python 3, since we need to pass a bytestring to getgrouplist user_name, user_gid = user.pw_name.encode(), user.pw_gid - ct = getgrouplist(user_name, user_gid, byref(grouplist), byref(ngrouplist)) + ct = ngetgrouplist(user_name, user_gid, byref(grouplist), byref(ngrouplist)) # if a max of 50 groups was not enough, try again with exact given nr if ct < 0: - getgrouplist.argtypes = [c_char_p, c_uint, POINTER(c_uint * int(ngrouplist.value)), POINTER(c_int32)] + ngetgrouplist.argtypes = [c_char_p, c_uint, POINTER(c_uint * int(ngrouplist.value)), POINTER(c_int32)] grouplist = (c_uint * int(ngrouplist.value))() - ct = getgrouplist(user_name, user_gid, byref(grouplist), byref(ngrouplist)) + ct = ngetgrouplist(user_name, user_gid, byref(grouplist), byref(ngrouplist)) if ct < 0: - raise Exception("Could not find groups for %s: getgrouplist returned %s" % (user, ct)) + raise ValueError(f"Could not find groups for {user}: getgrouplist returned {ct}") grouplist = [grouplist[i] for i in range(ct)] if groupnames: diff --git a/lib/vsc/utils/mail.py b/lib/vsc/utils/mail.py index 9e304b34..2ba8ad37 100644 --- a/lib/vsc/utils/mail.py +++ b/lib/vsc/utils/mail.py @@ -65,7 +65,7 @@ def __init__(self, mail_host=None, mail_to=None, mail_from=None, mail_subject=No self.err = err -class VscMail(object): +class VscMail: """Class providing functionality to send out mail.""" def __init__( @@ -84,7 +84,7 @@ def __init__( mail_options = ConfigParser() if mail_config: logging.info("Reading config file: %s", mail_config) - with open(mail_config, "r") as mc: + with open(mail_config) as mc: mail_options.read_file(mc) # we can have cases where the host part is actually host:port @@ -238,8 +238,8 @@ def _replace_images_cid(self, html, images): """ for im in images: - re_src = re.compile("src=\"%s\"" % im) - (html, count) = re_src.subn("src=\"cid:%s\"" % im, html) + re_src = re.compile(f"src=\"{im}\"") + (html, count) = re_src.subn(f"src=\"cid:{im}\"", html) if count == 0: logging.error("Could not find image %s in provided HTML.", im) raise VscMailError("Could not find image") @@ -337,9 +337,9 @@ def sendHTMLMail( if images is not None: for im in images: - with open(im, 'r') as image_fp: + with open(im) as image_fp: msg_image = MIMEImage(image_fp.read(), 'jpeg') # FIXME: for now, we assume jpegs - msg_image.add_header('Content-ID', "<%s>" % im) + msg_image.add_header('Content-ID', f"<{im}>") msg_alt.attach(msg_image) msg_root.attach(msg_alt) diff --git a/lib/vsc/utils/missing.py b/lib/vsc/utils/missing.py index 1f77990e..f873b365 100644 --- a/lib/vsc/utils/missing.py +++ b/lib/vsc/utils/missing.py @@ -101,7 +101,7 @@ def find_sublist_index(ls, sub_ls): return None -class Monoid(object): +class Monoid: """A monoid is a mathematical object with a default element (mempty or null) and a default operation to combine two elements of a given data type. @@ -148,25 +148,25 @@ def __init__(self, monoid, *args, **kwargs): @type monoid: Monoid instance """ - super(MonoidDict, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.monoid = monoid def __setitem__(self, key, value): """Combine the value the dict has for the key with the new value using the mappend operation.""" - if super(MonoidDict, self).__contains__(key): - current = super(MonoidDict, self).__getitem__(key) - super(MonoidDict, self).__setitem__(key, self.monoid(current, value)) + if super().__contains__(key): + current = super().__getitem__(key) + super().__setitem__(key, self.monoid(current, value)) else: - super(MonoidDict, self).__setitem__(key, value) + super().__setitem__(key, value) def __getitem__(self, key): """ Obtain the dictionary value for the given key. If no value is present, we return the monoid's mempty (null). """ - if not super(MonoidDict, self).__contains__(key): + if not super().__contains__(key): return self.monoid.null else: - return super(MonoidDict, self).__getitem__(key) + return super().__getitem__(key) class RUDict(dict): @@ -190,8 +190,8 @@ def update(self, E=None, **F): for (k, v) in E: self.r_update(k, {k: v}) - for k in F: - self.r_update(k, {k: F[k]}) + for k, v in F.items(): + self.r_update(k, {k: v}) def r_update(self, key, other_dict): """Recursive update.""" @@ -222,7 +222,7 @@ def __init__(self, *args, **kwargs): # handle unknown keys: either ignore them or raise an exception tmpdict = dict(*args, **kwargs) - unknown_keys = [key for key in tmpdict.keys() if key not in self.KNOWN_KEYS] + unknown_keys = [key for key in tmpdict if key not in self.KNOWN_KEYS] if unknown_keys: if ignore_unknown_keys: for key in unknown_keys: @@ -233,18 +233,18 @@ def __init__(self, *args, **kwargs): logging.error("Encountered unknown keys %s (known keys: %s)", unknown_keys, self.KNOWN_KEYS) raise KeyError("Encountered unknown keys") - super(FrozenDictKnownKeys, self).__init__(tmpdict) + super().__init__(tmpdict) # pylint: disable=arguments-differ def __getitem__(self, key, *args, **kwargs): """Redefine __getitem__ to provide a better KeyError message.""" try: - return super(FrozenDictKnownKeys, self).__getitem__(key, *args, **kwargs) + return super().__getitem__(key, *args, **kwargs) except KeyError as err: if key in self.KNOWN_KEYS: raise KeyError(err) else: - tup = (key, self.__class__.__name__, self.KNOWN_KEYS) - raise KeyError("Unknown key '%s' for %s instance (known keys: %s)" % tup) + raise KeyError( + f"Unknown key '{key}' for {self.__class__.__name__} instance (known keys: { self.KNOWN_KEYS})") def shell_quote(x): @@ -274,7 +274,7 @@ def get_class_for(modulepath, class_name): try: klass = getattr(module, class_name) except AttributeError as err: - raise ImportError("Failed to import %s from %s: %s" % (class_name, modulepath, err)) + raise ImportError(f"Failed to import {class_name} from {modulepath}: {err}") return klass @@ -295,7 +295,7 @@ def get_subclasses(klass, include_base_class=False): return get_subclasses_dict(klass, include_base_class=include_base_class).keys() -class TryOrFail(object): +class TryOrFail: """ Perform the function n times, catching each exception in the exception tuple except on the last try where it will be raised again. @@ -328,8 +328,7 @@ def post_order(graph, root): Walk the graph from the given root in a post-order manner by providing the corresponding generator """ for node in graph[root]: - for child in post_order(graph, node): - yield child + yield from post_order(graph, node) yield root diff --git a/lib/vsc/utils/optcomplete.py b/lib/vsc/utils/optcomplete.py index a6782a45..d9bb9a51 100644 --- a/lib/vsc/utils/optcomplete.py +++ b/lib/vsc/utils/optcomplete.py @@ -143,7 +143,7 @@ class CompleterMissingCallArgument(Exception): """Exception to raise when call arg is missing""" -class Completer(object): +class Completer: """Base class to derive all other completer classes from. It generates an empty completion list """ @@ -157,7 +157,7 @@ def __call__(self, **kwargs): for arg in self.CALL_ARGS: all_args.append(arg) if arg not in kwargs: - msg = "%s __call__ missing mandatory arg %s" % (self.__class__.__name__, arg) + msg = f"{self.__class__.__name__} __call__ missing mandatory arg {arg}" raise CompleterMissingCallArgument(msg) if self.CALL_ARGS_OPTIONAL is not None: @@ -217,7 +217,7 @@ def _call(self, **kwargs): if SHELL == BASH: res = ['_filedir'] if self.endings: - res.append("'@(%s)'" % '|'.join(self.endings)) + res.append(f"'@({'|'.join(self.endings)})'") return " ".join(res) else: res = [] @@ -513,7 +513,7 @@ def autocomplete(parser, arg_completer=None, opt_completer=None, subcmd_complete completer = NoneCompleter() # Warn user at least, it could help him figure out the problem. elif hasattr(option, 'completer'): - msg = "Error: optparse option with a completer does not take arguments: %s" % (option) + msg = f"Error: optparse option with a completer does not take arguments: {option}" raise SystemExit(msg) except KeyError: pass @@ -549,7 +549,7 @@ def autocomplete(parser, arg_completer=None, opt_completer=None, subcmd_complete if SHELL in (BASH,): # TODO: zsh print(completions) else: - raise Exception("Commands are unsupported by this shell %s" % SHELL) + raise Exception(f"Commands are unsupported by this shell {SHELL}") else: # Filter using prefix. if prefix: @@ -567,21 +567,21 @@ def autocomplete(parser, arg_completer=None, opt_completer=None, subcmd_complete if debugfn: txt = "\n".join([ '---------------------------------------------------------', - 'CWORDS %s' % cwords, - 'CLINE %s' % cline, - 'CPOINT %s' % cpoint, - 'CWORD %s' % cword, + f'CWORDS {cwords}', + f'CLINE {cline}', + f'CPOINT {cpoint}', + f'CWORD {cword}', '', 'Short options', pformat(parser._short_opt), '', 'Long options', pformat(parser._long_opt), - 'Prefix %s' % prefix, - 'Suffix %s' % suffix, - 'completer_kwargs%s' % str(completer_kwargs), + f'Prefix {prefix}', + f'Suffix {suffix}', + f'completer_kwargs{str(completer_kwargs)}', #'completer_completions %s' % completer_completions, - 'completions %s' % completions, + f'completions {completions}', ]) if isinstance(debugfn, logging.Logger): debugfn.debug(txt) @@ -594,7 +594,7 @@ def autocomplete(parser, arg_completer=None, opt_completer=None, subcmd_complete sys.exit(1) -class CmdComplete(object): +class CmdComplete: """Simple default base class implementation for a subcommand that supports command completion. This class is assuming that there might be a method @@ -624,16 +624,16 @@ def gen_cmdline(cmd_list, partial, shebang=True): cmdline = ' '.join([shell_quote(cmd) for cmd in cmd_list]) env = [] - env.append("%s=1" % OPTCOMPLETE_ENVIRONMENT) - env.append('COMP_LINE="%s"' % cmdline) - env.append('COMP_WORDS="(%s)"' % cmdline) - env.append("COMP_POINT=%s" % len(cmdline)) - env.append("COMP_CWORD=%s" % cmd_list.index(partial)) + env.append(f"{OPTCOMPLETE_ENVIRONMENT}=1") + env.append(f'COMP_LINE="{cmdline}"') + env.append(f'COMP_WORDS="({cmdline})"') + env.append(f"COMP_POINT={len(cmdline)}") + env.append(f"COMP_CWORD={cmd_list.index(partial)}") if not shebang: env.append(shell_quote(sys.executable)) # add script - env.append('"%s"' % cmd_list[0]) + env.append(f'"{cmd_list[0]}"') return " ".join(env) diff --git a/lib/vsc/utils/patterns.py b/lib/vsc/utils/patterns.py index 620ca715..d5ec576a 100644 --- a/lib/vsc/utils/patterns.py +++ b/lib/vsc/utils/patterns.py @@ -48,5 +48,5 @@ class Singleton(ABCMeta): def __call__(cls, *args, **kwargs): if cls not in cls._instances: - cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + cls._instances[cls] = super().__call__(*args, **kwargs) return cls._instances[cls] diff --git a/lib/vsc/utils/py2vs3/__init__.py b/lib/vsc/utils/py2vs3/__init__.py index 9370c19f..c87c7d66 100644 --- a/lib/vsc/utils/py2vs3/__init__.py +++ b/lib/vsc/utils/py2vs3/__init__.py @@ -33,7 +33,7 @@ import pkg_resources pkg_resources.declare_namespace(__name__) - +import logging import sys @@ -60,6 +60,8 @@ def is_py3(): # all functionality provided by the py2 and py3 modules is made available via the vsc.utils.py2vs3 namespace if is_py3(): + logging.warning( + "DEPRECATED: vsc.utils.py2vs3 is deprecated. Please remote it and use the py3 native modules, functions, ...") from vsc.utils.py2vs3.py3 import * # noqa else: raise ImportError("py2 module unsupported and removed, please stop using it.") diff --git a/lib/vsc/utils/py2vs3/py3.py b/lib/vsc/utils/py2vs3/py3.py index c0d0e140..2a3a506d 100644 --- a/lib/vsc/utils/py2vs3/py3.py +++ b/lib/vsc/utils/py2vs3/py3.py @@ -36,12 +36,7 @@ from tempfile import TemporaryDirectory # noqa from urllib.parse import urlencode, unquote # noqa from urllib.request import HTTPError, HTTPSHandler, Request, build_opener, urlopen # noqa -try: - # py 3.10+ - from collections import Mapping # noqa -except ImportError: - # < py 3.10 - from collections.abc import Mapping # noqa +from collections.abc import Mapping # noqa FileExistsErrorExc = FileExistsError # noqa FileNotFoundErrorExc = FileNotFoundError # noqa diff --git a/lib/vsc/utils/rest.py b/lib/vsc/utils/rest.py index 2698935f..78f853d6 100644 --- a/lib/vsc/utils/rest.py +++ b/lib/vsc/utils/rest.py @@ -46,7 +46,7 @@ CENSORED_MESSAGE = '' -class Client(object): +class Client: """An implementation of a REST client""" DELETE = 'DELETE' GET = 'GET' @@ -99,7 +99,7 @@ def __init__(self, url, username=None, password=None, token=None, token_type='To if password is not None: self.auth_header = self.hash_pass(password, username) elif token is not None: - self.auth_header = '%s %s' % (token_type, token) + self.auth_header = f'{token_type} {token}' def _append_slash_to(self, url): """Append slash to specified URL, if desired and needed.""" @@ -186,21 +186,19 @@ def request(self, method, url, body, headers, content_type=None): logging.debug('cli request: %s, %s, %s, %s', method, url, body_censored, headers_censored) - # TODO: in recent python: Context manager - conn = self.get_connection(method, url, body, headers) - status = conn.code - if method == self.HEAD: - pybody = conn.headers - else: - body = conn.read() - body = body.decode('utf-8') # byte encoded response - try: - pybody = json.loads(body) - except ValueError: - pybody = body - logging.debug('reponse len: %s ', len(pybody)) - conn.close() - return status, pybody + with self.get_connection(method, url, body, headers) as conn: + status = conn.code + if method == self.HEAD: + pybody = conn.headers + else: + body = conn.read() + body = body.decode('utf-8') # byte encoded response + try: + pybody = json.loads(body) + except ValueError: + pybody = body + logging.debug('reponse len: %s ', len(pybody)) + return status, pybody @staticmethod def censor_request(secrets, payload): @@ -230,7 +228,7 @@ def hash_pass(self, password, username=None): if not username: username = self.username - credentials = '%s:%s' % (username, password) + credentials = f'{username}:{password}' credentials = credentials.encode('utf-8') encoded_credentials = base64.b64encode(credentials).strip() encoded_credentials = str(encoded_credentials, 'utf-8') @@ -253,7 +251,7 @@ def get_connection(self, method, url, body, headers): return connection -class RequestBuilder(object): +class RequestBuilder: '''RequestBuilder(client).path.to.resource.method(...) stands for RequestBuilder(client).client.method('path/to/resource, ...) @@ -294,13 +292,13 @@ def __str__(self): '''If you ever stringify this, you've (probably) messed up somewhere. So let's give a semi-helpful message. ''' - return "I don't know about %s, You probably want to do a get or other http request, use .get()" % self.url + return f"I don't know about {self.url}, You probably want to do a get or other http request, use .get()" def __repr__(self): - return '%s: %s' % (self.__class__, self.url) + return f'{self.__class__}: {self.url}' -class RestClient(object): +class RestClient: """ A client with a request builder, so you can easily create rest requests e.g. to create a github Rest API client just do diff --git a/lib/vsc/utils/run.py b/lib/vsc/utils/run.py index 1ab4a31e..a674ff5e 100644 --- a/lib/vsc/utils/run.py +++ b/lib/vsc/utils/run.py @@ -96,7 +96,7 @@ def __init__(self, *args, **kwargs): :param cmd: actual command to run (first item in list) """ - super(CmdList, self).__init__() + super().__init__() self.add(args, **kwargs) def add(self, items, tmpl_vals=None, allow_spaces=True): @@ -114,12 +114,12 @@ def add(self, items, tmpl_vals=None, allow_spaces=True): item = item % tmpl_vals if not isinstance(item, str): - raise ValueError("Non-string item %s (type %s) being added to command %s" % (item, type(item), self)) + raise ValueError(f"Non-string item {item} (type {type(item)}) being added to command {self}") if not allow_spaces and ' ' in item: - raise ValueError("Found one or more spaces in item '%s' being added to command %s" % (item, self)) + raise ValueError(f"Found one or more spaces in item '{item}' being added to command {self}") - super(CmdList, self).append(item) + super().append(item) def append(self, *args, **kwargs): raise NotImplementedError("Use add rather than append") @@ -128,14 +128,14 @@ def extend(self, *args, **kwargs): raise NotImplementedError("Use add rather than extend") -class DummyFunction(object): +class DummyFunction: def __getattr__(self, name): def dummy(*args, **kwargs): # pylint: disable=unused-argument pass return dummy -class Run(object): +class Run: """Base class for static run method""" INIT_INPUT_CLOSE = True USE_SHELL = True @@ -189,7 +189,7 @@ def __init__(self, cmd=None, **kwargs): self._post_exitcode_log_failure = self.log.error - super(Run, self).__init__(**kwargs) + super().__init__(**kwargs) def _get_log_name(self): """Set the log name""" @@ -306,7 +306,7 @@ def _start_in_path(self): self.log.raiseException("_start_in_path: provided startpath %s exists but is no directory" % self.startpath) else: - self.log.raiseException("_start_in_path: startpath %s does not exist" % self.startpath) + self.log.raiseException(f"_start_in_path: startpath {self.startpath} does not exist") def _return_to_previous_start_in_path(self): """Change to original path before the change to startpath""" @@ -351,18 +351,17 @@ def _make_popen_named_args(self, others=None): def _make_shell_command(self): """Convert cmd into shell command""" - self.log.warning(("using potentialy unsafe shell commands, use run.run or run.RunNoShell.run " - "instead of run.run_simple or run.Run.run")) + self.log.warning("using potentialy unsafe shell commands, use run.run or run.RunNoShell.run " + "instead of run.run_simple or run.Run.run") if self.cmd is None: self.log.raiseException("_make_shell_command: no cmd set.") if isinstance(self.cmd, str): self._shellcmd = self.cmd elif isinstance(self.cmd, (list, tuple,)): - self._shellcmd = " ".join([str(arg).replace(' ', '\ ') for arg in self.cmd]) + self._shellcmd = " ".join([str(arg).replace(' ', r'\ ') for arg in self.cmd]) else: - self.log.raiseException("Failed to convert cmd %s (type %s) into shell command" % - (self.cmd, type(self.cmd))) + self.log.raiseException(f"Failed to convert cmd {self.cmd} (type {type(self.cmd)}) into shell command") def _init_process(self): """Initialise the self._process""" @@ -383,7 +382,7 @@ def _init_input(self): try: self._process.stdin.write(inp) except Exception: - self.log.raiseException("_init_input: Failed write input %s to process" % self.input) + self.log.raiseException(f"_init_input: Failed write input {self.input} to process") if self.INIT_INPUT_CLOSE: self._process.stdin.close() @@ -408,7 +407,7 @@ def _cleanup_process(self): try: self._process.stdout.close() except OSError as err: - self.log.raiseException("_cleanup_process: failed to close stdout of the process: %s" % err) + self.log.raiseException(f"_cleanup_process: failed to close stdout of the process: {err}") def _read_process(self, readsize=None): """Read from process, return out""" @@ -432,7 +431,6 @@ def _post_exitcode(self): def _post_output(self): """Postprocess the output in self._process_output""" - pass def _run_return(self): """What to return""" @@ -517,20 +515,18 @@ def _make_shell_command(self): elif isinstance(self.cmd, (list, tuple,)): self._shellcmd = self.cmd else: - self.log.raiseException("Failed to convert cmd %s (type %s) into non shell command" % - (self.cmd, type(self.cmd))) + self.log.raiseException(f"Failed to convert cmd {self.cmd} (type {type(self.cmd)}) into non shell command") class RunNoWorries(Run): """When the exitcode is >0, log.debug instead of log.error""" def __init__(self, cmd, **kwargs): - super(RunNoWorries, self).__init__(cmd, **kwargs) + super().__init__(cmd, **kwargs) self._post_exitcode_log_failure = self.log.debug class RunNoShellNoWorries(RunNoShell, RunNoWorries): """When the exitcode is >0, log.debug instead of log.error""" - pass class RunLoopException(Exception): @@ -539,7 +535,7 @@ def __init__(self, code, output): self.output = output def __str__(self): - return "%s code %s output %s" % (self.__class__.__name__, self.code, self.output) + return f"{self.__class__.__name__} code {self.code} output {self.output}" class RunLoop(Run): @@ -551,7 +547,7 @@ class RunLoop(Run): LOOP_TIMEOUT_MAIN = 1 def __init__(self, cmd, **kwargs): - super(RunLoop, self).__init__(cmd, **kwargs) + super().__init__(cmd, **kwargs) self._loop_count = None self._loop_continue = None # intial state, change this to break out the loop @@ -600,13 +596,11 @@ def _wait_for_process(self): def _loop_initialise(self): """Initialisation before the loop starts""" - pass def _loop_process_output(self, output): """Process the output that is read in blocks simplest form: do nothing """ - pass def _loop_process_output_final(self, output): """Process the remaining output that is read @@ -620,7 +614,6 @@ class RunNoShellLoop(RunNoShell, RunLoop): need to read from time to time. otherwise the stdout/stderr buffer gets filled and it all stops working """ - pass class RunLoopLog(RunLoop): @@ -629,14 +622,14 @@ class RunLoopLog(RunLoop): def _wait_for_process(self): # initialise the info logger self.log.info("Going to run cmd %s", self._shellcmd) - super(RunLoopLog, self)._wait_for_process() + super()._wait_for_process() def _loop_process_output(self, output): """Process the output that is read in blocks send it to the logger. The logger need to be stream-like """ self.log.streamLog(self.LOOP_LOG_LEVEL, output) - super(RunLoopLog, self)._loop_process_output(output) + super()._loop_process_output(output) class RunNoShellLoopLog(RunNoShell, RunLoopLog): @@ -651,7 +644,7 @@ def _loop_process_output(self, output): """ sys.stdout.write(output) sys.stdout.flush() - super(RunLoopStdout, self)._loop_process_output(output) + super()._loop_process_output(output) class RunNoShellLoopStdout(RunNoShell, RunLoopStdout): @@ -667,7 +660,7 @@ def _prep_module(self, modulepath=None, extendfromlist=None): modulepath = PROCESS_MODULE_ASYNCPROCESS_PATH if extendfromlist is None: extendfromlist = ['send_all', 'recv_some'] - super(RunAsync, self)._prep_module(modulepath=modulepath, extendfromlist=extendfromlist) + super()._prep_module(modulepath=modulepath, extendfromlist=extendfromlist) def _read_process(self, readsize=None): """Read from async process, return out""" @@ -686,7 +679,7 @@ def _read_process(self, readsize=None): # non-blocking read (readsize is a maximum to return ! out = self._process_module.recv_some(self._process, maxread=readsize) return ensure_ascii_string(out) - except (IOError, Exception): + except (OSError, Exception): # recv_some may throw Exception self.log.exception("_read_process: read failed") return '' @@ -694,7 +687,6 @@ def _read_process(self, readsize=None): class RunNoShellAsync(RunNoShell, RunAsync): """Async process class""" - pass class RunFile(Run): @@ -702,7 +694,7 @@ class RunFile(Run): def __init__(self, cmd, **kwargs): self.filename = kwargs.pop('filename', None) self.filehandle = None - super(RunFile, self).__init__(cmd, **kwargs) + super().__init__(cmd, **kwargs) def _make_popen_named_args(self, others=None): if others is None: @@ -727,20 +719,20 @@ def _make_popen_named_args(self, others=None): try: self.filehandle = open(self.filename, 'w') # pylint: disable=consider-using-with except OSError: - self.log.raiseException("_make_popen_named_args: failed to open filehandle for file %s" % self.filename) + self.log.raiseException(f"_make_popen_named_args: failed to open filehandle for file {self.filename}") others = { 'stdout': self.filehandle, } - super(RunFile, self)._make_popen_named_args(others=others) + super()._make_popen_named_args(others=others) def _cleanup_process(self): """Close the filehandle""" try: self.filehandle.close() except OSError: - self.log.raiseException("_cleanup_process: failed to close filehandle for filename %s" % self.filename) + self.log.raiseException(f"_cleanup_process: failed to close filehandle for filename {self.filename}") def _read_process(self, readsize=None): """Meaningless for filehandle""" @@ -749,7 +741,6 @@ def _read_process(self, readsize=None): class RunNoShellFile(RunNoShell, RunFile): """Popen to filehandle""" - pass class RunPty(Run): @@ -766,12 +757,11 @@ def _make_popen_named_args(self, others=None): 'stdout': slave, 'stderr': slave } - super(RunPty, self)._make_popen_named_args(others=others) + super()._make_popen_named_args(others=others) class RunNoShellPty(RunNoShell, RunPty): """Pty support (eg for screen sessions)""" - pass class RunTimeout(RunLoop, RunAsync): @@ -780,7 +770,7 @@ class RunTimeout(RunLoop, RunAsync): def __init__(self, cmd, **kwargs): self.timeout = float(kwargs.pop('timeout', None)) self.start = time.time() - super(RunTimeout, self).__init__(cmd, **kwargs) + super().__init__(cmd, **kwargs) def _loop_process_output(self, output): """""" @@ -791,12 +781,11 @@ def _loop_process_output(self, output): # go out of loop raise RunLoopException(RUNRUN_TIMEOUT_EXITCODE, RUNRUN_TIMEOUT_OUTPUT) - super(RunTimeout, self)._loop_process_output(output) + super()._loop_process_output(output) class RunNoShellTimeout(RunNoShell, RunTimeout): """Run for maximum timeout seconds""" - pass class RunQA(RunLoop, RunAsync): @@ -822,7 +811,7 @@ def __init__(self, cmd, **kwargs): self._loop_previous_ouput_length = None # track length of output through loop self.hit_position = 0 - super(RunQA, self).__init__(cmd, **kwargs) + super().__init__(cmd, **kwargs) self.qa, self.qa_reg, self.no_qa = self._parse_qa(qa, qa_reg, no_qa) @@ -839,10 +828,10 @@ def _parse_qa(self, qa, qa_reg, no_qa): """ def escape_special(string): - specials = '.*+?(){}[]|\$^' - return re.sub(r"([%s])" % ''.join(['\%s' % x for x in specials]), r"\\\1", string) + specials = r'.*+?(){}[]|\$^' + return re.sub(r"([%s])" % ''.join([r'\%s' % x for x in specials]), r"\\\1", string) - SPLIT = '[\s\n]+' + SPLIT = '[\\s\n]+' REG_SPLIT = re.compile(r"" + SPLIT) def process_answers(answers): @@ -957,7 +946,7 @@ def _loop_process_output(self, output): # go out of loop raise RunLoopException(RUNRUN_QA_MAX_MISS_EXITCODE, self._process_output) - super(RunQA, self)._loop_process_output(output) + super()._loop_process_output(output) class RunNoShellQA(RunNoShell, RunQA): diff --git a/lib/vsc/utils/wrapper.py b/lib/vsc/utils/wrapper.py index 79ad6f5b..28b4d216 100644 --- a/lib/vsc/utils/wrapper.py +++ b/lib/vsc/utils/wrapper.py @@ -17,7 +17,7 @@ def proxy(self, *args): # pylint:disable=unused-argument type.__init__(cls, name, bases, dct) if cls.__wraps__: - ignore = set("__%s__" % n for n in cls.__ignore__.split()) + ignore = {f"__{n}__" for n in cls.__ignore__.split()} for name in dir(cls.__wraps__): if name.startswith("__"): if name not in ignore and name not in dct: @@ -37,7 +37,7 @@ def __init__(self, obj): elif isinstance(obj, self.__wraps__): self._obj = obj else: - raise ValueError("wrapped object must be of %s" % self.__wraps__) + raise ValueError(f"wrapped object must be of {self.__wraps__}") # provide proxy access to regular attributes of wrapped object def __getattr__(self, name): diff --git a/setup.py b/setup.py index 1fee6b44..d4b0b0ba 100755 --- a/setup.py +++ b/setup.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# -*- coding: latin-1 -*- # # Copyright 2009-2023 Ghent University # @@ -37,7 +36,7 @@ from vsc.install.shared_setup import ag, kh, jt, sdw PACKAGE = { - 'version': '3.5.8', + 'version': '3.5.9', 'author': [sdw, jt, ag, kh], 'maintainer': [sdw, jt, ag, kh], 'install_requires': [ diff --git a/test/00-import.py b/test/00-import.py index 5effb602..3e928aab 100644 --- a/test/00-import.py +++ b/test/00-import.py @@ -32,4 +32,4 @@ class ImportTest(vsc.install.commontest.CommonTest): # skip import for vsc.utils.py2vs3 modules - EXCLUDE_MODS = ['^vsc\.utils\.py2vs3'] + EXCLUDE_MODS = [r'^vsc\.utils\.py2vs3'] diff --git a/test/asyncprocess.py b/test/asyncprocess.py index aa289ec0..e59ae72b 100644 --- a/test/asyncprocess.py +++ b/test/asyncprocess.py @@ -51,7 +51,7 @@ def setUp(self): """ setup a basic shell """ self.shell = Popen('sh', stdin=p.PIPE, stdout=p.PIPE, shell=True, executable='/bin/bash') self.cwd = os.getcwd() - super(AsyncProcessTest, self).setUp() + super().setUp() def runTest(self): """ try echoing some text and see if it comes back out """ diff --git a/test/dateandtime.py b/test/dateandtime.py index 59ff7be8..c92861b1 100644 --- a/test/dateandtime.py +++ b/test/dateandtime.py @@ -28,10 +28,8 @@ @author: Stijn De Weirdt (Ghent University) """ -from __future__ import print_function import datetime -import os from vsc.install.testing import TestCase from vsc.utils.dateandtime import FancyMonth, date_parser, datetime_parser diff --git a/test/docs.py b/test/docs.py index a8b205b7..ce7fca7a 100644 --- a/test/docs.py +++ b/test/docs.py @@ -56,7 +56,6 @@ def test_mk_rst_table(self): '=' * len(t), '', ] - self.assertEqual(table, check) def suite(): diff --git a/test/exceptions.py b/test/exceptions.py index 8fe4b367..60eefdb7 100644 --- a/test/exceptions.py +++ b/test/exceptions.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright 2015-2023 Ghent University # @@ -29,7 +28,6 @@ @author: Kenneth Hoste (Ghent University) """ -import logging import os import re import tempfile @@ -61,10 +59,10 @@ def test_loggedexception_defaultlogger(self): self.assertErrorRegex(LoggedException, 'BOOM', raise_loggedexception, 'BOOM') logToFile(tmplog, enable=False) - log_re = re.compile("^%s :: BOOM( \(at .*:[0-9]+ in raise_loggedexception\))?$" % getRootLoggerName(), re.M) - with open(tmplog, 'r') as f: + log_re = re.compile(f"^{getRootLoggerName()} :: BOOM( \\(at .*:[0-9]+ in raise_loggedexception\\))?$", re.M) + with open(tmplog) as f: logtxt = f.read() - self.assertTrue(log_re.match(logtxt), "%s matches %s" % (log_re.pattern, logtxt)) + self.assertTrue(log_re.match(logtxt), f"{log_re.pattern} matches {logtxt}") # test formatting of message self.assertErrorRegex(LoggedException, 'BOOMBAF', raise_loggedexception, 'BOOM%s', 'BAF') @@ -84,7 +82,6 @@ def test_loggedexception_specifiedlogger(self): setLogFormat("%(name)s :: %(message)s") logger1 = getLogger('testlogger_one') - logger2 = getLogger('testlogger_two') # if logger is specified, it should be used logToFile(tmplog, enable=True) @@ -92,10 +89,10 @@ def test_loggedexception_specifiedlogger(self): logToFile(tmplog, enable=False) rootlog = getRootLoggerName() - log_re = re.compile("^%s.testlogger_one :: BOOM( \(at .*:[0-9]+ in raise_loggedexception\))?$" % rootlog, re.M) - with open(tmplog, 'r') as f: + log_re = re.compile(rf"^{rootlog}.testlogger_one :: BOOM( \(at .*:[0-9]+ in raise_loggedexception\))?$", re.M) + with open(tmplog) as f: logtxt = f.read() - self.assertTrue(log_re.match(logtxt), "%s matches %s" % (log_re.pattern, logtxt)) + self.assertTrue(log_re.match(logtxt), f"{log_re.pattern} matches {logtxt}") os.remove(tmplog) @@ -107,7 +104,6 @@ def test_loggedexception_callerlogger(self): # set log format, for each regex searching setLogFormat("%(name)s :: %(message)s") - logger = getLogger('testlogger_local') # if no logger is specified, logger available in calling context should be used logToFile(tmplog, enable=True) @@ -115,10 +111,10 @@ def test_loggedexception_callerlogger(self): logToFile(tmplog, enable=False) rootlog = getRootLoggerName() - log_re = re.compile("^%s(.testlogger_local)? :: BOOM( \(at .*:[0-9]+ in raise_loggedexception\))?$" % rootlog) - with open(tmplog, 'r') as f: + log_re = re.compile(rf"^{rootlog}(.testlogger_local)? :: BOOM( \(at .*:[0-9]+ in raise_loggedexception\))?$") + with open(tmplog) as f: logtxt = f.read() - self.assertTrue(log_re.match(logtxt), "%s matches %s" % (log_re.pattern, logtxt)) + self.assertTrue(log_re.match(logtxt), f"{log_re.pattern} matches {logtxt}") os.remove(tmplog) @@ -145,10 +141,10 @@ def raise_testexception(msg, *args, **kwargs): rootlogname = getRootLoggerName() - log_re = re.compile("^%s :: BOOM$" % rootlogname, re.M) - with open(tmplog, 'r') as f: + log_re = re.compile(f"^{rootlogname} :: BOOM$", re.M) + with open(tmplog) as f: logtxt = f.read() - self.assertTrue(log_re.match(logtxt), "%s matches %s" % (log_re.pattern, logtxt)) + self.assertTrue(log_re.match(logtxt), f"{log_re.pattern} matches {logtxt}") with open(tmplog, 'w') as f: f.write('') @@ -160,9 +156,9 @@ def raise_testexception(msg, *args, **kwargs): logToFile(tmplog, enable=False) log_re = re.compile(r"^%s :: BOOM \(at (?:.*?/)?vsc/install/testing.py:[0-9]+ in assertErrorRegex\)$" % rootlogname) - with open(tmplog, 'r') as f: + with open(tmplog) as f: logtxt = f.read() - self.assertTrue(log_re.match(logtxt), "%s matches %s" % (log_re.pattern, logtxt)) + self.assertTrue(log_re.match(logtxt), f"{log_re.pattern} matches {logtxt}") with open(tmplog, 'w') as f: f.write('') @@ -174,9 +170,9 @@ def raise_testexception(msg, *args, **kwargs): logToFile(tmplog, enable=False) log_re = re.compile(r"^%s :: BOOM \(at (?:.*?/)?vsc/install/testing.py:[0-9]+ in assertErrorRegex\)$" % rootlogname) - with open(tmplog, 'r') as f: + with open(tmplog) as f: logtxt = f.read() - self.assertTrue(log_re.match(logtxt), "%s matches %s" % (log_re.pattern, logtxt)) + self.assertTrue(log_re.match(logtxt), f"{log_re.pattern} matches {logtxt}") os.remove(tmplog) @@ -192,7 +188,7 @@ def test_get_callers_logger(self): self.assertTrue(callers_logger in [logger, None]) # also works when logger is 'higher up' - class Test(object): + class Test: """Dummy test class""" def foo(self, logger=None): """Dummy test method, returns logger from calling context.""" diff --git a/test/fancylogger.py b/test/fancylogger.py index 0e7d2101..d7250989 100644 --- a/test/fancylogger.py +++ b/test/fancylogger.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright 2013-2023 Ghent University # @@ -86,7 +85,7 @@ def _get_tty_stream(): stream = open(tty, 'w') if os.isatty(stream.fileno()): return stream - except IOError: + except OSError: # cannot open $TTY for writing, continue pass # give up @@ -142,11 +141,11 @@ def mk_empty_log(self): def read_log(self): self.handler.flush() - with open(self.logfn, 'r') as fih: + with open(self.logfn) as fih: return fih.read() def setUp(self): - super(FancyLoggerTest, self).setUp() + super().setUp() self._reset_fancylogger() @@ -234,9 +233,9 @@ def test_utf8_decoding(self): "Here are some UTF-8 characters: ß, ©, Ω, £.", # only UTF8 characters "This non-UTF-8 character '\x80' should be handled properly.", # contains non UTF-8 character # unicode strings - u"This is a pure ASCII text.", # pure ASCII - u"Here are some UTF8 characters: ß, ©, Ω, £.", # only UTF8 characters - u"This non-UTF8 character '\x80' should be handled properly.", # contains non UTF-8 character + "This is a pure ASCII text.", # pure ASCII + "Here are some UTF8 characters: ß, ©, Ω, £.", # only UTF8 characters + "This non-UTF8 character '\x80' should be handled properly.", # contains non UTF-8 character ] for msg in msgs: logger.critical(msg) @@ -264,7 +263,7 @@ def test_deprecated(self): max_ver = "1.0" # test whether deprecation works - msgre_tpl_error = r"DEPRECATED\s*\(since v%s\).*%s" % (max_ver, MSG) + msgre_tpl_error = r"DEPRECATED\s*\(since v{}\).*{}".format(max_ver, MSG) self.assertErrorRegex(Exception, msgre_tpl_error, logger.deprecated, MSG, "1.1", max_ver) self.assertErrorRegex(Exception, msgre_tpl_error, logger.deprecated, MSG, "1.0", max_ver) @@ -275,9 +274,9 @@ def test_deprecated(self): # no deprecation if current version is lower than max version logger.deprecated(MSG, "0.9", max_ver) - msgre_warning = re.compile(r"WARNING.*Deprecated.* will no longer work in v%s:.*%s" % (max_ver, MSG)) + msgre_warning = re.compile(r"WARNING.*Deprecated.* will no longer work in v{}:.*{}".format(max_ver, MSG)) txt = self.read_log() - self.assertTrue(msgre_warning.search(txt), "Pattern '%s' found in: %s" % (msgre_warning.pattern, txt)) + self.assertTrue(msgre_warning.search(txt), f"Pattern '{msgre_warning.pattern}' found in: {txt}") self.mk_empty_log() @@ -293,7 +292,7 @@ def test_log_callback(msg, cache=callback_cache): self.mk_empty_log() # test handling of non-UTF8 chars - msg = MSG + u'\x81' + msg = MSG + '\x81' # Python 3: unicode is supported in regular string values (no special unicode type) msgre_tpl_error = r"DEPRECATED\s*\(since v%s\).*\x81" % max_ver msgre_warning = re.compile(r"WARNING.*Deprecated.* will no longer work in v%s:.*\x81" % max_ver) @@ -342,7 +341,7 @@ def test123(exception, msg): regex = re.compile("^WARNING.*HIT.*failtest\n.*in test123.*$", re.M) txt = self.read_log() - self.assertTrue(regex.match(txt), "Pattern '%s' matches '%s'" % (regex.pattern, txt)) + self.assertTrue(regex.match(txt), f"Pattern '{regex.pattern}' matches '{txt}'") self.truncate_log() fancylogger.FancyLogger.RAISE_EXCEPTION_CLASS = KeyError @@ -351,7 +350,7 @@ def test123(exception, msg): regex = re.compile("^WARNING.*HIT.*'failkeytest'\n.*in test123.*$", re.M) txt = self.read_log() - self.assertTrue(regex.match(txt), "Pattern '%s' matches '%s'" % (regex.pattern, txt)) + self.assertTrue(regex.match(txt), f"Pattern '{regex.pattern}' matches '{txt}'") self.truncate_log() fancylogger.FancyLogger.RAISE_EXCEPTION_LOG_METHOD = lambda c, msg: c.warning(msg) @@ -360,7 +359,7 @@ def test123(exception, msg): regex = re.compile("^WARNING.*HIT.*attrtest\n.*in test123.*$", re.M) txt = self.read_log() - self.assertTrue(regex.match(txt), "Pattern '%s' matches '%s'" % (regex.pattern, txt)) + self.assertTrue(regex.match(txt), f"Pattern '{regex.pattern}' matches '{txt}'") def _stream_stdouterr(self, isstdout=True, expect_match=True): """Log to stdout or stderror, check stdout or stderror""" @@ -383,7 +382,7 @@ def _stream_stdouterr(self, isstdout=True, expect_match=True): lh = fancylogger.logToScreen(stdout=isstdout) logger = fancylogger.getLogger(name, fname=True, clsname=False) # logfn makes it unique - msg = 'TEST isstdout %s expect_match %s logfn %s' % (isstdout, expect_match, logfn) + msg = f'TEST isstdout {isstdout} expect_match {expect_match} logfn {logfn}' logger.info(msg) # restore @@ -394,7 +393,7 @@ def _stream_stdouterr(self, isstdout=True, expect_match=True): fh2 = open(logfn) txt = fh2.read().strip() fh2.close() - reg_exp = re.compile(r"INFO\s+\S+.%s.%s\s+\S+\s+%s" % (name, '_stream_stdouterr', msg)) + reg_exp = re.compile(r"INFO\s+\S+.{}.{}\s+\S+\s+{}".format(name, '_stream_stdouterr', msg)) match = reg_exp.search(txt) is not None self.assertEqual(match, expect_match) @@ -453,7 +452,7 @@ def somefunction(self): # this will only hold in debug mode, so also disable the test if __debug__: pattern = 'FancyLoggerTest' - self.assertTrue(pattern in txt, "Pattern '%s' found in: %s" % (pattern, txt)) + self.assertTrue(pattern in txt, f"Pattern '{pattern}' found in: {txt}") # restore fancylogger.logToScreen(enable=False, handler=handler) sys.stderr = _stderr @@ -470,7 +469,7 @@ def test_getDetailsLogLevels(self): (None, fancylogger.getAllExistingLoggers)]: self.assertEqual([name for name, _ in func()], [name for name, _ in fancylogger.getDetailsLogLevels(fancy)], - "Test getDetailsLogLevels fancy %s and function %s" % (fancy, func.__name__)) + f"Test getDetailsLogLevels fancy {fancy} and function {func.__name__}") self.assertEqual([name for name, _ in fancylogger.getAllFancyloggers()], [name for name, _ in fancylogger.getDetailsLogLevels()], "Test getDetailsLogLevels default fancy True and function getAllFancyloggers") @@ -492,17 +491,17 @@ def test_normal_warning_logging(self): msg = 'this is my string' logging.warning(msg) self.assertTrue(msg in stringfile.getvalue(), - msg="'%s' in '%s'" % (msg, stringfile.getvalue())) + msg=f"'{msg}' in '{stringfile.getvalue()}'") msg = 'there are many like it' logging.getLogger().warning(msg) self.assertTrue(msg in stringfile.getvalue(), - msg="'%s' in '%s'" % (msg, stringfile.getvalue())) + msg=f"'{msg}' in '{stringfile.getvalue()}'") msg = 'but this one is mine' logging.getLogger('mine').warning(msg) self.assertTrue(msg in stringfile.getvalue(), - msg="'%s' in '%s'" % (msg, stringfile.getvalue())) + msg=f"'{msg}' in '{stringfile.getvalue()}'") # make sure this test runs last, since it may mess up other tests (like test_raiseException) def test_zzz_fancylogger_as_rootlogger_logging(self): @@ -629,7 +628,7 @@ def tearDown(self): self._reset_fancylogger() - super(FancyLoggerTest, self).tearDown() + super().tearDown() class ScreenLogFormatterFactoryTest(TestCase): @@ -649,7 +648,7 @@ def setUp(self): def _generate_var_name(self): while True: rnd = randint(0, 0xffffff) - name = ('TEST_VAR_%06X' % rnd) + name = f'TEST_VAR_{rnd:06X}' if name not in os.environ: return name diff --git a/test/generaloption.py b/test/generaloption.py index 65d740aa..dfed983c 100644 --- a/test/generaloption.py +++ b/test/generaloption.py @@ -120,14 +120,14 @@ class GeneralOptionTest(TestCase): def setUp(self): """Prepare for running test.""" - super(GeneralOptionTest, self).setUp() + super().setUp() fancylogger.resetroot() self.setup = vsc_setup() self.orig_environ = copy.deepcopy(os.environ) def tearDown(self): """Clean up after running test.""" - super(GeneralOptionTest, self).tearDown() + super().tearDown() fancylogger.resetroot() os.environ = self.orig_environ @@ -179,7 +179,7 @@ def test_help_long(self): def test_help_outputformats(self): """Generate (long) rst help message""" for output_format in HELP_OUTPUT_FORMATS: - topt = TestOption1(go_args=['--help=%s' % output_format], + topt = TestOption1(go_args=[f'--help={output_format}'], go_nosystemexit=True, go_columns=100, help_to_string=True, @@ -191,7 +191,7 @@ def test_help_outputformats(self): elif output_format == 'config': self.assertTrue(helptxt.find("#level-longlevel") > -1, "Configuration option in config help") else: - self.assertTrue(helptxt.find("--level-longlevel") > -1, "Long documentation expanded in long help (format: %s)" % output_format) + self.assertTrue(helptxt.find("--level-longlevel") > -1, f"Long documentation expanded in long help (format: {output_format})") def test_help_confighelp(self): """Generate long help message""" @@ -203,11 +203,11 @@ def test_help_confighelp(self): ) for section in ['MAIN', 'base', 'level', 'ext']: - self.assertTrue(topt.parser.help_to_file.getvalue().find("[%s]" % section) > -1, - "Looking for [%s] section marker" % section) + self.assertTrue(topt.parser.help_to_file.getvalue().find(f"[{section}]") > -1, + f"Looking for [{section}] section marker") for opt in ['store-with-dash', 'level-prefix-and-dash', 'ext-strlist', 'level-level', 'debug']: - self.assertTrue(topt.parser.help_to_file.getvalue().find("#%s" % opt) > -1, - "Looking for '#%s' option marker" % opt) + self.assertTrue(topt.parser.help_to_file.getvalue().find(f"#{opt}") > -1, + f"Looking for '#{opt}' option marker") def test_dest_with_dash(self): """Test the renaming of long opts to dest""" @@ -218,7 +218,7 @@ def test_dest_with_dash(self): def test_quote(self): """Test quote/unquote""" value = 'value with whitespace' - txt = '--option=%s' % value + txt = f'--option={value}' # this looks strange, but is correct self.assertEqual(str(txt), '--option=value with whitespace') self.assertEqual(txt , shell_unquote(shell_quote(txt))) @@ -393,7 +393,7 @@ def test_ext_add(self): ('a,b', ['a', 'b']), ('a,,b', ['a', 'x', 'y', 'b']), ]: - cmd = '--ext-add-list-flex=%s' % args + cmd = f'--ext-add-list-flex={args}' topt = TestOption1(go_args=[cmd]) self.assertEqual(topt.options.ext_add_list_flex, val) self.assertEqual(topt.generate_cmd_line(ignore=r'(? 0) regex = re.compile(r"'?test/sandbox/testpkg/\*'?: No such file or directory") - self.assertTrue(regex.search(output), "Pattern '%s' found in: %s" % (regex.pattern, output)) + self.assertTrue(regex.search(output), f"Pattern '{regex.pattern}' found in: {output}") ec, output = run_simple(TEST_GLOB) self.glob_output(output, ec=ec) @@ -142,7 +141,7 @@ def test_noshell_async_stdout_glob(self): self.mock_stderr(True) self.mock_stdout(True) - ec, output = async_to_stdout("/bin/echo %s" % msg) + ec, output = async_to_stdout(f"/bin/echo {msg}") stderr, stdout = self.get_stderr(), self.get_stdout() self.mock_stderr(False) self.mock_stdout(False) @@ -203,7 +202,7 @@ def test_noshell_executable(self): # to run Python command, it's required to use the right executable (Python shell rather than default) python_cmd = shell_quote(sys.executable) - ec, output = run("""%s -c 'print ("foo")'""" % python_cmd) + ec, output = run(f"""{python_cmd} -c 'print ("foo")'""") self.assertEqual(ec, 0) self.assertTrue('foo' in output) @@ -231,7 +230,7 @@ def check_pid(pid): try: os.kill(pid, 0) except OSError as err: - sys.stderr.write("check_pid in test_timeout: %s\n" % err) + sys.stderr.write(f"check_pid in test_timeout: {err}\n") return False else: return True @@ -240,15 +239,15 @@ def check_pid(pid): def do_test(kill_pgid): depth = 2 # this is the parent - res_fn = os.path.join(self.tempdir, 'nested_kill_pgid_%s' % kill_pgid) + res_fn = os.path.join(self.tempdir, f'nested_kill_pgid_{kill_pgid}') start = time.time() RunTimeout.KILL_PGID = kill_pgid ec, output = run_timeout([SCRIPT_NESTED, str(depth), res_fn], timeout=timeout) # reset it to default RunTimeout.KILL_PGID = default stop = time.time() - self.assertEqual(ec, RUNRUN_TIMEOUT_EXITCODE, msg='run_nested kill_pgid %s stopped due to timeout' % kill_pgid) - self.assertTrue(stop - start < timeout + 1, msg='run_nested kill_pgid %s timeout within margin' % kill_pgid) # give 1 sec margin + self.assertEqual(ec, RUNRUN_TIMEOUT_EXITCODE, msg=f'run_nested kill_pgid {kill_pgid} stopped due to timeout') + self.assertTrue(stop - start < timeout + 1, msg=f'run_nested kill_pgid {kill_pgid} timeout within margin') # give 1 sec margin # make it's not too fast time.sleep(5) # there's now 6 seconds to complete the remainder @@ -262,7 +261,7 @@ def do_test(kill_pgid): pids[int(dep)] = int(pid) # pids[0] should be killed - self.assertFalse(check_pid(pids[depth]), "main depth=%s pid (pids %s) is killed by timeout" % (depth, pids,)) + self.assertFalse(check_pid(pids[depth]), f"main depth={depth} pid (pids {pids}) is killed by timeout") if kill_pgid: # others should be killed as well @@ -274,7 +273,7 @@ def do_test(kill_pgid): msg = " not" for dep, pid in enumerate(pids[:depth]): - test_fn(check_pid(pid), "depth=%s pid (pids %s) is%s killed kill_pgid %s" % (dep, pids, msg, kill_pgid)) + test_fn(check_pid(pid), f"depth={dep} pid (pids {pids}) is{msg} killed kill_pgid {kill_pgid}") # clean them all for pid in pids: @@ -304,7 +303,7 @@ def test_qa_simple(self): def test_qa_regex(self): """Test regex based q and a (works only for qa_reg)""" qa_dict = { - '\s(?P