Skip to content

Commit

Permalink
V 2.1.0 (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
ClementAlba authored Apr 18, 2023
1 parent 836c7ee commit f38940d
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 700 deletions.
13 changes: 12 additions & 1 deletion docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ This file look like this :
"pipeline": "Your pipeline path"
}
**If you want to process a single file, you will have to put the path of it in the input value of the config file.**

pdal-parallelizer dry runs
..........................

Expand All @@ -23,4 +25,13 @@ Dry runs are test executions that allows the user to check if it's parameters ar

So, if you specify the dry run parameter is equal to 5, pdal-parallelizer will take the 5 biggest files of your input directory and test your parameters on it. So you will see if the execution is too slow or if the memory blow up.

If everything is good, you can lauch your treatment on your whole input directory (i.e. without specifying -dr option). If not, you can execute a new dry run with other options.
If everything is good, you can lauch your treatment on your whole input directory (i.e. without specifying -dr option). If not, you can execute a new dry run with other options.

pdal-parallelizer good practices
................................

Sometimes, your executions could fail and you will get an error. But why ?

Your PDAL pipelines will be processed through a Dask Cluster with a defined number of workers. Each worker will have a little part of your machine memory to process their tasks. However, when a PDAL pipeline is processed, it have to open your input file and to write the result of your treatments. These actions can take a lot of memory and make the distributed memory blow.

To avoid this problem, always try to have 1 task per worker. Also, when you process a single file do not cut your data in very small tiles.
20 changes: 10 additions & 10 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = pdal-parallelizer
version = 2.0.3
version = 2.1.0
description = A simple tool for use pdal with parallel execution
long_description = file: README.rst, CHANGELOG.rst, LICENSE.rst
keywords = pdal, parallelizer, dask
Expand All @@ -13,15 +13,15 @@ package_dir=
=src
packages = find:
install_requires =
click
pdal
PDAL
dask
distributed
twine
pandas
matplotlib
pyproj
click<=8.1.3
pdal<=3.2.2
dask<=2023.1.1
distributed<=2023.1.1
twine<=4.0.2
pandas<=1.5.3
pyproj<=3.4.1
numpy<=1.24.2


[options.entry_points]
console_scripts =
Expand Down
238 changes: 102 additions & 136 deletions src/pdal_parallelizer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
import json
import logging
import os
import os.path
import click
import pdal
import sys
from os.path import join
import numpy as np
import matplotlib.pyplot as plt
from . import file_manager
from . import do
from .cloud import Cloud
from dask import config as cfg
from dask.distributed import LocalCluster, Client, progress
from distributed.diagnostics import MemorySampler
from os import listdir
from . import do
from . import file_manager
from . import cloud
from . import tile
from matplotlib import pyplot as plt
import sys
import ntpath
from os.path import join
from math import ceil


def query_yes_no(question, default='no'):
Expand All @@ -42,62 +36,19 @@ def query_yes_no(question, default='no'):


def config_dask(n_workers, threads_per_worker, timeout):
"""Make some configuration to avoid workers errors due to heartbeat or timeout problems. Set the number of cores
to process the pipelines """
if not timeout:
timeout = input('After how long of inactivity do you want to kill your worker (timeout)\n')

cfg.set({'interface': 'lo'})
cfg.set({'distributed.scheduler.worker-ttl': None})
cfg.set({'distributed.comm.timeouts.connect': timeout})
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker, silence_logs=logging.ERROR)
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker, memory_limit=None,
silence_logs=logging.ERROR)
client = Client(cluster)
return client


def process_pipelines(
config,
input_type,
timeout=None,
n_workers=3,
threads_per_worker=1,
dry_run=None,
diagnostic=False,
tile_size=(256, 256),
buffer=None,
remove_buffer=None,
bounding_box=None,
merge_tiles=False,
remove_tiles=False
):
# Assertions
assert type(config) is str
assert input_type == "single" or input_type == "dir"
if timeout:
assert type(timeout) is int
assert type(n_workers) is int
assert type(threads_per_worker) is int
if dry_run:
assert type(dry_run) is int
assert type(diagnostic) is bool
assert type(tile_size) is tuple and len(tile_size) == 2
if buffer:
assert type(buffer) is int
if remove_buffer:
assert type(remove_buffer) is bool
if bounding_box:
assert type(bounding_box) is tuple
assert len(bounding_box) == 4
assert type(merge_tiles) is bool
assert type(remove_tiles) is bool

