From 21128c0e72eb199061c1cce0e473000b8fc22c2e Mon Sep 17 00:00:00 2001 From: Chad Date: Mon, 5 Jun 2023 06:18:25 -0700 Subject: [PATCH 01/25] first bit of perf work brought back from threading branch --- lddecode/core.py | 24 ++++++++-------- lddecode/utils.py | 71 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 65 insertions(+), 30 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 2a3c4e8c2..58a2f3075 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -39,7 +39,7 @@ from . import efm_pll from .utils import get_git_info, ac3_pipe, ldf_pipe, traceback -from .utils import nb_mean, nb_median, nb_round, nb_min, nb_max, nb_absmax +from .utils import nb_mean, nb_median, nb_round, nb_min, nb_max, nb_abs, nb_absmax, nb_diff, n_orgt, n_orlt from .utils import polar2z, sqsum, genwave, dsa_rescale_and_clip, scale, rms from .utils import findpeaks, findpulses, calczc, inrange, roundfloat from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance @@ -750,11 +750,11 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): out_videopilot = npfft.ifft(demod_fft * self.Filters["FVideoPilot"]).real video_out = np.rec.array( [ - out_video, - demod, - out_video05, - out_videoburst, - out_videopilot, + out_video.astype(np.float32), + demod.astype(np.float32), + out_video05.astype(np.float32), + out_videoburst.astype(np.float32), + out_videopilot.astype(np.float32), ], names=[ "demod", @@ -766,7 +766,7 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): ) else: video_out = np.rec.array( - [out_video, demod, out_video05, out_videoburst], + [out_video.astype(np.float32), demod.astype(np.float32), out_video05.astype(np.float32), out_videoburst.astype(np.float32)], names=["demod", "demod_raw", "demod_05", "demod_burst"], ) @@ -2676,13 +2676,13 @@ def dropout_detect_demod(self): valid_min[int(f.linelocs[l]):int(f.linelocs[l]) + hsync_len] = sync_min valid_min05[int(f.linelocs[l]):int(f.linelocs[l]) + hsync_len] = sync_min_05 - iserr2 = f.data["video"]["demod"] < valid_min - iserr2 |= f.data["video"]["demod"] > valid_max + n_orlt(iserr1, f.data["video"]["demod"], valid_min) + n_orgt(iserr1, f.data["video"]["demod"], valid_max) - iserr3 = f.data["video"]["demod_05"] < valid_min05 - iserr3 |= f.data["video"]["demod_05"] > valid_max05 + n_orlt(iserr1, f.data["video"]["demod_05"], valid_min05) + n_orgt(iserr1, f.data["video"]["demod_05"], valid_max05) - iserr = iserr1 | iserr2 | iserr3 | iserr_rf + iserr = iserr1 | iserr_rf # filter out dropouts outside actual field iserr[:int(f.linelocs[f.lineoffset + 1])] = False diff --git a/lddecode/utils.py b/lddecode/utils.py index 0d6847c9b..521ab9aef 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -642,7 +642,7 @@ def sqsum(cmplx): return np.sqrt((cmplx.real ** 2) + (cmplx.imag ** 2)) -@njit(cache=True) +@njit(cache=True,nogil=True) def calczc_findfirst(data, target, rising): if rising: for i in range(0, len(data)): @@ -658,7 +658,7 @@ def calczc_findfirst(data, target, rising): return None -@njit(cache=True) +@njit(cache=True,nogil=True) def calczc_do(data, _start_offset, target, edge=0, count=10): start_offset = max(1, int(_start_offset)) icount = int(count + 1) @@ -716,21 +716,34 @@ def build_hilbert(fft_size): return output - -def unwrap_hilbert(hilbert, freq_hz): +@njit(cache=True,nogil=True) +def unwrap_hilbert_getangles(hilbert): tangles = np.angle(hilbert) dangles = np.ediff1d(tangles, to_begin=0) - # make sure unwapping goes the right way - if dangles[0] < -pi: - dangles[0] += tau + return dangles - tdangles2 = np.unwrap(dangles) +@njit(cache=True,nogil=True) +def unwrap_hilbert_fixangles(tdangles2): # With extremely bad data, the unwrapped angles can jump. while np.min(tdangles2) < 0: tdangles2[tdangles2 < 0] += tau while np.max(tdangles2) > tau: tdangles2[tdangles2 > tau] -= tau + + return tdangles2 + +def unwrap_hilbert(hilbert, freq_hz): + dangles = unwrap_hilbert_getangles(hilbert) + + # make sure unwapping goes the right way + if dangles[0] < -pi: + dangles[0] += tau + + tdangles2 = np.unwrap(dangles) + + tdangles2 = unwrap_hilbert_fixangles(tdangles2) + return tdangles2 * (freq_hz / tau) @@ -904,6 +917,7 @@ def findpulses(sync_ref, _, high): return _to_pulses_list(pulses_starts, pulses_lengths) +@njit(cache=True, nogil=True) def findpeaks(array, low=0): array2 = array.copy() array2[np.where(array2 < low)] = 0 @@ -928,51 +942,72 @@ def LRUupdate(l, k): l.insert(0, k) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_median(m): return np.median(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_round(m): return int(np.round(m)) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_mean(m): return np.mean(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_min(m): return np.min(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_max(m): return np.max(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_abs(m): return np.abs(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_absmax(m): return np.max(np.abs(m)) -@njit(cache=True) +@njit(cache=True,nogil=True) +def nb_std(m): + return np.std(m) + + +@njit(cache=True,nogil=True) +def nb_diff(m): + return np.diff(m) + + +@njit(cache=True,nogil=True) def nb_mul(x, y): return x * y -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_where(x): return np.where(x) +@njit(cache=True,nogil=True) +def n_orgt(a, x, y): + a |= (x > y) + + +@njit(cache=True,nogil=True) +def n_orlt(a, x, y): + a |= (x < y) + + +@njit(cache=True,nogil=True) def angular_mean(x, cycle_len=1.0, zero_base=True): """ Compute the mean phase, assuming 0..1 is one phase cycle @@ -992,7 +1027,7 @@ def angular_mean(x, cycle_len=1.0, zero_base=True): return am - +@njit(cache=True) def phase_distance(x, c=0.75): """ returns the shortest path between two phases (assuming x and c are in (0..1)) """ d = (x - np.floor(x)) - c From b6e9dbdfb97e792cc2fe4e249b05b6856ca42039 Mon Sep 17 00:00:00 2001 From: Chad Date: Tue, 6 Jun 2023 06:50:31 -0700 Subject: [PATCH 02/25] background audio scaler, adjust unwrap_hilbert numba'ing --- lddecode/core.py | 39 ++++++++++++++++++++++++++++----------- lddecode/utils.py | 17 ++++++++--------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 58a2f3075..d392ebff2 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -1418,7 +1418,7 @@ def _downscale_audio_to_output( # Downscales to 16bit/44.1khz. It might be nice when analog audio is better to support 24/96, # but if we only support one output type, matching CD audio/digital sound is greatly preferable. -def downscale_audio(audio, lineinfo, rf, linecount, timeoffset=0, freq=44100): +def downscale_audio(audio, lineinfo, rf, linecount, timeoffset=0, freq=44100, rv=None): """downscale audio for output. Parameters: @@ -1456,6 +1456,10 @@ def downscale_audio(audio, lineinfo, rf, linecount, timeoffset=0, freq=44100): if failed: logger.warning("Analog audio processing error, muting samples") + if rv is not None: + rv['dsaudio'] = output16 + rv['audio_next_offset'] = arange[-1] - frametime + return output16, arange[-1] - frametime @@ -2457,6 +2461,7 @@ def computewow(self, lineinfo): return wow + #@profile def downscale( self, lineinfo=None, @@ -2474,6 +2479,20 @@ def downscale( # for video always output 263/313 lines linesout = self.outlinecount + audio_thread = None + if audio != 0 and self.rf.decode_analog_audio: + audio_rv = {} + audio_thread = threading.Thread(target=downscale_audio, args=( + self.data["audio"], + lineinfo, + self.rf, + self.linecount, + self.audio_offset, + audio, + audio_rv) + ) + audio_thread.start() + dsout = np.zeros((linesout * outwidth), dtype=np.double) # self.lineoffset is an adjustment for 0-based lines *before* downscaling so add 1 here lineoffset = self.lineoffset + 1 @@ -2497,16 +2516,6 @@ def downscale( (l - lineoffset) * outwidth : (l + 1 - lineoffset) * outwidth ] = self.rf.DecoderParams["ire0"] - if audio != 0 and self.rf.decode_analog_audio: - self.dsaudio, self.audio_next_offset = downscale_audio( - self.data["audio"], - lineinfo, - self.rf, - self.linecount, - self.audio_offset, - freq=audio - ) - if self.rf.decode_digital_audio: self.efmout = self.data["efm"][ int(self.linelocs[1]) : int(self.linelocs[self.linecount + 1]) @@ -2518,6 +2527,11 @@ def downscale( dsout = self.hz_to_output(dsout) self.dspicture = dsout + if audio_thread: + audio_thread.join() + self.dsaudio = audio_rv["dsaudio"] + self.audio_next_offset = audio_rv["audio_next_offset"] + return dsout, self.dsaudio, self.efmout def rf_tbc(self, linelocs=None): @@ -3549,6 +3563,7 @@ def AC3filter(self, rftbc): blk = self.AC3Collector.get_block() + #@profile def writeout(self, dataset): f, fi, picture, audio, efm = dataset if self.digital_audio is True: @@ -3581,6 +3596,7 @@ def writeout(self, dataset): if audio is not None and self.outfile_audio is not None: self.outfile_audio.write(audio) + #@profile def decodefield(self, initphase=False, redo=False): """ returns field object if valid, and the offset to the next decode """ self.readloc = int(self.fdoffset - self.rf.blockcut) @@ -3638,6 +3654,7 @@ def decodefield(self, initphase=False, redo=False): return f, f.nextfieldoffset - (self.readloc - self.rawdecode["startloc"]) + #@profile def readfield(self, initphase=False): # pretty much a retry-ing wrapper around decodefield with MTF checking self.prevfield = self.curfield diff --git a/lddecode/utils.py b/lddecode/utils.py index 521ab9aef..7fd19b079 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -721,30 +721,29 @@ def unwrap_hilbert_getangles(hilbert): tangles = np.angle(hilbert) dangles = np.ediff1d(tangles, to_begin=0) + # make sure unwapping goes the right way + if dangles[0] < -pi: + dangles[0] += tau + return dangles @njit(cache=True,nogil=True) -def unwrap_hilbert_fixangles(tdangles2): +def unwrap_hilbert_fixangles(tdangles2, freq_hz): # With extremely bad data, the unwrapped angles can jump. while np.min(tdangles2) < 0: tdangles2[tdangles2 < 0] += tau while np.max(tdangles2) > tau: tdangles2[tdangles2 > tau] -= tau - return tdangles2 + return tdangles2 * (freq_hz / tau) def unwrap_hilbert(hilbert, freq_hz): dangles = unwrap_hilbert_getangles(hilbert) - # make sure unwapping goes the right way - if dangles[0] < -pi: - dangles[0] += tau - + # This can't be run with numba tdangles2 = np.unwrap(dangles) - tdangles2 = unwrap_hilbert_fixangles(tdangles2) - - return tdangles2 * (freq_hz / tau) + return unwrap_hilbert_fixangles(tdangles2, freq_hz) #* (freq_hz / tau) def fft_determine_slices(center, min_bandwidth, freq_hz, bins_in): From 9eb73acc3a188dc4d9f484b84a72dd9b31752eb1 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Tue, 6 Jun 2023 07:51:49 -0700 Subject: [PATCH 03/25] break out array cases of hz_to_output to numba --- lddecode/core.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lddecode/core.py b/lddecode/core.py index d392ebff2..3b5daaa9e 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -44,7 +44,7 @@ from .utils import findpeaks, findpulses, calczc, inrange, roundfloat from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance from .utils import build_hilbert, unwrap_hilbert, emphasis_iir, filtfft -from .utils import fft_do_slice, fft_determine_slices, StridedCollector +from .utils import fft_do_slice, fft_determine_slices, StridedCollector, hz_to_output_array try: # If Anaconda's numpy is installed, mkl will use all threads for fft etc @@ -1587,7 +1587,18 @@ def usectooutpx(self, x): def outpxtousec(self, x): return x / self.rf.SysParams["outfreq"] + #@profile def hz_to_output(self, input): + if type(input) == np.ndarray: + return hz_to_output_array( + input, + self.rf.DecoderParams["ire0"], + self.rf.DecoderParams["hz_ire"], + self.rf.SysParams["outputZero"], + self.rf.DecoderParams["vsync_ire"], + self.out_scale, + ) + reduced = (input - self.rf.DecoderParams["ire0"]) / self.rf.DecoderParams["hz_ire"] reduced -= self.rf.DecoderParams["vsync_ire"] @@ -2182,6 +2193,7 @@ def getpulses(self): return findpulses(self.data["video"]["demod_05"], pulse_hz_min, pulse_hz_max) + #@profile def compute_linelocs(self): self.rawpulses = self.getpulses() From 106162dc85a17d24232600825b938487086715e7 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Tue, 6 Jun 2023 07:51:52 -0700 Subject: [PATCH 04/25] break out array cases of hz_to_output to numba --- lddecode/utils.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lddecode/utils.py b/lddecode/utils.py index 7fd19b079..de7d3dc39 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -834,6 +834,17 @@ def roundfloat(fl, places=3): return np.round(fl * r) / r +@njit(nogil=True, cache=True) +def hz_to_output_array(input, ire0, hz_ire, outputZero, vsync_ire, out_scale): + reduced = (input - ire0) / hz_ire + reduced -= vsync_ire + + return ( + np.clip((reduced * out_scale) + outputZero, 0, 65535) + 0.5 + ).astype(np.uint16) + + + # Something like this should be a numpy function, but I can't find it. @jit(cache=True) def findareas(array, cross): From 7f281ca559a09db6ce3b46bb213d0ccffc6a41cb Mon Sep 17 00:00:00 2001 From: Chad Page Date: Tue, 6 Jun 2023 19:13:22 -0700 Subject: [PATCH 05/25] prep work for switching to shared data --- lddecode/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lddecode/core.py b/lddecode/core.py index 3b5daaa9e..b8c9dd3cd 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -1596,7 +1596,7 @@ def hz_to_output(self, input): self.rf.DecoderParams["hz_ire"], self.rf.SysParams["outputZero"], self.rf.DecoderParams["vsync_ire"], - self.out_scale, + self.out_scale ) reduced = (input - self.rf.DecoderParams["ire0"]) / self.rf.DecoderParams["hz_ire"] From 512dbb6975362d74a61ac299354026083f6f316d Mon Sep 17 00:00:00 2001 From: Chad Date: Tue, 6 Jun 2023 22:05:21 -0700 Subject: [PATCH 06/25] a few more float32 casts --- lddecode/core.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index b8c9dd3cd..e936160b2 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -703,7 +703,7 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): rv["rfhpf"] = npfft.ifft(indata_fft * self.Filters["Frfhpf"]).real rv["rfhpf"] = rv["rfhpf"][ self.blockcut - rotdelay : -self.blockcut_end - rotdelay - ] + ].astype(np.float32) if self.system == "PAL" and self.PAL_V4300D_NotchFilter: """ This routine works around an 'interesting' issue seen with LD-V4300D players and @@ -791,7 +791,7 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): # Demodulate and restore frequency after bin slicing a1u = unwrap_hilbert(a1, afilter.a1_freq) + afilter.low_freq - stage1_out.append(a1u) + stage1_out.append(a1u.astype(np.float32)) audio_out = np.rec.array( [stage1_out[0], stage1_out[1]], names=["audio_left", "audio_right"] @@ -1234,7 +1234,6 @@ def dequeue(self): self.rf.blockcut : -self.rf.blockcut_end ] - def read(self, begin, length, MTF=0, getraw = False, forceredo=False): # transpose the cache by key, not block # t = {"input": [], "fft": [], "video": [], "audio": [], "efm": [], "rfhpf": []} From a02557d9c47860bfc400c5e2720e905e5b197d1b Mon Sep 17 00:00:00 2001 From: Chad Page Date: Wed, 7 Jun 2023 07:31:08 -0700 Subject: [PATCH 07/25] threads for everyone, not just Windows users --- lddecode/core.py | 62 +++++++++++++----------------------------------- lddecode/main.py | 2 +- 2 files changed, 17 insertions(+), 47 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index e936160b2..b0b28d330 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -6,12 +6,7 @@ import time import types -from multiprocessing import Queue, JoinableQueue, Pipe - -if platform.system() == "Darwin": - from multiprocessing import set_start_method -if platform.system() != "Windows": - from multiprocessing import Process +from queue import Queue # standard numeric/scientific libraries import numpy as np @@ -20,20 +15,8 @@ import numba from numba import njit -# Use PyFFTW's faster FFT implementation if available -if platform.system != "Windows": - try: - import pyfftw.interfaces.numpy_fft as npfft - import pyfftw.interfaces - - pyfftw.interfaces.cache.enable() - pyfftw.interfaces.cache.set_keepalive_time(10) - except ImportError: - import numpy.fft as npfft -else: - # Don't use pyfftw on windows as we have to use Thread and that causes - # issues if also using pyfftw. - import numpy.fft as npfft +# Use standard numpy fft, since it's thread-safe +import numpy.fft as npfft # internal libraries @@ -1007,39 +990,29 @@ def __init__( self.lock = threading.Lock() self.blocks = {} - if platform.system() == "Darwin": - set_start_method("fork") - - # Workaround to make it work on windows. - # Using Process gives a "io.BufferedReader can't be pickled error". - # Using Thread may be a bit slower due to how python threads work, - # so ideally we would want to find a better way to do this later. - thread_type = Process if platform.system() != "Windows" else threading.Thread - self.block_status = {} - self.q_in = JoinableQueue() + self.q_in = Queue() self.q_out = Queue() self.threadpipes = [] self.threads = [] + self.request = 0 + self.ended = False + + self.deqeue_thread = threading.Thread(target=self.dequeue, daemon=True) num_worker_threads = max(num_worker_threads - 1, 1) for i in range(num_worker_threads): - self.threadpipes.append(Pipe()) - t = thread_type( - target=self.worker, daemon=True, args=(self.threadpipes[-1][1],) + t = threading.Thread( + target=self.worker, daemon=True, args=() ) t.start() self.threads.append(t) - self.deqeue_thread = threading.Thread(target=self.dequeue, daemon=True) self.deqeue_thread.start() - self.request = 0 - self.ended = False - def end(self): if not self.ended: # stop workers @@ -1112,14 +1085,9 @@ def apply_newparams(self, newparams): self.rf.computefilters() - def worker(self, pipein): + def worker(self): while True: - ispiped = False - if pipein.poll(): - item = pipein.recv() - ispiped = True - else: - item = self.q_in.get() + item = self.q_in.get() if item is None or item[0] == "END": return @@ -1145,12 +1113,13 @@ def worker(self, pipein): output["MTF"] = target_MTF output["request"] = request + # print(blocknum, output) self.q_out.put((blocknum, output)) elif item[0] == "NEWPARAMS": self.apply_newparams(item[1]) - if not ispiped: - self.q_in.task_done() +# if not ispiped: +# self.q_in.task_done() def doread(self, blocknums, MTF, redo=False, prefetch=False): need_blocks = [] @@ -1511,6 +1480,7 @@ def __init__( def process(self): self.linelocs1, self.linebad, self.nextfieldoffset = self.compute_linelocs() + #print(self.readloc, self.linelocs1, self.nextfieldoffset) if self.linelocs1 is None: if self.nextfieldoffset is None: self.nextfieldoffset = self.rf.linelen * 200 diff --git a/lddecode/main.py b/lddecode/main.py index be18f45e5..cc0329957 100644 --- a/lddecode/main.py +++ b/lddecode/main.py @@ -208,7 +208,7 @@ def main(args=None): "--threads", metavar="threads", type=int, - default=5, + default=4, help="number of CPU threads to use", ) From 77e86572a40208b7fbcab3a87ab938828fbdbc35 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Wed, 7 Jun 2023 20:40:40 -0700 Subject: [PATCH 08/25] some tweaks to compute_line_bursts --- lddecode/core.py | 12 ++++++------ lddecode/utils.py | 14 ++++++++------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index b0b28d330..707499120 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -2793,7 +2793,7 @@ def dropout_detect(self): return rv_lines, rv_starts, rv_ends - + @profile def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): line = _line + self.lineoffset # calczc works from integers, so get the start and remainder @@ -2837,20 +2837,20 @@ def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): phase_offset = [] # this subroutine is in utils.py, broken out so it can be JIT'd - bursts = clb_findbursts( + risings, zcs = clb_findbursts( burstarea, 0, len(burstarea) - 1, threshold ) - if len(bursts) == 0: + if len(zcs) == 0: return None, None - for prevalue, zc in bursts: + for rising, zc in zip(risings, zcs): zc_cycle = ((bstart + zc - s_rem) / zcburstdiv) + phase_adjust zc_round = nb_round(zc_cycle) phase_offset.append(zc_round - zc_cycle) - if prevalue < 0: + if rising: rising_count += not (zc_round % 2) else: rising_count += zc_round % 2 @@ -2858,7 +2858,7 @@ def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): phase_adjust += nb_median(np.array(phase_offset)) passcount += 1 - rising = (rising_count / len(bursts)) > 0.5 + rising = (rising_count / len(zcs)) > 0.5 return rising, -phase_adjust diff --git a/lddecode/utils.py b/lddecode/utils.py index de7d3dc39..bd93c687e 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1072,24 +1072,26 @@ def dsa_rescale_and_clip(infloat): # removed so that they can be JIT'd -@njit(cache=True) +@njit(cache=True,nogil=True) def clb_findbursts(burstarea, i, endburstarea, threshold): - out = [] + rising = [] + zcs = [] j = i while j < endburstarea: if np.abs(burstarea[j]) > threshold: - pre = burstarea[j] + rising.append(burstarea[j] < 0) zc = calczc_do(burstarea, j, 0) if zc is not None: - out.append((burstarea[j], zc)) + zcs.append(zc) j = int(zc) + 1 else: - return out + return rising, zcs else: j += 1 - return out + return rising, zcs + @njit(cache=True) From 0a32d0741b9e6951cb2928f597230473c22e7928 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Wed, 7 Jun 2023 21:10:05 -0700 Subject: [PATCH 09/25] a little bit more micro-optimization --- lddecode/core.py | 18 +++++++++--------- lddecode/utils.py | 10 ++++------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 707499120..66c34fdff 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -1599,7 +1599,7 @@ def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=Fal else self.rf.SysParams["outlinelen"] ) - return slice(int(np.round(_begin)), int(np.round(_begin + _length))) + return slice(nb_round(_begin), nb_round(_begin + _length)) def get_timings(self): pulses = self.rawpulses @@ -1888,7 +1888,7 @@ def processVBlank(self, validpulses, start, limit=None): dist = (firstloc - loc_presync) / self.inlinelen # get the integer rounded X * .5H distance. then invert to determine # the half-H alignment with the sync/blank pulses - hdist = int(np.round(dist * 2)) + hdist = nb_round(dist * 2) # isfirstfield = not ((hdist % 2) == self.rf.SysParams['firstField1H'][0]) isfirstfield = (hdist % 2) == (self.rf.SysParams["firstFieldH"][1] != 1) @@ -2032,7 +2032,7 @@ def getLine0(self, validpulses): meanlinelen * self.rf.SysParams["field_lines"][0 if isFirstField_next else 1] ) - line0loc_next = int(np.round(self.vblank_next - fieldlen)) + line0loc_next = nb_round(self.vblank_next - fieldlen) if line0loc_next < 0: self.sync_confidence = 10 @@ -2742,7 +2742,7 @@ def dropout_errlist_to_tbc(field, errlist): end_linepos = end_rf_linepos / ( field.linelocs[line + 1] - field.linelocs[line] ) - end_linepos = int(np.round(end_linepos * field.outlinelen)) + end_linepos = nb_round(end_linepos * field.outlinelen) first_line = line + 1 + lineoffset @@ -2793,7 +2793,7 @@ def dropout_detect(self): return rv_lines, rv_starts, rv_ends - @profile + #@profile def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): line = _line + self.lineoffset # calczc works from integers, so get the start and remainder @@ -2837,14 +2837,14 @@ def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): phase_offset = [] # this subroutine is in utils.py, broken out so it can be JIT'd - risings, zcs = clb_findbursts( + bursts = clb_findbursts( burstarea, 0, len(burstarea) - 1, threshold ) - if len(zcs) == 0: + if len(bursts) == 0: return None, None - for rising, zc in zip(risings, zcs): + for rising, zc in bursts: zc_cycle = ((bstart + zc - s_rem) / zcburstdiv) + phase_adjust zc_round = nb_round(zc_cycle) @@ -2858,7 +2858,7 @@ def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): phase_adjust += nb_median(np.array(phase_offset)) passcount += 1 - rising = (rising_count / len(zcs)) > 0.5 + rising = (rising_count / len(bursts)) > 0.5 return rising, -phase_adjust diff --git a/lddecode/utils.py b/lddecode/utils.py index bd93c687e..d109de3c2 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1074,23 +1074,21 @@ def dsa_rescale_and_clip(infloat): @njit(cache=True,nogil=True) def clb_findbursts(burstarea, i, endburstarea, threshold): - rising = [] - zcs = [] + out = [] j = i while j < endburstarea: if np.abs(burstarea[j]) > threshold: - rising.append(burstarea[j] < 0) zc = calczc_do(burstarea, j, 0) if zc is not None: - zcs.append(zc) + out.append((burstarea[j] < 0, zc)) j = int(zc) + 1 else: - return rising, zcs + return out else: j += 1 - return rising, zcs + return out From cea57cb25ce5a0e12d0081cb7b09cb513fd222ff Mon Sep 17 00:00:00 2001 From: Chad Page Date: Thu, 8 Jun 2023 05:46:37 -0700 Subject: [PATCH 10/25] finally, a faster compute_line_bursts... but the actual speedup is going to be elsewhere --- lddecode/core.py | 37 +++++++++++++------------------------ lddecode/utils.py | 21 ++++++++++++--------- 2 files changed, 25 insertions(+), 33 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 66c34fdff..cf3b6f816 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -2793,7 +2793,7 @@ def dropout_detect(self): return rv_lines, rv_starts, rv_ends - #@profile + def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): line = _line + self.lineoffset # calczc works from integers, so get the start and remainder @@ -2829,36 +2829,25 @@ def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): # Apply phase adjustment from previous frame/line if available. phase_adjust = -prev_phaseadjust + # a proper color burst should have ~12-13 zero crossings + isrising = np.zeros(16, dtype=np.bool_) + zcs = np.zeros(16, dtype=np.float32) + # The first pass computes phase_offset, the second uses it to determine # the colo(u)r burst phase of the line. - passcount = 0 - while passcount < 2: - rising_count = 0 - phase_offset = [] - + for passcount in range(2): # this subroutine is in utils.py, broken out so it can be JIT'd - bursts = clb_findbursts( - burstarea, 0, len(burstarea) - 1, threshold - ) + zc_count = clb_findbursts(isrising, zcs, burstarea, 0, len(burstarea) - 1, threshold) - if len(bursts) == 0: + if zc_count == 0: return None, None - for rising, zc in bursts: - zc_cycle = ((bstart + zc - s_rem) / zcburstdiv) + phase_adjust - zc_round = nb_round(zc_cycle) - - phase_offset.append(zc_round - zc_cycle) - - if rising: - rising_count += not (zc_round % 2) - else: - rising_count += zc_round % 2 - - phase_adjust += nb_median(np.array(phase_offset)) - passcount += 1 + zc_cycles = ((bstart + zcs - s_rem) / zcburstdiv) + phase_adjust + zc_rounds = np.round(zc_cycles) + phase_adjust += nb_median(zc_rounds - zc_cycles) - rising = (rising_count / len(bursts)) > 0.5 + rising_count = np.sum(np.bitwise_xor(isrising, (zc_rounds % 2) != 0)) + rising = rising_count > (zc_count / 2) return rising, -phase_adjust diff --git a/lddecode/utils.py b/lddecode/utils.py index d109de3c2..341ab2253 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1073,24 +1073,27 @@ def dsa_rescale_and_clip(infloat): @njit(cache=True,nogil=True) -def clb_findbursts(burstarea, i, endburstarea, threshold): - out = [] - +def clb_findbursts(isrising, zcs, burstarea, i, endburstarea, threshold): + zc_count = 0 j = i - while j < endburstarea: + + isrising.fill(False) + zcs.fill(0) + + while j < endburstarea and zc_count < len(zcs): if np.abs(burstarea[j]) > threshold: zc = calczc_do(burstarea, j, 0) if zc is not None: - out.append((burstarea[j] < 0, zc)) + isrising[zc_count] = burstarea[j] < 0 + zcs[zc_count] = zc + zc_count += 1 j = int(zc) + 1 else: - return out + return zc_count else: j += 1 - return out - - + return zc_count @njit(cache=True) def distance_from_round(x): From 5ff50de7038fd0a3a4cd42d3796437b21101452f Mon Sep 17 00:00:00 2001 From: Chad Page Date: Thu, 8 Jun 2023 06:18:56 -0700 Subject: [PATCH 11/25] numbafied more, and *that* worked --- lddecode/core.py | 12 ++---------- lddecode/utils.py | 12 +++++++++--- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index cf3b6f816..63f61fe25 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -1231,7 +1231,7 @@ def read(self, begin, length, MTF=0, getraw = False, forceredo=False): return rv while need_blocks is not None and len(need_blocks): - time.sleep(0.001) # A crude busy loop + time.sleep(0.0005) # A crude busy loop need_blocks = self.doread(toread, MTF) if need_blocks is None: @@ -2837,16 +2837,8 @@ def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): # the colo(u)r burst phase of the line. for passcount in range(2): # this subroutine is in utils.py, broken out so it can be JIT'd - zc_count = clb_findbursts(isrising, zcs, burstarea, 0, len(burstarea) - 1, threshold) + zc_count, phase_adjust, rising_count = clb_findbursts(isrising, zcs, burstarea, 0, len(burstarea) - 1, threshold, bstart, s_rem, zcburstdiv, phase_adjust) - if zc_count == 0: - return None, None - - zc_cycles = ((bstart + zcs - s_rem) / zcburstdiv) + phase_adjust - zc_rounds = np.round(zc_cycles) - phase_adjust += nb_median(zc_rounds - zc_cycles) - - rising_count = np.sum(np.bitwise_xor(isrising, (zc_rounds % 2) != 0)) rising = rising_count > (zc_count / 2) return rising, -phase_adjust diff --git a/lddecode/utils.py b/lddecode/utils.py index 341ab2253..d666b203d 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1073,7 +1073,7 @@ def dsa_rescale_and_clip(infloat): @njit(cache=True,nogil=True) -def clb_findbursts(isrising, zcs, burstarea, i, endburstarea, threshold): +def clb_findbursts(isrising, zcs, burstarea, i, endburstarea, threshold, bstart, s_rem, zcburstdiv, phase_adjust): zc_count = 0 j = i @@ -1089,11 +1089,17 @@ def clb_findbursts(isrising, zcs, burstarea, i, endburstarea, threshold): zc_count += 1 j = int(zc) + 1 else: - return zc_count + break else: j += 1 - return zc_count + if zc_count: + zc_cycles = ((bstart + zcs - s_rem) / zcburstdiv) + phase_adjust + zc_rounds = (zc_cycles + .5).astype(np.int32) + phase_adjust += nb_median(zc_rounds - zc_cycles) + rising_count = np.sum(np.bitwise_xor(isrising, (zc_rounds % 2) != 0)) + + return zc_count, phase_adjust, rising_count @njit(cache=True) def distance_from_round(x): From 4fcd0e5d03582220384655f81915c4cc377bb7f6 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Thu, 8 Jun 2023 06:19:57 -0700 Subject: [PATCH 12/25] numbafied more, and *that* worked --- lddecode/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lddecode/utils.py b/lddecode/utils.py index d666b203d..5bb70e5d2 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1075,6 +1075,7 @@ def dsa_rescale_and_clip(infloat): @njit(cache=True,nogil=True) def clb_findbursts(isrising, zcs, burstarea, i, endburstarea, threshold, bstart, s_rem, zcburstdiv, phase_adjust): zc_count = 0 + rising_count = 0 j = i isrising.fill(False) @@ -1095,6 +1096,7 @@ def clb_findbursts(isrising, zcs, burstarea, i, endburstarea, threshold, bstart, if zc_count: zc_cycles = ((bstart + zcs - s_rem) / zcburstdiv) + phase_adjust + # numba doesn't support np.round over an array, so we have to do this zc_rounds = (zc_cycles + .5).astype(np.int32) phase_adjust += nb_median(zc_rounds - zc_cycles) rising_count = np.sum(np.bitwise_xor(isrising, (zc_rounds % 2) != 0)) From 7ba6b0f78986490ba9ebab2fc856506097e5bde0 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Thu, 8 Jun 2023 20:23:36 -0700 Subject: [PATCH 13/25] improve waiting for blocks to arrive, and adjust prefetch size automatically --- lddecode/core.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 63f61fe25..0332f6b12 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -984,7 +984,10 @@ def __init__( # Cache dictionary - key is block #, which holds data for that block self.lrusize = cachesize - self.prefetch = 32 # TODO: set this to proper amount for format + + self.bytes_per_frame = int(self.rf.freq_hz / self.rf.SysParams["FPS"]) + self.prefetch = int(self.bytes_per_frame / self.blocksize) + 4 + self.lru = [] self.lock = threading.Lock() @@ -994,6 +997,8 @@ def __init__( self.q_in = Queue() self.q_out = Queue() + self.waiting = set() + self.q_out_event = threading.Event() self.threadpipes = [] self.threads = [] @@ -1163,13 +1168,17 @@ def doread(self, blocknums, MTF, redo=False, prefetch=False): if redo or not waiting: queuelist.append(b) need_blocks.append(b) - elif self.block_status[b]["waiting"]: + elif waiting: need_blocks.append(b) + + if not prefetch: + self.waiting.add(b) for b in queuelist: self.block_status[b] = {'MTF': MTF, 'waiting': True, 'request': self.request, 'prefetch': prefetch} self.q_in.put(("DEMOD", b, self.blocks[b], MTF, self.request)) + self.q_out_event.clear() return None if reached_end else need_blocks def dequeue(self): @@ -1196,7 +1205,14 @@ def dequeue(self): self.blocks[blocknum][k] = item[k] if 'demod' in item.keys(): - self.block_status[blocknum]['waiting'] = False + if self.block_status[blocknum]['waiting']: + self.block_status[blocknum]['waiting'] = False + + if blocknum in self.waiting: + self.waiting.remove(blocknum) + + if not len(self.waiting): + self.q_out_event.set() if "input" not in self.blocks[blocknum]: self.blocks[blocknum]["input"] = self.blocks[blocknum]["rawinput"][ @@ -1231,8 +1247,10 @@ def read(self, begin, length, MTF=0, getraw = False, forceredo=False): return rv while need_blocks is not None and len(need_blocks): - time.sleep(0.0005) # A crude busy loop + self.q_out_event.wait(.01) need_blocks = self.doread(toread, MTF) + if need_blocks: + self.q_out_event.clear() if need_blocks is None: # EOF From 5dab680313611a5331f209525fdfbd6fb7cc859f Mon Sep 17 00:00:00 2001 From: Chad Page Date: Thu, 8 Jun 2023 21:28:57 -0700 Subject: [PATCH 14/25] only compute median line length one --- lddecode/core.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 0332f6b12..3c131d3b1 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -28,6 +28,7 @@ from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance from .utils import build_hilbert, unwrap_hilbert, emphasis_iir, filtfft from .utils import fft_do_slice, fft_determine_slices, StridedCollector, hz_to_output_array +from .utils import Pulse try: # If Anaconda's numpy is installed, mkl will use all threads for fft etc @@ -1496,6 +1497,7 @@ def __init__( # this is eventually set to 262/263 and 312/313 for audio timing self.linecount = None + @profile def process(self): self.linelocs1, self.linebad, self.nextfieldoffset = self.compute_linelocs() #print(self.readloc, self.linelocs1, self.nextfieldoffset) @@ -1551,6 +1553,7 @@ def usectoinpx(self, x, line=None): def inpxtousec(self, x, line=None): return x / self.get_linefreq(line) + @profile def lineslice(self, l, begin=None, length=None, linelocs=None, begin_offset=0): """ return a slice corresponding with pre-TBC line l, begin+length are uSecs """ @@ -1601,6 +1604,7 @@ def output_to_ire(self, output): (output - self.rf.SysParams["outputZero"]) / self.out_scale ) + self.rf.DecoderParams["vsync_ire"] + @profile def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=False): """ return a slice corresponding with pre-TBC line l """ @@ -1619,6 +1623,7 @@ def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=Fal return slice(nb_round(_begin), nb_round(_begin + _length)) + @profile def get_timings(self): pulses = self.rawpulses hsync_typical = self.usectoinpx(self.rf.SysParams["hsyncPulseUS"]) @@ -1670,8 +1675,7 @@ def get_timings(self): return LT - def pulse_qualitycheck(self, prevpulse, pulse): - + def pulse_qualitycheck(self, prevpulse: Pulse, pulse: Pulse): if prevpulse[0] > 0 and pulse[0] > 0: exprange = (0.4, 0.6) elif prevpulse[0] == 0 and pulse[0] == 0: @@ -1684,6 +1688,7 @@ def pulse_qualitycheck(self, prevpulse, pulse): return inorder + @profile def run_vblank_state_machine(self, pulses, LT): """ Determines if a pulse set is a valid vblank by running a state machine """ @@ -1774,6 +1779,7 @@ def run_vblank_state_machine(self, pulses, LT): return done, validpulses + @profile def refinepulses(self): LT = self.get_timings() @@ -1813,6 +1819,7 @@ def refinepulses(self): return valid_pulses + @profile def getBlankRange(self, validpulses, start=0): vp_type = np.array([p[0] for p in validpulses]) @@ -1953,11 +1960,12 @@ def processVBlank(self, validpulses, start, limit=None): return None, None, None, 0 + @profile def computeLineLen(self, validpulses): # determine longest run of 0's longrun = [-1, -1] currun = None - for i, v in enumerate([p[0] for p in self.validpulses]): + for i, v in enumerate([p[0] for p in validpulses]): if v != 0: if currun is not None and currun[1] > longrun[1]: longrun = currun @@ -1972,10 +1980,10 @@ def computeLineLen(self, validpulses): linelens = [] for i in range(longrun[0] + 1, longrun[0] + longrun[1]): - linelen = self.validpulses[i][1].start - self.validpulses[i - 1][1].start + linelen = validpulses[i][1].start - validpulses[i - 1][1].start if inrange(linelen / self.inlinelen, 0.95, 1.05): linelens.append( - self.validpulses[i][1].start - self.validpulses[i - 1][1].start + validpulses[i][1].start - validpulses[i - 1][1].start ) if len(linelens) > 0: @@ -1983,6 +1991,7 @@ def computeLineLen(self, validpulses): else: return self.inlinelen + def skip_check(self): """ This routine checks to see if there's a (probable) VSYNC at the end. Returns a (currently rough) probability. @@ -2017,7 +2026,8 @@ def skip_check(self): # pull the above together into a routine that (should) find line 0, the last line of # the previous field. - def getLine0(self, validpulses): + @profile + def getLine0(self, validpulses, meanlinelen): # Gather the local line 0 location and projected from the previous field self.sync_confidence = 100 @@ -2045,7 +2055,6 @@ def getLine0(self, validpulses): if self.vblank_next is not None: isFirstField_next = not isNotFirstField_next - meanlinelen = self.computeLineLen(validpulses) fieldlen = ( meanlinelen * self.rf.SysParams["field_lines"][0 if isFirstField_next else 1] @@ -2180,7 +2189,7 @@ def getpulses(self): return findpulses(self.data["video"]["demod_05"], pulse_hz_min, pulse_hz_max) - #@profile + @profile def compute_linelocs(self): self.rawpulses = self.getpulses() @@ -2194,8 +2203,8 @@ def compute_linelocs(self): self.validpulses = validpulses = self.refinepulses() - - line0loc, lastlineloc, self.isFirstField = self.getLine0(validpulses) + meanlinelen = self.computeLineLen(validpulses) + line0loc, lastlineloc, self.isFirstField = self.getLine0(validpulses, meanlinelen) self.linecount = 263 if self.isFirstField else 262 # Number of lines to actually process. This is set so that the entire following @@ -2220,8 +2229,6 @@ def compute_linelocs(self): return None, None, self.inlinelen * 200 - meanlinelen = self.computeLineLen(validpulses) - # If we don't have enough data at the end, move onto the next field lastline = (self.rawpulses[-1].start - line0loc) / meanlinelen if lastline < proclines: @@ -2341,6 +2348,7 @@ def compute_linelocs(self): return rv_ll, rv_err, nextfield + @profile def refine_linelocs_hsync(self): linelocs2 = self.linelocs1.copy() From 4b284c48de405c515dff6e8dea9e4cc6d803a3e7 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Sat, 10 Jun 2023 06:46:08 -0700 Subject: [PATCH 15/25] use nb_std to save .005 second, provide fallback @profile decorator --- lddecode/core.py | 41 ++++++++++++++++++++++++-------------- lddecode/utils.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 15 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 3c131d3b1..e6532ef53 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -28,7 +28,7 @@ from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance from .utils import build_hilbert, unwrap_hilbert, emphasis_iir, filtfft from .utils import fft_do_slice, fft_determine_slices, StridedCollector, hz_to_output_array -from .utils import Pulse +from .utils import Pulse, nb_std, nb_gt try: # If Anaconda's numpy is installed, mkl will use all threads for fft etc @@ -44,6 +44,12 @@ # and ld-decode. Probably should just bring all logging in here... logger = None +try: + profile +except: + def profile(fn): + return fn + def calclinelen(SP, mult, mhz): if type(mhz) == str: @@ -1497,7 +1503,7 @@ def __init__( # this is eventually set to 262/263 and 312/313 for audio timing self.linecount = None - @profile + #@profile def process(self): self.linelocs1, self.linebad, self.nextfieldoffset = self.compute_linelocs() #print(self.readloc, self.linelocs1, self.nextfieldoffset) @@ -1553,7 +1559,6 @@ def usectoinpx(self, x, line=None): def inpxtousec(self, x, line=None): return x / self.get_linefreq(line) - @profile def lineslice(self, l, begin=None, length=None, linelocs=None, begin_offset=0): """ return a slice corresponding with pre-TBC line l, begin+length are uSecs """ @@ -1604,7 +1609,7 @@ def output_to_ire(self, output): (output - self.rf.SysParams["outputZero"]) / self.out_scale ) + self.rf.DecoderParams["vsync_ire"] - @profile + def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=False): """ return a slice corresponding with pre-TBC line l """ @@ -1623,7 +1628,7 @@ def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=Fal return slice(nb_round(_begin), nb_round(_begin + _length)) - @profile + def get_timings(self): pulses = self.rawpulses hsync_typical = self.usectoinpx(self.rf.SysParams["hsyncPulseUS"]) @@ -1688,7 +1693,7 @@ def pulse_qualitycheck(self, prevpulse: Pulse, pulse: Pulse): return inorder - @profile + #@profile def run_vblank_state_machine(self, pulses, LT): """ Determines if a pulse set is a valid vblank by running a state machine """ @@ -1779,7 +1784,7 @@ def run_vblank_state_machine(self, pulses, LT): return done, validpulses - @profile + #@profile def refinepulses(self): LT = self.get_timings() @@ -1819,7 +1824,7 @@ def refinepulses(self): return valid_pulses - @profile + #@profile def getBlankRange(self, validpulses, start=0): vp_type = np.array([p[0] for p in validpulses]) @@ -1960,7 +1965,7 @@ def processVBlank(self, validpulses, start, limit=None): return None, None, None, 0 - @profile + #@profile def computeLineLen(self, validpulses): # determine longest run of 0's longrun = [-1, -1] @@ -2026,7 +2031,7 @@ def skip_check(self): # pull the above together into a routine that (should) find line 0, the last line of # the previous field. - @profile + #@profile def getLine0(self, validpulses, meanlinelen): # Gather the local line 0 location and projected from the previous field @@ -2189,7 +2194,7 @@ def getpulses(self): return findpulses(self.data["video"]["demod_05"], pulse_hz_min, pulse_hz_max) - @profile + #@profile def compute_linelocs(self): self.rawpulses = self.getpulses() @@ -2348,7 +2353,7 @@ def compute_linelocs(self): return rv_ll, rv_err, nextfield - @profile + #@profile def refine_linelocs_hsync(self): linelocs2 = self.linelocs1.copy() @@ -2541,6 +2546,7 @@ def downscale( return dsout, self.dsaudio, self.efmout + @profile def rf_tbc(self, linelocs=None): """ This outputs a TBC'd version of the input RF data, mostly intended to assist in audio processing. Outputs a uint16 array. @@ -2645,13 +2651,14 @@ def get_vsync_lines(self): return rv + @profile def dropout_detect_demod(self): # current field f = self isPAL = self.rf.system == "PAL" - rfstd = np.std(f.data["rfhpf"]) + rfstd = nb_std(f.data["rfhpf"]) # iserr_rf = np.full(len(f.data['video']['demod']), False, dtype=np.bool) iserr_rf1 = (f.data["rfhpf"] < (-rfstd * 3)) | ( f.data["rfhpf"] > (rfstd * 3) @@ -2663,6 +2670,7 @@ def dropout_detect_demod(self): # detect absurd fluctuations in pre-deemp demod, since only dropouts can cause them # (current np.diff has a prepend option, but not in ubuntu 18.04's version) + #iserr1 = nb_gt(f.data["video"]["demod_raw"], self.rf.freq_hz_half) iserr1 = f.data["video"]["demod_raw"] > self.rf.freq_hz_half # This didn't work right for PAL (issue #471) # iserr1 |= f.data['video']['demod_hpf'] > 3000000 @@ -2713,6 +2721,7 @@ def dropout_detect_demod(self): return iserr + @profile def build_errlist(self, errmap): errlist = [] @@ -2733,6 +2742,7 @@ def build_errlist(self, errmap): return errlist + @profile def dropout_errlist_to_tbc(field, errlist): """Convert data from raw data coordinates to tbc coordinates, and splits up multi-line dropouts. @@ -2799,6 +2809,7 @@ def dropout_errlist_to_tbc(field, errlist): return dropouts + @profile def dropout_detect(self): """ returns dropouts in three arrays, to line up with the JSON output """ @@ -3584,7 +3595,7 @@ def writeout(self, dataset): if audio is not None and self.outfile_audio is not None: self.outfile_audio.write(audio) - #@profile + @profile def decodefield(self, initphase=False, redo=False): """ returns field object if valid, and the offset to the next decode """ self.readloc = int(self.fdoffset - self.rf.blockcut) @@ -3642,7 +3653,7 @@ def decodefield(self, initphase=False, redo=False): return f, f.nextfieldoffset - (self.readloc - self.rawdecode["startloc"]) - #@profile + @profile def readfield(self, initphase=False): # pretty much a retry-ing wrapper around decodefield with MTF checking self.prevfield = self.curfield diff --git a/lddecode/utils.py b/lddecode/utils.py index 5bb70e5d2..238b73361 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -926,6 +926,51 @@ def findpulses(sync_ref, _, high): ) return _to_pulses_list(pulses_starts, pulses_lengths) +if False: + @njit(cache=True,nogil=True) + def numba_pulse_qualitycheck(prevpulse: Pulse, pulse: Pulse, inlinelen: int): + + if prevpulse[0] > 0 and pulse[0] > 0: + exprange = (0.4, 0.6) + elif prevpulse[0] == 0 and pulse[0] == 0: + exprange = (0.9, 1.1) + else: # transition to/from regular hsyncs can be .5 or 1H + exprange = (0.4, 1.1) + + linelen = (pulse[1].start - prevpulse[1].start) / inlinelen + inorder = inrange(linelen, *exprange) + + return inorder + + @njit(cache=True,nogil=True) + def numba_computeLineLen(validpulses, inlinelen): + # determine longest run of 0's + longrun = [-1, -1] + currun = None + for i, v in enumerate([p[0] for p in validpulses]): + if v != 0: + if currun is not None and currun[1] > longrun[1]: + longrun = currun + currun = None + elif currun is None: + currun = [i, 0] + else: + currun[1] += 1 + + if currun is not None and currun[1] > longrun[1]: + longrun = currun + + linelens = [] + for i in range(longrun[0] + 1, longrun[0] + longrun[1]): + linelen = validpulses[i][1].start - validpulses[i - 1][1].start + if inrange(linelen / inlinelen, 0.95, 1.05): + linelens.append( + validpulses[i][1].start - validpulses[i - 1][1].start + ) + + # Can't prepare the mean here since numba np.mean doesn't work over lists + return linelens + @njit(cache=True, nogil=True) def findpeaks(array, low=0): @@ -1007,6 +1052,11 @@ def nb_where(x): return np.where(x) +@njit(cache=True,nogil=True) +def nb_gt(x, y): + return (x > y) + + @njit(cache=True,nogil=True) def n_orgt(a, x, y): a |= (x > y) From 67eb3d869059c41d71b75e991b5d0b1b5bfd9384 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Sat, 10 Jun 2023 07:17:40 -0700 Subject: [PATCH 16/25] switch to threaded demod... should be faster but isn't *yet* --- lddecode/core.py | 214 ++++++++++++++++++++++++++++++----------------- 1 file changed, 137 insertions(+), 77 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index e6532ef53..99c538c1a 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -3405,7 +3405,6 @@ def __init__( self.fname_out = fname_out self.firstfield = None # In frame output mode, the first field goes here - self.fieldloc = 0 self.system = system self.rf = RFDecode( @@ -3438,13 +3437,10 @@ def __init__( self.audio_offset = 0 self.mtf_level = 1 - self.prevfield = None - self.curfield = None + self.fieldstack = [None, None] self.doDOD = doDOD - self.badfields = None - self.fieldinfo = [] self.leadIn = False @@ -3463,6 +3459,9 @@ def __init__( self.bw_ratios = [] + self.decodethread = None + self.threadreturn = {} + def __del__(self): del self.demodcache @@ -3562,7 +3561,6 @@ def AC3filter(self, rftbc): blk = self.AC3Collector.get_block() - #@profile def writeout(self, dataset): f, fi, picture, audio, efm = dataset if self.digital_audio is True: @@ -3596,36 +3594,41 @@ def writeout(self, dataset): self.outfile_audio.write(audio) @profile - def decodefield(self, initphase=False, redo=False): + def decodefield(self, start, mtf_level, audio_offset, prevfield=None, initphase=False, redo=False, rv=None): """ returns field object if valid, and the offset to the next decode """ - self.readloc = int(self.fdoffset - self.rf.blockcut) - if self.readloc < 0: - self.readloc = 0 - self.readloc_block = self.readloc // self.blocksize - self.numblocks = (self.readlen // self.blocksize) + 2 + if rv is None: + rv = {} + + rv['field'] = None + rv['offset'] = None + + readloc = int(start - self.rf.blockcut) + if readloc < 0: + readloc = 0 - self.rawdecode = self.demodcache.read( - self.readloc_block * self.blocksize, - self.numblocks * self.blocksize, - self.mtf_level, + readloc_block = readloc // self.blocksize + numblocks = (self.readlen // self.blocksize) + 2 + + rawdecode = self.demodcache.read( + readloc_block * self.blocksize, + numblocks * self.blocksize, + mtf_level, forceredo=redo ) - if self.rawdecode is None: + if rawdecode is None: # logger.info("Failed to demodulate data") return None, None - self.indata = self.rawdecode["input"] - f = self.FieldClass( self.rf, - self.rawdecode, - audio_offset=self.audio_offset, - prevfield=self.curfield, + rawdecode, + audio_offset=audio_offset, + prevfield=prevfield, initphase=initphase, fields_written=self.fields_written, - readloc=self.rawdecode["startloc"], + readloc=rawdecode["startloc"], ) if self.use_profiler: @@ -3647,49 +3650,99 @@ def decodefield(self, initphase=False, redo=False): except Exception as e: raise e + rv['field'] = f + rv['offset'] = f.nextfieldoffset - (readloc - rawdecode["startloc"]) + if not f.valid: # logger.info("Bad data - jumping one second") - return f, f.nextfieldoffset + rv['offset'] = f.nextfieldoffset - return f, f.nextfieldoffset - (self.readloc - self.rawdecode["startloc"]) + return rv['field'], rv['offset'] @profile def readfield(self, initphase=False): - # pretty much a retry-ing wrapper around decodefield with MTF checking - self.prevfield = self.curfield done = False adjusted = False - redo = False + redo = None + + if len(self.fieldstack) >= 2: + self.fieldstack.pop(-1) while done is False: if redo: + # Drop existing thread + if self.decodethread: + self.decodethread.join() + self.decodethread = None + + # Start new thread + self.threadreturn = {} + df_args = (redo, self.mtf_level, self.audio_offset, self.fieldstack[0], initphase, redo, self.threadreturn) + + self.decodethread = threading.Thread(target=self.decodefield, args=df_args) + # .run() does not actually run this in the background + self.decodethread.run() + f, offset = self.threadreturn['field'], self.threadreturn['offset'] + self.decodethread = None + # Only allow one redo, no matter what done = True + redo = None + elif self.decodethread and self.decodethread.ident: + self.decodethread.join() + self.decodethread = None + + f, offset = self.threadreturn['field'], self.threadreturn['offset'] + #print(f, f.valid, f.readloc, offset) + else: # assume first run + f = None + offset = 0 + + if True: + # Start new thread + self.threadreturn = {} + if f and f.valid: + self.audio_offset = f.audio_next_offset + prevfield = f + toffset = self.fdoffset + offset + # XXX: this is wrong somehow + audio_offset = f.audio_next_offset + else: + prevfield = None + toffset = self.fdoffset + audio_offset = self.audio_offset - self.fieldloc = self.fdoffset - f, offset = self.decodefield(initphase=initphase, redo=redo) + if offset: + toffset += offset - if f is None: - if offset is None: - # EOF, probably - return None + df_args = (toffset, self.mtf_level, audio_offset, prevfield, initphase, False, self.threadreturn) - self.fdoffset += offset + self.decodethread = threading.Thread(target=self.decodefield, args=df_args) + #elf.decodethread.run() + self.decodethread.start() + #self.decodethread.join() + + # process previous run + if f: + self.fdoffset += offset + elif offset is None: + # Probable end, so push an empty field + self.fieldstack.insert(0, None) - if f is not None and f.valid: + if f and f.valid: picture, audio, efm = f.downscale( linesout=self.output_lines, final=True, audio=self.analog_audio ) - self.audio_offset = f.audio_next_offset - metrics = self.computeMetrics(f, None, verbose=True) if "blackToWhiteRFRatio" in metrics and adjusted is False: keep = 900 if self.isCLV else 30 self.bw_ratios.append(metrics["blackToWhiteRFRatio"]) self.bw_ratios = self.bw_ratios[-keep:] - redo = f.needrerun or not self.checkMTF(f, self.prevfield) + redo = f.needrerun or not self.checkMTF(f, self.fieldstack[0]) + if redo: + redo = self.fdoffset - offset # Perform AGC changes on first fields only to prevent luma mismatch intra-field if self.useAGC and f.isFirstField and f.sync_confidence > 80: @@ -3697,9 +3750,9 @@ def readfield(self, initphase=False): actualwhiteIRE = f.rf.hztoire(ire100_hz) - sync_ire_diff = np.abs(self.rf.hztoire(sync_hz) - self.rf.DecoderParams["vsync_ire"]) - whitediff = np.abs(self.rf.hztoire(ire100_hz) - actualwhiteIRE) - ire0_diff = np.abs(self.rf.hztoire(ire0_hz)) + sync_ire_diff = nb_abs(self.rf.hztoire(sync_hz) - self.rf.DecoderParams["vsync_ire"]) + whitediff = nb_abs(self.rf.hztoire(ire100_hz) - actualwhiteIRE) + ire0_diff = nb_abs(self.rf.hztoire(ire0_hz)) acceptable_diff = 2 if self.fields_written else 0.5 @@ -3713,31 +3766,34 @@ def readfield(self, initphase=False): len(self.fieldinfo), np.round(vsync_ire, 2) )) else: - redo = True + redo = self.fdoffset - offset self.rf.DecoderParams["ire0"] = ire0_hz # Note that vsync_ire is a negative number, so (sync_hz - ire0_hz) is correct self.rf.DecoderParams["hz_ire"] = hz_ire self.rf.DecoderParams["vsync_ire"] = vsync_ire - if adjusted is False and redo is True: + if adjusted is False and redo: self.demodcache.flush_demod() adjusted = True - self.fdoffset -= offset + self.fdoffset = redo else: done = True - else: - # Probably jumping ahead - delete the previous field so - # TBC computations aren't thrown off - if self.curfield is not None and self.badfields is None: - self.badfields = (self.curfield, f) - self.curfield = None + self.fieldstack.insert(0, f) + self.audio_offset = f.audio_next_offset - if f is None or f.valid is False: - return None + if f is None and offset is None: + # EOF, probably + return None - self.curfield = f + if self.decodethread and not self.decodethread.ident and not redo: + self.decodethread.start() + elif redo: + self.decodethread = None + if f is None or f.valid is False: + return None + if f is not None and self.fname_out is not None: # Only write a FirstField first if len(self.fieldinfo) == 0 and not f.isFirstField: @@ -3855,7 +3911,7 @@ def computeMetricsPAL(self, metrics, f, fp=None): # Unforunately this is too short to get a 50IRE RF level wl_slice = f.lineslice_tbc(13, 4.7 + 15.5, 3) metrics["greyPSNR"] = self.calcpsnr(f, wl_slice) - metrics["greyIRE"] = np.mean(f.output_to_ire(f.dspicture[wl_slice])) + metrics["greyIRE"] = nb_mean(f.output_to_ire(f.dspicture[wl_slice])) else: # There's a nice long burst at 50IRE block on field2 l13 b50slice = f.lineslice_tbc(13, 36, 20) @@ -3879,7 +3935,7 @@ def computeMetricsNTSC(self, metrics, f, fp=None): ire50_slice = f.lineslice_tbc(19, 36, 10) metrics["greyPSNR"] = self.calcpsnr(f, ire50_slice) - metrics["greyIRE"] = np.mean(f.output_to_ire(f.dspicture[ire50_slice])) + metrics["greyIRE"] = nb_mean(f.output_to_ire(f.dspicture[ire50_slice])) ire50_rawslice = f.lineslice(19, 36, 10) rawdata = f.rawdata[ @@ -3981,6 +4037,7 @@ def computeMetrics(self, f, fp=None, verbose=False): return metrics_rounded + #@profile def buildmetadata(self, f, check_phase=True): """ returns field information JSON and whether or not a backfill field is needed """ prevfi = self.fieldinfo[-1] if len(self.fieldinfo) else None @@ -3989,8 +4046,8 @@ def buildmetadata(self, f, check_phase=True): "isFirstField": True if f.isFirstField else False, "syncConf": f.compute_syncconf(), "seqNo": len(self.fieldinfo) + 1, - "diskLoc": np.round((self.fieldloc / self.bytes_per_field) * 10) / 10, - "fileLoc": int(np.floor(self.fieldloc)), + "diskLoc": np.round((f.readloc / self.bytes_per_field) * 10) / 10, + "fileLoc": int(np.floor(f.readloc)), "medianBurstIRE": roundfloat(f.burstmedian), } @@ -4036,7 +4093,7 @@ def buildmetadata(self, f, check_phase=True): return fi, True fi["decodeFaults"] = decodeFaults - fi["vitsMetrics"] = self.computeMetrics(self.curfield, self.prevfield) + fi["vitsMetrics"] = self.computeMetrics(self.fieldstack[0], self.fieldstack[1]) fi["vbi"] = {"vbiData": [int(lc) for lc in f.linecode if lc is not None]} @@ -4049,7 +4106,7 @@ def buildmetadata(self, f, check_phase=True): # process VBI frame info data self.frameNumber = self.decodeFrameNumber(self.firstfield, f) - rawloc = np.floor((self.readloc / self.bytes_per_field) / 2) + rawloc = np.floor((f.readloc / self.bytes_per_field) / 2) disk_Type = "CLV" if self.isCLV else "CAV" disk_TimeCode = None @@ -4115,11 +4172,13 @@ def seek_getframenr(self, startfield): at file location 0 """ + curfield = None + prevfield = None + self.roughseek(startfield) for fields in range(10): - self.fieldloc = self.fdoffset - f, offset = self.decodefield(initphase=True) + f, offset = self.decodefield(self.fdoffset, 0, 0) if f is None: # If given an invalid starting location (i.e. seeking to a frame in an already cut raw file), @@ -4132,27 +4191,23 @@ def seek_getframenr(self, startfield): elif not f.valid: self.fdoffset += offset else: - self.prevfield = self.curfield - self.curfield = f + prevfield = curfield + curfield = f self.fdoffset += offset # Two fields are needed to be sure to have sufficient Philips code data # to determine frame #. - if self.prevfield is not None and f.valid: - fnum = self.decodeFrameNumber(self.prevfield, self.curfield) + if prevfield is not None and f.valid: + fnum = self.decodeFrameNumber(prevfield, curfield) if self.earlyCLV: logger.error("Cannot seek in early CLV disks w/o timecode") return None, startfield elif fnum is not None: - rawloc = np.floor((self.readloc / self.bytes_per_field) / 2) + rawloc = np.floor((f.readloc / self.bytes_per_field) / 2) logger.info("seeking: file loc %d frame # %d", rawloc, fnum) - # Clear field memory on seeks - self.prevfield = None - self.curfield = None - - return fnum, startfield + return fnum, startfield, f.readloc return None, None @@ -4168,11 +4223,11 @@ def seek(self, startframe, target): curfield = startframe * 2 for retries in range(3): - fnr, curfield = self.seek_getframenr(curfield) + fnr, curfield, readloc = self.seek_getframenr(curfield) if fnr is None: return None - cur = int((self.fieldloc / self.bytes_per_field)) + cur = int((readloc / self.bytes_per_field)) if fnr == target: logger.info("Finished seek") print("Finished seeking, starting at frame", fnr, file=sys.stderr) @@ -4183,7 +4238,7 @@ def seek(self, startframe, target): return None - def build_json(self, f): + def build_json(self): """ build up the JSON structure for file output. """ jout = {} jout["pcmAudioParameters"] = { @@ -4198,8 +4253,13 @@ def build_json(self, f): vp["numberOfSequentialFields"] = len(self.fieldinfo) vp["osInfo"] = f'{platform.system()}:{platform.release()}:{platform.version()}' - if f is None: - return + # get the first valid field in the stack if any + for f in self.fieldstack: + if f: + break + + if not f: + return None vp["gitBranch"] = self.branch vp["gitCommit"] = self.commit From c7f3cfe8d0ce825cacf5718612a3015386b9293a Mon Sep 17 00:00:00 2001 From: Chad Page Date: Sat, 10 Jun 2023 07:18:41 -0700 Subject: [PATCH 17/25] switch to threaded demod... should be faster but isn't *yet* --- lddecode/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lddecode/main.py b/lddecode/main.py index cc0329957..3918df424 100644 --- a/lddecode/main.py +++ b/lddecode/main.py @@ -377,7 +377,7 @@ def main(args=None): jsondumper = jsondump_thread(ldd, outname) def cleanup(): - jsondumper.put(ldd.build_json(ldd.curfield)) + jsondumper.put(ldd.build_json()) # logger.flush() ldd.close() jsondumper.put(None) @@ -407,7 +407,7 @@ def cleanup(): done = True if ldd.fields_written < 100 or ((ldd.fields_written % 500) == 0): - jsondumper.put(ldd.build_json(ldd.curfield)) + jsondumper.put(ldd.build_json()) print("\nCompleted: saving JSON and exiting", file=sys.stderr) cleanup() From 18bfcef9fbcf42c506227457a7f7d0a190624802 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Sat, 10 Jun 2023 07:30:45 -0700 Subject: [PATCH 18/25] wip --- lddecode/core.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 99c538c1a..3645c85c4 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -1628,7 +1628,7 @@ def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=Fal return slice(nb_round(_begin), nb_round(_begin + _length)) - + @profile def get_timings(self): pulses = self.rawpulses hsync_typical = self.usectoinpx(self.rf.SysParams["hsyncPulseUS"]) @@ -1784,9 +1784,9 @@ def run_vblank_state_machine(self, pulses, LT): return done, validpulses - #@profile + @profile def refinepulses(self): - LT = self.get_timings() + self.LT = self.get_timings() HSYNC, EQPL1, VSYNC, EQPL2 = range(4) @@ -1796,7 +1796,7 @@ def refinepulses(self): while i < len(self.rawpulses): curpulse = self.rawpulses[i] - if inrange(curpulse.len, *LT["hsync"]): + if inrange(curpulse.len, *self.LT["hsync"]): good = ( self.pulse_qualitycheck(valid_pulses[-1], (0, curpulse)) if len(valid_pulses) @@ -1806,12 +1806,12 @@ def refinepulses(self): i += 1 elif ( i > 2 - and inrange(self.rawpulses[i].len, *LT["eq"]) + and inrange(self.rawpulses[i].len, *self.LT["eq"]) and (len(valid_pulses) and valid_pulses[-1][0] == HSYNC) ): # print(i, self.rawpulses[i]) done, vblank_pulses = self.run_vblank_state_machine( - self.rawpulses[i - 2 : i + 24], LT + self.rawpulses[i - 2 : i + 24], self.LT ) if done: [valid_pulses.append(p) for p in vblank_pulses[2:]] @@ -2670,8 +2670,8 @@ def dropout_detect_demod(self): # detect absurd fluctuations in pre-deemp demod, since only dropouts can cause them # (current np.diff has a prepend option, but not in ubuntu 18.04's version) - #iserr1 = nb_gt(f.data["video"]["demod_raw"], self.rf.freq_hz_half) - iserr1 = f.data["video"]["demod_raw"] > self.rf.freq_hz_half + iserr1 = nb_gt(f.data["video"]["demod_raw"], self.rf.freq_hz_half) + #iserr1 = f.data["video"]["demod_raw"] > self.rf.freq_hz_half # This didn't work right for PAL (issue #471) # iserr1 |= f.data['video']['demod_hpf'] > 3000000 @@ -2689,7 +2689,7 @@ def dropout_detect_demod(self): # Account for sync pulses when checking demod - hsync_len = int(f.get_timings()['hsync'][1]) + hsync_len = int(f.LT['hsync'][1]) vsync_ire = f.rf.SysParams['vsync_ire'] vsync_lines = self.get_vsync_lines() From afac3d8202786e362ecad419b21d1a27195fab6f Mon Sep 17 00:00:00 2001 From: Chad Page Date: Sat, 10 Jun 2023 07:59:08 -0700 Subject: [PATCH 19/25] a little more tweaking and enable time taken --- lddecode/core.py | 29 +++++++++++++---------------- lddecode/main.py | 3 +++ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 3645c85c4..284cc8668 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -1559,6 +1559,7 @@ def usectoinpx(self, x, line=None): def inpxtousec(self, x, line=None): return x / self.get_linefreq(line) + @profile def lineslice(self, l, begin=None, length=None, linelocs=None, begin_offset=0): """ return a slice corresponding with pre-TBC line l, begin+length are uSecs """ @@ -1572,8 +1573,8 @@ def lineslice(self, l, begin=None, length=None, linelocs=None, begin_offset=0): _length = self.usectoinpx(_length) return slice( - int(np.floor(_begin + begin_offset)), - int(np.ceil(_begin + _length + begin_offset)), + int(_begin + begin_offset), + int(_begin + _length + begin_offset + 1), ) def usectooutpx(self, x): @@ -2663,18 +2664,11 @@ def dropout_detect_demod(self): iserr_rf1 = (f.data["rfhpf"] < (-rfstd * 3)) | ( f.data["rfhpf"] > (rfstd * 3) ) # | (f.rawdata <= -32000) - iserr_rf = np.full_like(iserr_rf1, False) - iserr_rf[self.rf.delays["video_rot"] :] = iserr_rf1[ + iserr = np.full_like(iserr_rf1, False) + iserr[self.rf.delays["video_rot"] :] = iserr_rf1[ : -self.rf.delays["video_rot"] ] - # detect absurd fluctuations in pre-deemp demod, since only dropouts can cause them - # (current np.diff has a prepend option, but not in ubuntu 18.04's version) - iserr1 = nb_gt(f.data["video"]["demod_raw"], self.rf.freq_hz_half) - #iserr1 = f.data["video"]["demod_raw"] > self.rf.freq_hz_half - # This didn't work right for PAL (issue #471) - # iserr1 |= f.data['video']['demod_hpf'] > 3000000 - # build sets of min/max valid levels valid_min = np.full_like( f.data["video"]["demod"], f.rf.iretohz(-70 if isPAL else -50) @@ -2705,13 +2699,15 @@ def dropout_detect_demod(self): valid_min[int(f.linelocs[l]):int(f.linelocs[l]) + hsync_len] = sync_min valid_min05[int(f.linelocs[l]):int(f.linelocs[l]) + hsync_len] = sync_min_05 - n_orlt(iserr1, f.data["video"]["demod"], valid_min) - n_orgt(iserr1, f.data["video"]["demod"], valid_max) + # detect absurd fluctuations in pre-deemp demod, since only dropouts can cause them + # (current np.diff has a prepend option, but not in ubuntu 18.04's version) + n_orgt(iserr, f.data["video"]["demod_raw"], self.rf.freq_hz_half) - n_orlt(iserr1, f.data["video"]["demod_05"], valid_min05) - n_orgt(iserr1, f.data["video"]["demod_05"], valid_max05) + n_orlt(iserr, f.data["video"]["demod"], valid_min) + n_orgt(iserr, f.data["video"]["demod"], valid_max) - iserr = iserr1 | iserr_rf + n_orlt(iserr, f.data["video"]["demod_05"], valid_min05) + n_orgt(iserr, f.data["video"]["demod_05"], valid_max05) # filter out dropouts outside actual field iserr[:int(f.linelocs[f.lineoffset + 1])] = False @@ -3509,6 +3505,7 @@ def checkMTF(self, field, pfield=None): return np.abs(self.mtf_level - oldmtf) < 0.05 + @profile def detectLevels(self, field): # Returns sync level, 0IRE, and 100IRE levels of a field # computed from HSYNC areas and VITS diff --git a/lddecode/main.py b/lddecode/main.py index 3918df424..6e48aef05 100644 --- a/lddecode/main.py +++ b/lddecode/main.py @@ -384,6 +384,7 @@ def cleanup(): if audio_pipe is not None: audio_pipe.close() + firstdecode = time.time() while not done and ldd.fields_written < (req_frames * 2): try: f = ldd.readfield() @@ -411,3 +412,5 @@ def cleanup(): print("\nCompleted: saving JSON and exiting", file=sys.stderr) cleanup() + + print(time.time()-firstdecode) From 5747b2302d36fe53f445f15ded48aff9556605d3 Mon Sep 17 00:00:00 2001 From: Chad Date: Sun, 11 Jun 2023 11:04:04 -0700 Subject: [PATCH 20/25] Add OpenCL (don't get too excited, it hogs the GIL) --- lddecode/core.py | 447 ++++++++++++++++++++++++++++++++++++++++++---- lddecode/main.py | 8 + lddecode/utils.py | 25 +++ 3 files changed, 449 insertions(+), 31 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index 284cc8668..d204a9e39 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -28,7 +28,7 @@ from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance from .utils import build_hilbert, unwrap_hilbert, emphasis_iir, filtfft from .utils import fft_do_slice, fft_determine_slices, StridedCollector, hz_to_output_array -from .utils import Pulse, nb_std, nb_gt +from .utils import Pulse, nb_std, nb_gt, n_ornotrange, init_opencl try: # If Anaconda's numpy is installed, mkl will use all threads for fft etc @@ -50,12 +50,27 @@ def profile(fn): return fn +have_pyopencl = False +BLOCKSIZE = 32 * 1024 + +try: + if True: + import pyopencl as cl + import pyopencl.array as cla + import pyvkfft.opencl + from pyvkfft.fft import fftn as vkfftn, ifftn as vkifftn, rfftn as vkrfftn, irfftn as vkirfftn + from pyvkfft.opencl import VkFFTApp + import pyopencl.reduction as cl_reduction + have_pyopencl = True + BLOCKSIZE = 128 * 1024 +except: + pass def calclinelen(SP, mult, mhz): if type(mhz) == str: mhz = SP[mhz] - return int(np.round(SP["line_period"] * mhz * mult)) + return int(nb_round(SP["line_period"] * mhz * mult)) # states for first field of validpulses (second field is pulse #) @@ -283,7 +298,7 @@ def __init__( self, inputfreq=40, system="NTSC", - blocklen=32 * 1024, + blocklen=BLOCKSIZE, decode_digital_audio=False, decode_analog_audio=0, has_analog_audio=True, @@ -367,6 +382,8 @@ def __init__( linelen = self.freq_hz / (1000000.0 / self.SysParams["line_period"]) self.linelen = int(np.round(linelen)) + self.samplesperline = self.freq / self.linelen + # How much horizontal sync position can deviate from previous/expected position # and still be interpreted as a horizontal sync pulse. # Too high tolerance may result in false positive sync pulses, too low may end up missing them. @@ -376,12 +393,173 @@ def __init__( self.decode_digital_audio = decode_digital_audio self.decode_analog_audio = decode_analog_audio + # Optional OpenCL support + self.opencl_context = None + self.opencl_queue = None + self.opencl_initevent = None + self.OpenCL_Filters = {} + self.computefilters() # The 0.5mhz filter is rolled back to align with the data, so there # are a few unusable samples at the end. self.blockcut_end = self.Filters["F05_offset"] + + def use_opencl(self, opencl_init_event, devname = None): + self.opencl_context = init_opencl(cl, devname) + if not self.opencl_context: + print('unable to open opencl context') + return False + self.opencl_queue = cl.CommandQueue(self.opencl_context) + if not self.opencl_queue: + print('unable to open opencl queue') + return False + self.prepare_opencl() + self.computefilters() + + blockshape = (self.blocklen,) + + self.opencl_buffers = {} + self.opencl_buffers['inbuf_complex64'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) + self.opencl_buffers['fftbuf'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) + self.opencl_buffers['demod_fft'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) + self.opencl_buffers['fftfilt'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) + self.opencl_buffers['hilbert'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) + self.opencl_buffers['rfhpf'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) + self.opencl_buffers['rfhpf_sqsum'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + self.opencl_buffers['gtangles'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + self.opencl_buffers['demod'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + self.opencl_buffers['demod_clipped'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + self.opencl_buffers['demod_clipped_cplx'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) + + self.opencl_buffers['demodtmp1'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) + self.opencl_buffers['demodtmp2'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) + self.opencl_buffers['demodtmp3'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) + self.opencl_buffers['demodtmp4'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) + + self.opencl_buffers['demodtmp1r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + self.opencl_buffers['demodtmp2r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + self.opencl_buffers['demodtmp3r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + self.opencl_buffers['demodtmp4r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) + + opencl_init_event.set() + + def prepare_opencl(self): + # ? - move to separate file? + + self.opencl_kernels = cl.Program(self.opencl_context, """ + __kernel void int16_to_complex64(__global short *real_part, __global float2 *output) { + int gid = get_global_id(0); + output[gid].x = (float)real_part[gid]; + output[gid].y = (float)0; + } + + __kernel void complex64_getreal(__global float2 *input, __global float *output) { + int gid = get_global_id(0); + output[gid] = input[gid].x; + } + + __kernel void compute_magnitude(__global float2* data, __global float* output) { + int gid = get_global_id(0); + float2 cmplx = data[gid]; + output[gid-get_global_offset(0)] = sqrt((cmplx.x * cmplx.x) + (cmplx.y * cmplx.y)); + } + + __kernel void PAL_WibbleRemover(__global float2* fftbuf, __global float* data, float mean, float std_dev) { + int gid = get_global_id(0); + if (fabs(data[gid - get_global_offset(0)] - mean) > 3*std_dev) { + fftbuf[gid - 1] = 0; + fftbuf[gid] = 0; + fftbuf[gid + 1] = 0; + //data[gid - get_global_offset(0)] = 0; + } + } + + __kernel void compute_angle(__global float2* complex_arr, __global float* angle_arr) { + int id = get_global_id(0); + float2 complex_val = complex_arr[id]; + float angle = atan2(complex_val.y, complex_val.x); + angle_arr[id] = angle; + } + + /* This implements: + tdangles2 = np.unwrap(dangles) + # With extremely bad data, the unwrapped angles can jump. + while np.min(tdangles2) < 0: + tdangles2[tdangles2 < 0] += tau + while np.max(tdangles2) > tau: + tdangles2[tdangles2 > tau] -= tau + return tdangles2 * (freq_hz / tau) + */ + + __kernel void angle_diff_and_unwrap(__global const float* input, __global float* output, float freq_hz) { + int id = get_global_id(0); + output[0] = 0; + if (id >= 1) { + output[id] = input[id] - input[id - 1]; + while (output[id] < 0) { + output[id] += 6.283185307179586; + } + while (output[id] > 6.283185307179586) { + output[id] -= 6.283185307179586; + } + output[id] *= freq_hz / 6.283185307179586; + } + } + + __kernel void sum(__global const float *a, __global float *b, int n) { + int gid = get_global_id(0); + float sum = 0; + for (int i = gid; i < n; i += get_global_size(0)) { + sum += a[i]; + } + b[gid] = sum; + } + + __kernel void sum_of_squares(__global const float *a, __global float *b, int n) { + int gid = get_global_id(0); + float sum = 0; + for (int i = gid; i < n; i += get_global_size(0)) { + sum += a[i] * a[i]; + } + b[gid] = sum; + } + + __kernel void clip(__global const float* input, __global float* output, float _min, float _max) { + int id = get_global_id(0); + if (input[id] < _min) { + output[id] = _min; + } else if (input[id] > _max) { + output[id] = _max; + } else { + output[id] = input[id]; + } + } + + __kernel void float32_to_complex64(__global float *real_part, __global float2 *output) { + int gid = get_global_id(0); + output[gid].x = (float)real_part[gid]; + output[gid].y = (float)0; + } + """).build() + + self.opencl_fft = VkFFTApp((self.blocklen,), + np.complex64, + queue=self.opencl_queue, + ndim=1, + inplace=False, + norm=1) + + self.sum_gpu = cl_reduction.ReductionKernel(self.opencl_context, np.float32, neutral="0", + reduce_expr="a+b", map_expr="x[i]", + arguments="__global float *x") + + self.square_diff_gpu = cl_reduction.ReductionKernel(self.opencl_context, np.float32, neutral="0", + reduce_expr="a+b", map_expr="(x[i]-mean)*(x[i]-mean)", + arguments="__global float *x, float mean") + + def computefilters(self): """ (re)compute the filter sets """ @@ -427,6 +605,15 @@ def computefilters(self): self.computedelays() + if self.opencl_context: + self.OpenCL_Filters = {} + for f in self.Filters.keys(): + if type(self.Filters[f]) == np.ndarray: + self.OpenCL_Filters[f] = cla.to_device(self.opencl_queue, self.Filters[f].astype(np.complex64)) + else: + self.OpenCL_Filters[f] = self.Filters[f] + + def computeefmfilter(self): """Frequency-domain equalisation filter for the LaserDisc EFM signal. This was inspired by the input signal equaliser in WSJT-X, described in @@ -673,12 +860,178 @@ def hztoire(self, hz, spec=False): return (hz - params["ire0"]) / params["hz_ire"] def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): - rv = {} - mtf_level *= self.mtf_mult mtf_level *= self.DecoderParams["MTF_basemult"] mtf_level += self.mtf_offset + if self.opencl_context: + return self.demodblock_opencl(data, mtf_level, fftdata, cut) + else: + return self.demodblock_cpu(data, mtf_level, fftdata, cut) + + def demodblock_opencl(self, data=None, mtf_level=0, fftdata=None, cut=False): + rv = {} + + blockshape = (self.blocklen,) + if False and fftdata is not None: + indata_fft = fftdata + elif data is not None: + indata_fft = np.zeros(blockshape, dtype=np.complex64) + inbuf_int16 = cla.to_device(self.opencl_queue, data) + self.opencl_kernels.int16_to_complex64(self.opencl_queue, (self.blocklen,), None, inbuf_int16.data, self.opencl_buffers['inbuf_complex64'].data) + self.opencl_fft.fft(self.opencl_buffers['inbuf_complex64'], dest=self.opencl_buffers['fftbuf']) + self.opencl_buffers['fftbuf'].get(ary=indata_fft) + + # delete OpenCL buffers when finished to avoid resource exhaustion + del inbuf_int16 + else: + raise Exception("demodblock called without raw or FFT data") + + rotdelay = 0 + if getattr(self, "delays", None) is not None and "video_rot" in self.delays: + rotdelay = self.delays["video_rot"] + + self.opencl_fft.ifft((self.opencl_buffers['fftbuf'] * self.OpenCL_Filters['Frfhpf']), self.opencl_buffers['rfhpf']) + rv["rfhpf"] = self.opencl_buffers['rfhpf'].get()[ + self.blockcut - rotdelay : -self.blockcut_end - rotdelay + ] + + if self.system == "PAL" and self.PAL_V4300D_NotchFilter: + """ This routine works around an 'interesting' issue seen with LD-V4300D players and + some PAL digital audio disks, where there is a signal somewhere between 8.47 and 8.57mhz. + + The idea here is to look for anomolies (3 std deviations) and snip them out of the + FFT. There may be side effects, however, but generally minor compared to the + 'wibble' itself and only in certain cases. + """ + bin_start, bin_end = int(self.blocklen * (8.42 / self.freq)), int(1 + (self.blocklen * (8.6 / self.freq))) + flip_start = self.blocklen - bin_end + flip_end = self.blocklen - bin_start + N = bin_end-bin_start + + fftbuf_sqsum = cla.empty(self.opencl_queue, (N,), dtype=np.float32) + + for start, end in ((bin_start, bin_end), (flip_start, flip_end)): + global_work_offset = [start] + + self.opencl_kernels.compute_magnitude(self.opencl_queue, (N,), None, self.opencl_buffers['fftbuf'].data, fftbuf_sqsum.data, global_offset=global_work_offset) + mean = self.sum_gpu(fftbuf_sqsum).get() / N + # XXX: slow! + variance = self.square_diff_gpu(fftbuf_sqsum, np.float32(mean)).get() / N + std_dev = np.sqrt(variance) + + self.opencl_kernels.PAL_WibbleRemover(self.opencl_queue, (N,), None, self.opencl_buffers['fftbuf'].data, fftbuf_sqsum.data, np.float32(mean), np.float32(std_dev), global_offset=global_work_offset) + + del fftbuf_sqsum + + fftfilt = self.opencl_buffers['fftbuf'] * self.OpenCL_Filters["RFVideo"] + + if mtf_level != 0: + fftfilt *= self.OpenCL_Filters["MTF"] ** mtf_level + + self.opencl_fft.ifft(fftfilt, self.opencl_buffers['hilbert']) + self.opencl_kernels.compute_angle(self.opencl_queue, blockshape, None, self.opencl_buffers['hilbert'].data, self.opencl_buffers['gtangles'].data) + self.opencl_kernels.angle_diff_and_unwrap(self.opencl_queue, blockshape, None, self.opencl_buffers['gtangles'].data, self.opencl_buffers['demod'].data, np.float32(self.freq_hz)) + + # use a clipped demod for video output processing to reduce speckling impact + self.opencl_kernels.clip(self.opencl_queue, blockshape, None, self.opencl_buffers['demod'].data, self.opencl_buffers['demod_clipped'].data, np.float32(150000), np.float32(self.freq_hz*.75)) + self.opencl_kernels.float32_to_complex64(self.opencl_queue, (self.blocklen,), None, self.opencl_buffers['demod_clipped'].data, self.opencl_buffers['demod_clipped_cplx'].data) + + self.opencl_fft.fft(self.opencl_buffers['demod_clipped_cplx'], dest=self.opencl_buffers['demod_fft']) + + self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideo"], self.opencl_buffers['demodtmp1']) + self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideo05"], self.opencl_buffers['demodtmp2']) + self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideoBurst"], self.opencl_buffers['demodtmp3']) + + self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp1'].data, self.opencl_buffers['demodtmp1r'].data) + self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp2'].data, self.opencl_buffers['demodtmp2r'].data) + self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp3'].data, self.opencl_buffers['demodtmp3r'].data) + + self.opencl_queue.finish() + + (out_video, event1) = self.opencl_buffers['demodtmp1r'].get_async() + (out_video05, event2) = self.opencl_buffers['demodtmp2r'].get_async() + (out_videoburst, event3) = self.opencl_buffers['demodtmp3r'].get_async() + + for event in [event1, event2, event3]: + event.wait() + + out_video05 = np.roll(out_video05, -self.Filters["F05_offset"]) + + rv['fft'] = indata_fft + + if self.system == "PAL": + self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideoPilot"], self.opencl_buffers['demodtmp1']) + self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp1'].data, self.opencl_buffers['demodtmp1r'].data) + out_videopilot = self.opencl_buffers['demodtmp1r'].get() + + video_out = np.rec.array( + [ + out_video, + self.opencl_buffers['demod'].get().real, + out_video05, + out_videoburst, + out_videopilot, + ], + names=[ + "demod", + "demod_raw", + "demod_05", + "demod_burst", + "demod_pilot", + ], + ) + else: + video_out = np.rec.array( + [out_video, self.opencl_buffers['demod'].get().real, out_video05, out_videoburst], + names=["demod", "demod_raw", "demod_05", "demod_burst"], + ) + + rv["video"] = ( + video_out[self.blockcut : -self.blockcut_end] if cut else video_out + ) + + if self.decode_digital_audio: + self.opencl_fft.ifft(self.opencl_buffers['fftbuf'] * self.OpenCL_Filters["Fefm"], self.opencl_buffers['demodtmp1']) + self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp1'].data, self.opencl_buffers['demodtmp1r'].data) + (efm_out, event) = self.opencl_buffers['demodtmp1r'].get_async() + event.wait() + + if cut: + efm_out = efm_out[self.blockcut : -self.blockcut_end] + rv["efm"] = np.int16(np.clip(efm_out.real, -32768, 32767)) + + # NOTE: ac3 audio is filtered after RF TBC + if self.decode_analog_audio: + stage1_out = [] + for channel in ['left', 'right']: + afilter = self.audio[channel] + + # Apply first stage audio filter + a1 = npfft.ifft(afilter.slicer(indata_fft) * afilter.filt1) + # Demodulate and restore frequency after bin slicing + a1u = unwrap_hilbert(a1, afilter.a1_freq) + afilter.low_freq + + stage1_out.append(a1u) + + audio_out = np.rec.array( + [stage1_out[0].astype(np.float32), stage1_out[1].astype(np.float32)], names=["audio_left", "audio_right"] + ) + + fdiv = video_out.shape[0] // audio_out.shape[0] + rv["audio"] = ( + audio_out[self.blockcut // fdiv : -self.blockcut_end // fdiv] + if cut + else audio_out + ) + + rv['setupcount'] = self.setupcount + + return rv + + def demodblock_cpu(self, data=None, mtf_level=0, fftdata=None, cut=False): + rv = {} + if fftdata is not None: indata_fft = fftdata elif data is not None: @@ -784,7 +1137,7 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): stage1_out.append(a1u.astype(np.float32)) audio_out = np.rec.array( - [stage1_out[0], stage1_out[1]], names=["audio_left", "audio_right"] + [stage1_out[0].astype(np.float32), stage1_out[1].astype(np.float32)], names=["audio_left", "audio_right"] ) fdiv = video_out.shape[0] // audio_out.shape[0] @@ -940,7 +1293,7 @@ def computedelays(self, mtf_level=0): fakesignal += 8192 fakesignal[6000:6005] = 0 - fakedecode = rf.demodblock(fakesignal, mtf_level=mtf_level) + fakedecode = rf.demodblock_cpu(fakesignal, mtf_level=mtf_level) vdemod = fakedecode["video"]["demod"] vdemod_raw = fakedecode["video"]["demod_raw"] @@ -976,6 +1329,8 @@ def __init__( rf, infile, loader, + rf_args, + OpenCL = False, cachesize=256, num_worker_threads=6, MTF_tolerance=0.05, @@ -983,6 +1338,7 @@ def __init__( self.infile = infile self.loader = loader self.rf = rf + self.rf_args = rf_args self.currentMTF = 1 self.MTF_tolerance = MTF_tolerance @@ -992,8 +1348,9 @@ def __init__( # Cache dictionary - key is block #, which holds data for that block self.lrusize = cachesize - self.bytes_per_frame = int(self.rf.freq_hz / self.rf.SysParams["FPS"]) - self.prefetch = int(self.bytes_per_frame / self.blocksize) + 4 + # should be in self.rf, but may not be computed yet + self.bytes_per_field = int(self.rf.freq_hz / (self.rf.SysParams["FPS"] * 2)) + 1 + self.prefetch = int((self.bytes_per_field * 2) / self.blocksize) + 4 self.lru = [] @@ -1016,9 +1373,21 @@ def __init__( self.deqeue_thread = threading.Thread(target=self.dequeue, daemon=True) num_worker_threads = max(num_worker_threads - 1, 1) + opencl_worker_threads = 1 if OpenCL else 0 + opencl_init_event = threading.Event() + + for i in range(opencl_worker_threads): + t = threading.Thread( + target=self.worker, daemon=True, args=(opencl_init_event,) + ) + t.start() + opencl_init_event.wait() + self.threads.append(t) + + num_worker_threads = num_worker_threads if not OpenCL else 0 for i in range(num_worker_threads): t = threading.Thread( - target=self.worker, daemon=True, args=() + target=self.worker, daemon=True, args=(False,) ) t.start() self.threads.append(t) @@ -1097,11 +1466,23 @@ def apply_newparams(self, newparams): self.rf.computefilters() - def worker(self): + def worker(self, opencl_init_event): + blocksrun = 0 + blockstime = 0 + opencl = False + + rf = RFDecode(**self.rf_args) + + if have_pyopencl and opencl_init_event: + opencl = True + opencl_init_event.clear() + rf.use_opencl(opencl_init_event) + while True: item = self.q_in.get() if item is None or item[0] == "END": + #print(opencl, blocksrun, blockstime / blocksrun) return if item[0] == "DEMOD": @@ -1119,9 +1500,13 @@ def worker(self): "demod" not in block or np.abs(block["MTF"] - target_MTF) > self.MTF_tolerance ): - output["demod"] = self.rf.demodblock( + st = time.time() + output["demod"] = rf.demodblock( data=block["rawinput"], fftdata=fftdata, mtf_level=target_MTF, cut=True ) + blockstime += time.time() - st + blocksrun += 1 + output["MTF"] = target_MTF output["request"] = request @@ -1130,8 +1515,6 @@ def worker(self): elif item[0] == "NEWPARAMS": self.apply_newparams(item[1]) -# if not ispiped: -# self.q_in.task_done() def doread(self, blocknums, MTF, redo=False, prefetch=False): need_blocks = [] @@ -1523,6 +1906,7 @@ def process(self): self.valid = True + @profile def get_linelen(self, line=None, linelocs=None): # compute adjusted frequency from neighboring line lengths @@ -1551,7 +1935,7 @@ def get_linelen(self, line=None, linelocs=None): return length def get_linefreq(self, line=None, linelocs=None): - return self.rf.freq * (self.get_linelen(line, linelocs) / self.rf.linelen) + return self.rf.samplesperline * self.get_linelen(line, linelocs) def usectoinpx(self, x, line=None): return x * self.get_linefreq(line) @@ -2703,11 +3087,8 @@ def dropout_detect_demod(self): # (current np.diff has a prepend option, but not in ubuntu 18.04's version) n_orgt(iserr, f.data["video"]["demod_raw"], self.rf.freq_hz_half) - n_orlt(iserr, f.data["video"]["demod"], valid_min) - n_orgt(iserr, f.data["video"]["demod"], valid_max) - - n_orlt(iserr, f.data["video"]["demod_05"], valid_min05) - n_orgt(iserr, f.data["video"]["demod_05"], valid_max05) + n_ornotrange(iserr, f.data["video"]["demod"], valid_min, valid_max) + n_ornotrange(iserr, f.data["video"]["demod_05"], valid_min05, valid_max05) # filter out dropouts outside actual field iserr[:int(f.linelocs[f.lineoffset + 1])] = False @@ -2826,7 +3207,7 @@ def dropout_detect(self): return rv_lines, rv_starts, rv_ends - + @profile def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): line = _line + self.lineoffset # calczc works from integers, so get the start and remainder @@ -3358,6 +3739,7 @@ def __init__( self.digital_audio = digital_audio self.ac3 = extra_options.get("AC3", False) self.write_rf_tbc = extra_options.get("write_RF_TBC", False) + self.OpenCL = extra_options.get("OpenCL", False) self.has_analog_audio = True if system == "PAL": @@ -3403,14 +3785,17 @@ def __init__( self.firstfield = None # In frame output mode, the first field goes here self.system = system - self.rf = RFDecode( - inputfreq=inputfreq, - system=system, - decode_analog_audio=analog_audio, - decode_digital_audio=digital_audio, - has_analog_audio=self.has_analog_audio, - extra_options=extra_options, - ) + self.rf_opts = { + 'inputfreq':inputfreq, + 'system':system, + 'decode_analog_audio':analog_audio, + 'decode_digital_audio':digital_audio, + 'has_analog_audio':self.has_analog_audio, + 'extra_options':extra_options, + 'blocklen': 128 * 1024 if self.OpenCL else 32 * 1024, + } + + self.rf = RFDecode(**self.rf_opts) if system == "PAL": self.FieldClass = FieldPAL @@ -3450,7 +3835,7 @@ def __init__( self.verboseVITS = False self.demodcache = DemodCache( - self.rf, self.infile, self.freader, num_worker_threads=self.numthreads + self.rf, self.infile, self.freader, self.rf_opts, OpenCL=self.OpenCL, num_worker_threads=self.numthreads ) self.bw_ratios = [] @@ -4034,7 +4419,7 @@ def computeMetrics(self, f, fp=None, verbose=False): return metrics_rounded - #@profile + @profile def buildmetadata(self, f, check_phase=True): """ returns field information JSON and whether or not a backfill field is needed """ prevfi = self.fieldinfo[-1] if len(self.fieldinfo) else None diff --git a/lddecode/main.py b/lddecode/main.py index 6e48aef05..7dde55943 100644 --- a/lddecode/main.py +++ b/lddecode/main.py @@ -212,6 +212,13 @@ def main(args=None): help="number of CPU threads to use", ) + parser.add_argument( + "--OpenCL", + action="store_true", + default=False, + help="Use OpenCL acceleration", + ) + parser.add_argument( "-f", "--frequency", @@ -286,6 +293,7 @@ def main(args=None): "deemp_coeff": (args.deemp_low, args.deemp_high), "audio_filterwidth": args.audio_filterwidth, "AC3": args.AC3, + "OpenCL": args.OpenCL, } if vid_standard == "NTSC" and args.NTSC_color_notch_filter: diff --git a/lddecode/utils.py b/lddecode/utils.py index 238b73361..05ce4bb6c 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1067,6 +1067,11 @@ def n_orlt(a, x, y): a |= (x < y) +@njit(cache=True,nogil=True) +def n_ornotrange(a, x, y, z): + a |= (x < y) | (x > z) + + @njit(cache=True,nogil=True) def angular_mean(x, cycle_len=1.0, zero_base=True): """ Compute the mean phase, assuming 0..1 is one phase cycle @@ -1159,6 +1164,26 @@ def distance_from_round(x): return np.round(x) - x +def init_opencl(cl, name = None): + # Create some context on the first available GPU + if 'PYOPENCL_CTX' in os.environ: + ctx = cl.create_some_context() + else: + ctx = None + # Find the first OpenCL GPU available and use it, unless + for p in cl.get_platforms(): + for d in p.get_devices(): + if d.type & cl.device_type.GPU == 1: + continue + print("Selected device: ", d.name) + ctx = cl.Context(devices=(d,)) + break + if ctx is not None: + break + #queue = cl.CommandQueue(ctx) + return ctx + + # Write the .tbc.json file (used by lddecode and notebooks) def write_json(ldd, jsondict, outname): From cdb6a944666ee8c86dfc51df22789c39b6e0c5a5 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Sun, 11 Jun 2023 15:33:14 -0700 Subject: [PATCH 21/25] remove opencl from perf branch --- lddecode/core.py | 377 +---------------------------------------------- lddecode/main.py | 10 +- 2 files changed, 7 insertions(+), 380 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index d204a9e39..bcfdf6782 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -28,7 +28,7 @@ from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance from .utils import build_hilbert, unwrap_hilbert, emphasis_iir, filtfft from .utils import fft_do_slice, fft_determine_slices, StridedCollector, hz_to_output_array -from .utils import Pulse, nb_std, nb_gt, n_ornotrange, init_opencl +from .utils import Pulse, nb_std, nb_gt, n_ornotrange try: # If Anaconda's numpy is installed, mkl will use all threads for fft etc @@ -50,22 +50,8 @@ def profile(fn): return fn -have_pyopencl = False BLOCKSIZE = 32 * 1024 -try: - if True: - import pyopencl as cl - import pyopencl.array as cla - import pyvkfft.opencl - from pyvkfft.fft import fftn as vkfftn, ifftn as vkifftn, rfftn as vkrfftn, irfftn as vkirfftn - from pyvkfft.opencl import VkFFTApp - import pyopencl.reduction as cl_reduction - have_pyopencl = True - BLOCKSIZE = 128 * 1024 -except: - pass - def calclinelen(SP, mult, mhz): if type(mhz) == str: mhz = SP[mhz] @@ -393,12 +379,6 @@ def __init__( self.decode_digital_audio = decode_digital_audio self.decode_analog_audio = decode_analog_audio - # Optional OpenCL support - self.opencl_context = None - self.opencl_queue = None - self.opencl_initevent = None - self.OpenCL_Filters = {} - self.computefilters() # The 0.5mhz filter is rolled back to align with the data, so there @@ -406,160 +386,6 @@ def __init__( self.blockcut_end = self.Filters["F05_offset"] - def use_opencl(self, opencl_init_event, devname = None): - self.opencl_context = init_opencl(cl, devname) - if not self.opencl_context: - print('unable to open opencl context') - return False - self.opencl_queue = cl.CommandQueue(self.opencl_context) - if not self.opencl_queue: - print('unable to open opencl queue') - return False - self.prepare_opencl() - self.computefilters() - - blockshape = (self.blocklen,) - - self.opencl_buffers = {} - self.opencl_buffers['inbuf_complex64'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) - self.opencl_buffers['fftbuf'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) - self.opencl_buffers['demod_fft'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) - self.opencl_buffers['fftfilt'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) - self.opencl_buffers['hilbert'] = cla.empty(self.opencl_queue,blockshape, dtype=np.complex64) - self.opencl_buffers['rfhpf'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) - self.opencl_buffers['rfhpf_sqsum'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - self.opencl_buffers['gtangles'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - self.opencl_buffers['demod'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - self.opencl_buffers['demod_clipped'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - self.opencl_buffers['demod_clipped_cplx'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) - - self.opencl_buffers['demodtmp1'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) - self.opencl_buffers['demodtmp2'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) - self.opencl_buffers['demodtmp3'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) - self.opencl_buffers['demodtmp4'] = cla.empty(self.opencl_queue, blockshape, dtype=np.complex64) - - self.opencl_buffers['demodtmp1r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - self.opencl_buffers['demodtmp2r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - self.opencl_buffers['demodtmp3r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - self.opencl_buffers['demodtmp4r'] = cla.empty(self.opencl_queue, blockshape, dtype=np.float32) - - opencl_init_event.set() - - def prepare_opencl(self): - # ? - move to separate file? - - self.opencl_kernels = cl.Program(self.opencl_context, """ - __kernel void int16_to_complex64(__global short *real_part, __global float2 *output) { - int gid = get_global_id(0); - output[gid].x = (float)real_part[gid]; - output[gid].y = (float)0; - } - - __kernel void complex64_getreal(__global float2 *input, __global float *output) { - int gid = get_global_id(0); - output[gid] = input[gid].x; - } - - __kernel void compute_magnitude(__global float2* data, __global float* output) { - int gid = get_global_id(0); - float2 cmplx = data[gid]; - output[gid-get_global_offset(0)] = sqrt((cmplx.x * cmplx.x) + (cmplx.y * cmplx.y)); - } - - __kernel void PAL_WibbleRemover(__global float2* fftbuf, __global float* data, float mean, float std_dev) { - int gid = get_global_id(0); - if (fabs(data[gid - get_global_offset(0)] - mean) > 3*std_dev) { - fftbuf[gid - 1] = 0; - fftbuf[gid] = 0; - fftbuf[gid + 1] = 0; - //data[gid - get_global_offset(0)] = 0; - } - } - - __kernel void compute_angle(__global float2* complex_arr, __global float* angle_arr) { - int id = get_global_id(0); - float2 complex_val = complex_arr[id]; - float angle = atan2(complex_val.y, complex_val.x); - angle_arr[id] = angle; - } - - /* This implements: - tdangles2 = np.unwrap(dangles) - # With extremely bad data, the unwrapped angles can jump. - while np.min(tdangles2) < 0: - tdangles2[tdangles2 < 0] += tau - while np.max(tdangles2) > tau: - tdangles2[tdangles2 > tau] -= tau - return tdangles2 * (freq_hz / tau) - */ - - __kernel void angle_diff_and_unwrap(__global const float* input, __global float* output, float freq_hz) { - int id = get_global_id(0); - output[0] = 0; - if (id >= 1) { - output[id] = input[id] - input[id - 1]; - while (output[id] < 0) { - output[id] += 6.283185307179586; - } - while (output[id] > 6.283185307179586) { - output[id] -= 6.283185307179586; - } - output[id] *= freq_hz / 6.283185307179586; - } - } - - __kernel void sum(__global const float *a, __global float *b, int n) { - int gid = get_global_id(0); - float sum = 0; - for (int i = gid; i < n; i += get_global_size(0)) { - sum += a[i]; - } - b[gid] = sum; - } - - __kernel void sum_of_squares(__global const float *a, __global float *b, int n) { - int gid = get_global_id(0); - float sum = 0; - for (int i = gid; i < n; i += get_global_size(0)) { - sum += a[i] * a[i]; - } - b[gid] = sum; - } - - __kernel void clip(__global const float* input, __global float* output, float _min, float _max) { - int id = get_global_id(0); - if (input[id] < _min) { - output[id] = _min; - } else if (input[id] > _max) { - output[id] = _max; - } else { - output[id] = input[id]; - } - } - - __kernel void float32_to_complex64(__global float *real_part, __global float2 *output) { - int gid = get_global_id(0); - output[gid].x = (float)real_part[gid]; - output[gid].y = (float)0; - } - """).build() - - self.opencl_fft = VkFFTApp((self.blocklen,), - np.complex64, - queue=self.opencl_queue, - ndim=1, - inplace=False, - norm=1) - - self.sum_gpu = cl_reduction.ReductionKernel(self.opencl_context, np.float32, neutral="0", - reduce_expr="a+b", map_expr="x[i]", - arguments="__global float *x") - - self.square_diff_gpu = cl_reduction.ReductionKernel(self.opencl_context, np.float32, neutral="0", - reduce_expr="a+b", map_expr="(x[i]-mean)*(x[i]-mean)", - arguments="__global float *x, float mean") - - def computefilters(self): """ (re)compute the filter sets """ @@ -605,14 +431,6 @@ def computefilters(self): self.computedelays() - if self.opencl_context: - self.OpenCL_Filters = {} - for f in self.Filters.keys(): - if type(self.Filters[f]) == np.ndarray: - self.OpenCL_Filters[f] = cla.to_device(self.opencl_queue, self.Filters[f].astype(np.complex64)) - else: - self.OpenCL_Filters[f] = self.Filters[f] - def computeefmfilter(self): """Frequency-domain equalisation filter for the LaserDisc EFM signal. @@ -864,170 +682,8 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): mtf_level *= self.DecoderParams["MTF_basemult"] mtf_level += self.mtf_offset - if self.opencl_context: - return self.demodblock_opencl(data, mtf_level, fftdata, cut) - else: - return self.demodblock_cpu(data, mtf_level, fftdata, cut) - - def demodblock_opencl(self, data=None, mtf_level=0, fftdata=None, cut=False): - rv = {} - - blockshape = (self.blocklen,) - if False and fftdata is not None: - indata_fft = fftdata - elif data is not None: - indata_fft = np.zeros(blockshape, dtype=np.complex64) - inbuf_int16 = cla.to_device(self.opencl_queue, data) - self.opencl_kernels.int16_to_complex64(self.opencl_queue, (self.blocklen,), None, inbuf_int16.data, self.opencl_buffers['inbuf_complex64'].data) - self.opencl_fft.fft(self.opencl_buffers['inbuf_complex64'], dest=self.opencl_buffers['fftbuf']) - self.opencl_buffers['fftbuf'].get(ary=indata_fft) - - # delete OpenCL buffers when finished to avoid resource exhaustion - del inbuf_int16 - else: - raise Exception("demodblock called without raw or FFT data") - - rotdelay = 0 - if getattr(self, "delays", None) is not None and "video_rot" in self.delays: - rotdelay = self.delays["video_rot"] - - self.opencl_fft.ifft((self.opencl_buffers['fftbuf'] * self.OpenCL_Filters['Frfhpf']), self.opencl_buffers['rfhpf']) - rv["rfhpf"] = self.opencl_buffers['rfhpf'].get()[ - self.blockcut - rotdelay : -self.blockcut_end - rotdelay - ] - - if self.system == "PAL" and self.PAL_V4300D_NotchFilter: - """ This routine works around an 'interesting' issue seen with LD-V4300D players and - some PAL digital audio disks, where there is a signal somewhere between 8.47 and 8.57mhz. - - The idea here is to look for anomolies (3 std deviations) and snip them out of the - FFT. There may be side effects, however, but generally minor compared to the - 'wibble' itself and only in certain cases. - """ - bin_start, bin_end = int(self.blocklen * (8.42 / self.freq)), int(1 + (self.blocklen * (8.6 / self.freq))) - flip_start = self.blocklen - bin_end - flip_end = self.blocklen - bin_start - N = bin_end-bin_start - - fftbuf_sqsum = cla.empty(self.opencl_queue, (N,), dtype=np.float32) - - for start, end in ((bin_start, bin_end), (flip_start, flip_end)): - global_work_offset = [start] - - self.opencl_kernels.compute_magnitude(self.opencl_queue, (N,), None, self.opencl_buffers['fftbuf'].data, fftbuf_sqsum.data, global_offset=global_work_offset) - mean = self.sum_gpu(fftbuf_sqsum).get() / N - # XXX: slow! - variance = self.square_diff_gpu(fftbuf_sqsum, np.float32(mean)).get() / N - std_dev = np.sqrt(variance) - - self.opencl_kernels.PAL_WibbleRemover(self.opencl_queue, (N,), None, self.opencl_buffers['fftbuf'].data, fftbuf_sqsum.data, np.float32(mean), np.float32(std_dev), global_offset=global_work_offset) - - del fftbuf_sqsum - - fftfilt = self.opencl_buffers['fftbuf'] * self.OpenCL_Filters["RFVideo"] - - if mtf_level != 0: - fftfilt *= self.OpenCL_Filters["MTF"] ** mtf_level - - self.opencl_fft.ifft(fftfilt, self.opencl_buffers['hilbert']) - self.opencl_kernels.compute_angle(self.opencl_queue, blockshape, None, self.opencl_buffers['hilbert'].data, self.opencl_buffers['gtangles'].data) - self.opencl_kernels.angle_diff_and_unwrap(self.opencl_queue, blockshape, None, self.opencl_buffers['gtangles'].data, self.opencl_buffers['demod'].data, np.float32(self.freq_hz)) - - # use a clipped demod for video output processing to reduce speckling impact - self.opencl_kernels.clip(self.opencl_queue, blockshape, None, self.opencl_buffers['demod'].data, self.opencl_buffers['demod_clipped'].data, np.float32(150000), np.float32(self.freq_hz*.75)) - self.opencl_kernels.float32_to_complex64(self.opencl_queue, (self.blocklen,), None, self.opencl_buffers['demod_clipped'].data, self.opencl_buffers['demod_clipped_cplx'].data) - - self.opencl_fft.fft(self.opencl_buffers['demod_clipped_cplx'], dest=self.opencl_buffers['demod_fft']) - - self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideo"], self.opencl_buffers['demodtmp1']) - self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideo05"], self.opencl_buffers['demodtmp2']) - self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideoBurst"], self.opencl_buffers['demodtmp3']) - - self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp1'].data, self.opencl_buffers['demodtmp1r'].data) - self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp2'].data, self.opencl_buffers['demodtmp2r'].data) - self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp3'].data, self.opencl_buffers['demodtmp3r'].data) - - self.opencl_queue.finish() - - (out_video, event1) = self.opencl_buffers['demodtmp1r'].get_async() - (out_video05, event2) = self.opencl_buffers['demodtmp2r'].get_async() - (out_videoburst, event3) = self.opencl_buffers['demodtmp3r'].get_async() - - for event in [event1, event2, event3]: - event.wait() - - out_video05 = np.roll(out_video05, -self.Filters["F05_offset"]) - - rv['fft'] = indata_fft - - if self.system == "PAL": - self.opencl_fft.ifft(self.opencl_buffers['demod_fft'] * self.OpenCL_Filters["FVideoPilot"], self.opencl_buffers['demodtmp1']) - self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp1'].data, self.opencl_buffers['demodtmp1r'].data) - out_videopilot = self.opencl_buffers['demodtmp1r'].get() - - video_out = np.rec.array( - [ - out_video, - self.opencl_buffers['demod'].get().real, - out_video05, - out_videoburst, - out_videopilot, - ], - names=[ - "demod", - "demod_raw", - "demod_05", - "demod_burst", - "demod_pilot", - ], - ) - else: - video_out = np.rec.array( - [out_video, self.opencl_buffers['demod'].get().real, out_video05, out_videoburst], - names=["demod", "demod_raw", "demod_05", "demod_burst"], - ) - - rv["video"] = ( - video_out[self.blockcut : -self.blockcut_end] if cut else video_out - ) - - if self.decode_digital_audio: - self.opencl_fft.ifft(self.opencl_buffers['fftbuf'] * self.OpenCL_Filters["Fefm"], self.opencl_buffers['demodtmp1']) - self.opencl_kernels.complex64_getreal(self.opencl_queue, blockshape, None, self.opencl_buffers['demodtmp1'].data, self.opencl_buffers['demodtmp1r'].data) - (efm_out, event) = self.opencl_buffers['demodtmp1r'].get_async() - event.wait() - - if cut: - efm_out = efm_out[self.blockcut : -self.blockcut_end] - rv["efm"] = np.int16(np.clip(efm_out.real, -32768, 32767)) - - # NOTE: ac3 audio is filtered after RF TBC - if self.decode_analog_audio: - stage1_out = [] - for channel in ['left', 'right']: - afilter = self.audio[channel] + return self.demodblock_cpu(data, mtf_level, fftdata, cut) - # Apply first stage audio filter - a1 = npfft.ifft(afilter.slicer(indata_fft) * afilter.filt1) - # Demodulate and restore frequency after bin slicing - a1u = unwrap_hilbert(a1, afilter.a1_freq) + afilter.low_freq - - stage1_out.append(a1u) - - audio_out = np.rec.array( - [stage1_out[0].astype(np.float32), stage1_out[1].astype(np.float32)], names=["audio_left", "audio_right"] - ) - - fdiv = video_out.shape[0] // audio_out.shape[0] - rv["audio"] = ( - audio_out[self.blockcut // fdiv : -self.blockcut_end // fdiv] - if cut - else audio_out - ) - - rv['setupcount'] = self.setupcount - - return rv def demodblock_cpu(self, data=None, mtf_level=0, fftdata=None, cut=False): rv = {} @@ -1330,7 +986,6 @@ def __init__( infile, loader, rf_args, - OpenCL = False, cachesize=256, num_worker_threads=6, MTF_tolerance=0.05, @@ -1373,21 +1028,9 @@ def __init__( self.deqeue_thread = threading.Thread(target=self.dequeue, daemon=True) num_worker_threads = max(num_worker_threads - 1, 1) - opencl_worker_threads = 1 if OpenCL else 0 - opencl_init_event = threading.Event() - - for i in range(opencl_worker_threads): - t = threading.Thread( - target=self.worker, daemon=True, args=(opencl_init_event,) - ) - t.start() - opencl_init_event.wait() - self.threads.append(t) - - num_worker_threads = num_worker_threads if not OpenCL else 0 for i in range(num_worker_threads): t = threading.Thread( - target=self.worker, daemon=True, args=(False,) + target=self.worker, daemon=True, args=() ) t.start() self.threads.append(t) @@ -1466,23 +1109,16 @@ def apply_newparams(self, newparams): self.rf.computefilters() - def worker(self, opencl_init_event): + def worker(self): blocksrun = 0 blockstime = 0 - opencl = False rf = RFDecode(**self.rf_args) - if have_pyopencl and opencl_init_event: - opencl = True - opencl_init_event.clear() - rf.use_opencl(opencl_init_event) - while True: item = self.q_in.get() if item is None or item[0] == "END": - #print(opencl, blocksrun, blockstime / blocksrun) return if item[0] == "DEMOD": @@ -3739,7 +3375,6 @@ def __init__( self.digital_audio = digital_audio self.ac3 = extra_options.get("AC3", False) self.write_rf_tbc = extra_options.get("write_RF_TBC", False) - self.OpenCL = extra_options.get("OpenCL", False) self.has_analog_audio = True if system == "PAL": @@ -3792,7 +3427,7 @@ def __init__( 'decode_digital_audio':digital_audio, 'has_analog_audio':self.has_analog_audio, 'extra_options':extra_options, - 'blocklen': 128 * 1024 if self.OpenCL else 32 * 1024, + 'blocklen': 32 * 1024, } self.rf = RFDecode(**self.rf_opts) @@ -3835,7 +3470,7 @@ def __init__( self.verboseVITS = False self.demodcache = DemodCache( - self.rf, self.infile, self.freader, self.rf_opts, OpenCL=self.OpenCL, num_worker_threads=self.numthreads + self.rf, self.infile, self.freader, self.rf_opts, num_worker_threads=self.numthreads ) self.bw_ratios = [] diff --git a/lddecode/main.py b/lddecode/main.py index 7dde55943..2f3acc17c 100644 --- a/lddecode/main.py +++ b/lddecode/main.py @@ -212,13 +212,6 @@ def main(args=None): help="number of CPU threads to use", ) - parser.add_argument( - "--OpenCL", - action="store_true", - default=False, - help="Use OpenCL acceleration", - ) - parser.add_argument( "-f", "--frequency", @@ -293,7 +286,6 @@ def main(args=None): "deemp_coeff": (args.deemp_low, args.deemp_high), "audio_filterwidth": args.audio_filterwidth, "AC3": args.AC3, - "OpenCL": args.OpenCL, } if vid_standard == "NTSC" and args.NTSC_color_notch_filter: @@ -421,4 +413,4 @@ def cleanup(): print("\nCompleted: saving JSON and exiting", file=sys.stderr) cleanup() - print(time.time()-firstdecode) +# print(time.time()-firstdecode) From dfa248d61db32abf67c7c6a93de09e8ea34a2f55 Mon Sep 17 00:00:00 2001 From: Chad Page Date: Sun, 11 Jun 2023 19:45:11 -0700 Subject: [PATCH 22/25] Update tests.yml to use ubuntu 22.04 (part 1?) --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8b9acb11f..2ad31c8ca 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,7 +8,7 @@ on: jobs: qt5: name: Build with Qt 5 - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v2 From 574a9b6ff9ffc42a6b54abaff3d22cd9f91500ab Mon Sep 17 00:00:00 2001 From: Chad Date: Sun, 11 Jun 2023 19:55:45 -0700 Subject: [PATCH 23/25] fix PAL by demoting some of angular_mean out of numba --- lddecode/core.py | 7 +++++-- lddecode/utils.py | 12 ++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/lddecode/core.py b/lddecode/core.py index bcfdf6782..c10858e34 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -25,7 +25,7 @@ from .utils import nb_mean, nb_median, nb_round, nb_min, nb_max, nb_abs, nb_absmax, nb_diff, n_orgt, n_orlt from .utils import polar2z, sqsum, genwave, dsa_rescale_and_clip, scale, rms from .utils import findpeaks, findpulses, calczc, inrange, roundfloat -from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance +from .utils import LRUupdate, clb_findbursts, angular_mean_helper, phase_distance from .utils import build_hilbert, unwrap_hilbert, emphasis_iir, filtfft from .utils import fft_do_slice, fft_determine_slices, StridedCollector, hz_to_output_array from .utils import Pulse, nb_std, nb_gt, n_ornotrange @@ -2929,7 +2929,10 @@ def refine_linelocs_pilot(self, linelocs=None): zcs.append(zc) - am = angular_mean(zcs) + angles = angular_mean_helper(np.array(zcs)) + am = np.angle(np.mean(angles)) / (np.pi * 2) + if (am < 0): + am = 1 + am for l in range(0, 323): linelocs[l] += (phase_distance(zcs[l], am) * plen[l]) * 1 diff --git a/lddecode/utils.py b/lddecode/utils.py index 05ce4bb6c..9722bbb16 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1072,8 +1072,8 @@ def n_ornotrange(a, x, y, z): a |= (x < y) | (x > z) -@njit(cache=True,nogil=True) -def angular_mean(x, cycle_len=1.0, zero_base=True): +@njit(cache=True, nojit=True) +def angular_mean_helper(x, cycle_len=1.0, zero_base=True): """ Compute the mean phase, assuming 0..1 is one phase cycle (Using this technique handles the 3.99, 5.01 issue @@ -1081,16 +1081,12 @@ def angular_mean(x, cycle_len=1.0, zero_base=True): naive computation could be changed to rotate around 0.5, that breaks down when things are out of phase...) """ - x2 = x - np.floor(x) # not strictly necessary but slightly more precise + x2 = x - x.astype(np.int32) # not strictly necessary but slightly more precise # refer to https://en.wikipedia.org/wiki/Mean_of_circular_quantities angles = [np.e ** (1j * f * np.pi * 2 / cycle_len) for f in x2] - am = np.angle(np.mean(angles)) / (np.pi * 2) - if zero_base and (am < 0): - am = 1 + am - - return am + return angles @njit(cache=True) def phase_distance(x, c=0.75): From 9f985b88982c47d5360e0a928afafcbd3b944a46 Mon Sep 17 00:00:00 2001 From: Chad Date: Sun, 11 Jun 2023 20:06:27 -0700 Subject: [PATCH 24/25] remove qt5-default --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2ad31c8ca..e075de07d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,7 +24,7 @@ jobs: sudo apt-get update # Based on: https://github.com/happycube/ld-decode/wiki/Installation # Added: cmake libqt5opengl5-dev libqt5svg5-dev - sudo apt-get install -y --no-install-recommends git cmake make python3-distutils python3-numpy python3-scipy python3-matplotlib git qt5-default libqt5opengl5-dev libqt5svg5-dev libqwt-qt5-dev libfftw3-dev python3-numba libavformat-dev libavcodec-dev libavutil-dev ffmpeg + sudo apt-get install -y --no-install-recommends git cmake make python3-distutils python3-numpy python3-scipy python3-matplotlib git libqt5opengl5-dev libqt5svg5-dev libqwt-qt5-dev libfftw3-dev python3-numba libavformat-dev libavcodec-dev libavutil-dev ffmpeg - name: Set up build dir timeout-minutes: 1 From ba8dd5cdea940c7b7a9623f61f556dd738806ab9 Mon Sep 17 00:00:00 2001 From: Chad Date: Sun, 11 Jun 2023 20:08:13 -0700 Subject: [PATCH 25/25] oops. fix angular_mean_helper @njit --- lddecode/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lddecode/utils.py b/lddecode/utils.py index 9722bbb16..84f54f0d3 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -1072,7 +1072,7 @@ def n_ornotrange(a, x, y, z): a |= (x < y) | (x > z) -@njit(cache=True, nojit=True) +@njit(cache=True, nogil=True) def angular_mean_helper(x, cycle_len=1.0, zero_base=True): """ Compute the mean phase, assuming 0..1 is one phase cycle