Skip to content

Commit

Permalink
fix: Change when we track and create partitions (#1228)
Browse files Browse the repository at this point in the history
* we will use the existing syn scope of the Connect module to check which users still have database connections active
* remove ets as this is no longer required
* Janitor will also tackle creating partitions
  • Loading branch information
filipecabaco authored Nov 18, 2024
1 parent 31401d2 commit 0147dbb
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 82 deletions.
1 change: 0 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ defmodule Realtime.Application do

region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
:ets.new(:active_tenants, [:named_table, :set, :public])

children =
[
Expand Down
26 changes: 0 additions & 26 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -298,32 +298,6 @@ defmodule Realtime.Tenants do
|> tap(fn _ -> Cache.invalidate_tenant_cache(tenant_id) end)
end

@doc """
Tracks the active tenant by external_id and stores the time when it was tracked in the ETS table named `:active_tenants`.
"""
@spec track_active_tenant(String.t()) :: :ok
def track_active_tenant(external_id) do
:ets.insert(:active_tenants, {external_id, NaiveDateTime.utc_now()})
:ok
end

@doc """
Lists all active tenants from the ETS table named `:active_tenants`.
"""
@spec track_active_tenant(String.t()) :: list({String.t(), NaiveDateTime.t()})
def list_active_tenants() do
:ets.tab2list(:active_tenants)
end

@doc """
Untracks the active tenant by external_id from the ETS table named `:active_tenants`.
"""
@spec untrack_active_tenant(String.t()) :: :ok
def untrack_active_tenant(external_id) do
:ets.delete(:active_tenants, external_id)
:ok
end

defp broadcast_operation_event(action, external_id) do
Phoenix.PubSub.broadcast!(
Realtime.PubSub,
Expand Down
1 change: 0 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ defmodule Realtime.Tenants.Connect do
connected_users_bucket: connected_users_bucket
} = state

Tenants.track_active_tenant(state.tenant_id)
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:invalidate_cache")
send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)

Expand Down
48 changes: 26 additions & 22 deletions lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ defmodule Realtime.Tenants.Janitor do
alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.Messages
alias Realtime.Repo
alias Realtime.Tenants
alias Realtime.Tenants.Migrations

@type t :: %__MODULE__{
timer: pos_integer() | nil,
Expand Down Expand Up @@ -52,32 +52,35 @@ defmodule Realtime.Tenants.Janitor do
def init(%__MODULE__{start_after: start_after} = state) do
timer = timer(state) + start_after
Process.send_after(self(), :delete_old_messages, timer)

Logger.info("Janitor started")
{:ok, state}
end

@table_name :"syn_registry_by_name_Elixir.Realtime.Tenants.Connect"
@impl true
def handle_info(:delete_old_messages, state) do
Logger.info("Janitor started")
%{chunks: chunks, tasks: tasks} = state

{:ok, new_tasks} =
Repo.transaction(fn ->
Tenants.list_active_tenants()
|> Stream.map(&elem(&1, 0))
|> Stream.chunk_every(chunks)
|> Stream.map(fn chunks ->
task =
Task.Supervisor.async_nolink(
__MODULE__.TaskSupervisor,
fn -> run_cleanup_on_tenants(chunks) end,
ordered: false
)

{task.ref, chunks}
end)
|> Map.new()
matchspec = [
{{:"$1", :"$2", :"$3", :"$4", :"$5", Node.self()}, [], [:"$1"]}
]

new_tasks =
:ets.select(@table_name, matchspec)
|> Stream.chunk_every(chunks)
|> Stream.map(fn chunks ->
task =
Task.Supervisor.async_nolink(
__MODULE__.TaskSupervisor,
fn -> perform_mantaince_tasks(chunks) end,
ordered: false
)

{task.ref, chunks}
end)
|> Map.new()

Process.send_after(self(), :delete_old_messages, timer(state))

Expand All @@ -93,7 +96,7 @@ defmodule Realtime.Tenants.Janitor do

def handle_info({:DOWN, ref, _, _, :killed}, state) do
%{tasks: tasks} = state
{tenants, tasks} = Map.pop(tasks, ref)
tenants = Map.get(tasks, ref)

log_error(
"JanitorFailedToDeleteOldMessages",
Expand All @@ -110,18 +113,19 @@ defmodule Realtime.Tenants.Janitor do
defp timer(%{timer: timer, randomize: true}), do: timer + :timer.minutes(Enum.random(1..59))
defp timer(%{timer: timer}), do: timer

defp run_cleanup_on_tenants(tenants), do: Enum.map(tenants, &run_cleanup_on_tenant/1)
defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1)

defp run_cleanup_on_tenant(tenant_external_id) do
defp perform_mantaince_task(tenant_external_id) do
Logger.metadata(project: tenant_external_id, external_id: tenant_external_id)
Logger.info("Janitor starting realtime.messages cleanup")

with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
{:ok, conn} <- Database.connect(tenant, "realtime_janitor", 1),
:ok <- Messages.delete_old_messages(conn) do
:ok <- Messages.delete_old_messages(conn),
:ok <- Migrations.create_partitions(conn) do
Logger.info("Janitor finished")

GenServer.stop(conn)
Tenants.untrack_active_tenant(tenant_external_id)
:ok
end
end
Expand Down
8 changes: 0 additions & 8 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Realtime.Tenants.ConnectTest do
# async: false due to the fact that multiple operations against the database will use the same connection
alias Realtime.Tenants
use Realtime.DataCase, async: false

import Mock
Expand All @@ -22,13 +21,6 @@ defmodule Realtime.Tenants.ConnectTest do
assert is_pid(db_conn)
end

test "on connect, tracks tenant as active", %{tenant: tenant} do
assert {:ok, _} = Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(500)

assert Enum.find(Tenants.list_active_tenants(), &(elem(&1, 0) == tenant.external_id))
end

test "on database disconnect, returns new connection", %{tenant: tenant} do
assert {:ok, old_conn} = Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(500)
Expand Down
53 changes: 31 additions & 22 deletions test/realtime/tenants/janitor_test.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
defmodule Realtime.Tenants.JanitorTest do
# async: false due to using database process
alias Realtime.Tenants
use Realtime.DataCase, async: false

import Mock
import ExUnit.CaptureLog

alias Realtime.Api.Message
alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.Repo
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Janitor
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Connect

setup do
dev_tenant = Tenant |> Repo.all() |> hd()
Expand All @@ -27,13 +28,11 @@ defmodule Realtime.Tenants.JanitorTest do
dev_tenant
],
fn tenant ->
tenant = Repo.preload(tenant, [:extensions])
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)
tenant = Repo.preload(tenant, :extensions)
Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(250)
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
clean_table(conn, "realtime", "messages")
Tenants.track_active_tenant(tenant.external_id)
tenant
end
)
Expand All @@ -47,13 +46,15 @@ defmodule Realtime.Tenants.JanitorTest do
)

