# This should be set appropriately for the system
# eg. cuda118 for a machine with a GPU and CUDA 11.8+
IO.puts(System.get_env("XLA_TARGET"))
Mix.install(
[
{:bumblebee, github: "elixir-nx/bumblebee", branch: "main", override: true},
{:exla, ">= 0.0.0"},
{:kino_bumblebee, "~> 0.1.0"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
Nx.global_default_backend(EXLA.Backend)
Nx.Defn.global_default_options(compiler: EXLA)
To start, we're going to try to get Stable Diffusion running using BumbleBee
and Nx.Serving
. This should (!) be pretty straightforward, we'll use the example notebook here
repository_id = "CompVis/stable-diffusion-v1-4"
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/clip-vit-large-patch14"})
{:ok, clip} = Bumblebee.load_model({:hf, repository_id, subdir: "text_encoder"})
{:ok, unet} =
Bumblebee.load_model({:hf, repository_id, subdir: "unet"},
params_filename: "diffusion_pytorch_model.bin"
)
{:ok, vae} =
Bumblebee.load_model({:hf, repository_id, subdir: "vae"},
architecture: :decoder,
params_filename: "diffusion_pytorch_model.bin"
)
{:ok, scheduler} = Bumblebee.load_scheduler({:hf, repository_id, subdir: "scheduler"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, repository_id, subdir: "feature_extractor"})
{:ok, safety_checker} = Bumblebee.load_model({:hf, repository_id, subdir: "safety_checker"})
:ok
%Bumblebee.Diffusion.DdimScheduler{}
serving =
Bumblebee.Diffusion.StableDiffusion.text_to_image(clip, unet, vae, tokenizer, scheduler,
num_steps: 20,
num_images_per_prompt: 2,
safety_checker: safety_checker,
safety_checker_featurizer: featurizer,
compile: [batch_size: 1, sequence_length: 60],
defn_options: [compiler: EXLA],
seed: 0
)
text_input =
Kino.Input.text("Prompt", default: "numbat, forest, high quality, detailed, digital art")
prompt = Kino.Input.read(text_input)
output = Nx.Serving.run(serving, prompt)
for result <- output.results do
Kino.Image.new(result.image)
end
|> Kino.Layout.grid(columns: 2)
That worked! If you got a CuDNN error in the last step (as I did), check your XLA_TARGET
and make sure you haven't used one with too high a CuDNN. If so, downgrade your XLA_TARGET
or upgrade CuDNN.
Now that we know the basics are working (which is a huge step since it means CUDA, XLA and Nx are all set up correctly), we're going to go a little deeper. Right now, a lot of the details of Stable Diffusion are hidden behind the Bumblebee.Diffusion.StableDiffusion.text_to_image
function. We're going to break this function down into its parts in this notebook so we can start modifying the pieces in the next step.
For this breakdown, we're going to ignore some of the unnecessary/less interesting bits like the safety checker and doing multiple images per prompt.
This breakdown is based on the code underlying Bumblebee.Diffusion.StableDiffusion.text_to_image and this notebook from fast.ai
Let's start by processing the prompt - in this case processing means turn the natural language text into text embeddings.
The way SD works is we create outputs for two prompts: the conditional prompt which is the prompt we give it, and the unconditional prompt which is simply the empty string. The final result is a sort of weighted sum between these two results.
prompt = Kino.Input.read(text_input)
seq_length = 60
batch_size = 2
num_steps = 20
guidance_scale = 7.5
tokenizer_options = [
length: seq_length,
return_token_type_ids: false,
return_attention_mask: false
]
cond_tokens = Bumblebee.Text.ClipTokenizer.apply(tokenizer, prompt, tokenizer_options)
uncond_tokens = Bumblebee.Text.ClipTokenizer.apply(tokenizer, "", tokenizer_options)
# Since cond_tokens and uncond_tokens are maps, this concats the corresponding keys correctly
tokens = Bumblebee.Utils.Nx.composite_concatenate(uncond_tokens, cond_tokens)
%{hidden_state: text_embeddings} = Axon.predict(clip.model, clip.params, tokens)
# Shape = {2, 60, 768}
text_embeddings
We have the text embeddings for the conditional and unconditional prompt, but we need to replicate these to match our batch size. We're going to put our batch size in the 2nd dimension so later we can easily split the output into the conditional and unconditional parts.
text_embeddings =
text_embeddings
|> Nx.new_axis(1)
|> Nx.tile([1, batch_size, 1, 1])
|> Nx.reshape({:auto, seq_length, 768})
The final shape of text_embeddings
is {batch_size*2, 60, 768}
with the first batch of size batch_size being the embeddings for the empty prompt (unconditional) and the 2nd batch being the embeddings for our target prompt (conditional).
Next, we'll create our starting random latent vectors, one for each generation we're going to do. For this model, we can look at the spec to determine these are 64x64x4 tensors. So our final output is going to have shape {batch_size, 64, 64, 4}
, one latent for each image we're going to generate.
latents_shape = {batch_size, unet.spec.sample_size, unet.spec.sample_size, unet.spec.in_channels}
key = Nx.Random.key(0)
{latents, _new_key} = Nx.Random.normal(key, shape: latents_shape)
latents
Now we have all the pieces, we can do a single step of our eventual loop. We'll initialize the scheduler and then predict the in our current (totally noisy) latents. We can even try to visualize our outputs - though don't expect much... yet.
defmodule SDHelper do
import Nx.Defn
# We need this in a module so we can use use `Nx.Defn` so the operators work
defn apply_guidance(noise_pred_uncond, noise_pred_cond, guidance_scale) do
noise_pred_uncond + guidance_scale * (noise_pred_cond - noise_pred_uncond)
end
# TODO: remove this once https://github.com/elixir-nx/bumblebee/issues/123 lands in main
defn scheduler_step(step_fn, scheduler_state, latents, noise_pred) do
step_fn.(scheduler_state, latents, noise_pred)
end
end
{scheduler_state, timesteps} = Bumblebee.scheduler_init(scheduler, num_steps, latents_shape)
unet_inputs = %{
# One batch for uncond and one for cond_tokens
"sample" => Nx.concatenate([latents, latents]),
"timestep" => timesteps[0],
"encoder_hidden_state" => text_embeddings
}
%{sample: noise_pred} = Axon.predict(unet.model, unet.params, unet_inputs)
noise_pred_uncond = noise_pred[0..(batch_size - 1)]
noise_pred_cond = noise_pred[batch_size..-1//1]
noise_pred = SDHelper.apply_guidance(noise_pred_uncond, noise_pred_cond, guidance_scale)
scheduler_step_fn = &Bumblebee.scheduler_step(scheduler, &1, &2, &3)
{_state, new_latents} =
SDHelper.scheduler_step(scheduler_step_fn, scheduler_state, latents, noise_pred)
Now that we have our new latents after one step of the diffusion process, we can run them through the VAE to see what the image looks like. But we're not expecting much since it's just a single step.
# Scaling before we pass it to the VAE
new_latents = Nx.multiply(new_latents, 1 / 0.18215)
%{sample: image} = Axon.predict(vae.model, vae.params, new_latents)
images = NxImage.from_continuous(image, -1, 1)
Kino.Layout.grid(
[
Kino.Image.new(images[0]),
Kino.Image.new(images[1])
],
boxed: true,
columns: 2
)
It's a noisy mess but that's what we expected. Let's put this into a loop and run all 20 steps so we can get a real image. We'll use a while
loop in defn
for a performance gain. To do that, we'll have to use the pre-built versions of our models to avoid issues passing them into the defn
{_, unet_predict} = Axon.build(unet.model, compiler: EXLA)
{_, vae_predict} = Axon.build(vae.model, compiler: EXLA)
defmodule SDLoop do
import Nx.Defn
defn run(
guidance_scale,
latents,
timesteps,
text_embeddings,
unet_predict,
unet_params,
scheduler_step_fn,
scheduler_state
) do
{scheduler_state, latents, _, _, _} =
while {scheduler_state, latents, unet_params, text_embeddings, guidance_scale},
timestep <- timesteps do
unet_inputs = %{
"sample" => Nx.concatenate([latents, latents]),
"timestep" => timestep,
"encoder_hidden_state" => text_embeddings
}
%{sample: noise_pred} = unet_predict.(unet_params, unet_inputs)
batch_size = div(Nx.axis_size(noise_pred, 0), 2)
noise_pred_uncond = noise_pred[0..(batch_size - 1)]
noise_pred_cond = noise_pred[batch_size..-1//1]
noise_pred = SDHelper.apply_guidance(noise_pred_uncond, noise_pred_cond, guidance_scale)
{scheduler_state, latents} =
SDHelper.scheduler_step(scheduler_step_fn, scheduler_state, latents, noise_pred)
{scheduler_state, latents, unet_params, text_embeddings, guidance_scale}
end
{scheduler_state, latents}
end
end
{_final_scheduler_state, final_latents} =
SDLoop.run(
guidance_scale,
latents,
timesteps,
text_embeddings,
unet_predict,
unet.params,
scheduler_step_fn,
scheduler_state
)
Now final_latents
represents the latents for our image after many steps of the diffusion denoising process. Let's run them through the VAE to see what they look like.
# Scaling before we pass it to the VAE
final_latents = Nx.multiply(final_latents, 1 / 0.18215)
%{sample: image} = Axon.predict(vae.model, vae.params, final_latents)
images = NxImage.from_continuous(image, -1, 1)
Kino.Layout.grid(
[
Kino.Image.new(images[0]),
Kino.Image.new(images[1])
],
boxed: true,
columns: 2
)
Success! It matches our original generation using Bumblebee
and Nx.Serving
because we used the same seed of 0.
The advantage of breaking the model down like this is now we can use the additional control to add features. Let's do that now.
It requires too much patience to wait while the image is being generated. Let's make it so we can see the intermediate results as they're generated.
frame = Kino.Frame.new() |> Kino.render()
defmodule SDRenderer do
def render_latents(latents, vae) do
latents = Nx.multiply(latents, 1 / 0.18215)
%{sample: image} = Axon.predict(vae.model, vae.params, latents)
images = NxImage.from_continuous(image, -1, 1)
Enum.map(0..(Nx.axis_size(images, 0) - 1), &Kino.Image.new(images[&1]))
end
def render_latents(latents, vae, frame) do
kino_images = render_latents(latents, vae)
image_grid = Kino.Layout.grid(kino_images, boxed: true, columns: 2)
Kino.Frame.render(frame, image_grid)
end
end
chunked_timesteps =
timesteps
|> Nx.to_flat_list()
|> Enum.chunk_every(4)
|> Enum.map(&Nx.tensor/1)
{_, final_latents} =
Enum.reduce(chunked_timesteps, {scheduler_state, latents}, fn timesteps,
{scheduler_state, latents} ->
{scheduler_state, latents} =
SDLoop.run(
guidance_scale,
latents,
timesteps,
text_embeddings,
unet_predict,
unet.params,
scheduler_step_fn,
scheduler_state
)
SDRenderer.render_latents(latents, vae, frame)
{scheduler_state, latents}
end)
With controllable SD inference, we can try something new - image2image! First, we'll need both the VAE decoder and encoder. We already have the decoder from earlier, so let's load the encoder.
vae_decoder = vae
{:ok, vae_encoder} =
Bumblebee.load_model({:hf, repository_id, subdir: "vae"},
architecture: :encoder,
params_filename: "diffusion_pytorch_model.bin"
)
:ok
Next we can use Kino to create a control for upload an image, which we'll center crop and then preprocess into the right Nx
format.
image = Kino.Input.image("Source image", size: {512, 512}, fit: :crop)
We need to extract the binary from the uploaded image, convert it to Nx
in HWC format, put it into the range -1 to 1 instead of 0 to 255 and add a batch dimension.
%{data: content, format: _, height: height, width: width} = Kino.Input.read(image)
source_image =
Nx.from_binary(content, :u8)
|> Nx.reshape({height, width, 3})
image_tensor =
source_image
|> NxImage.to_continuous(-1, 1)
|> Nx.new_axis(0)
The image tensor is ready to pass to our VAE. A VAE doesn't directly output the latents - instead it outputs the distribution from which we sample the latents, so we need a sampling function.
sample = fn posterior ->
z = Nx.random_normal(Nx.shape(posterior.mean))
Nx.add(posterior.mean, Nx.multiply(posterior.std, z))
end
%{latent_dist: posterior} = Axon.predict(vae_encoder.model, vae_encoder.params, image_tensor)
# Scale the latents
latent = Nx.multiply(sample.(posterior), 0.18215)
We can make sure that we did everything right by running the decoder on our latent. We should get back our original image
frame = Kino.Frame.new() |> Kino.render()
SDRenderer.render_latents(latent, vae_decoder, frame)
Looks right! The way image2image works is, instead of starting with random latent like we did earlier, we're going to start with a noisy version of the latent for our source image (which we just calculated). So first, we need to add the right amount of noise to the image where the right amount is determined by the scheduler. Then we run the diffusion process and get our new image.
To add the "right amount of noise", we'll have to use the scheduler to compute the noise. The function below is a port from the python diffusers library. We're going to use the DdimScheduler
because it's simpler to understand.
num_steps = 40
scheduler = %Bumblebee.Diffusion.DdimScheduler{
beta_start: 0.00085,
beta_end: 0.012,
clip_denoised_sample: false,
alpha_clip_strategy: :alpha_zero
}
latents_shape = {1, unet.spec.sample_size, unet.spec.sample_size, unet.spec.in_channels}
{scheduler_state, timesteps} = Bumblebee.scheduler_init(scheduler, num_steps, latents_shape)
defmodule Noiser do
import Nx.Defn
defn add_noise(scheduler_state, original_samples, noise, timesteps) do
alpha_bars = scheduler_state.alpha_bars
sqrt_alpha_bars =
(alpha_bars[timesteps] ** 0.5)
|> Nx.flatten()
|> expand_dims(Nx.rank(original_samples))
sqrt_one_minus_alpha_bars =
((1 - alpha_bars[timesteps]) ** 0.5)
|> Nx.flatten()
|> expand_dims(Nx.rank(original_samples))
sqrt_alpha_bars * original_samples + sqrt_one_minus_alpha_bars * noise
end
# Adds dimensions at the end until the tensor rank matches `rank`
defn expand_dims(tensor, rank) do
if Nx.rank(tensor) < rank do
expand_dims(Nx.new_axis(tensor, -1), rank)
else
tensor
end
end
end
sampling_step = 15
key = Nx.Random.key(0)
{noise, _new_key} = Nx.Random.normal(key, shape: latent)
noisy_latent =
Noiser.add_noise(scheduler_state, latent, noise, Nx.new_axis(timesteps[sampling_step], 0))
# Note how we have to update the scheduler_state to reflect we've "done" some iterations
scheduler_state = %{scheduler_state | iteration: sampling_step}
SDRenderer.render_latents(noisy_latent, vae_decoder, Kino.render(Kino.Frame.new()))
A noisy image! We set the sampling_step
to 15, so it's as if we had already run the diffusion process for 15 steps and the noisy_latent
was the result. Now we can run the remaning steps starting from step 15 to finish the diffusion process and get our new image.
We'll also need new text embeddings to guide the process.
im2im_text_input =
Kino.Input.text("Prompt", default: "numbat, forest, high quality, detailed, digital art")
defmodule SDEmbeddings do
def get_embeddings(text_input, clip, tokenizer, seq_length, batch_size) do
prompt = Kino.Input.read(text_input)
tokenizer_options = [
length: seq_length,
return_token_type_ids: false,
return_attention_mask: false
]
cond_tokens = Bumblebee.Text.ClipTokenizer.apply(tokenizer, prompt, tokenizer_options)
uncond_tokens = Bumblebee.Text.ClipTokenizer.apply(tokenizer, "", tokenizer_options)
tokens = Bumblebee.Utils.Nx.composite_concatenate(uncond_tokens, cond_tokens)
%{hidden_state: text_embeddings} = Axon.predict(clip.model, clip.params, tokens)
text_embeddings
|> Nx.new_axis(1)
|> Nx.tile([1, batch_size, 1, 1])
|> Nx.reshape({:auto, seq_length, 768})
end
end
im2im_embeddings = SDEmbeddings.get_embeddings(im2im_text_input, clip, tokenizer, 60, 1)
scheduler_step_fn = &Bumblebee.scheduler_step(scheduler, &1, &2, &3)
frame = Kino.Frame.new() |> Kino.render()
render = &Kino.Frame.render(frame, Kino.Layout.grid(&1, boxed: true, columns: 2))
source_image_kino = Kino.Image.new(source_image)
render.([source_image_kino | SDRenderer.render_latents(noisy_latent, vae_decoder)])
chunked_timesteps =
timesteps[sampling_step..-1//1]
|> Nx.to_flat_list()
|> Enum.chunk_every(4)
|> Enum.map(&Nx.tensor/1)
{_, final_latents} =
Enum.reduce(chunked_timesteps, {scheduler_state, noisy_latent}, fn timesteps,
{scheduler_state, latents} ->
{scheduler_state, latents} =
SDLoop.run(
15,
latents,
timesteps,
im2im_embeddings,
unet_predict,
unet.params,
scheduler_step_fn,
scheduler_state
)
kino_images = SDRenderer.render_latents(latents, vae)
[source_image_kino | kino_images]
|> render.()
{scheduler_state, latents}
end)
:ok