Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk import JSON FHIR resources #187

Merged
merged 11 commits into from
May 21, 2024
11 changes: 11 additions & 0 deletions importer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,14 @@ The coverage report `coverage.html` will be at the working directory
- See example csv [here](/importer/csv/import/inventory.csv)
- This creates a Group resource for each inventory imported
- The first two columns __name__ and __active__ is the minimum required

### 12. Import JSON resources from file
- Run `python3 main.py --bulk_import True --json_file tests/fhir_sample.json --chunk_size 500000 --sync sort --resources_count 100 --log_level info`
- This takes in a file with a JSON array, reads the resources from the array in the file and posts them to the FHIR server
- `bulk_import` (Required) must be set to True
- `json_file` (Required) points to the file with the json array. The resources in the array need to be separated by a single comma (no spaces) and the **"id"** must always be the first attribute in the resource object. This is what the code uses to identify the beginning and end of resources
- `chunk_size` (Not required) is the number of characters to read from the JSON file at a time. The size of this file can potentially be very large, so we do not want to read it all at once, we read it in chunks. This number **MUST** be at least the size of the largest single resource in the array. The default is set to 1,000,000
- `sync` (Not required) defines the sync strategy. This can be either **direct** (which is the default) or **sort**
- **Direct** will read the resources one chunk at a time, while building a payload and posting to the server before reading the next chunk. This works if you have referential integrity turned off in the FHIR server
- **Sort** will read all the resources in the file first and sort them into different resource types. It will then build separate payloads for the different resource types and try to post them to the FHIR server in the order that the resources first appear in the JSON file. For example, if you want Patients to be synced first, then make sure that the first resource is a Patient resource
- `resources_count` (Not required) is the number of resources put in a bundle when posting the resources to the FHIR server. The default is set to 100
166 changes: 161 additions & 5 deletions importer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_access_token():
# This function makes the request to the provided url
# to create resources
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=180)
def post_request(request_type, payload, url):
def post_request(request_type, payload, url, json_payload):
logging.info("Posting request")
logging.info("Request type: " + request_type)
logging.info("Url: " + url)
Expand All @@ -92,9 +92,9 @@ def post_request(request_type, payload, url):
headers = {"Content-type": "application/json", "Authorization": access_token}

if request_type == "POST":
return requests.post(url, data=payload, headers=headers)
return requests.post(url, data=payload, json=json_payload, headers=headers)
elif request_type == "PUT":
return requests.put(url, data=payload, headers=headers)
return requests.put(url, data=payload, json=json_payload, headers=headers)
elif request_type == "GET":
return requests.get(url, headers=headers)
elif request_type == "DELETE":
Expand All @@ -103,9 +103,9 @@ def post_request(request_type, payload, url):
logging.error("Unsupported request type!")


def handle_request(request_type, payload, url):
def handle_request(request_type, payload, url, json_payload=None):
try:
response = post_request(request_type, payload, url)
response = post_request(request_type, payload, url, json_payload)
if response.status_code == 200 or response.status_code == 201:
logging.info("[" + str(response.status_code) + "]" + ": SUCCESS!")

Expand Down Expand Up @@ -1494,6 +1494,136 @@ def save_image(image_source_url):
return 0


def process_chunk(resources_array: list, resource_type: str):
new_arr = []
with click.progressbar(
resources_array, label="Progress::Processing chunks ... "
) as resources_array_progress:
for resource in resources_array_progress:
if not resource_type:
resource_type = resource["resourceType"]
try:
resource_id = resource["id"]
except KeyError:
if "identifier" in resource:
resource_identifier = resource["identifier"][0]["value"]
resource_id = str(
uuid.uuid5(uuid.NAMESPACE_DNS, resource_identifier)
)
else:
resource_id = str(uuid.uuid4())

item = {"resource": resource, "request": {}}
item["request"]["method"] = "PUT"
item["request"]["url"] = "/".join([resource_type, resource_id])
new_arr.append(item)

json_payload = {"resourceType": "Bundle", "type": "transaction", "entry": new_arr}

r = handle_request("POST", "", config.fhir_base_url, json_payload)
logging.info(r.text)
# TODO handle failures


def set_resource_list(
objs: str = None,
json_list: list = None,
resource_type: str = None,
number_of_resources: int = 100,
):
if objs:
resources_array = json.loads(objs)
process_chunk(resources_array, resource_type)
if json_list:
if len(json_list) > number_of_resources:
for i in range(0, len(json_list), number_of_resources):
sub_list = json_list[i : i + number_of_resources]
process_chunk(sub_list, resource_type)
else:
process_chunk(json_list, resource_type)


