Skip to content

Commit

Permalink
Improve size and memory checks on the local adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Jan 9, 2023
1 parent f7a91a8 commit 53e570d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 81 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,31 @@ on:

jobs:
nebulex_test:
name: 'Nebulex Test (Elixir ${{ matrix.elixir }} OTP ${{ matrix.otp }})'
runs-on: ubuntu-latest
name: >-
Nebulex Test (Elixir ${{ matrix.elixir }} / OTP ${{ matrix.otp }} /
OS ${{ matrix.os }})
runs-on: ${{ matrix.os }}

strategy:
matrix:
include:
- elixir: 1.14.x
otp: 25.x
os: 'ubuntu-latest'
style: true
coverage: true
sobelow: true
dialyzer: true
- elixir: 1.13.x
otp: 24.x
os: 'ubuntu-latest'
- elixir: 1.11.x
otp: 23.x
os: 'ubuntu-20.04'
inch-report: true
- elixir: 1.9.x
otp: 22.x
os: 'ubuntu-20.04'

env:
GITHUB_TOKEN: '${{ secrets.GITHUB_TOKEN }}'
Expand Down
159 changes: 96 additions & 63 deletions lib/nebulex/adapters/local/generation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,16 @@ defmodule Nebulex.Adapters.Local.Generation do

@impl true
def handle_call(:delete_all, _from, %__MODULE__{} = state) do
# Get current size
size =
state
|> Map.from_struct()
|> Local.execute(:count_all, nil, [])

# Create new generation
:ok = new_gen(state)

# Delete all objects
:ok =
state.meta_tab
|> list()
Expand All @@ -322,8 +325,13 @@ defmodule Nebulex.Adapters.Local.Generation do
end

def handle_call({:new_generation, reset_timer?}, _from, state) do
# Create new generation
:ok = new_gen(state)
{:reply, :ok, %{state | gc_heartbeat_ref: maybe_reset_timer(reset_timer?, state)}}

# Maybe reset heartbeat timer
heartbeat_ref = maybe_reset_timer(reset_timer?, state)

{:reply, :ok, %{state | gc_heartbeat_ref: heartbeat_ref}}
end

def handle_call(
Expand All @@ -348,86 +356,90 @@ defmodule Nebulex.Adapters.Local.Generation do
end

@impl true
def handle_info(:heartbeat, %__MODULE__{gc_interval: time, gc_heartbeat_ref: ref} = state) do
def handle_info(
:heartbeat,
%__MODULE__{
gc_interval: gc_interval,
gc_heartbeat_ref: heartbeat_ref
} = state
) do
# Create new generation
:ok = new_gen(state)
{:noreply, %{state | gc_heartbeat_ref: start_timer(time, ref)}}

# Reset heartbeat timer
heartbeat_ref = start_timer(gc_interval, heartbeat_ref)

{:noreply, %{state | gc_heartbeat_ref: heartbeat_ref}}
end

def handle_info(:cleanup, state) do
state =
state
|> check_size()
|> check_memory()
# Check size first, if the cleanup is done, skip checking the memory,
# otherwise, check the memory too.
{_, state} =
with {false, state} <- check_size(state) do
check_memory(state)
end

{:noreply, state}
end

defp check_size(
%__MODULE__{
meta_tab: meta_tab,
max_size: max_size,
backend: backend
} = state
)
when not is_nil(max_size) do
meta_tab
|> list()
|> Enum.reduce(0, &(backend.info(&1, :size) + &2))
|> maybe_cleanup(max_size, state)
defp check_size(%__MODULE__{max_size: max_size} = state) when not is_nil(max_size) do
maybe_cleanup(:size, state)
end

defp check_size(state), do: state
defp check_size(state) do
{false, state}
end

defp check_memory(
%__MODULE__{
meta_tab: meta_tab,
backend: backend,
allocated_memory: allocated
} = state
)
when not is_nil(allocated) do
backend
|> memory_info(meta_tab)
|> maybe_cleanup(allocated, state)
defp check_memory(%__MODULE__{allocated_memory: allocated} = state) when not is_nil(allocated) do
maybe_cleanup(:memory, state)
end

defp check_memory(state), do: state
defp check_memory(state) do
{false, state}
end

defp maybe_cleanup(
size,
max_size,
info,
%__MODULE__{
gc_cleanup_max_timeout: max_timeout,
gc_cleanup_ref: cleanup_ref,
gc_cleanup_min_timeout: min_timeout,
gc_cleanup_max_timeout: max_timeout,
gc_interval: gc_interval,
gc_heartbeat_ref: heartbeat_ref
} = state
)
when size >= max_size do
:ok = new_gen(state)
) do
case cleanup_info(info, state) do
{size, max_size} when size >= max_size ->
# Create a new generation
:ok = new_gen(state)

