diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..1ee7b94 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,29 @@ +# .readthedocs.yaml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the version of Python and other tools you might need +build: + os: ubuntu-22.04 + tools: + python: "3.11" + # You can also specify other tool versions: + # nodejs: "19" + # rust: "1.64" + # golang: "1.19" + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: docs/conf.py + +# If using Sphinx, optionally build your docs in additional formats such as PDF +# formats: +# - pdf + +# Optionally declare the Python requirements required to build your docs +python: + install: + - requirements: requirements.txt diff --git a/docs/source/conf.py b/docs/source/conf.py index bf71489..fbf7b72 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -7,7 +7,7 @@ author = 'Clément Alba' release = '2.0' -version = '2.0.1' +version = '2.0.2' # -- General configuration diff --git a/setup.cfg b/setup.cfg index 0c5123f..a0dc2f7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pdal-parallelizer -version = 2.0.2 +version = 2.0.3 description = A simple tool for use pdal with parallel execution long_description = file: README.rst, CHANGELOG.rst, LICENSE.rst keywords = pdal, parallelizer, dask diff --git a/src/pdal_parallelizer/__init__.py b/src/pdal_parallelizer/__init__.py index 3732bf0..5830236 100644 --- a/src/pdal_parallelizer/__init__.py +++ b/src/pdal_parallelizer/__init__.py @@ -1,4 +1,5 @@ import json +import logging import os import os.path import click @@ -10,9 +11,12 @@ from . import do from . import file_manager from . import cloud +from . import tile from matplotlib import pyplot as plt import sys import ntpath +from os.path import join +from math import ceil def query_yes_no(question, default='no'): @@ -46,7 +50,7 @@ def config_dask(n_workers, threads_per_worker, timeout): cfg.set({'interface': 'lo'}) cfg.set({'distributed.scheduler.worker-ttl': None}) cfg.set({'distributed.comm.timeouts.connect': timeout}) - cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker, silence_logs=logging.ERROR) client = Client(cluster) return client @@ -63,8 +67,8 @@ def process_pipelines( buffer=None, remove_buffer=None, bounding_box=None, - merge_tiles=None, - process=False + merge_tiles=False, + remove_tiles=False ): # Assertions assert type(config) is str @@ -76,7 +80,7 @@ def process_pipelines( if dry_run: assert type(dry_run) is int assert type(diagnostic) is bool - assert type(tile_size) is tuple + assert type(tile_size) is tuple and len(tile_size) == 2 if buffer: assert type(buffer) is int if remove_buffer: @@ -84,6 +88,15 @@ def process_pipelines( if bounding_box: assert type(bounding_box) is tuple assert len(bounding_box) == 4 + assert type(merge_tiles) is bool + assert type(remove_tiles) is bool + + with open(config, 'r') as c: + config_file = json.load(c) + input_dir = config_file.get('input') + output = config_file.get('output') + temp = config_file.get('temp') + pipeline = config_file.get('pipeline') if n_workers >= os.cpu_count(): answer = query_yes_no( @@ -93,20 +106,33 @@ def process_pipelines( if not answer: return - if tile_size == (256, 256): - answer = query_yes_no( - f'WARNING - You are using the default value of the tile_size option (256 by 256 meters). Please ' - f'check if your points cloud\'s dimensions are greater than this value.\nDo you want to continue ? ' + if input_type == 'single': + if tile_size == (256, 256): + answer = query_yes_no( + f'WARNING - You are using the default value of the tile_size option (256 by 256 meters). Please ' + f'check if your points cloud\'s dimensions are greater than this value.\nDo you want to continue ? ' + ) + if not answer: + return + + infos = cloud.compute_quickinfo(input_dir) + bounds = infos['summary']['bounds'] + distX, distY = ( + int(bounds['maxx'] - bounds['minx']), + int(bounds['maxy'] - bounds['miny']) ) - if not answer: - return - with open(config, 'r') as c: - config_file = json.load(c) - input_dir = config_file.get('input') - output = config_file.get('output') - temp = config_file.get('temp') - pipeline = config_file.get('pipeline') + nTilesX = ceil(distX / tile_size[0]) + nTilesY = ceil(distY / tile_size[0]) + + if nTilesX * nTilesY > n_workers: + answer = query_yes_no( + f'WARNING - With this size of tiles and this number of workers, each worker will have more than one task' + f' and it can blow up the distributed memory. Please choose a larger size for your tiles or increase ' + f'your number of workers.\nDo you want to continue ?' + ) + if not answer: + return if not os.path.exists(temp): os.mkdir(temp) @@ -162,36 +188,28 @@ def process_pipelines( if diagnostic: ms = MemorySampler() with ms.sample(label='execution', client=client): - delayed = client.persist(delayed) - progress(delayed) if process else None - futures = client.compute(delayed) - client.gather(futures) + delayed = client.compute(delayed) + progress(delayed) ms.plot() else: - delayed = client.persist(delayed) - progress(delayed) if process else None - futures = client.compute(delayed) - client.gather(futures) + delayed = client.compute(delayed) + progress(delayed) del delayed - del futures file_manager.getEmptyWeight(output_directory=output) if merge_tiles and len(listdir(output)) > 1: - input_filename = ntpath.basename(input_dir) - merge = cloud.merge(output, input_filename) + input_filename = ntpath.basename(input_dir).split('.')[0] + writers = tile.get_writers(tile.load_pipeline(pipeline)) + merge = cloud.merge(output, input_filename, writers) if merge is not None: - merge_ppln = pdal.Pipeline(cloud.merge(output, input_filename)) + merge_ppln = pdal.Pipeline(merge) merge_ppln.execute() - plt.savefig(output + '/memory-usage.png') if diagnostic else None + if remove_tiles: + for f in os.listdir(output): + if f.split('.')[0] != input_filename: + os.remove(join(output, f)) - -if __name__ == "__main__": - process_pipelines( - config="D:/data_dev/pdal-parallelizer/config.json", - input_type="single", - timeout=500, - n_workers=3 - ) + plt.savefig(output + '/memory-usage.png') if diagnostic else None diff --git a/src/pdal_parallelizer/cloud.py b/src/pdal_parallelizer/cloud.py index 71ed7ac..cd27b61 100644 --- a/src/pdal_parallelizer/cloud.py +++ b/src/pdal_parallelizer/cloud.py @@ -19,16 +19,35 @@ def crop(bounds): return parsed -def merge(output_dir, filename): +def merge(output_dir, filename, writers): outputs = "" + # Default values according to the pdal writers.las documentation + compression = 'none' + minor_version = 2 + dataformat_id = 3 + for f in listdir(output_dir): if f.split('.').count('png') <= 0: outputs += '"' + output_dir + '/' + f + '",' if outputs != "": extension = listdir(output_dir)[0].split('.')[1] + if extension == 'laz': + writers_extension = 'las' + compression = 'laszip' + else: + writers_extension = extension + + try: + minor_version = writers[0]['minor_version'] + dataformat_id = writers[0]['dataformat_id'] + except KeyError: + pass + + merge = '[' + outputs + '{"type": "writers.' + writers_extension + '", "filename":"' + output_dir + '/' + \ + filename + '.' + extension + '","extra_dims": "all", "compression": "' + compression + '", ' + \ + '"minor_version": ' + str(minor_version) + ', "dataformat_id": ' + str(dataformat_id) + '}]' - merge = '[' + outputs + '{"type": "writers.' + extension + '", "filename":"' + output_dir + '/' + filename + '","extra_dims": "all"}]' return merge @@ -37,10 +56,21 @@ def addClassFlags(): return json.loads(ferry) +def compute_quickinfo(filepath): + """Returns some information about the cloud.""" + # Get the cloud information + pdal_info = subprocess.run(['pdal', 'info', filepath, '--summary'], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE) + info = json.loads(pdal_info.stdout.decode()) + + return info + + class Cloud: def __init__(self, filepath, bounds=None): self.filepath = filepath - self.info = self.compute_quickinfo() + self.info = compute_quickinfo(self.filepath) self.classFlags = self.hasClassFlags() # Get the cloud information to set its bounds @@ -65,19 +95,9 @@ def __init__(self, filepath, bounds=None): def getCount(self): return self.info['summary']['num_points'] - def compute_quickinfo(self): - """Returns some information about the cloud.""" - # Get the cloud information - pdal_info = subprocess.run(['pdal', 'info', self.filepath, '--summary'], - stderr=subprocess.PIPE, - stdout=subprocess.PIPE) - info = json.loads(pdal_info.stdout.decode()) - - return info - def hasClassFlags(self): """Check if the cloud has the ClassFlags dimension""" - info = self.compute_quickinfo() + info = compute_quickinfo(self.filepath) dimensions = info['summary']['dimensions'] return 'ClassFlags' in dimensions diff --git a/src/pdal_parallelizer/pdal_parallelizer_cli/__main__.py b/src/pdal_parallelizer/pdal_parallelizer_cli/__main__.py index 9d86a34..49bc124 100644 --- a/src/pdal_parallelizer/pdal_parallelizer_cli/__main__.py +++ b/src/pdal_parallelizer/pdal_parallelizer_cli/__main__.py @@ -9,7 +9,7 @@ @click.group() -@click.version_option('2.0.2') +@click.version_option('2.0.3') def main(): """A simple parallelization tool for 3d point clouds treatment""" pass @@ -27,6 +27,7 @@ def main(): @click.option('-rb', '--remove_buffer', is_flag=True, required=False) @click.option('-bb', '--bounding_box', required=False, nargs=4, type=float) @click.option('-mt', '--merge_tiles', required=False, is_flag=True) +@click.option('-rt', '--remove_tiles', required=False, is_flag=True) def process_pipelines(**kwargs): # Get all the options config = kwargs.get('config') @@ -40,6 +41,7 @@ def process_pipelines(**kwargs): remove_buffer = kwargs.get('remove_buffer') bounding_box = kwargs.get('bounding_box') merge_tiles = kwargs.get('merge_tiles') + remove_tiles = kwargs.get('remove_tiles') process( config=config, @@ -53,7 +55,7 @@ def process_pipelines(**kwargs): remove_buffer=remove_buffer, bounding_box=bounding_box, merge_tiles=merge_tiles, - process=True + remove_tiles=remove_tiles ) diff --git a/src/pdal_parallelizer/tile.py b/src/pdal_parallelizer/tile.py index 55563cc..9f6d2fe 100644 --- a/src/pdal_parallelizer/tile.py +++ b/src/pdal_parallelizer/tile.py @@ -17,6 +17,20 @@ from . import bounds +def load_pipeline(pipeline): + with open(pipeline, 'r') as ppln: + p = json.load(ppln) + return p + + +def get_readers(pipeline): + return list(filter(lambda x: x['type'].startswith('readers'), pipeline)) + + +def get_writers(pipeline): + return list(filter(lambda x: x['type'].startswith('writers'), pipeline)) + + class Tile: def __init__(self, filepath, output_dir, json_pipeline, name=None, bounds=None, buffer=None, remove_buffer=False, cloud_object=None): self.filepath = filepath @@ -45,52 +59,54 @@ def pipeline(self, single_file): """Assign a pipeline to the tile""" output_dir = self.output_dir - # Open the pipeline - with open(self.json_pipeline, 'r') as pipeline: - p = json.load(pipeline) - - # Create the name of the temp file associated to the pipeline - temp_name = 'temp__' + self.getName() - output_filename = f'{output_dir}/{self.getName()}' - # Get the reader and the writer - reader = list(filter(lambda x: x['type'].startswith('readers'), p)) - writer = list(filter(lambda x: x['type'].startswith('writers'), p)) + p = load_pipeline(self.json_pipeline) + + # Create the name of the temp file associated to the pipeline + temp_name = 'temp__' + self.getName() + output_filename = f'{output_dir}/{self.getName()}' + # Get the reader and the writer + reader = get_readers(p) + writer = get_writers(p) + try: + compression = writer[0]['compression'] + extension = '.laz' if compression == 'laszip' or compression == 'lazperf' else '.las' + except KeyError: # Get the extension for the output extension = '.' + writer[0]['type'].split('.')[1] + '.las' if writer[0]['type'].split('.')[1] == 'copc' else '.' + writer[0]['type'].split('.')[1] - # If there is a buffer - if self.buffer: - # If the writer is 'writers.copc' or 'writer.laz' or 'writers.copc' - if writer[0]['type'].split('.')[1] in ['las', 'laz', 'copc']: - # Insert the filter to assign the wittheld flag to the buffer - p.insert(1, self.assign) - if self.remove_buffer: - # Remove the buffer - p.insert(len(p) - 1, bounds.removeBuffer(self.bounds_without_buffer)) - - if self.cloud: - # If there is the ferry filter attribute in the tile instance - if not self.cloud.classFlags: - # Insert the filter to add the ClassFlags dimension - p.insert(1, cloud.addClassFlags()) - - # The pipeline must contain a reader AND a writer - if not reader: - sys.exit("Please add a reader to your pipeline.") - elif not writer: - sys.exit("Please add a writer to your pipeline.") - - # If the input is a single file - if single_file: - # Add a crop filter to divide the cloud in small tiles - p.insert(1, cloud.crop(self.bounds)) - - # Add the filename option in the pipeline's reader to get the right file - reader[0]['filename'] = self.filepath - # Add the filename option in the pipeline's writer to write the result in the right file - writer[0]['filename'] = output_filename + extension - - p = pdal.Pipeline(json.dumps(p)) + # If there is a buffer + if self.buffer: + # If the writer is 'writers.copc' or 'writer.laz' or 'writers.copc' + if writer[0]['type'].split('.')[1] in ['las', 'laz', 'copc']: + # Insert the filter to assign the wittheld flag to the buffer + p.insert(1, self.assign) + if self.remove_buffer: + # Remove the buffer + p.insert(len(p) - 1, bounds.removeBuffer(self.bounds_without_buffer)) + + if self.cloud: + # If there is the ferry filter attribute in the tile instance + if not self.cloud.classFlags: + # Insert the filter to add the ClassFlags dimension + p.insert(1, cloud.addClassFlags()) + + # The pipeline must contain a reader AND a writer + if not reader: + sys.exit("Please add a reader to your pipeline.") + elif not writer: + sys.exit("Please add a writer to your pipeline.") + + # If the input is a single file + if single_file: + # Add a crop filter to divide the cloud in small tiles + p.insert(1, cloud.crop(self.bounds)) + + # Add the filename option in the pipeline's reader to get the right file + reader[0]['filename'] = self.filepath + # Add the filename option in the pipeline's writer to write the result in the right file + writer[0]['filename'] = output_filename + extension + + p = pdal.Pipeline(json.dumps(p)) return p, temp_name