Skip to content

Commit

Permalink
Projectors refactor (#1624)
Browse files Browse the repository at this point in the history
  • Loading branch information
CDimonaco authored Jul 13, 2023
1 parent 755ece9 commit e59a5bf
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 110 deletions.
25 changes: 16 additions & 9 deletions lib/trento/application/projectors/cluster_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ defmodule Trento.ClusterProjector do
},
fn multi ->
changeset =
ClusterReadModel.changeset(%ClusterReadModel{id: cluster_id}, %{
ClusterReadModel
|> Repo.get!(cluster_id)
|> ClusterReadModel.changeset(%{
deregistered_at: deregistered_at
})

Expand All @@ -77,10 +79,10 @@ defmodule Trento.ClusterProjector do
cluster_id: cluster_id
},
fn multi ->
cluster = Repo.get!(ClusterReadModel, cluster_id)

changeset =
ClusterReadModel.changeset(cluster, %{
ClusterReadModel
|> Repo.get!(cluster_id)
|> ClusterReadModel.changeset(%{
deregistered_at: nil
})

Expand All @@ -102,7 +104,9 @@ defmodule Trento.ClusterProjector do
},
fn multi ->
changeset =
ClusterReadModel.changeset(%ClusterReadModel{id: id}, %{
ClusterReadModel
|> Repo.get!(id)
|> ClusterReadModel.changeset(%{
name: name,
sid: sid,
additional_sids: additional_sids,
Expand Down Expand Up @@ -135,7 +139,10 @@ defmodule Trento.ClusterProjector do
)

project(%ClusterHealthChanged{cluster_id: cluster_id, health: health}, fn multi ->
changeset = ClusterReadModel.changeset(%ClusterReadModel{id: cluster_id}, %{health: health})
changeset =
ClusterReadModel
|> Repo.get!(cluster_id)
|> ClusterReadModel.changeset(%{health: health})

Ecto.Multi.update(multi, :cluster, changeset)
end)
Expand Down Expand Up @@ -185,9 +192,9 @@ defmodule Trento.ClusterProjector do
end

@impl true
def after_update(%ClusterDeregistered{cluster_id: cluster_id}, _, _) do
%ClusterReadModel{name: name} = Repo.get!(ClusterReadModel, cluster_id)

def after_update(%ClusterDeregistered{cluster_id: cluster_id}, _, %{
cluster: %ClusterReadModel{name: name}
}) do
TrentoWeb.Endpoint.broadcast("monitoring:clusters", "cluster_deregistered", %{
id: cluster_id,
name: name
Expand Down
64 changes: 29 additions & 35 deletions lib/trento/application/projectors/database_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ defmodule Trento.DatabaseProjector do
},
fn multi ->
changeset =
DatabaseReadModel.changeset(%DatabaseReadModel{id: sap_system_id}, %{health: health})
DatabaseReadModel
|> Repo.get!(sap_system_id)
|> DatabaseReadModel.changeset(%{health: health})

Ecto.Multi.update(multi, :database, changeset)
end
Expand Down Expand Up @@ -104,14 +106,13 @@ defmodule Trento.DatabaseProjector do
},
fn multi ->
changeset =
DatabaseInstanceReadModel.changeset(
%DatabaseInstanceReadModel{
sap_system_id: sap_system_id,
host_id: host_id,
instance_number: instance_number
},
%{health: health}
DatabaseInstanceReadModel
|> Repo.get_by(
sap_system_id: sap_system_id,
instance_number: instance_number,
host_id: host_id
)
|> DatabaseInstanceReadModel.changeset(%{health: health})

Ecto.Multi.update(multi, :database_instance, changeset)
end
Expand All @@ -127,17 +128,16 @@ defmodule Trento.DatabaseProjector do
},
fn multi ->
changeset =
DatabaseInstanceReadModel.changeset(
%DatabaseInstanceReadModel{
sap_system_id: sap_system_id,
host_id: host_id,
instance_number: instance_number
},
%{
system_replication: system_replication,
system_replication_status: system_replication_status
}
DatabaseInstanceReadModel
|> Repo.get_by(
sap_system_id: sap_system_id,
instance_number: instance_number,
host_id: host_id
)
|> DatabaseInstanceReadModel.changeset(%{
system_replication: system_replication,
system_replication_status: system_replication_status
})

Ecto.Multi.update(multi, :database_instance, changeset)
end
Expand All @@ -150,12 +150,9 @@ defmodule Trento.DatabaseProjector do
},
fn multi ->
changeset =
DatabaseReadModel.changeset(
%DatabaseReadModel{
id: sap_system_id
},
%{deregistered_at: deregistered_at}
)
DatabaseReadModel
|> Repo.get!(sap_system_id)
|> DatabaseReadModel.changeset(%{deregistered_at: deregistered_at})

Ecto.Multi.update(multi, :database, changeset)
end
Expand All @@ -167,13 +164,10 @@ defmodule Trento.DatabaseProjector do
health: health
},
fn multi ->
db = Repo.get!(DatabaseReadModel, sap_system_id)

changeset =
DatabaseReadModel.changeset(
db,
%{deregistered_at: nil, health: health}
)
DatabaseReadModel
|> Repo.get!(sap_system_id)
|> DatabaseReadModel.changeset(%{deregistered_at: nil, health: health})

Ecto.Multi.update(multi, :database, changeset)
end
Expand Down Expand Up @@ -323,12 +317,12 @@ defmodule Trento.DatabaseProjector do
sap_system_id: sap_system_id
},
_,
_
%{
database: %DatabaseReadModel{
sid: sid
}
}
) do
%DatabaseReadModel{
sid: sid
} = Repo.get(DatabaseReadModel, sap_system_id)

TrentoWeb.Endpoint.broadcast(
@databases_topic,
"database_deregistered",
Expand Down
86 changes: 42 additions & 44 deletions lib/trento/application/projectors/host_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ defmodule Trento.HostProjector do
},
fn multi ->
changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel
|> Repo.get!(id)
|> HostReadModel.changeset(%{
deregistered_at: deregistered_at
})

Expand All @@ -71,10 +73,10 @@ defmodule Trento.HostProjector do
host_id: id
},
fn multi ->
host = Repo.get!(HostReadModel, id)

changeset =
HostReadModel.changeset(host, %{
HostReadModel
|> Repo.get!(id)
|> HostReadModel.changeset(%{
deregistered_at: nil
})

Expand All @@ -95,7 +97,8 @@ defmodule Trento.HostProjector do

Ecto.Multi.insert(multi, :host, changeset,
on_conflict: {:replace, [:cluster_id]},
conflict_target: [:id]
conflict_target: [:id],
returning: true
)
end
)
Expand Down Expand Up @@ -132,7 +135,9 @@ defmodule Trento.HostProjector do
},
fn multi ->
changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel
|> Repo.get!(id)
|> HostReadModel.changeset(%{
hostname: hostname,
ip_addresses: ip_addresses,
agent_version: agent_version
Expand Down Expand Up @@ -163,7 +168,9 @@ defmodule Trento.HostProjector do
%HeartbeatSucceded{host_id: id},
fn multi ->
changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel
|> Repo.get!(id)
|> HostReadModel.changeset(%{
heartbeat: :passing
})

Expand All @@ -175,7 +182,9 @@ defmodule Trento.HostProjector do
%HeartbeatFailed{host_id: id},
fn multi ->
changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel
|> Repo.get!(id)
|> HostReadModel.changeset(%{
heartbeat: :critical
})

Expand All @@ -187,7 +196,9 @@ defmodule Trento.HostProjector do
%ProviderUpdated{host_id: id, provider: provider, provider_data: provider_data},
fn multi ->
changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel
|> Repo.get!(id)
|> HostReadModel.changeset(%{
provider: provider,
provider_data: handle_provider_data(provider_data)
})
Expand All @@ -205,13 +216,10 @@ defmodule Trento.HostProjector do
@impl true
@spec after_update(any, any, any) :: :ok | {:error, any}
def after_update(
%HostRegistered{host_id: id},
%HostRegistered{},
_,
_
%{host: %HostReadModel{} = host}
) do
# We need to hit the database to get the cluster_id
host = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_registered",
Expand Down Expand Up @@ -239,10 +247,8 @@ defmodule Trento.HostProjector do
def after_update(
%HostDeregistered{host_id: id},
_,
_
%{host: %HostReadModel{hostname: hostname}}
) do
%HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_deregistered",
Expand All @@ -253,32 +259,30 @@ defmodule Trento.HostProjector do
)
end

def after_update(%HostAddedToCluster{}, _, %{
host: %HostReadModel{hostname: nil}
}),
do: :ok

def after_update(
%HostAddedToCluster{host_id: id, cluster_id: cluster_id},
_,
_
) do
case Repo.get!(HostReadModel, id) do
# In case the host was not registered yet, we don't want to broadcast
%HostReadModel{hostname: nil} ->
:ok

%HostReadModel{} ->
TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_details_updated",
%{
id: id,
cluster_id: cluster_id
}
)
end
TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_details_updated",
%{
id: id,
cluster_id: cluster_id
}
)
end

