Skip to content

Commit

Permalink
ruff check --format plus some return types
Browse files Browse the repository at this point in the history
  • Loading branch information
metazool committed Oct 4, 2024
1 parent da4b4e2 commit 3667f4a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 88 deletions.
4 changes: 3 additions & 1 deletion src/cyto_ml/data/decollage.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def output_dir(self) -> None:
os.mkdir(self.output_directory)

def do_decollage(self) -> None:
"""Not very lovely single function that replaces the work of the script."""
"""Not very lovely single function that replaces the work of the script.
See cyto_ml.pipeline.pipeline_decollage - has the same code in it
"""
# Reasonably assume that all images in a session have same spatio-temporal metadata
# extract the coords, date, possibly depth from directory name
collage_headers = headers_from_filename(self.directory)
Expand Down
133 changes: 46 additions & 87 deletions src/cyto_ml/pipeline/pipeline_decollage.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,38 @@
import glob
import logging
import os
import re
from datetime import datetime
from typing import List

import boto3
import luigi
import pandas as pd
import requests
from dotenv import load_dotenv
from exiftool import ExifToolHelper
from exiftool.exceptions import ExifToolExecuteError
from skimage.io import imread, imsave

from cyto_ml.data.decollage import headers_from_filename, lst_metadata, window_slice, write_headers
from cyto_ml.data.s3 import boto3_client

# Set up logging
logging.basicConfig(level=logging.INFO)


# Load AWS credentials and S3 bucket name from .env file
# Rather than depend on the presence of credentials.json in the package
load_dotenv()

AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID", "")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", "")
AWS_URL_ENDPOINT = os.environ.get("AWS_URL_ENDPOINT", "")

