Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python implementation to support BAGv2 Extract and Mutations #127

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
70 changes: 70 additions & 0 deletions stetl/bagutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os
import shutil
import zipfile

from stetl.util import Util


log = Util.get_log('bagutil')


class BAGUtil:
"""
Helper functions for BAG 2.0 Extract processing
"""

@staticmethod
def zip_file_content(zip_file):
log.info("Retrieving content from: %s" % zip_file)

zip_content = []

with zipfile.ZipFile(zip_file) as z:
for name in z.namelist():
zip_content.append(name)

return zip_content

@staticmethod
def extract_zip_file(zip_file, temp_dir):
extracted = []

with zipfile.ZipFile(zip_file) as z:
for name in z.namelist():
temp_file = os.path.join(temp_dir, name)

log.info(
"Extracting %s from %s to %s" % (
name,
zip_file,
temp_file,
)
)

z.extract(name, path=temp_dir)

extracted.append(temp_file)

return extracted

@staticmethod
def extract_from_zip_file(name, zip_file, temp_dir):
with zipfile.ZipFile(zip_file) as z:
if name not in z.namelist():
raise Exception("Cannot extract file: %s" % name)

return z.extract(name, path=temp_dir)

@staticmethod
def remove_temp_file(temp_file):
log.info("Removing temp file: %s" % temp_file)
os.unlink(temp_file)

return True

@staticmethod
def remove_temp_dir(temp_dir):
log.info("Removing temp dir: %s" % temp_dir)
shutil.rmtree(temp_dir)

return True
58 changes: 58 additions & 0 deletions stetl/filters/bagfilter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# BAG related filters

from stetl.component import Config
from stetl.util import Util
from stetl.filter import Filter
from stetl.packet import FORMAT

log = Util.get_log("bagfilter")


class LeveringFilter(Filter):
"""
Convert Leveringsdocument-BAG-Extract.xml content to record for
insertion into nlx_bag_info table.
"""

@Config(ptype=str, default='sleutel', required=False)
def key_column(self):
"""
Column name for key
"""
pass

@Config(ptype=str, default='levering_xml', required=False)
def key_value(self):
"""
Column value for key
"""
pass

@Config(ptype=str, default='waarde', required=False)
def value_column(self):
"""
Column name for value
"""
pass

# Constructor
def __init__(self, configdict, section, consumes=FORMAT.string, produces=FORMAT.record_array):
Filter.__init__(self, configdict, section, consumes, produces)

def invoke(self, packet):
if packet.data is None or packet.is_end_of_stream():
return packet

with open(packet.data, 'rt') as f:
data = f.read()

record = {
self.key_column: self.key_value,
self.value_column: data,
}

# record_array is used to avoid ValueError:
# https://github.com/geopython/stetl/issues/125
packet.data = [record]

return packet
210 changes: 210 additions & 0 deletions stetl/inputs/baginput.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import os
import pprint
import re

from stetl.bagutil import BAGUtil
from stetl.component import Config
from stetl.input import Input
from stetl.util import Util
from stetl.packet import FORMAT

log = Util.get_log('baginput')


class BAGInput(Input):
"""
Parse BAG 2.0 Extract ZIP files into a stream of records containing zipfile path and file names.

produces=FORMAT.record
"""

@Config(ptype=str, required=True, default=None)
def file_path(self):
"""
Path to BAG 2.0 Extact ZIP file (lvbag-extract-nl.zip)
"""
pass

@Config(ptype=str, required=False, default='/tmp')
def temp_dir(self):
"""
Path for temporary directory
"""
pass

@Config(ptype=bool, required=False, default=False)
def multiprocessing(self):
"""
Process multiple files in parallel
"""
pass

def exit(self):
log.info('Exit: removing temp files')

for entry in self.extracted:
if os.path.exists(entry):
BAGUtil.remove_temp_file(entry)

def process_zip_file(self, zip_file, initial=True):
zip_content = BAGUtil.zip_file_content(zip_file)

if(
initial is True or # noqa: W504
re.search(
r'^\d{4}(?:Inactief|InOnderzoek|NietBag)\d{8}\.zip$',
os.path.basename(zip_file),
)
):
extract_zip_file = True
else:
extract_zip_file = False

if extract_zip_file:
extracted = BAGUtil.extract_zip_file(zip_file, self.temp_dir)

for entry in extracted:
if(
os.path.isdir(entry) or # noqa: W504
os.path.basename(entry).startswith('.') or # noqa: W504
not os.path.exists(entry)
):
log.info("Not processing: %s" % entry)

if os.path.isdir(entry) and os.path.exists(entry):
BAGUtil.remove_temp_dir(entry)
elif os.path.exists(entry):
BAGUtil.remove_temp_file(entry)

zip_content.remove(os.path.basename(entry))

continue

if(
not entry.endswith('.xml') and # noqa: W504
not entry.endswith('.zip')
):
log.warning("Skipping unsupported file: %s" % entry)

BAGUtil.remove_temp_file(entry)

zip_content.remove(os.path.basename(entry))

self.extracted.append(entry)

if self.multiprocessing:
if initial is True:
leverings_xml = 'Leveringsdocument-BAG-Extract.xml'

if leverings_xml in zip_content:
self.file_list.append(
os.path.join(
self.temp_dir,
leverings_xml,
)
)

for entry in sorted(zip_content):
if(
re.search(
r'^\d{4}(MUT)\d{8}-\d{8}\.zip$|^\d{4}(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$',
entry,
) or # noqa: W504
re.search(
r'^GEM\-WPL\-RELATIE\-\d{8}\.zip$',
entry,
)
):
self.file_list.append(
os.path.join(
self.temp_dir,
entry,
)
)

for entry in sorted(zip_content):
if re.search(
r'^\d{4}(?:Inactief|InOnderzoek|NietBag)\d{8}\.zip$',
entry,
):
self.process_zip_file(
os.path.join(
self.temp_dir,
entry,
),
initial=False,
)
else:
for entry in sorted(zip_content):
if re.search(
r'^\d{4}(?:IO|)(MUT)\d{8}-\d{8}\.zip$|^\d{4}(?:IA|IO|NB)(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$',
entry,
):
self.file_list.append(
os.path.join(
self.temp_dir,
entry,
)
)
else:
for entry in zip_content:
if(
entry.startswith('.') or # noqa: W504
entry.startswith('_')
):
continue

if entry.endswith('.zip'):
self.process_zip_file(
os.path.join(
self.temp_dir,
entry,
),
initial=False,
)
elif entry.endswith('.xml'):
item = {
'file_path': zip_file,
'name': entry,
}

self.file_list.append(item)
else:
log.warning("Ignoring entry: %s" % entry)

def __init__(self, configdict, section, produces=FORMAT.record):
Input.__init__(self, configdict, section, produces)

self.file_list = []
self.extracted = []

self.process_zip_file(self.file_path)

log.debug("file_list:\n%s" % pprint.pformat(self.file_list))

def read(self, packet):
if not len(self.file_list):
packet.set_end_of_stream()

log.info("Empty file list")

return packet

if self.multiprocessing:
file_list = self.file_list

log.info("Read: file list")

packet.data = {
'file_list': file_list,
}

self.file_list = []
else:
entry = self.file_list.pop(0)

log.info("Read entry: %s" % entry)

packet.data = entry

return packet
Loading