diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8b9acb11f..e075de07d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,7 +8,7 @@ on: jobs: qt5: name: Build with Qt 5 - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v2 @@ -24,7 +24,7 @@ jobs: sudo apt-get update # Based on: https://github.com/happycube/ld-decode/wiki/Installation # Added: cmake libqt5opengl5-dev libqt5svg5-dev - sudo apt-get install -y --no-install-recommends git cmake make python3-distutils python3-numpy python3-scipy python3-matplotlib git qt5-default libqt5opengl5-dev libqt5svg5-dev libqwt-qt5-dev libfftw3-dev python3-numba libavformat-dev libavcodec-dev libavutil-dev ffmpeg + sudo apt-get install -y --no-install-recommends git cmake make python3-distutils python3-numpy python3-scipy python3-matplotlib git libqt5opengl5-dev libqt5svg5-dev libqwt-qt5-dev libfftw3-dev python3-numba libavformat-dev libavcodec-dev libavutil-dev ffmpeg - name: Set up build dir timeout-minutes: 1 diff --git a/lddecode/core.py b/lddecode/core.py index 2a3c4e8c2..c10858e34 100644 --- a/lddecode/core.py +++ b/lddecode/core.py @@ -6,12 +6,7 @@ import time import types -from multiprocessing import Queue, JoinableQueue, Pipe - -if platform.system() == "Darwin": - from multiprocessing import set_start_method -if platform.system() != "Windows": - from multiprocessing import Process +from queue import Queue # standard numeric/scientific libraries import numpy as np @@ -20,31 +15,20 @@ import numba from numba import njit -# Use PyFFTW's faster FFT implementation if available -if platform.system != "Windows": - try: - import pyfftw.interfaces.numpy_fft as npfft - import pyfftw.interfaces - - pyfftw.interfaces.cache.enable() - pyfftw.interfaces.cache.set_keepalive_time(10) - except ImportError: - import numpy.fft as npfft -else: - # Don't use pyfftw on windows as we have to use Thread and that causes - # issues if also using pyfftw. - import numpy.fft as npfft +# Use standard numpy fft, since it's thread-safe +import numpy.fft as npfft # internal libraries from . import efm_pll from .utils import get_git_info, ac3_pipe, ldf_pipe, traceback -from .utils import nb_mean, nb_median, nb_round, nb_min, nb_max, nb_absmax +from .utils import nb_mean, nb_median, nb_round, nb_min, nb_max, nb_abs, nb_absmax, nb_diff, n_orgt, n_orlt from .utils import polar2z, sqsum, genwave, dsa_rescale_and_clip, scale, rms from .utils import findpeaks, findpulses, calczc, inrange, roundfloat -from .utils import LRUupdate, clb_findbursts, angular_mean, phase_distance +from .utils import LRUupdate, clb_findbursts, angular_mean_helper, phase_distance from .utils import build_hilbert, unwrap_hilbert, emphasis_iir, filtfft -from .utils import fft_do_slice, fft_determine_slices, StridedCollector +from .utils import fft_do_slice, fft_determine_slices, StridedCollector, hz_to_output_array +from .utils import Pulse, nb_std, nb_gt, n_ornotrange try: # If Anaconda's numpy is installed, mkl will use all threads for fft etc @@ -60,12 +44,19 @@ # and ld-decode. Probably should just bring all logging in here... logger = None +try: + profile +except: + def profile(fn): + return fn + +BLOCKSIZE = 32 * 1024 def calclinelen(SP, mult, mhz): if type(mhz) == str: mhz = SP[mhz] - return int(np.round(SP["line_period"] * mhz * mult)) + return int(nb_round(SP["line_period"] * mhz * mult)) # states for first field of validpulses (second field is pulse #) @@ -293,7 +284,7 @@ def __init__( self, inputfreq=40, system="NTSC", - blocklen=32 * 1024, + blocklen=BLOCKSIZE, decode_digital_audio=False, decode_analog_audio=0, has_analog_audio=True, @@ -377,6 +368,8 @@ def __init__( linelen = self.freq_hz / (1000000.0 / self.SysParams["line_period"]) self.linelen = int(np.round(linelen)) + self.samplesperline = self.freq / self.linelen + # How much horizontal sync position can deviate from previous/expected position # and still be interpreted as a horizontal sync pulse. # Too high tolerance may result in false positive sync pulses, too low may end up missing them. @@ -392,6 +385,7 @@ def __init__( # are a few unusable samples at the end. self.blockcut_end = self.Filters["F05_offset"] + def computefilters(self): """ (re)compute the filter sets """ @@ -437,6 +431,7 @@ def computefilters(self): self.computedelays() + def computeefmfilter(self): """Frequency-domain equalisation filter for the LaserDisc EFM signal. This was inspired by the input signal equaliser in WSJT-X, described in @@ -683,12 +678,16 @@ def hztoire(self, hz, spec=False): return (hz - params["ire0"]) / params["hz_ire"] def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): - rv = {} - mtf_level *= self.mtf_mult mtf_level *= self.DecoderParams["MTF_basemult"] mtf_level += self.mtf_offset + return self.demodblock_cpu(data, mtf_level, fftdata, cut) + + + def demodblock_cpu(self, data=None, mtf_level=0, fftdata=None, cut=False): + rv = {} + if fftdata is not None: indata_fft = fftdata elif data is not None: @@ -703,7 +702,7 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): rv["rfhpf"] = npfft.ifft(indata_fft * self.Filters["Frfhpf"]).real rv["rfhpf"] = rv["rfhpf"][ self.blockcut - rotdelay : -self.blockcut_end - rotdelay - ] + ].astype(np.float32) if self.system == "PAL" and self.PAL_V4300D_NotchFilter: """ This routine works around an 'interesting' issue seen with LD-V4300D players and @@ -750,11 +749,11 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): out_videopilot = npfft.ifft(demod_fft * self.Filters["FVideoPilot"]).real video_out = np.rec.array( [ - out_video, - demod, - out_video05, - out_videoburst, - out_videopilot, + out_video.astype(np.float32), + demod.astype(np.float32), + out_video05.astype(np.float32), + out_videoburst.astype(np.float32), + out_videopilot.astype(np.float32), ], names=[ "demod", @@ -766,7 +765,7 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): ) else: video_out = np.rec.array( - [out_video, demod, out_video05, out_videoburst], + [out_video.astype(np.float32), demod.astype(np.float32), out_video05.astype(np.float32), out_videoburst.astype(np.float32)], names=["demod", "demod_raw", "demod_05", "demod_burst"], ) @@ -791,10 +790,10 @@ def demodblock(self, data=None, mtf_level=0, fftdata=None, cut=False): # Demodulate and restore frequency after bin slicing a1u = unwrap_hilbert(a1, afilter.a1_freq) + afilter.low_freq - stage1_out.append(a1u) + stage1_out.append(a1u.astype(np.float32)) audio_out = np.rec.array( - [stage1_out[0], stage1_out[1]], names=["audio_left", "audio_right"] + [stage1_out[0].astype(np.float32), stage1_out[1].astype(np.float32)], names=["audio_left", "audio_right"] ) fdiv = video_out.shape[0] // audio_out.shape[0] @@ -950,7 +949,7 @@ def computedelays(self, mtf_level=0): fakesignal += 8192 fakesignal[6000:6005] = 0 - fakedecode = rf.demodblock(fakesignal, mtf_level=mtf_level) + fakedecode = rf.demodblock_cpu(fakesignal, mtf_level=mtf_level) vdemod = fakedecode["video"]["demod"] vdemod_raw = fakedecode["video"]["demod_raw"] @@ -986,6 +985,7 @@ def __init__( rf, infile, loader, + rf_args, cachesize=256, num_worker_threads=6, MTF_tolerance=0.05, @@ -993,6 +993,7 @@ def __init__( self.infile = infile self.loader = loader self.rf = rf + self.rf_args = rf_args self.currentMTF = 1 self.MTF_tolerance = MTF_tolerance @@ -1001,45 +1002,41 @@ def __init__( # Cache dictionary - key is block #, which holds data for that block self.lrusize = cachesize - self.prefetch = 32 # TODO: set this to proper amount for format + + # should be in self.rf, but may not be computed yet + self.bytes_per_field = int(self.rf.freq_hz / (self.rf.SysParams["FPS"] * 2)) + 1 + self.prefetch = int((self.bytes_per_field * 2) / self.blocksize) + 4 + self.lru = [] self.lock = threading.Lock() self.blocks = {} - if platform.system() == "Darwin": - set_start_method("fork") - - # Workaround to make it work on windows. - # Using Process gives a "io.BufferedReader can't be pickled error". - # Using Thread may be a bit slower due to how python threads work, - # so ideally we would want to find a better way to do this later. - thread_type = Process if platform.system() != "Windows" else threading.Thread - self.block_status = {} - self.q_in = JoinableQueue() + self.q_in = Queue() self.q_out = Queue() + self.waiting = set() + self.q_out_event = threading.Event() self.threadpipes = [] self.threads = [] + self.request = 0 + self.ended = False + + self.deqeue_thread = threading.Thread(target=self.dequeue, daemon=True) num_worker_threads = max(num_worker_threads - 1, 1) for i in range(num_worker_threads): - self.threadpipes.append(Pipe()) - t = thread_type( - target=self.worker, daemon=True, args=(self.threadpipes[-1][1],) + t = threading.Thread( + target=self.worker, daemon=True, args=() ) t.start() self.threads.append(t) - self.deqeue_thread = threading.Thread(target=self.dequeue, daemon=True) self.deqeue_thread.start() - self.request = 0 - self.ended = False - def end(self): if not self.ended: # stop workers @@ -1112,14 +1109,14 @@ def apply_newparams(self, newparams): self.rf.computefilters() - def worker(self, pipein): + def worker(self): + blocksrun = 0 + blockstime = 0 + + rf = RFDecode(**self.rf_args) + while True: - ispiped = False - if pipein.poll(): - item = pipein.recv() - ispiped = True - else: - item = self.q_in.get() + item = self.q_in.get() if item is None or item[0] == "END": return @@ -1139,18 +1136,21 @@ def worker(self, pipein): "demod" not in block or np.abs(block["MTF"] - target_MTF) > self.MTF_tolerance ): - output["demod"] = self.rf.demodblock( + st = time.time() + output["demod"] = rf.demodblock( data=block["rawinput"], fftdata=fftdata, mtf_level=target_MTF, cut=True ) + blockstime += time.time() - st + blocksrun += 1 + output["MTF"] = target_MTF output["request"] = request + # print(blocknum, output) self.q_out.put((blocknum, output)) elif item[0] == "NEWPARAMS": self.apply_newparams(item[1]) - if not ispiped: - self.q_in.task_done() def doread(self, blocknums, MTF, redo=False, prefetch=False): need_blocks = [] @@ -1194,13 +1194,17 @@ def doread(self, blocknums, MTF, redo=False, prefetch=False): if redo or not waiting: queuelist.append(b) need_blocks.append(b) - elif self.block_status[b]["waiting"]: + elif waiting: need_blocks.append(b) + + if not prefetch: + self.waiting.add(b) for b in queuelist: self.block_status[b] = {'MTF': MTF, 'waiting': True, 'request': self.request, 'prefetch': prefetch} self.q_in.put(("DEMOD", b, self.blocks[b], MTF, self.request)) + self.q_out_event.clear() return None if reached_end else need_blocks def dequeue(self): @@ -1227,14 +1231,20 @@ def dequeue(self): self.blocks[blocknum][k] = item[k] if 'demod' in item.keys(): - self.block_status[blocknum]['waiting'] = False + if self.block_status[blocknum]['waiting']: + self.block_status[blocknum]['waiting'] = False + + if blocknum in self.waiting: + self.waiting.remove(blocknum) + + if not len(self.waiting): + self.q_out_event.set() if "input" not in self.blocks[blocknum]: self.blocks[blocknum]["input"] = self.blocks[blocknum]["rawinput"][ self.rf.blockcut : -self.rf.blockcut_end ] - def read(self, begin, length, MTF=0, getraw = False, forceredo=False): # transpose the cache by key, not block # t = {"input": [], "fft": [], "video": [], "audio": [], "efm": [], "rfhpf": []} @@ -1263,8 +1273,10 @@ def read(self, begin, length, MTF=0, getraw = False, forceredo=False): return rv while need_blocks is not None and len(need_blocks): - time.sleep(0.001) # A crude busy loop + self.q_out_event.wait(.01) need_blocks = self.doread(toread, MTF) + if need_blocks: + self.q_out_event.clear() if need_blocks is None: # EOF @@ -1418,7 +1430,7 @@ def _downscale_audio_to_output( # Downscales to 16bit/44.1khz. It might be nice when analog audio is better to support 24/96, # but if we only support one output type, matching CD audio/digital sound is greatly preferable. -def downscale_audio(audio, lineinfo, rf, linecount, timeoffset=0, freq=44100): +def downscale_audio(audio, lineinfo, rf, linecount, timeoffset=0, freq=44100, rv=None): """downscale audio for output. Parameters: @@ -1456,6 +1468,10 @@ def downscale_audio(audio, lineinfo, rf, linecount, timeoffset=0, freq=44100): if failed: logger.warning("Analog audio processing error, muting samples") + if rv is not None: + rv['dsaudio'] = output16 + rv['audio_next_offset'] = arange[-1] - frametime + return output16, arange[-1] - frametime @@ -1506,8 +1522,10 @@ def __init__( # this is eventually set to 262/263 and 312/313 for audio timing self.linecount = None + #@profile def process(self): self.linelocs1, self.linebad, self.nextfieldoffset = self.compute_linelocs() + #print(self.readloc, self.linelocs1, self.nextfieldoffset) if self.linelocs1 is None: if self.nextfieldoffset is None: self.nextfieldoffset = self.rf.linelen * 200 @@ -1524,6 +1542,7 @@ def process(self): self.valid = True + @profile def get_linelen(self, line=None, linelocs=None): # compute adjusted frequency from neighboring line lengths @@ -1552,7 +1571,7 @@ def get_linelen(self, line=None, linelocs=None): return length def get_linefreq(self, line=None, linelocs=None): - return self.rf.freq * (self.get_linelen(line, linelocs) / self.rf.linelen) + return self.rf.samplesperline * self.get_linelen(line, linelocs) def usectoinpx(self, x, line=None): return x * self.get_linefreq(line) @@ -1560,6 +1579,7 @@ def usectoinpx(self, x, line=None): def inpxtousec(self, x, line=None): return x / self.get_linefreq(line) + @profile def lineslice(self, l, begin=None, length=None, linelocs=None, begin_offset=0): """ return a slice corresponding with pre-TBC line l, begin+length are uSecs """ @@ -1573,8 +1593,8 @@ def lineslice(self, l, begin=None, length=None, linelocs=None, begin_offset=0): _length = self.usectoinpx(_length) return slice( - int(np.floor(_begin + begin_offset)), - int(np.ceil(_begin + _length + begin_offset)), + int(_begin + begin_offset), + int(_begin + _length + begin_offset + 1), ) def usectooutpx(self, x): @@ -1583,7 +1603,18 @@ def usectooutpx(self, x): def outpxtousec(self, x): return x / self.rf.SysParams["outfreq"] + #@profile def hz_to_output(self, input): + if type(input) == np.ndarray: + return hz_to_output_array( + input, + self.rf.DecoderParams["ire0"], + self.rf.DecoderParams["hz_ire"], + self.rf.SysParams["outputZero"], + self.rf.DecoderParams["vsync_ire"], + self.out_scale + ) + reduced = (input - self.rf.DecoderParams["ire0"]) / self.rf.DecoderParams["hz_ire"] reduced -= self.rf.DecoderParams["vsync_ire"] @@ -1599,6 +1630,7 @@ def output_to_ire(self, output): (output - self.rf.SysParams["outputZero"]) / self.out_scale ) + self.rf.DecoderParams["vsync_ire"] + def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=False): """ return a slice corresponding with pre-TBC line l """ @@ -1615,8 +1647,9 @@ def lineslice_tbc(self, l, begin=None, length=None, linelocs=None, keepphase=Fal else self.rf.SysParams["outlinelen"] ) - return slice(int(np.round(_begin)), int(np.round(_begin + _length))) + return slice(nb_round(_begin), nb_round(_begin + _length)) + @profile def get_timings(self): pulses = self.rawpulses hsync_typical = self.usectoinpx(self.rf.SysParams["hsyncPulseUS"]) @@ -1668,8 +1701,7 @@ def get_timings(self): return LT - def pulse_qualitycheck(self, prevpulse, pulse): - + def pulse_qualitycheck(self, prevpulse: Pulse, pulse: Pulse): if prevpulse[0] > 0 and pulse[0] > 0: exprange = (0.4, 0.6) elif prevpulse[0] == 0 and pulse[0] == 0: @@ -1682,6 +1714,7 @@ def pulse_qualitycheck(self, prevpulse, pulse): return inorder + #@profile def run_vblank_state_machine(self, pulses, LT): """ Determines if a pulse set is a valid vblank by running a state machine """ @@ -1772,8 +1805,9 @@ def run_vblank_state_machine(self, pulses, LT): return done, validpulses + @profile def refinepulses(self): - LT = self.get_timings() + self.LT = self.get_timings() HSYNC, EQPL1, VSYNC, EQPL2 = range(4) @@ -1783,7 +1817,7 @@ def refinepulses(self): while i < len(self.rawpulses): curpulse = self.rawpulses[i] - if inrange(curpulse.len, *LT["hsync"]): + if inrange(curpulse.len, *self.LT["hsync"]): good = ( self.pulse_qualitycheck(valid_pulses[-1], (0, curpulse)) if len(valid_pulses) @@ -1793,12 +1827,12 @@ def refinepulses(self): i += 1 elif ( i > 2 - and inrange(self.rawpulses[i].len, *LT["eq"]) + and inrange(self.rawpulses[i].len, *self.LT["eq"]) and (len(valid_pulses) and valid_pulses[-1][0] == HSYNC) ): # print(i, self.rawpulses[i]) done, vblank_pulses = self.run_vblank_state_machine( - self.rawpulses[i - 2 : i + 24], LT + self.rawpulses[i - 2 : i + 24], self.LT ) if done: [valid_pulses.append(p) for p in vblank_pulses[2:]] @@ -1811,6 +1845,7 @@ def refinepulses(self): return valid_pulses + #@profile def getBlankRange(self, validpulses, start=0): vp_type = np.array([p[0] for p in validpulses]) @@ -1904,7 +1939,7 @@ def processVBlank(self, validpulses, start, limit=None): dist = (firstloc - loc_presync) / self.inlinelen # get the integer rounded X * .5H distance. then invert to determine # the half-H alignment with the sync/blank pulses - hdist = int(np.round(dist * 2)) + hdist = nb_round(dist * 2) # isfirstfield = not ((hdist % 2) == self.rf.SysParams['firstField1H'][0]) isfirstfield = (hdist % 2) == (self.rf.SysParams["firstFieldH"][1] != 1) @@ -1951,11 +1986,12 @@ def processVBlank(self, validpulses, start, limit=None): return None, None, None, 0 + #@profile def computeLineLen(self, validpulses): # determine longest run of 0's longrun = [-1, -1] currun = None - for i, v in enumerate([p[0] for p in self.validpulses]): + for i, v in enumerate([p[0] for p in validpulses]): if v != 0: if currun is not None and currun[1] > longrun[1]: longrun = currun @@ -1970,10 +2006,10 @@ def computeLineLen(self, validpulses): linelens = [] for i in range(longrun[0] + 1, longrun[0] + longrun[1]): - linelen = self.validpulses[i][1].start - self.validpulses[i - 1][1].start + linelen = validpulses[i][1].start - validpulses[i - 1][1].start if inrange(linelen / self.inlinelen, 0.95, 1.05): linelens.append( - self.validpulses[i][1].start - self.validpulses[i - 1][1].start + validpulses[i][1].start - validpulses[i - 1][1].start ) if len(linelens) > 0: @@ -1981,6 +2017,7 @@ def computeLineLen(self, validpulses): else: return self.inlinelen + def skip_check(self): """ This routine checks to see if there's a (probable) VSYNC at the end. Returns a (currently rough) probability. @@ -2015,7 +2052,8 @@ def skip_check(self): # pull the above together into a routine that (should) find line 0, the last line of # the previous field. - def getLine0(self, validpulses): + #@profile + def getLine0(self, validpulses, meanlinelen): # Gather the local line 0 location and projected from the previous field self.sync_confidence = 100 @@ -2043,12 +2081,11 @@ def getLine0(self, validpulses): if self.vblank_next is not None: isFirstField_next = not isNotFirstField_next - meanlinelen = self.computeLineLen(validpulses) fieldlen = ( meanlinelen * self.rf.SysParams["field_lines"][0 if isFirstField_next else 1] ) - line0loc_next = int(np.round(self.vblank_next - fieldlen)) + line0loc_next = nb_round(self.vblank_next - fieldlen) if line0loc_next < 0: self.sync_confidence = 10 @@ -2178,6 +2215,7 @@ def getpulses(self): return findpulses(self.data["video"]["demod_05"], pulse_hz_min, pulse_hz_max) + #@profile def compute_linelocs(self): self.rawpulses = self.getpulses() @@ -2191,8 +2229,8 @@ def compute_linelocs(self): self.validpulses = validpulses = self.refinepulses() - - line0loc, lastlineloc, self.isFirstField = self.getLine0(validpulses) + meanlinelen = self.computeLineLen(validpulses) + line0loc, lastlineloc, self.isFirstField = self.getLine0(validpulses, meanlinelen) self.linecount = 263 if self.isFirstField else 262 # Number of lines to actually process. This is set so that the entire following @@ -2217,8 +2255,6 @@ def compute_linelocs(self): return None, None, self.inlinelen * 200 - meanlinelen = self.computeLineLen(validpulses) - # If we don't have enough data at the end, move onto the next field lastline = (self.rawpulses[-1].start - line0loc) / meanlinelen if lastline < proclines: @@ -2338,6 +2374,7 @@ def compute_linelocs(self): return rv_ll, rv_err, nextfield + #@profile def refine_linelocs_hsync(self): linelocs2 = self.linelocs1.copy() @@ -2457,6 +2494,7 @@ def computewow(self, lineinfo): return wow + #@profile def downscale( self, lineinfo=None, @@ -2474,6 +2512,20 @@ def downscale( # for video always output 263/313 lines linesout = self.outlinecount + audio_thread = None + if audio != 0 and self.rf.decode_analog_audio: + audio_rv = {} + audio_thread = threading.Thread(target=downscale_audio, args=( + self.data["audio"], + lineinfo, + self.rf, + self.linecount, + self.audio_offset, + audio, + audio_rv) + ) + audio_thread.start() + dsout = np.zeros((linesout * outwidth), dtype=np.double) # self.lineoffset is an adjustment for 0-based lines *before* downscaling so add 1 here lineoffset = self.lineoffset + 1 @@ -2497,16 +2549,6 @@ def downscale( (l - lineoffset) * outwidth : (l + 1 - lineoffset) * outwidth ] = self.rf.DecoderParams["ire0"] - if audio != 0 and self.rf.decode_analog_audio: - self.dsaudio, self.audio_next_offset = downscale_audio( - self.data["audio"], - lineinfo, - self.rf, - self.linecount, - self.audio_offset, - freq=audio - ) - if self.rf.decode_digital_audio: self.efmout = self.data["efm"][ int(self.linelocs[1]) : int(self.linelocs[self.linecount + 1]) @@ -2518,8 +2560,14 @@ def downscale( dsout = self.hz_to_output(dsout) self.dspicture = dsout + if audio_thread: + audio_thread.join() + self.dsaudio = audio_rv["dsaudio"] + self.audio_next_offset = audio_rv["audio_next_offset"] + return dsout, self.dsaudio, self.efmout + @profile def rf_tbc(self, linelocs=None): """ This outputs a TBC'd version of the input RF data, mostly intended to assist in audio processing. Outputs a uint16 array. @@ -2624,28 +2672,23 @@ def get_vsync_lines(self): return rv + @profile def dropout_detect_demod(self): # current field f = self isPAL = self.rf.system == "PAL" - rfstd = np.std(f.data["rfhpf"]) + rfstd = nb_std(f.data["rfhpf"]) # iserr_rf = np.full(len(f.data['video']['demod']), False, dtype=np.bool) iserr_rf1 = (f.data["rfhpf"] < (-rfstd * 3)) | ( f.data["rfhpf"] > (rfstd * 3) ) # | (f.rawdata <= -32000) - iserr_rf = np.full_like(iserr_rf1, False) - iserr_rf[self.rf.delays["video_rot"] :] = iserr_rf1[ + iserr = np.full_like(iserr_rf1, False) + iserr[self.rf.delays["video_rot"] :] = iserr_rf1[ : -self.rf.delays["video_rot"] ] - # detect absurd fluctuations in pre-deemp demod, since only dropouts can cause them - # (current np.diff has a prepend option, but not in ubuntu 18.04's version) - iserr1 = f.data["video"]["demod_raw"] > self.rf.freq_hz_half - # This didn't work right for PAL (issue #471) - # iserr1 |= f.data['video']['demod_hpf'] > 3000000 - # build sets of min/max valid levels valid_min = np.full_like( f.data["video"]["demod"], f.rf.iretohz(-70 if isPAL else -50) @@ -2660,7 +2703,7 @@ def dropout_detect_demod(self): # Account for sync pulses when checking demod - hsync_len = int(f.get_timings()['hsync'][1]) + hsync_len = int(f.LT['hsync'][1]) vsync_ire = f.rf.SysParams['vsync_ire'] vsync_lines = self.get_vsync_lines() @@ -2676,13 +2719,12 @@ def dropout_detect_demod(self): valid_min[int(f.linelocs[l]):int(f.linelocs[l]) + hsync_len] = sync_min valid_min05[int(f.linelocs[l]):int(f.linelocs[l]) + hsync_len] = sync_min_05 - iserr2 = f.data["video"]["demod"] < valid_min - iserr2 |= f.data["video"]["demod"] > valid_max - - iserr3 = f.data["video"]["demod_05"] < valid_min05 - iserr3 |= f.data["video"]["demod_05"] > valid_max05 + # detect absurd fluctuations in pre-deemp demod, since only dropouts can cause them + # (current np.diff has a prepend option, but not in ubuntu 18.04's version) + n_orgt(iserr, f.data["video"]["demod_raw"], self.rf.freq_hz_half) - iserr = iserr1 | iserr2 | iserr3 | iserr_rf + n_ornotrange(iserr, f.data["video"]["demod"], valid_min, valid_max) + n_ornotrange(iserr, f.data["video"]["demod_05"], valid_min05, valid_max05) # filter out dropouts outside actual field iserr[:int(f.linelocs[f.lineoffset + 1])] = False @@ -2692,6 +2734,7 @@ def dropout_detect_demod(self): return iserr + @profile def build_errlist(self, errmap): errlist = [] @@ -2712,6 +2755,7 @@ def build_errlist(self, errmap): return errlist + @profile def dropout_errlist_to_tbc(field, errlist): """Convert data from raw data coordinates to tbc coordinates, and splits up multi-line dropouts. @@ -2747,7 +2791,7 @@ def dropout_errlist_to_tbc(field, errlist): end_linepos = end_rf_linepos / ( field.linelocs[line + 1] - field.linelocs[line] ) - end_linepos = int(np.round(end_linepos * field.outlinelen)) + end_linepos = nb_round(end_linepos * field.outlinelen) first_line = line + 1 + lineoffset @@ -2778,6 +2822,7 @@ def dropout_errlist_to_tbc(field, errlist): return dropouts + @profile def dropout_detect(self): """ returns dropouts in three arrays, to line up with the JSON output """ @@ -2798,7 +2843,7 @@ def dropout_detect(self): return rv_lines, rv_starts, rv_ends - + @profile def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): line = _line + self.lineoffset # calczc works from integers, so get the start and remainder @@ -2834,36 +2879,17 @@ def compute_line_bursts(self, linelocs, _line, prev_phaseadjust=0): # Apply phase adjustment from previous frame/line if available. phase_adjust = -prev_phaseadjust + # a proper color burst should have ~12-13 zero crossings + isrising = np.zeros(16, dtype=np.bool_) + zcs = np.zeros(16, dtype=np.float32) + # The first pass computes phase_offset, the second uses it to determine # the colo(u)r burst phase of the line. - passcount = 0 - while passcount < 2: - rising_count = 0 - phase_offset = [] - + for passcount in range(2): # this subroutine is in utils.py, broken out so it can be JIT'd - bursts = clb_findbursts( - burstarea, 0, len(burstarea) - 1, threshold - ) - - if len(bursts) == 0: - return None, None + zc_count, phase_adjust, rising_count = clb_findbursts(isrising, zcs, burstarea, 0, len(burstarea) - 1, threshold, bstart, s_rem, zcburstdiv, phase_adjust) - for prevalue, zc in bursts: - zc_cycle = ((bstart + zc - s_rem) / zcburstdiv) + phase_adjust - zc_round = nb_round(zc_cycle) - - phase_offset.append(zc_round - zc_cycle) - - if prevalue < 0: - rising_count += not (zc_round % 2) - else: - rising_count += zc_round % 2 - - phase_adjust += nb_median(np.array(phase_offset)) - passcount += 1 - - rising = (rising_count / len(bursts)) > 0.5 + rising = rising_count > (zc_count / 2) return rising, -phase_adjust @@ -2903,7 +2929,10 @@ def refine_linelocs_pilot(self, linelocs=None): zcs.append(zc) - am = angular_mean(zcs) + angles = angular_mean_helper(np.array(zcs)) + am = np.angle(np.mean(angles)) / (np.pi * 2) + if (am < 0): + am = 1 + am for l in range(0, 323): linelocs[l] += (phase_distance(zcs[l], am) * plen[l]) * 1 @@ -3392,17 +3421,19 @@ def __init__( self.fname_out = fname_out self.firstfield = None # In frame output mode, the first field goes here - self.fieldloc = 0 self.system = system - self.rf = RFDecode( - inputfreq=inputfreq, - system=system, - decode_analog_audio=analog_audio, - decode_digital_audio=digital_audio, - has_analog_audio=self.has_analog_audio, - extra_options=extra_options, - ) + self.rf_opts = { + 'inputfreq':inputfreq, + 'system':system, + 'decode_analog_audio':analog_audio, + 'decode_digital_audio':digital_audio, + 'has_analog_audio':self.has_analog_audio, + 'extra_options':extra_options, + 'blocklen': 32 * 1024, + } + + self.rf = RFDecode(**self.rf_opts) if system == "PAL": self.FieldClass = FieldPAL @@ -3425,13 +3456,10 @@ def __init__( self.audio_offset = 0 self.mtf_level = 1 - self.prevfield = None - self.curfield = None + self.fieldstack = [None, None] self.doDOD = doDOD - self.badfields = None - self.fieldinfo = [] self.leadIn = False @@ -3445,11 +3473,14 @@ def __init__( self.verboseVITS = False self.demodcache = DemodCache( - self.rf, self.infile, self.freader, num_worker_threads=self.numthreads + self.rf, self.infile, self.freader, self.rf_opts, num_worker_threads=self.numthreads ) self.bw_ratios = [] + self.decodethread = None + self.threadreturn = {} + def __del__(self): del self.demodcache @@ -3497,6 +3528,7 @@ def checkMTF(self, field, pfield=None): return np.abs(self.mtf_level - oldmtf) < 0.05 + @profile def detectLevels(self, field): # Returns sync level, 0IRE, and 100IRE levels of a field # computed from HSYNC areas and VITS @@ -3581,36 +3613,42 @@ def writeout(self, dataset): if audio is not None and self.outfile_audio is not None: self.outfile_audio.write(audio) - def decodefield(self, initphase=False, redo=False): + @profile + def decodefield(self, start, mtf_level, audio_offset, prevfield=None, initphase=False, redo=False, rv=None): """ returns field object if valid, and the offset to the next decode """ - self.readloc = int(self.fdoffset - self.rf.blockcut) - if self.readloc < 0: - self.readloc = 0 - self.readloc_block = self.readloc // self.blocksize - self.numblocks = (self.readlen // self.blocksize) + 2 + if rv is None: + rv = {} + + rv['field'] = None + rv['offset'] = None + + readloc = int(start - self.rf.blockcut) + if readloc < 0: + readloc = 0 - self.rawdecode = self.demodcache.read( - self.readloc_block * self.blocksize, - self.numblocks * self.blocksize, - self.mtf_level, + readloc_block = readloc // self.blocksize + numblocks = (self.readlen // self.blocksize) + 2 + + rawdecode = self.demodcache.read( + readloc_block * self.blocksize, + numblocks * self.blocksize, + mtf_level, forceredo=redo ) - if self.rawdecode is None: + if rawdecode is None: # logger.info("Failed to demodulate data") return None, None - self.indata = self.rawdecode["input"] - f = self.FieldClass( self.rf, - self.rawdecode, - audio_offset=self.audio_offset, - prevfield=self.curfield, + rawdecode, + audio_offset=audio_offset, + prevfield=prevfield, initphase=initphase, fields_written=self.fields_written, - readloc=self.rawdecode["startloc"], + readloc=rawdecode["startloc"], ) if self.use_profiler: @@ -3632,48 +3670,99 @@ def decodefield(self, initphase=False, redo=False): except Exception as e: raise e + rv['field'] = f + rv['offset'] = f.nextfieldoffset - (readloc - rawdecode["startloc"]) + if not f.valid: # logger.info("Bad data - jumping one second") - return f, f.nextfieldoffset + rv['offset'] = f.nextfieldoffset - return f, f.nextfieldoffset - (self.readloc - self.rawdecode["startloc"]) + return rv['field'], rv['offset'] + @profile def readfield(self, initphase=False): - # pretty much a retry-ing wrapper around decodefield with MTF checking - self.prevfield = self.curfield done = False adjusted = False - redo = False + redo = None + + if len(self.fieldstack) >= 2: + self.fieldstack.pop(-1) while done is False: if redo: + # Drop existing thread + if self.decodethread: + self.decodethread.join() + self.decodethread = None + + # Start new thread + self.threadreturn = {} + df_args = (redo, self.mtf_level, self.audio_offset, self.fieldstack[0], initphase, redo, self.threadreturn) + + self.decodethread = threading.Thread(target=self.decodefield, args=df_args) + # .run() does not actually run this in the background + self.decodethread.run() + f, offset = self.threadreturn['field'], self.threadreturn['offset'] + self.decodethread = None + # Only allow one redo, no matter what done = True + redo = None + elif self.decodethread and self.decodethread.ident: + self.decodethread.join() + self.decodethread = None + + f, offset = self.threadreturn['field'], self.threadreturn['offset'] + #print(f, f.valid, f.readloc, offset) + else: # assume first run + f = None + offset = 0 + + if True: + # Start new thread + self.threadreturn = {} + if f and f.valid: + self.audio_offset = f.audio_next_offset + prevfield = f + toffset = self.fdoffset + offset + # XXX: this is wrong somehow + audio_offset = f.audio_next_offset + else: + prevfield = None + toffset = self.fdoffset + audio_offset = self.audio_offset - self.fieldloc = self.fdoffset - f, offset = self.decodefield(initphase=initphase, redo=redo) + if offset: + toffset += offset - if f is None: - if offset is None: - # EOF, probably - return None + df_args = (toffset, self.mtf_level, audio_offset, prevfield, initphase, False, self.threadreturn) - self.fdoffset += offset + self.decodethread = threading.Thread(target=self.decodefield, args=df_args) + #elf.decodethread.run() + self.decodethread.start() + #self.decodethread.join() + + # process previous run + if f: + self.fdoffset += offset + elif offset is None: + # Probable end, so push an empty field + self.fieldstack.insert(0, None) - if f is not None and f.valid: + if f and f.valid: picture, audio, efm = f.downscale( linesout=self.output_lines, final=True, audio=self.analog_audio ) - self.audio_offset = f.audio_next_offset - metrics = self.computeMetrics(f, None, verbose=True) if "blackToWhiteRFRatio" in metrics and adjusted is False: keep = 900 if self.isCLV else 30 self.bw_ratios.append(metrics["blackToWhiteRFRatio"]) self.bw_ratios = self.bw_ratios[-keep:] - redo = f.needrerun or not self.checkMTF(f, self.prevfield) + redo = f.needrerun or not self.checkMTF(f, self.fieldstack[0]) + if redo: + redo = self.fdoffset - offset # Perform AGC changes on first fields only to prevent luma mismatch intra-field if self.useAGC and f.isFirstField and f.sync_confidence > 80: @@ -3681,9 +3770,9 @@ def readfield(self, initphase=False): actualwhiteIRE = f.rf.hztoire(ire100_hz) - sync_ire_diff = np.abs(self.rf.hztoire(sync_hz) - self.rf.DecoderParams["vsync_ire"]) - whitediff = np.abs(self.rf.hztoire(ire100_hz) - actualwhiteIRE) - ire0_diff = np.abs(self.rf.hztoire(ire0_hz)) + sync_ire_diff = nb_abs(self.rf.hztoire(sync_hz) - self.rf.DecoderParams["vsync_ire"]) + whitediff = nb_abs(self.rf.hztoire(ire100_hz) - actualwhiteIRE) + ire0_diff = nb_abs(self.rf.hztoire(ire0_hz)) acceptable_diff = 2 if self.fields_written else 0.5 @@ -3697,31 +3786,34 @@ def readfield(self, initphase=False): len(self.fieldinfo), np.round(vsync_ire, 2) )) else: - redo = True + redo = self.fdoffset - offset self.rf.DecoderParams["ire0"] = ire0_hz # Note that vsync_ire is a negative number, so (sync_hz - ire0_hz) is correct self.rf.DecoderParams["hz_ire"] = hz_ire self.rf.DecoderParams["vsync_ire"] = vsync_ire - if adjusted is False and redo is True: + if adjusted is False and redo: self.demodcache.flush_demod() adjusted = True - self.fdoffset -= offset + self.fdoffset = redo else: done = True - else: - # Probably jumping ahead - delete the previous field so - # TBC computations aren't thrown off - if self.curfield is not None and self.badfields is None: - self.badfields = (self.curfield, f) - self.curfield = None + self.fieldstack.insert(0, f) + self.audio_offset = f.audio_next_offset - if f is None or f.valid is False: - return None + if f is None and offset is None: + # EOF, probably + return None - self.curfield = f + if self.decodethread and not self.decodethread.ident and not redo: + self.decodethread.start() + elif redo: + self.decodethread = None + if f is None or f.valid is False: + return None + if f is not None and self.fname_out is not None: # Only write a FirstField first if len(self.fieldinfo) == 0 and not f.isFirstField: @@ -3839,7 +3931,7 @@ def computeMetricsPAL(self, metrics, f, fp=None): # Unforunately this is too short to get a 50IRE RF level wl_slice = f.lineslice_tbc(13, 4.7 + 15.5, 3) metrics["greyPSNR"] = self.calcpsnr(f, wl_slice) - metrics["greyIRE"] = np.mean(f.output_to_ire(f.dspicture[wl_slice])) + metrics["greyIRE"] = nb_mean(f.output_to_ire(f.dspicture[wl_slice])) else: # There's a nice long burst at 50IRE block on field2 l13 b50slice = f.lineslice_tbc(13, 36, 20) @@ -3863,7 +3955,7 @@ def computeMetricsNTSC(self, metrics, f, fp=None): ire50_slice = f.lineslice_tbc(19, 36, 10) metrics["greyPSNR"] = self.calcpsnr(f, ire50_slice) - metrics["greyIRE"] = np.mean(f.output_to_ire(f.dspicture[ire50_slice])) + metrics["greyIRE"] = nb_mean(f.output_to_ire(f.dspicture[ire50_slice])) ire50_rawslice = f.lineslice(19, 36, 10) rawdata = f.rawdata[ @@ -3965,6 +4057,7 @@ def computeMetrics(self, f, fp=None, verbose=False): return metrics_rounded + @profile def buildmetadata(self, f, check_phase=True): """ returns field information JSON and whether or not a backfill field is needed """ prevfi = self.fieldinfo[-1] if len(self.fieldinfo) else None @@ -3973,8 +4066,8 @@ def buildmetadata(self, f, check_phase=True): "isFirstField": True if f.isFirstField else False, "syncConf": f.compute_syncconf(), "seqNo": len(self.fieldinfo) + 1, - "diskLoc": np.round((self.fieldloc / self.bytes_per_field) * 10) / 10, - "fileLoc": int(np.floor(self.fieldloc)), + "diskLoc": np.round((f.readloc / self.bytes_per_field) * 10) / 10, + "fileLoc": int(np.floor(f.readloc)), "medianBurstIRE": roundfloat(f.burstmedian), } @@ -4020,7 +4113,7 @@ def buildmetadata(self, f, check_phase=True): return fi, True fi["decodeFaults"] = decodeFaults - fi["vitsMetrics"] = self.computeMetrics(self.curfield, self.prevfield) + fi["vitsMetrics"] = self.computeMetrics(self.fieldstack[0], self.fieldstack[1]) fi["vbi"] = {"vbiData": [int(lc) for lc in f.linecode if lc is not None]} @@ -4033,7 +4126,7 @@ def buildmetadata(self, f, check_phase=True): # process VBI frame info data self.frameNumber = self.decodeFrameNumber(self.firstfield, f) - rawloc = np.floor((self.readloc / self.bytes_per_field) / 2) + rawloc = np.floor((f.readloc / self.bytes_per_field) / 2) disk_Type = "CLV" if self.isCLV else "CAV" disk_TimeCode = None @@ -4099,11 +4192,13 @@ def seek_getframenr(self, startfield): at file location 0 """ + curfield = None + prevfield = None + self.roughseek(startfield) for fields in range(10): - self.fieldloc = self.fdoffset - f, offset = self.decodefield(initphase=True) + f, offset = self.decodefield(self.fdoffset, 0, 0) if f is None: # If given an invalid starting location (i.e. seeking to a frame in an already cut raw file), @@ -4116,27 +4211,23 @@ def seek_getframenr(self, startfield): elif not f.valid: self.fdoffset += offset else: - self.prevfield = self.curfield - self.curfield = f + prevfield = curfield + curfield = f self.fdoffset += offset # Two fields are needed to be sure to have sufficient Philips code data # to determine frame #. - if self.prevfield is not None and f.valid: - fnum = self.decodeFrameNumber(self.prevfield, self.curfield) + if prevfield is not None and f.valid: + fnum = self.decodeFrameNumber(prevfield, curfield) if self.earlyCLV: logger.error("Cannot seek in early CLV disks w/o timecode") return None, startfield elif fnum is not None: - rawloc = np.floor((self.readloc / self.bytes_per_field) / 2) + rawloc = np.floor((f.readloc / self.bytes_per_field) / 2) logger.info("seeking: file loc %d frame # %d", rawloc, fnum) - # Clear field memory on seeks - self.prevfield = None - self.curfield = None - - return fnum, startfield + return fnum, startfield, f.readloc return None, None @@ -4152,11 +4243,11 @@ def seek(self, startframe, target): curfield = startframe * 2 for retries in range(3): - fnr, curfield = self.seek_getframenr(curfield) + fnr, curfield, readloc = self.seek_getframenr(curfield) if fnr is None: return None - cur = int((self.fieldloc / self.bytes_per_field)) + cur = int((readloc / self.bytes_per_field)) if fnr == target: logger.info("Finished seek") print("Finished seeking, starting at frame", fnr, file=sys.stderr) @@ -4167,7 +4258,7 @@ def seek(self, startframe, target): return None - def build_json(self, f): + def build_json(self): """ build up the JSON structure for file output. """ jout = {} jout["pcmAudioParameters"] = { @@ -4182,8 +4273,13 @@ def build_json(self, f): vp["numberOfSequentialFields"] = len(self.fieldinfo) vp["osInfo"] = f'{platform.system()}:{platform.release()}:{platform.version()}' - if f is None: - return + # get the first valid field in the stack if any + for f in self.fieldstack: + if f: + break + + if not f: + return None vp["gitBranch"] = self.branch vp["gitCommit"] = self.commit diff --git a/lddecode/main.py b/lddecode/main.py index be18f45e5..2f3acc17c 100644 --- a/lddecode/main.py +++ b/lddecode/main.py @@ -208,7 +208,7 @@ def main(args=None): "--threads", metavar="threads", type=int, - default=5, + default=4, help="number of CPU threads to use", ) @@ -377,13 +377,14 @@ def main(args=None): jsondumper = jsondump_thread(ldd, outname) def cleanup(): - jsondumper.put(ldd.build_json(ldd.curfield)) + jsondumper.put(ldd.build_json()) # logger.flush() ldd.close() jsondumper.put(None) if audio_pipe is not None: audio_pipe.close() + firstdecode = time.time() while not done and ldd.fields_written < (req_frames * 2): try: f = ldd.readfield() @@ -407,7 +408,9 @@ def cleanup(): done = True if ldd.fields_written < 100 or ((ldd.fields_written % 500) == 0): - jsondumper.put(ldd.build_json(ldd.curfield)) + jsondumper.put(ldd.build_json()) print("\nCompleted: saving JSON and exiting", file=sys.stderr) cleanup() + +# print(time.time()-firstdecode) diff --git a/lddecode/utils.py b/lddecode/utils.py index 0d6847c9b..84f54f0d3 100644 --- a/lddecode/utils.py +++ b/lddecode/utils.py @@ -642,7 +642,7 @@ def sqsum(cmplx): return np.sqrt((cmplx.real ** 2) + (cmplx.imag ** 2)) -@njit(cache=True) +@njit(cache=True,nogil=True) def calczc_findfirst(data, target, rising): if rising: for i in range(0, len(data)): @@ -658,7 +658,7 @@ def calczc_findfirst(data, target, rising): return None -@njit(cache=True) +@njit(cache=True,nogil=True) def calczc_do(data, _start_offset, target, edge=0, count=10): start_offset = max(1, int(_start_offset)) icount = int(count + 1) @@ -716,8 +716,8 @@ def build_hilbert(fft_size): return output - -def unwrap_hilbert(hilbert, freq_hz): +@njit(cache=True,nogil=True) +def unwrap_hilbert_getangles(hilbert): tangles = np.angle(hilbert) dangles = np.ediff1d(tangles, to_begin=0) @@ -725,14 +725,26 @@ def unwrap_hilbert(hilbert, freq_hz): if dangles[0] < -pi: dangles[0] += tau - tdangles2 = np.unwrap(dangles) + return dangles + +@njit(cache=True,nogil=True) +def unwrap_hilbert_fixangles(tdangles2, freq_hz): # With extremely bad data, the unwrapped angles can jump. while np.min(tdangles2) < 0: tdangles2[tdangles2 < 0] += tau while np.max(tdangles2) > tau: tdangles2[tdangles2 > tau] -= tau + return tdangles2 * (freq_hz / tau) +def unwrap_hilbert(hilbert, freq_hz): + dangles = unwrap_hilbert_getangles(hilbert) + + # This can't be run with numba + tdangles2 = np.unwrap(dangles) + + return unwrap_hilbert_fixangles(tdangles2, freq_hz) #* (freq_hz / tau) + def fft_determine_slices(center, min_bandwidth, freq_hz, bins_in): """ returns the # of sub-bins needed to get center+/-min_bandwidth. @@ -822,6 +834,17 @@ def roundfloat(fl, places=3): return np.round(fl * r) / r +@njit(nogil=True, cache=True) +def hz_to_output_array(input, ire0, hz_ire, outputZero, vsync_ire, out_scale): + reduced = (input - ire0) / hz_ire + reduced -= vsync_ire + + return ( + np.clip((reduced * out_scale) + outputZero, 0, 65535) + 0.5 + ).astype(np.uint16) + + + # Something like this should be a numpy function, but I can't find it. @jit(cache=True) def findareas(array, cross): @@ -903,7 +926,53 @@ def findpulses(sync_ref, _, high): ) return _to_pulses_list(pulses_starts, pulses_lengths) +if False: + @njit(cache=True,nogil=True) + def numba_pulse_qualitycheck(prevpulse: Pulse, pulse: Pulse, inlinelen: int): + + if prevpulse[0] > 0 and pulse[0] > 0: + exprange = (0.4, 0.6) + elif prevpulse[0] == 0 and pulse[0] == 0: + exprange = (0.9, 1.1) + else: # transition to/from regular hsyncs can be .5 or 1H + exprange = (0.4, 1.1) + + linelen = (pulse[1].start - prevpulse[1].start) / inlinelen + inorder = inrange(linelen, *exprange) + + return inorder + + @njit(cache=True,nogil=True) + def numba_computeLineLen(validpulses, inlinelen): + # determine longest run of 0's + longrun = [-1, -1] + currun = None + for i, v in enumerate([p[0] for p in validpulses]): + if v != 0: + if currun is not None and currun[1] > longrun[1]: + longrun = currun + currun = None + elif currun is None: + currun = [i, 0] + else: + currun[1] += 1 + + if currun is not None and currun[1] > longrun[1]: + longrun = currun + + linelens = [] + for i in range(longrun[0] + 1, longrun[0] + longrun[1]): + linelen = validpulses[i][1].start - validpulses[i - 1][1].start + if inrange(linelen / inlinelen, 0.95, 1.05): + linelens.append( + validpulses[i][1].start - validpulses[i - 1][1].start + ) + + # Can't prepare the mean here since numba np.mean doesn't work over lists + return linelens + +@njit(cache=True, nogil=True) def findpeaks(array, low=0): array2 = array.copy() array2[np.where(array2 < low)] = 0 @@ -928,52 +997,83 @@ def LRUupdate(l, k): l.insert(0, k) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_median(m): return np.median(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_round(m): return int(np.round(m)) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_mean(m): return np.mean(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_min(m): return np.min(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_max(m): return np.max(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_abs(m): return np.abs(m) -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_absmax(m): return np.max(np.abs(m)) -@njit(cache=True) +@njit(cache=True,nogil=True) +def nb_std(m): + return np.std(m) + + +@njit(cache=True,nogil=True) +def nb_diff(m): + return np.diff(m) + + +@njit(cache=True,nogil=True) def nb_mul(x, y): return x * y -@njit(cache=True) +@njit(cache=True,nogil=True) def nb_where(x): return np.where(x) -def angular_mean(x, cycle_len=1.0, zero_base=True): +@njit(cache=True,nogil=True) +def nb_gt(x, y): + return (x > y) + + +@njit(cache=True,nogil=True) +def n_orgt(a, x, y): + a |= (x > y) + + +@njit(cache=True,nogil=True) +def n_orlt(a, x, y): + a |= (x < y) + + +@njit(cache=True,nogil=True) +def n_ornotrange(a, x, y, z): + a |= (x < y) | (x > z) + + +@njit(cache=True, nogil=True) +def angular_mean_helper(x, cycle_len=1.0, zero_base=True): """ Compute the mean phase, assuming 0..1 is one phase cycle (Using this technique handles the 3.99, 5.01 issue @@ -981,18 +1081,14 @@ def angular_mean(x, cycle_len=1.0, zero_base=True): naive computation could be changed to rotate around 0.5, that breaks down when things are out of phase...) """ - x2 = x - np.floor(x) # not strictly necessary but slightly more precise + x2 = x - x.astype(np.int32) # not strictly necessary but slightly more precise # refer to https://en.wikipedia.org/wiki/Mean_of_circular_quantities angles = [np.e ** (1j * f * np.pi * 2 / cycle_len) for f in x2] - am = np.angle(np.mean(angles)) / (np.pi * 2) - if zero_base and (am < 0): - am = 1 + am - - return am - + return angles +@njit(cache=True) def phase_distance(x, c=0.75): """ returns the shortest path between two phases (assuming x and c are in (0..1)) """ d = (x - np.floor(x)) - c @@ -1027,25 +1123,36 @@ def dsa_rescale_and_clip(infloat): # removed so that they can be JIT'd -@njit(cache=True) -def clb_findbursts(burstarea, i, endburstarea, threshold): - out = [] - +@njit(cache=True,nogil=True) +def clb_findbursts(isrising, zcs, burstarea, i, endburstarea, threshold, bstart, s_rem, zcburstdiv, phase_adjust): + zc_count = 0 + rising_count = 0 j = i - while j < endburstarea: + + isrising.fill(False) + zcs.fill(0) + + while j < endburstarea and zc_count < len(zcs): if np.abs(burstarea[j]) > threshold: - pre = burstarea[j] zc = calczc_do(burstarea, j, 0) if zc is not None: - out.append((burstarea[j], zc)) + isrising[zc_count] = burstarea[j] < 0 + zcs[zc_count] = zc + zc_count += 1 j = int(zc) + 1 else: - return out + break else: j += 1 - return out + if zc_count: + zc_cycles = ((bstart + zcs - s_rem) / zcburstdiv) + phase_adjust + # numba doesn't support np.round over an array, so we have to do this + zc_rounds = (zc_cycles + .5).astype(np.int32) + phase_adjust += nb_median(zc_rounds - zc_cycles) + rising_count = np.sum(np.bitwise_xor(isrising, (zc_rounds % 2) != 0)) + return zc_count, phase_adjust, rising_count @njit(cache=True) def distance_from_round(x): @@ -1053,6 +1160,26 @@ def distance_from_round(x): return np.round(x) - x +def init_opencl(cl, name = None): + # Create some context on the first available GPU + if 'PYOPENCL_CTX' in os.environ: + ctx = cl.create_some_context() + else: + ctx = None + # Find the first OpenCL GPU available and use it, unless + for p in cl.get_platforms(): + for d in p.get_devices(): + if d.type & cl.device_type.GPU == 1: + continue + print("Selected device: ", d.name) + ctx = cl.Context(devices=(d,)) + break + if ctx is not None: + break + #queue = cl.CommandQueue(ctx) + return ctx + + # Write the .tbc.json file (used by lddecode and notebooks) def write_json(ldd, jsondict, outname):