Process Groups
Agrupa procesos a través de nodos para pub/sub distribuido. ⏱️ 2.5 horas
¿Qué son los Process Groups?
:pg (Process Groups) es un módulo de OTP que permite agrupar
procesos bajo nombres, incluso a través de múltiples nodos. Características:
- Distribuido: grupos visibles en todo el cluster
- Eventualmente consistente: sincroniza automáticamente
- Tolerante a fallos: detecta procesos/nodos muertos
- Eficiente: replicación local para consultas rápidas
Usa :pg para implementar pub/sub distribuido: agrupa
subscribers por símbolo, luego broadcast a todos los miembros del
grupo desde cualquier nodo. Los ticks llegan a todos los interesados
sin importar en qué nodo estén.
API básica de :pg
# Iniciar pg (normalmente en tu Application)
{:ok, _} = :pg.start_link()
# Unirse a un grupo
:ok = :pg.join(:market_data, self())
# Unirse a múltiples grupos
:ok = :pg.join({:symbol, "BTCUSD"}, self())
:ok = :pg.join({:symbol, "ETHUSD"}, self())
# Obtener miembros de un grupo (solo locales)
:pg.get_local_members({:symbol, "BTCUSD"})
# => [#PID<0.123.0>, #PID<0.456.0>]
# Obtener todos los miembros (incluye remotos)
:pg.get_members({:symbol, "BTCUSD"})
# => [#PID<0.123.0>, #PID<17832.89.0>]
# Salir de un grupo
:ok = :pg.leave({:symbol, "BTCUSD"}, self())
# Listar todos los grupos conocidos
:pg.which_groups()
# => [:market_data, {:symbol, "BTCUSD"}, {:symbol, "ETHUSD"}]
Scope (namespace)
Por defecto :pg usa un scope global. Puedes crear scopes
separados para diferentes dominios:
# Iniciar con scope custom
:pg.start_link(MarketData.PG)
:pg.start_link(Trading.PG)
# Operaciones en scope específico
:pg.join(MarketData.PG, {:symbol, "BTCUSD"}, self())
:pg.get_members(MarketData.PG, {:symbol, "BTCUSD"})
# En Application supervisor
children = [
{:pg, name: MarketData.PG},
{:pg, name: Trading.PG}
]
Broadcast distribuido
defmodule MarketData.Broadcaster do
@pg_scope MarketData.PG
def broadcast(symbol, message) do
group = {:symbol, symbol}
# Enviar a todos los miembros (locales y remotos)
:pg.get_members(@pg_scope, group)
|> Enum.each(fn pid ->
send(pid, message)
end)
end
# Broadcast solo a miembros locales (más eficiente)
def broadcast_local(symbol, message) do
group = {:symbol, symbol}
:pg.get_local_members(@pg_scope, group)
|> Enum.each(fn pid ->
send(pid, message)
end)
end
# Broadcast a todos los grupos de símbolos
def broadcast_all(message) do
:pg.which_groups(@pg_scope)
|> Enum.filter(fn
{:symbol, _} -> true
_ -> false
end)
|> Enum.each(fn {:symbol, symbol} ->
broadcast(symbol, message)
end)
end
end
Subscriber con pg
defmodule MarketData.Subscriber do
use GenServer
require Logger
@pg_scope MarketData.PG
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
def subscribe(pid, symbol) do
GenServer.call(pid, {:subscribe, symbol})
end
def unsubscribe(pid, symbol) do
GenServer.call(pid, {:unsubscribe, symbol})
end
@impl true
def init(opts) do
callback = Keyword.fetch!(opts, :callback)
{:ok, %{callback: callback, subscriptions: MapSet.new()}}
end
@impl true
def handle_call({:subscribe, symbol}, _from, state) do
:ok = :pg.join(@pg_scope, {:symbol, symbol}, self())
new_subs = MapSet.put(state.subscriptions, symbol)
Logger.info("Subscribed to #{symbol}")
{:reply, :ok, %{state | subscriptions: new_subs}}
end
@impl true
def handle_call({:unsubscribe, symbol}, _from, state) do
:ok = :pg.leave(@pg_scope, {:symbol, symbol}, self())
new_subs = MapSet.delete(state.subscriptions, symbol)
Logger.info("Unsubscribed from #{symbol}")
{:reply, :ok, %{state | subscriptions: new_subs}}
end
@impl true
def handle_info({:tick, symbol, price, timestamp}, state) do
# Llamar al callback del usuario
state.callback.({symbol, price, timestamp})
{:noreply, state}
end
@impl true
def terminate(_reason, state) do
# pg limpia automáticamente cuando el proceso muere,
# pero podemos ser explícitos
Enum.each(state.subscriptions, fn symbol ->
:pg.leave(@pg_scope, {:symbol, symbol}, self())
end)
:ok
end
end
Monitoreo de grupos
defmodule MarketData.GroupMonitor do
use GenServer
require Logger
@pg_scope MarketData.PG
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_) do
# Monitorear cambios en grupos
:ok = :pg.monitor_scope(@pg_scope)
{:ok, %{}}
end
@impl true
def handle_info({:pg, :membership, @pg_scope, group, joins, leaves}, state) do
unless joins == [] do
Logger.info("Group #{inspect(group)}: #{length(joins)} joined")
end
unless leaves == [] do
Logger.info("Group #{inspect(group)}: #{length(leaves)} left")
end
# Actualizar métricas
count = length(:pg.get_members(@pg_scope, group))
:telemetry.execute(
[:market_data, :group, :size],
%{count: count},
%{group: group}
)
{:noreply, state}
end
end
Patrón: pub/sub distribuido completo
defmodule MarketData.DistributedPubSub do
use GenServer
@pg_scope MarketData.PG
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
## API Pública
def subscribe(topic) do
:pg.join(@pg_scope, topic, self())
end
def unsubscribe(topic) do
:pg.leave(@pg_scope, topic, self())
end
def publish(topic, message) do
GenServer.cast(__MODULE__, {:publish, topic, message})
end
def publish_local(topic, message) do
GenServer.cast(__MODULE__, {:publish_local, topic, message})
end
def subscribers(topic) do
:pg.get_members(@pg_scope, topic)
end
def local_subscribers(topic) do
:pg.get_local_members(@pg_scope, topic)
end
## Callbacks
@impl true
def init(_opts) do
{:ok, %{}}
end
@impl true
def handle_cast({:publish, topic, message}, state) do
:pg.get_members(@pg_scope, topic)
|> send_to_all(message)
{:noreply, state}
end
@impl true
def handle_cast({:publish_local, topic, message}, state) do
:pg.get_local_members(@pg_scope, topic)
|> send_to_all(message)
{:noreply, state}
end
defp send_to_all(pids, message) do
Enum.each(pids, &send(&1, message))
end
end
Optimización: broadcast jerárquico
Para miles de subscribers, enviar a cada uno individualmente desde un nodo es ineficiente. Mejor: enviar a un coordinador en cada nodo:
defmodule MarketData.HierarchicalBroadcast do
@pg_scope MarketData.PG
def broadcast(topic, message) do
# 1. Obtener nodos que tienen subscribers
nodes_with_members =
:pg.get_members(@pg_scope, topic)
|> Enum.map(&node/1)
|> Enum.uniq()
# 2. Enviar al coordinador de cada nodo
Enum.each(nodes_with_members, fn target_node ->
coordinator = {MarketData.LocalBroadcaster, target_node}
GenServer.cast(coordinator, {:broadcast_local, topic, message})
end)
end
end
defmodule MarketData.LocalBroadcaster do
use GenServer
@pg_scope MarketData.PG
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_) do
{:ok, %{}}
end
@impl true
def handle_cast({:broadcast_local, topic, message}, state) do
# Solo envía a subscribers locales
:pg.get_local_members(@pg_scope, topic)
|> Enum.each(&send(&1, message))
{:noreply, state}
end
end
Con pocos subscribers (<100), envío directo es más simple. Con muchos subscribers en múltiples nodos, el patrón jerárquico reduce mensajes entre nodos de O(N) a O(nodes).
Implementa un sistema de chat usando pg:
- Grupos por "room" (canal)
- Usuarios pueden unirse/salir de rooms
- Mensajes broadcast a todos en el room
- Prueba con 2+ nodos conectados
Extiende el ejercicio anterior con presencia:
- Trackear qué usuarios están en cada room
- Notificar joins/leaves a todos los miembros
- Manejar correctamente desconexiones de nodos
- Implementar "typing indicators"
Conexión con el proyecto final
:pg será fundamental para nuestro sistema distribuido:
- Grupos por símbolo: subscribers agrupados por instrumento
- Broadcast eficiente: ticks a todos los interesados
- Escalabilidad horizontal: añadir nodos aumenta capacidad
- Failover automático: pg detecta nodos/procesos muertos