# S3 Client
s3 = boto3.client("s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=AWS_URL_ENDPOINT)


# Utility functions (kept as is from your original code)
def lst_metadata(filename: str) -> pd.DataFrame:
heads = pd.read_csv(filename, sep="|", nrows=53, skiprows=1)
colNames = list(heads["num-fields"])
meta = pd.read_csv(filename, sep="|", skiprows=55, header=None)
meta.columns = colNames
return meta


def window_slice(image, x, y, height, width):
return image[y:y + height, x:x + width]


def headers_from_filename(filename: str) -> dict:
headers = {}
pattern = r"_(-?\d+\.\d+)_(-?\d+\.\d+)_(\d{8})(?:_(\d+))?"
match = re.search(pattern, filename)
if match:
lat, lon, date, depth = match.groups()
headers["GPSLatitude"] = lat
headers["GPSLongitude"] = lon
headers["DateTimeOriginal"] = date
headers["GPSAltitude"] = depth
return headers


def write_headers(filename: str, headers: dict) -> bool:
result = None
try:
with ExifToolHelper() as et:
et.set_tags([filename], tags=headers, params=["-P", "-overwrite_original"])
result = True
except ExifToolExecuteError as err:
logging.warning(err)
result = False
return result
s3 = boto3_client()


class ReadMetadata(luigi.Task):
"""
Task to read metadata from the .lst file.
"""

directory = luigi.Parameter()

def output(self):
return luigi.LocalTarget(f'{self.directory}/metadata.csv')
def output(self) -> luigi.Target:
return luigi.LocalTarget(f"{self.directory}/metadata.csv")

def run(self):
def run(self) -> None:
files = glob.glob(f"{self.directory}/*.lst")
if len(files) == 0:
raise FileNotFoundError("No .lst file found in this directory.")
Expand All @@ -93,12 +46,13 @@ class CreateOutputDirectory(luigi.Task):
"""
Task to create the output directory if it does not exist.
"""

output_directory = luigi.Parameter()

def output(self):
def output(self) -> luigi.Target:
return luigi.LocalTarget(self.output_directory)

def run(self):
def run(self) -> None:
if not os.path.exists(self.output_directory):
os.mkdir(self.output_directory)
logging.info(f"Output directory created: {self.output_directory}")
Expand All @@ -110,18 +64,19 @@ class DecollageImages(luigi.Task):
"""
Task that processes the large TIFF image, extracts vignettes, and saves them with EXIF metadata.
"""

directory = luigi.Parameter()
output_directory = luigi.Parameter()
experiment_name = luigi.Parameter()

def requires(self):
def requires(self) -> List[luigi.Task]:
return [ReadMetadata(self.directory), CreateOutputDirectory(self.output_directory)]

def output(self):
def output(self) -> luigi.Target:
date = datetime.today().date()
return luigi.LocalTarget(f'{self.directory}/decollage_complete_{date}.txt')
return luigi.LocalTarget(f"{self.directory}/decollage_complete_{date}.txt")

def run(self):
def run(self) -> None:
metadata = pd.read_csv(self.input()[0].path)
collage_headers = headers_from_filename(self.directory)

Expand All @@ -145,35 +100,34 @@ def run(self):
imsave(output_file, img_sub)
write_headers(output_file, headers)

with self.output().open('w') as f:
f.write('Decollage complete')
with self.output().open("w") as f:
f.write("Decollage complete")


class UploadDecollagedImagesToS3(luigi.Task):
"""
Task to upload decollaged images to an S3 bucket.
"""

directory = luigi.Parameter()
output_directory = luigi.Parameter()
s3_bucket = luigi.Parameter()

def requires(self):
def requires(self) -> List[luigi.Task]:
return DecollageImages(
directory=self.directory,
output_directory=self.output_directory,
experiment_name="test_experiment"
directory=self.directory, output_directory=self.output_directory, experiment_name="test_experiment"
)

def output(self):
def output(self) -> luigi.Target:
date = datetime.today().date()
return luigi.LocalTarget(f'{self.directory}/s3_upload_complete_{date}.txt')
return luigi.LocalTarget(f"{self.directory}/s3_upload_complete_{date}.txt")

def run(self):
def run(self) -> None:
# Collect the list of decollaged image files from the output of DecollageImages
image_files = glob.glob(f"{self.output_directory}/*.tif")

# Prepare the files for uploading
files = [("files", (open(image_file, 'rb'))) for image_file in image_files]
files = [("files", (open(image_file, "rb"))) for image_file in image_files]

# Prepare the payload for the API request
payload = {
Expand All @@ -192,7 +146,7 @@ def run(self):
# Check if the request was successful
if response.status_code == 200:
logging.info("Files successfully uploaded via API.")
with self.output().open('w') as f:
with self.output().open("w") as f:
f.write("API upload complete")
else:
logging.error(f"API upload failed with status code {response.status_code}")
Expand All @@ -203,34 +157,39 @@ def run(self):
logging.error(f"Failed to upload files to API: {e}")
raise e

with self.output().open('w') as f:
with self.output().open("w") as f:
f.write("S3 upload complete")


class FlowCamPipeline(luigi.WrapperTask):
"""
Main wrapper task to execute the entire pipeline.
"""

directory = luigi.Parameter()
output_directory = luigi.Parameter()
experiment_name = luigi.Parameter()
s3_bucket = luigi.Parameter()

def requires(self):
def requires(self) -> luigi.Task:
return UploadDecollagedImagesToS3(
directory=self.directory,
output_directory=self.output_directory,
s3_bucket=self.s3_bucket
directory=self.directory, output_directory=self.output_directory, s3_bucket=self.s3_bucket
)


# To run the pipeline
if __name__ == '__main__':
luigi.run([
"FlowCamPipeline",
# "--local-scheduler",
"--directory", "/home/albseg/scratch/plankton_pipeline_luigi/data/19_10_Tank25_blanksremoved",
"--output-directory", "/home/albseg/scratch/plankton_pipeline_luigi/data/images_decollage",
"--experiment-name", "test",
"--s3-bucket", "test-upload-alba"
])
if __name__ == "__main__":
luigi.run(
[
"FlowCamPipeline",
# "--local-scheduler",
"--directory",
"/home/albseg/scratch/plankton_pipeline_luigi/data/19_10_Tank25_blanksremoved",
"--output-directory",
"/home/albseg/scratch/plankton_pipeline_luigi/data/images_decollage",
"--experiment-name",
"test",
"--s3-bucket",
"test-upload-alba",
]
)

0 comments on commit 3667f4a

Please sign in to comment.