with open(config, 'r') as c:
config_file = json.load(c)
input_dir = config_file.get('input')
output = config_file.get('output')
temp = config_file.get('temp')
pipeline = config_file.get('pipeline')

def trigger_warnings(n_workers, input_type, input, output, temp, tile_size):
if n_workers >= os.cpu_count():
answer = query_yes_no(
f'\nWARNING - You choose to launch {n_workers} workers but your machine has only {os.cpu_count()}'
Expand All @@ -107,6 +58,7 @@ def process_pipelines(
return

if input_type == 'single':
c = Cloud(input)
if tile_size == (256, 256):
answer = query_yes_no(
f'WARNING - You are using the default value of the tile_size option (256 by 256 meters). Please '
Expand All @@ -115,101 +67,115 @@ def process_pipelines(
if not answer:
return

infos = cloud.compute_quickinfo(input_dir)
bounds = infos['summary']['bounds']
distX, distY = (
int(bounds['maxx'] - bounds['minx']),
int(bounds['maxy'] - bounds['miny'])
)

nTilesX = ceil(distX / tile_size[0])
nTilesY = ceil(distY / tile_size[0])

if nTilesX * nTilesY > n_workers:
if input_type == 'dir':
if input == output or input == temp:
answer = query_yes_no(
f'WARNING - With this size of tiles and this number of workers, each worker will have more than one task'
f' and it can blow up the distributed memory. Please choose a larger size for your tiles or increase '
f'your number of workers.\nDo you want to continue ?'
f'WARNING - Your input folder is the same as your output or temp folder. This could be a problem. '
f'Please choose three separate directories.\n Do you want to continue ? '
)
if not answer:
return

if len(os.listdir(output)) > 0:
answer = query_yes_no(
f'WARNING - Your output directory is not empty.\n Do you want to continue ? '
)
if not answer:
return


def process_pipelines(
config,
input_type,
timeout=None,
n_workers=3,
threads_per_worker=1,
dry_run=None,
diagnostic=None,
tile_size=(256, 256),
buffer=None,
remove_buffer=None,
bounding_box=None,
merge_tiles=None,
remove_tiles=None
):
with open(config, 'r') as c:
config_file = json.load(c)
input = config_file.get('input')
output = config_file.get('output')
temp = config_file.get('temp')
pipeline = config_file.get('pipeline')

trigger_warnings(n_workers, input_type, input, output, temp, tile_size)

if not os.path.exists(temp):
os.mkdir(temp)

if not os.path.exists(output):
os.mkdir(output)

# If there is some temp file in the temp directory, these are processed
if len(listdir(temp)) != 0:
click.echo(
f'Something went wrong during previous execution, there is some temp files in your temp '
f'directory.\nBeginning of the execution\n')
# Get all the deserialized pipelines
pipeline_iterator = file_manager.getSerializedPipelines(temp_directory=temp)
# Process pipelines
delayed = do.process_serialized_pipelines(temp_dir=temp, iterator=pipeline_iterator)
else:
click.echo('Beginning of the execution\n')
# If the user don't specify the dry_run option
if not dry_run:
# If the user wants to process a single file, it is split. Else, get all the files of the input directory
iterator = do.splitCloud(filepath=input_dir,
output_dir=output,
json_pipeline=pipeline,
tile_bounds=tile_size,
buffer=buffer,
remove_buffer=remove_buffer,
bounding_box=bounding_box) if input_type == 'single' \
else file_manager.getFiles(input_directory=input_dir)
# Process pipelines
delayed = do.process_pipelines(output_dir=output, json_pipeline=pipeline, temp_dir=temp, iterator=iterator,
is_single=(input_type == 'single'))
client = config_dask(n_workers, threads_per_worker, timeout)
ms = MemorySampler()

if input_type == "single":
futures = []
c = Cloud(input, bounding_box)
if len(os.listdir(temp)) != 0:
print("Something went wrong during previous execution, there is some temp files in your temp " +
"directory.\nBeginning of the execution\n")
tiles = file_manager.get_serialized_tiles(temp)
else:
# If the user wants to process a single file, it is split and get the number of tiles given by the user.
# Else, get the number of files we want to do the test execution (not serialized)
iterator = do.splitCloud(filepath=input_dir,
output_dir=output,
json_pipeline=pipeline,
tile_bounds=tile_size,
nTiles=dry_run,
buffer=buffer,
remove_buffer=remove_buffer,
bounding_box=bounding_box) if input_type == 'single' \
else file_manager.getFiles(input_directory=input_dir, nFiles=dry_run)
# Process pipelines
delayed = do.process_pipelines(output_dir=output, json_pipeline=pipeline, iterator=iterator,
dry_run=dry_run, is_single=(input_type == 'single'))

client = config_dask(n_workers=n_workers, threads_per_worker=threads_per_worker, timeout=timeout)

click.echo('Parallelization started.\n')
tiles = c.split(tile_size, pipeline, output, buffer, dry_run)

if diagnostic:
ms = MemorySampler()
with ms.sample(label='execution', client=client):
delayed = client.compute(delayed)
progress(delayed)
ms.plot()
else:
delayed = client.compute(delayed)
progress(delayed)
print("Opening the cloud.\n")
image_array = c.load_image_array(pipeline)

if bounding_box:
image_array = image_array[np.where((image_array["X"] > bounding_box[0]) &
(image_array["X"] < bounding_box[2]) &
(image_array["Y"] > bounding_box[1]) &
(image_array["Y"] < bounding_box[3]))]

del delayed
data = do.cut_image_array(tiles, image_array, temp, dry_run)

file_manager.getEmptyWeight(output_directory=output)
print(f"Starting parallelization, visit the dashboard ({client.dashboard_link}) to follow the execution.\n")

if merge_tiles and len(listdir(output)) > 1:
input_filename = ntpath.basename(input_dir).split('.')[0]
writers = tile.get_writers(tile.load_pipeline(pipeline))
merge = cloud.merge(output, input_filename, writers)
if merge is not None:
merge_ppln = pdal.Pipeline(merge)
merge_ppln.execute()
with ms.sample("Execution"):
for (array, stages, tile) in data:
if len(array) > 0:
big_future = client.scatter(array)
futures.append(
client.submit(do.execute_stages_streaming, big_future, stages, tile, temp, remove_buffer, dry_run))
else:
if not dry_run:
os.remove(temp + "/" + tile.name + ".pickle")

client.gather(futures)

if merge_tiles:
c.merge(output, pipeline)

if remove_tiles:
for f in os.listdir(output):
if f.split('.')[0] != input_filename:
if f != os.path.basename(c.filepath) and f.split(".")[1] != "png":
os.remove(join(output, f))
else:
if len(os.listdir(temp)) != 0:
print("Something went wrong during previous execution, there is some temp files in your temp " +
"directory.\nBeginning of the execution\n")
serialized_data = file_manager.get_serialized_tiles(temp)
tasks = do.process_serialized_tiles(serialized_data, temp)
else:
print("Beginning of the execution.\n")
files = file_manager.get_files(input, dry_run)
tasks = do.process_several_clouds(files, pipeline, output, temp, buffer, dry_run)

plt.savefig(output + '/memory-usage.png') if diagnostic else None
print("Starting parallelization.\n")

with ms.sample("Execution"):
future = client.persist(tasks)
progress(future)

if diagnostic:
ms.plot()
plt.savefig(output + "/diagnostic.png")
Loading

0 comments on commit f38940d

Please sign in to comment.