Skip to content

Commit

Permalink
Merge pull request #3803 from nulib/4259-generate-export-manifest
Browse files Browse the repository at this point in the history
Write a CSV manifest of exported assets to the export destination
  • Loading branch information
mbklein authored Jan 30, 2024
2 parents f55b696 + cbabeaf commit d51978f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 31 deletions.
96 changes: 65 additions & 31 deletions app/lib/meadow/seed/export.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,37 @@ defmodule Meadow.Seed.Export do
opts.prefix
)

unless opts.skip_assets do
ingest_sheet_assets(sheet_ids)
exported_assets =
ingest_sheet_assets(sheet_ids, opts.skip_assets)
|> export_assets(opts.bucket, opts.prefix, opts.threads)
end

Logger.info("Exporting #{length(opts.works)} works")

work_ids =
export_standalone_works(opts.works, opts.bucket, opts.prefix)

unless opts.skip_assets do
work_assets(work_ids)
|> export_assets(opts.bucket, opts.prefix, opts.threads)
end
exported_assets =
[
exported_assets
| work_assets(work_ids, opts.skip_assets)
|> export_assets(opts.bucket, opts.prefix, opts.threads)
]
|> List.flatten()

exported_csv =
[{"source_bucket", "source_key", "dest_bucket", "dest_key", "status"} | exported_assets]
|> Enum.map(&Tuple.to_list/1)
|> CSV.dump_to_iodata()
|> IO.iodata_to_binary()

ExAws.S3.put_object(
opts.bucket,
Path.join([opts.prefix, "exported_assets.csv"]),
exported_csv
)
|> ExAws.request!()

{:ok, %{sheet_ids: sheet_ids, work_ids: work_ids, exported_assets: exported_assets}}
end

def export_manifest(bucket, prefix) do
Expand Down Expand Up @@ -118,7 +135,9 @@ defmodule Meadow.Seed.Export do
ids
end

def ingest_sheet_assets(ingest_sheet_ids) do
def ingest_sheet_assets(_, true), do: []

def ingest_sheet_assets(ingest_sheet_ids, _) do
from(w in Work,
join: fs in FileSet,
on: fs.work_id == w.id,
Expand All @@ -134,7 +153,9 @@ defmodule Meadow.Seed.Export do
end)
end

def work_assets(work_ids) do
def work_assets(_, true), do: []

def work_assets(work_ids, _) do
from(fs in FileSet,
where: fs.work_id in ^work_ids,
select: %{
Expand All @@ -153,16 +174,17 @@ defmodule Meadow.Seed.Export do

def export_assets(assets, bucket, prefix, 1) do
assets
|> Enum.each(&upload_asset_sync(&1, bucket, prefix))
|> Enum.map(&upload_asset_sync(&1, bucket, prefix))
end

def export_assets(assets, bucket, prefix, threads) do
assets
|> Enum.chunk_every(threads)
|> Enum.each(fn chunk ->
|> Enum.map(fn chunk ->
Enum.map(chunk, &upload_asset_async(&1, bucket, prefix))
|> Task.await_many(30_000)
end)
|> List.flatten()
end

defp upload_asset_async(asset, bucket, prefix) do
Expand All @@ -171,32 +193,44 @@ defmodule Meadow.Seed.Export do

defp upload_asset_sync(asset, bucket, prefix) do
with %{preservation_file: preservation_file, pyramid_file: pyramid_file} <- asset do
copy_asset(preservation_file, bucket, Path.join([prefix, "preservation"]))
copy_asset(pyramid_file, bucket, Path.join([prefix, "pyramid"]))
[
copy_asset(preservation_file, bucket, Path.join([prefix, "preservation"])),
copy_asset(pyramid_file, bucket, Path.join([prefix, "pyramid"]))
]
end
end

defp copy_asset(source, bucket, prefix) do
Logger.info("Copying #{source} to s3://#{bucket}/#{prefix}/")

with %URI{host: source_bucket, path: "/" <> source_key} <- URI.parse(source) do
case ExAws.S3.put_object_copy(
bucket,
Path.join([prefix, source_key]),
source_bucket,
source_key,
metadata_directive: :COPY
)
|> ExAws.request() do
{:error, {:http_error, status, _}} ->
Logger.warning("Failed to copy: HTTP error #{status}")

{:error, message} ->
Logger.warning("Failed to copy because: #{message}")

other ->
other
end
with %URI{host: source_bucket, path: "/" <> source_key} <- URI.parse(source),
key <- Path.join([prefix, source_key]) do
status =
case ExAws.S3.put_object_copy(
bucket,
key,
source_bucket,
source_key,
metadata_directive: :COPY
)
|> ExAws.request() do
{:error, {:http_error, status, _}} ->
Logger.warning("Failed to copy: HTTP error #{status}")
status

{:error, message} ->
Logger.warning("Failed to copy because: #{message}")
500

{:ok, %{status_code: status}} ->
status

other ->
Logger.warning("Unexpected response: #{inspect(other)}")
500
end

{source_bucket, source_key, bucket, key, status}
end
end

Expand Down
1 change: 1 addition & 0 deletions livebook/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM ghcr.io/livebook-dev/livebook
ENV LIVEBOOK_AWS_CREDENTIALS=true
ENV LIVEBOOK_IDENTITY_PROVIDER=custom:MeadowLivebookAuth
ENV LIVEBOOK_DISTRIBUTION=sname
ENV LIVEBOOK_IP=0.0.0.0
Expand Down

0 comments on commit d51978f

Please sign in to comment.