on_exit(fn ->
Enum.each(tenants, &Connect.shutdown(&1.external_id))
:timer.sleep(10)
Application.put_env(:realtime, :janitor_schedule_timer, timer)
end)

%{tenants: tenants}
end

test "cleans messages older than 72 hours from tenants that were active and untracks the user",
test "cleans messages older than 72 hours and creates partitions from tenants that were active and untracks the user",
%{
tenants: tenants
} do
Expand All @@ -73,20 +74,22 @@ defmodule Realtime.Tenants.JanitorTest do
|> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt))
|> MapSet.new()

start_supervised!(Janitor)
Process.sleep(500)

current =
Enum.map(tenants, fn tenant ->
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, res} = Repo.all(conn, from(m in Message), Message)
res
end)
|> List.flatten()
|> MapSet.new()
with_mock Migrations, create_partitions: fn _ -> :ok end do
start_supervised!(Janitor)
Process.sleep(500)

assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
assert Tenants.list_active_tenants() == []
current =
Enum.map(tenants, fn tenant ->
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, res} = Repo.all(conn, from(m in Message), Message)
res
end)
|> List.flatten()
|> MapSet.new()

assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
assert_called(Migrations.create_partitions(:_))
end
end

test "logs error if fails to connect to tenant" do
Expand All @@ -109,7 +112,13 @@ defmodule Realtime.Tenants.JanitorTest do
]

tenant = tenant_fixture(%{extensions: extensions})
Tenants.track_active_tenant(tenant.external_id)
# Force add a bad tenant
:ets.insert(
:"syn_registry_by_name_Elixir.Realtime.Tenants.Connect",
{tenant.external_id, :undefined, :undefined, :undefined, :undefined, Node.self()}
)

:timer.sleep(250)

assert capture_log(fn ->
start_supervised!(Janitor)
Expand Down
2 changes: 0 additions & 2 deletions test/support/data_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ defmodule Realtime.DataCase do
Sandbox.mode(Realtime.Repo, {:shared, self()})
end

:ets.match_delete(:active_tenants, :_)

{:ok, conn: Phoenix.ConnTest.build_conn()}
end

Expand Down

0 comments on commit 0147dbb

Please sign in to comment.