Skip to content

Commit

Permalink
V 2.0.3 (#61)
Browse files Browse the repository at this point in the history
* Create .readthedocs.yaml

* Removing warning message related to tile_size when input_type = dir
use compute instead of persist

* adding the remove tiles option

* adding a warning if the number of tasks is greater than the number of workers

* change warning message for number of workers

* add compression for the merge option

* add minor_version and dataformat_id for the merge step

* changing imports

* upgrade to 2.0.3
  • Loading branch information
ClementAlba authored Feb 3, 2023
1 parent 140f7b1 commit 13d4723
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 98 deletions.
29 changes: 29 additions & 0 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# .readthedocs.yaml
# Read the Docs configuration file
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details

# Required
version: 2

# Set the version of Python and other tools you might need
build:
os: ubuntu-22.04
tools:
python: "3.11"
# You can also specify other tool versions:
# nodejs: "19"
# rust: "1.64"
# golang: "1.19"

# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py

# If using Sphinx, optionally build your docs in additional formats such as PDF
# formats:
# - pdf

# Optionally declare the Python requirements required to build your docs
python:
install:
- requirements: requirements.txt
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
author = 'Clément Alba'

release = '2.0'
version = '2.0.1'
version = '2.0.2'

# -- General configuration

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = pdal-parallelizer
version = 2.0.2
version = 2.0.3
description = A simple tool for use pdal with parallel execution
long_description = file: README.rst, CHANGELOG.rst, LICENSE.rst
keywords = pdal, parallelizer, dask
Expand Down
92 changes: 55 additions & 37 deletions src/pdal_parallelizer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os
import os.path
import click
Expand All @@ -10,9 +11,12 @@
from . import do
from . import file_manager
from . import cloud
from . import tile
from matplotlib import pyplot as plt
import sys
import ntpath
from os.path import join
from math import ceil


def query_yes_no(question, default='no'):
Expand Down Expand Up @@ -46,7 +50,7 @@ def config_dask(n_workers, threads_per_worker, timeout):
cfg.set({'interface': 'lo'})
cfg.set({'distributed.scheduler.worker-ttl': None})
cfg.set({'distributed.comm.timeouts.connect': timeout})
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker, silence_logs=logging.ERROR)
client = Client(cluster)
return client