%{
state
| gc_cleanup_ref: start_timer(max_timeout, cleanup_ref, :cleanup),
gc_heartbeat_ref: start_timer(gc_interval, heartbeat_ref)
}
# Reset the heartbeat timer
heartbeat_ref = start_timer(gc_interval, heartbeat_ref)

# Reset the cleanup timer
cleanup_ref =
info
|> cleanup_info(state)
|> elem(0)
|> reset_cleanup_timer(max_size, min_timeout, max_timeout, cleanup_ref)

{true, %{state | gc_heartbeat_ref: heartbeat_ref, gc_cleanup_ref: cleanup_ref}}

{size, max_size} ->
# Reset the cleanup timer
cleanup_ref = reset_cleanup_timer(size, max_size, min_timeout, max_timeout, cleanup_ref)

{false, %{state | gc_cleanup_ref: cleanup_ref}}
end
end

defp maybe_cleanup(
size,
max_size,
%__MODULE__{
gc_cleanup_min_timeout: min_timeout,
gc_cleanup_max_timeout: max_timeout,
gc_cleanup_ref: cleanup_ref
} = state
) do
cleanup_ref =
size
|> linear_inverse_backoff(max_size, min_timeout, max_timeout)
|> start_timer(cleanup_ref, :cleanup)
defp cleanup_info(:size, %__MODULE__{backend: mod, meta_tab: tab, max_size: max}) do
{size_info(mod, tab), max}
end

%{state | gc_cleanup_ref: cleanup_ref}
defp cleanup_info(:memory, %__MODULE__{backend: mod, meta_tab: tab, allocated_memory: max}) do
{memory_info(mod, tab), max}
end

## Private Functions
Expand All @@ -447,22 +459,22 @@ defmodule Nebulex.Adapters.Local.Generation do
# Since the older generation is deleted, update evictions count
:ok = Stats.incr(stats_counter, :evictions, backend.info(older, :size))

# Update generations
:ok = Metadata.put(meta_tab, :generations, [gen_tab, newer])

# Process the older generation:
# - Delete previously stored deprecated generation
# - Flush the older generation
# - Deprecate it (mark it for deletion)
:ok = process_older_gen(meta_tab, backend, older)

# Update generations
Metadata.put(meta_tab, :generations, [gen_tab, newer])

[newer] ->
# Update generations
Metadata.put(meta_tab, :generations, [gen_tab, newer])
:ok = Metadata.put(meta_tab, :generations, [gen_tab, newer])

[] ->
# update generations
Metadata.put(meta_tab, :generations, [gen_tab])
# Update generations
:ok = Metadata.put(meta_tab, :generations, [gen_tab])
end
end

Expand All @@ -486,6 +498,7 @@ defmodule Nebulex.Adapters.Local.Generation do

defp start_timer(time, ref \\ nil, event \\ :heartbeat) do
_ = if ref, do: Process.cancel_timer(ref)

Process.send_after(self(), event, time)
end

Expand All @@ -501,6 +514,18 @@ defmodule Nebulex.Adapters.Local.Generation do
start_timer(state.gc_interval, state.gc_heartbeat_ref)
end