def after_update(
%HostRemovedFromCluster{host_id: host_id},
_,
%{host: %Trento.HostReadModel{cluster_id: nil}}
%{host: %HostReadModel{cluster_id: nil}}
) do
TrentoWeb.Endpoint.broadcast("monitoring:hosts", "host_details_updated", %{
id: host_id,
Expand All @@ -289,7 +293,7 @@ defmodule Trento.HostProjector do
def after_update(
%HostDetailsUpdated{},
_,
%{host: host}
%{host: %HostReadModel{} = host}
) do
TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
Expand All @@ -301,10 +305,8 @@ defmodule Trento.HostProjector do
def after_update(
%HeartbeatSucceded{host_id: id},
_,
_
%{host: %HostReadModel{hostname: hostname}}
) do
%HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"heartbeat_succeded",
Expand All @@ -320,10 +322,8 @@ defmodule Trento.HostProjector do
def after_update(
%HeartbeatFailed{host_id: id},
_,
_
%{host: %HostReadModel{hostname: hostname}}
) do
%HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"heartbeat_failed",
Expand All @@ -349,12 +349,10 @@ defmodule Trento.HostProjector do
end

def after_update(
%HostChecksSelected{host_id: host_id, checks: checks},
%HostChecksSelected{checks: checks},
_,
_
%{host: %HostReadModel{selected_checks: checks} = host}
) do
host = %HostReadModel{id: host_id, selected_checks: checks}

message =
HostView.render(
"host_details_updated.json",
Expand Down
Loading

0 comments on commit e59a5bf

Please sign in to comment.