Expand All @@ -63,8 +67,8 @@ def process_pipelines(
buffer=None,
remove_buffer=None,
bounding_box=None,
merge_tiles=None,
process=False
merge_tiles=False,
remove_tiles=False
):
# Assertions
assert type(config) is str
Expand All @@ -76,14 +80,23 @@ def process_pipelines(
if dry_run:
assert type(dry_run) is int
assert type(diagnostic) is bool
assert type(tile_size) is tuple
assert type(tile_size) is tuple and len(tile_size) == 2
if buffer:
assert type(buffer) is int
if remove_buffer:
assert type(remove_buffer) is bool
if bounding_box:
assert type(bounding_box) is tuple
assert len(bounding_box) == 4
assert type(merge_tiles) is bool
assert type(remove_tiles) is bool

with open(config, 'r') as c:
config_file = json.load(c)
input_dir = config_file.get('input')
output = config_file.get('output')
temp = config_file.get('temp')
pipeline = config_file.get('pipeline')

if n_workers >= os.cpu_count():
answer = query_yes_no(
Expand All @@ -93,20 +106,33 @@ def process_pipelines(
if not answer:
return

if tile_size == (256, 256):
answer = query_yes_no(
f'WARNING - You are using the default value of the tile_size option (256 by 256 meters). Please '
f'check if your points cloud\'s dimensions are greater than this value.\nDo you want to continue ? '
if input_type == 'single':
if tile_size == (256, 256):
answer = query_yes_no(
f'WARNING - You are using the default value of the tile_size option (256 by 256 meters). Please '
f'check if your points cloud\'s dimensions are greater than this value.\nDo you want to continue ? '
)
if not answer:
return

infos = cloud.compute_quickinfo(input_dir)
bounds = infos['summary']['bounds']
distX, distY = (
int(bounds['maxx'] - bounds['minx']),
int(bounds['maxy'] - bounds['miny'])
)
if not answer:
return

with open(config, 'r') as c:
config_file = json.load(c)
input_dir = config_file.get('input')
output = config_file.get('output')
temp = config_file.get('temp')
pipeline = config_file.get('pipeline')
nTilesX = ceil(distX / tile_size[0])
nTilesY = ceil(distY / tile_size[0])

if nTilesX * nTilesY > n_workers:
answer = query_yes_no(
f'WARNING - With this size of tiles and this number of workers, each worker will have more than one task'
f' and it can blow up the distributed memory. Please choose a larger size for your tiles or increase '
f'your number of workers.\nDo you want to continue ?'
)
if not answer:
return

if not os.path.exists(temp):
os.mkdir(temp)
Expand Down Expand Up @@ -162,36 +188,28 @@ def process_pipelines(
if diagnostic:
ms = MemorySampler()
with ms.sample(label='execution', client=client):
delayed = client.persist(delayed)
progress(delayed) if process else None
futures = client.compute(delayed)
client.gather(futures)
delayed = client.compute(delayed)
progress(delayed)
ms.plot()
else:
delayed = client.persist(delayed)
progress(delayed) if process else None
futures = client.compute(delayed)
client.gather(futures)
delayed = client.compute(delayed)
progress(delayed)

del delayed
del futures

file_manager.getEmptyWeight(output_directory=output)

if merge_tiles and len(listdir(output)) > 1:
input_filename = ntpath.basename(input_dir)
merge = cloud.merge(output, input_filename)
input_filename = ntpath.basename(input_dir).split('.')[0]
writers = tile.get_writers(tile.load_pipeline(pipeline))
merge = cloud.merge(output, input_filename, writers)
if merge is not None:
merge_ppln = pdal.Pipeline(cloud.merge(output, input_filename))
merge_ppln = pdal.Pipeline(merge)
merge_ppln.execute()

plt.savefig(output + '/memory-usage.png') if diagnostic else None
if remove_tiles:
for f in os.listdir(output):
if f.split('.')[0] != input_filename:
os.remove(join(output, f))


if __name__ == "__main__":
process_pipelines(
config="D:/data_dev/pdal-parallelizer/config.json",
input_type="single",
timeout=500,
n_workers=3
)
plt.savefig(output + '/memory-usage.png') if diagnostic else None
48 changes: 34 additions & 14 deletions src/pdal_parallelizer/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,35 @@ def crop(bounds):
return parsed


def merge(output_dir, filename):
def merge(output_dir, filename, writers):
outputs = ""
# Default values according to the pdal writers.las documentation
compression = 'none'
minor_version = 2
dataformat_id = 3

for f in listdir(output_dir):
if f.split('.').count('png') <= 0:
outputs += '"' + output_dir + '/' + f + '",'

if outputs != "":
extension = listdir(output_dir)[0].split('.')[1]
if extension == 'laz':
writers_extension = 'las'
compression = 'laszip'
else:
writers_extension = extension

try:
minor_version = writers[0]['minor_version']
dataformat_id = writers[0]['dataformat_id']
except KeyError:
pass

merge = '[' + outputs + '{"type": "writers.' + writers_extension + '", "filename":"' + output_dir + '/' + \
filename + '.' + extension + '","extra_dims": "all", "compression": "' + compression + '", ' + \
'"minor_version": ' + str(minor_version) + ', "dataformat_id": ' + str(dataformat_id) + '}]'

merge = '[' + outputs + '{"type": "writers.' + extension + '", "filename":"' + output_dir + '/' + filename + '","extra_dims": "all"}]'
return merge


Expand All @@ -37,10 +56,21 @@ def addClassFlags():
return json.loads(ferry)


def compute_quickinfo(filepath):
"""Returns some information about the cloud."""
# Get the cloud information
pdal_info = subprocess.run(['pdal', 'info', filepath, '--summary'],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
info = json.loads(pdal_info.stdout.decode())

return info


class Cloud:
def __init__(self, filepath, bounds=None):
self.filepath = filepath
self.info = self.compute_quickinfo()
self.info = compute_quickinfo(self.filepath)
self.classFlags = self.hasClassFlags()

# Get the cloud information to set its bounds
Expand All @@ -65,19 +95,9 @@ def __init__(self, filepath, bounds=None):
def getCount(self):
return self.info['summary']['num_points']

def compute_quickinfo(self):
"""Returns some information about the cloud."""
# Get the cloud information
pdal_info = subprocess.run(['pdal', 'info', self.filepath, '--summary'],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
info = json.loads(pdal_info.stdout.decode())

return info

def hasClassFlags(self):
"""Check if the cloud has the ClassFlags dimension"""
info = self.compute_quickinfo()
info = compute_quickinfo(self.filepath)
dimensions = info['summary']['dimensions']
return 'ClassFlags' in dimensions

Expand Down
6 changes: 4 additions & 2 deletions src/pdal_parallelizer/pdal_parallelizer_cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


@click.group()
@click.version_option('2.0.2')
@click.version_option('2.0.3')
def main():
"""A simple parallelization tool for 3d point clouds treatment"""
pass
Expand All @@ -27,6 +27,7 @@ def main():
@click.option('-rb', '--remove_buffer', is_flag=True, required=False)
@click.option('-bb', '--bounding_box', required=False, nargs=4, type=float)
@click.option('-mt', '--merge_tiles', required=False, is_flag=True)
@click.option('-rt', '--remove_tiles', required=False, is_flag=True)
def process_pipelines(**kwargs):
# Get all the options
config = kwargs.get('config')
Expand All @@ -40,6 +41,7 @@ def process_pipelines(**kwargs):
remove_buffer = kwargs.get('remove_buffer')
bounding_box = kwargs.get('bounding_box')
merge_tiles = kwargs.get('merge_tiles')
remove_tiles = kwargs.get('remove_tiles')

process(
config=config,
Expand All @@ -53,7 +55,7 @@ def process_pipelines(**kwargs):
remove_buffer=remove_buffer,
bounding_box=bounding_box,
merge_tiles=merge_tiles,
process=True
remove_tiles=remove_tiles
)


Expand Down
Loading

0 comments on commit 13d4723

Please sign in to comment.