defp reset_cleanup_timer(size, max_size, min_timeout, max_timeout, cleanup_ref) do
size
|> linear_inverse_backoff(max_size, min_timeout, max_timeout)
|> start_timer(cleanup_ref, :cleanup)
end

defp size_info(backend, meta_tab) do
meta_tab
|> list()
|> Enum.reduce(0, &(backend.info(&1, :size) + &2))
end

defp memory_info(backend, meta_tab) do
meta_tab
|> list()
Expand All @@ -512,6 +537,14 @@ defmodule Nebulex.Adapters.Local.Generation do
end)
end

defp linear_inverse_backoff(size, _max_size, _min_timeout, max_timeout) when size <= 0 do
max_timeout
end

defp linear_inverse_backoff(size, max_size, min_timeout, _max_timeout) when size >= max_size do
min_timeout
end

defp linear_inverse_backoff(size, max_size, min_timeout, max_timeout) do
round((min_timeout - max_timeout) / max_size * size + max_timeout)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/nebulex/entry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Nebulex.Entry do
def encode(data, opts \\ []) do
data
|> :erlang.term_to_binary(opts)
|> Base.url_encode64()
|> Base.encode64()
end

@doc """
Expand All @@ -61,7 +61,7 @@ defmodule Nebulex.Entry do
@spec decode(binary, [term]) :: term
def decode(data, opts \\ []) when is_binary(data) do
data
|> Base.url_decode64!()
|> Base.decode64!()
|> :erlang.binary_to_term(opts)
end

Expand Down
42 changes: 33 additions & 9 deletions test/nebulex/adapters/local/generation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -246,39 +246,63 @@ defmodule Nebulex.Adapters.Local.GenerationTest do
LocalWithSizeLimit.start_link(
gc_interval: 3_600_000,
max_size: 3,
gc_cleanup_min_timeout: 10,
gc_cleanup_max_timeout: 1000
gc_cleanup_min_timeout: 1000,
gc_cleanup_max_timeout: 1500
)

# Initially there should be only 1 generation and no entries
assert generations_len(LocalWithSizeLimit) == 1
assert LocalWithSizeLimit.count_all() == 0

# Put some entries to exceed the max size
_ = cache_put(LocalWithSizeLimit, 1..4)

# Validate current size
assert LocalWithSizeLimit.count_all() == 4

:ok = Process.sleep(1100)
# Wait the max cleanup timeout
:ok = Process.sleep(1600)

# There should be 2 generation now
assert generations_len(LocalWithSizeLimit) == 2

# The entries should be now in the older generation
assert LocalWithSizeLimit.count_all() == 4

# Wait the min cleanup timeout since max size is exceeded
:ok = Process.sleep(1100)

assert generations_len(LocalWithSizeLimit) == 2
# Cache should be empty now
assert LocalWithSizeLimit.count_all() == 0

_ = cache_put(LocalWithSizeLimit, 5..8)
# Put some entries without exceeding the max size
_ = cache_put(LocalWithSizeLimit, 5..6)

assert LocalWithSizeLimit.count_all() == 4
# Validate current size
assert LocalWithSizeLimit.count_all() == 2

:ok = Process.sleep(1100)
# Wait the max cleanup timeout (timeout should be relative to the size)
:ok = Process.sleep(1600)

assert generations_len(LocalWithSizeLimit) == 2
# The entries should be in the newer generation yet
assert LocalWithSizeLimit.count_all() == 2

# Put some entries to exceed the max size
_ = cache_put(LocalWithSizeLimit, 7..8)

# Wait the max cleanup timeout
:ok = Process.sleep(1600)

# The entries should be in the newer generation yet
assert LocalWithSizeLimit.count_all() == 4

# Wait the min cleanup timeout since max size is exceeded
:ok = Process.sleep(1100)

assert generations_len(LocalWithSizeLimit) == 2
# Cache should be empty now
assert LocalWithSizeLimit.count_all() == 0

# Stop the cache
:ok = LocalWithSizeLimit.stop()
end
end
Expand Down
Loading

0 comments on commit 53e570d

Please sign in to comment.