def build_mapped_payloads(resource_mapping, json_file, resources_count):
with open(json_file, "r") as file:
data_dict = json.load(file)
with click.progressbar(
resource_mapping, label="Progress::Setting up ... "
) as resource_mapping_progress:
for resource_type in resource_mapping_progress:
index_positions = resource_mapping[resource_type]
resource_list = [data_dict[i] for i in index_positions]
set_resource_list(None, resource_list, resource_type, resources_count)


def build_resource_type_map(resources: str, mapping: dict, index_tracker: int = 0):
resource_list = json.loads(resources)
for index, resource in enumerate(resource_list):
resource_type = resource["resourceType"]
if resource_type in mapping.keys():
mapping[resource_type].append(index + index_tracker)
else:
mapping[resource_type] = [index + index_tracker]

global import_counter
import_counter = len(resource_list) + import_counter


def split_chunk(
chunk: str,
left_over_chunk: str,
size: int,
mapping: dict = None,
sync: str = None,
import_counter: int = 0,
):
if len(chunk) + len(left_over_chunk) < int(size):
# load can fit in one chunk, so remove closing bracket
last_bracket = chunk.rfind("}")
current_chunk = chunk[: int(last_bracket)]
next_left_over_chunk = "-"
if len(chunk.strip()) == 0:
last_bracket = left_over_chunk.rfind("}")
left_over_chunk = left_over_chunk[: int(last_bracket)]
else:
# load can't fit, so split on last full resource
split_index = chunk.rfind(
'},{"id"'
) # Assumption that this string will find the last full resource
current_chunk = chunk[:split_index]
next_left_over_chunk = chunk[int(split_index) + 2 :]
if len(chunk.strip()) == 0:
last_bracket = left_over_chunk.rfind("}")
left_over_chunk = left_over_chunk[: int(last_bracket)]

if len(left_over_chunk.strip()) == 0:
current_chunk = current_chunk[1:]

chunk_list = "[" + left_over_chunk + current_chunk + "}]"

if sync.lower() == "direct":
set_resource_list(chunk_list)
if sync.lower() == "sort":
build_resource_type_map(chunk_list, mapping, import_counter)
return next_left_over_chunk


def read_file_in_chunks(json_file: str, chunk_size: int, sync: str):
logging.info("Reading file in chunks ...")
incomplete_load = ""
mapping = {}
global import_counter
import_counter = 0
with open(json_file, "r") as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
incomplete_load = split_chunk(
chunk, incomplete_load, chunk_size, mapping, sync, import_counter
)
return mapping


class ResponseFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
Expand Down Expand Up @@ -1523,6 +1653,7 @@ def filter(self, record):

@click.command()
@click.option("--csv_file", required=False)
@click.option("--json_file", required=False)
@click.option("--access_token", required=False)
@click.option("--resource_type", required=False)
@click.option("--assign", required=False)
Expand All @@ -1538,8 +1669,18 @@ def filter(self, record):
@click.option("--parameter", required=False, default="_lastUpdated")
@click.option("--value", required=False, default="gt2023-01-01")
@click.option("--limit", required=False, default=1000)
@click.option("--bulk_import", required=False, default=False)
@click.option("--chunk_size", required=False, default=1000000)
@click.option("--resources_count", required=False, default=100)
@click.option(
"--sync",
type=click.Choice(["DIRECT", "SORT"], case_sensitive=False),
required=False,
default="DIRECT",
)
def main(
csv_file,
json_file,
access_token,
resource_type,
assign,
Expand All @@ -1553,6 +1694,10 @@ def main(
parameter,
value,
limit,
bulk_import,
chunk_size,
resources_count,
sync,
):
if log_level == "DEBUG":
logging.basicConfig(
Expand Down Expand Up @@ -1580,6 +1725,17 @@ def main(
export_resources_to_csv(resource_type, parameter, value, limit)
exit()

if bulk_import:
logging.info("Starting bulk import...")
resource_mapping = read_file_in_chunks(json_file, chunk_size, sync)
if sync.lower() == "sort":
build_mapped_payloads(resource_mapping, json_file, resources_count)
end_time = datetime.now()
logging.info("End time: " + end_time.strftime("%H:%M:%S"))
total_time = end_time - start_time
logging.info("Total time: " + str(total_time.total_seconds()) + " seconds")
exit()

# set access token
if access_token:
global global_access_token
Expand Down
Loading
Loading