Process Groups

Agrupa procesos a través de nodos para pub/sub distribuido. ⏱️ 2.5 horas

¿Qué son los Process Groups?

:pg (Process Groups) es un módulo de OTP que permite agrupar procesos bajo nombres, incluso a través de múltiples nodos. Características:

📊 pg en sistemas financieros

Usa :pg para implementar pub/sub distribuido: agrupa subscribers por símbolo, luego broadcast a todos los miembros del grupo desde cualquier nodo. Los ticks llegan a todos los interesados sin importar en qué nodo estén.

API básica de :pg

# Iniciar pg (normalmente en tu Application)
{:ok, _} = :pg.start_link()

# Unirse a un grupo
:ok = :pg.join(:market_data, self())

# Unirse a múltiples grupos
:ok = :pg.join({:symbol, "BTCUSD"}, self())
:ok = :pg.join({:symbol, "ETHUSD"}, self())

# Obtener miembros de un grupo (solo locales)
:pg.get_local_members({:symbol, "BTCUSD"})
# => [#PID<0.123.0>, #PID<0.456.0>]

# Obtener todos los miembros (incluye remotos)
:pg.get_members({:symbol, "BTCUSD"})
# => [#PID<0.123.0>, #PID<17832.89.0>]

# Salir de un grupo
:ok = :pg.leave({:symbol, "BTCUSD"}, self())

# Listar todos los grupos conocidos
:pg.which_groups()
# => [:market_data, {:symbol, "BTCUSD"}, {:symbol, "ETHUSD"}]

Scope (namespace)

Por defecto :pg usa un scope global. Puedes crear scopes separados para diferentes dominios:

# Iniciar con scope custom
:pg.start_link(MarketData.PG)
:pg.start_link(Trading.PG)

# Operaciones en scope específico
:pg.join(MarketData.PG, {:symbol, "BTCUSD"}, self())
:pg.get_members(MarketData.PG, {:symbol, "BTCUSD"})

# En Application supervisor
children = [
  {:pg, name: MarketData.PG},
  {:pg, name: Trading.PG}
]

Broadcast distribuido

defmodule MarketData.Broadcaster do
  @pg_scope MarketData.PG

  def broadcast(symbol, message) do
    group = {:symbol, symbol}

    # Enviar a todos los miembros (locales y remotos)
    :pg.get_members(@pg_scope, group)
    |> Enum.each(fn pid ->
      send(pid, message)
    end)
  end

  # Broadcast solo a miembros locales (más eficiente)
  def broadcast_local(symbol, message) do
    group = {:symbol, symbol}

    :pg.get_local_members(@pg_scope, group)
    |> Enum.each(fn pid ->
      send(pid, message)
    end)
  end

  # Broadcast a todos los grupos de símbolos
  def broadcast_all(message) do
    :pg.which_groups(@pg_scope)
    |> Enum.filter(fn
      {:symbol, _} -> true
      _ -> false
    end)
    |> Enum.each(fn {:symbol, symbol} ->
      broadcast(symbol, message)
    end)
  end
end

Subscriber con pg

defmodule MarketData.Subscriber do
  use GenServer
  require Logger

  @pg_scope MarketData.PG

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def subscribe(pid, symbol) do
    GenServer.call(pid, {:subscribe, symbol})
  end

  def unsubscribe(pid, symbol) do
    GenServer.call(pid, {:unsubscribe, symbol})
  end

  @impl true
  def init(opts) do
    callback = Keyword.fetch!(opts, :callback)
    {:ok, %{callback: callback, subscriptions: MapSet.new()}}
  end

  @impl true
  def handle_call({:subscribe, symbol}, _from, state) do
    :ok = :pg.join(@pg_scope, {:symbol, symbol}, self())
    new_subs = MapSet.put(state.subscriptions, symbol)
    Logger.info("Subscribed to #{symbol}")
    {:reply, :ok, %{state | subscriptions: new_subs}}
  end

  @impl true
  def handle_call({:unsubscribe, symbol}, _from, state) do
    :ok = :pg.leave(@pg_scope, {:symbol, symbol}, self())
    new_subs = MapSet.delete(state.subscriptions, symbol)
    Logger.info("Unsubscribed from #{symbol}")
    {:reply, :ok, %{state | subscriptions: new_subs}}
  end

  @impl true
  def handle_info({:tick, symbol, price, timestamp}, state) do
    # Llamar al callback del usuario
    state.callback.({symbol, price, timestamp})
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, state) do
    # pg limpia automáticamente cuando el proceso muere,
    # pero podemos ser explícitos
    Enum.each(state.subscriptions, fn symbol ->
      :pg.leave(@pg_scope, {:symbol, symbol}, self())
    end)
    :ok
  end
end

Monitoreo de grupos

defmodule MarketData.GroupMonitor do
  use GenServer
  require Logger

  @pg_scope MarketData.PG

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    # Monitorear cambios en grupos
    :ok = :pg.monitor_scope(@pg_scope)
    {:ok, %{}}
  end

  @impl true
  def handle_info({:pg, :membership, @pg_scope, group, joins, leaves}, state) do
    unless joins == [] do
      Logger.info("Group #{inspect(group)}: #{length(joins)} joined")
    end

    unless leaves == [] do
      Logger.info("Group #{inspect(group)}: #{length(leaves)} left")
    end

    # Actualizar métricas
    count = length(:pg.get_members(@pg_scope, group))
    :telemetry.execute(
      [:market_data, :group, :size],
      %{count: count},
      %{group: group}
    )

    {:noreply, state}
  end
end

Patrón: pub/sub distribuido completo

defmodule MarketData.DistributedPubSub do
  use GenServer

  @pg_scope MarketData.PG

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  ## API Pública

  def subscribe(topic) do
    :pg.join(@pg_scope, topic, self())
  end

  def unsubscribe(topic) do
    :pg.leave(@pg_scope, topic, self())
  end

  def publish(topic, message) do
    GenServer.cast(__MODULE__, {:publish, topic, message})
  end

  def publish_local(topic, message) do
    GenServer.cast(__MODULE__, {:publish_local, topic, message})
  end

  def subscribers(topic) do
    :pg.get_members(@pg_scope, topic)
  end

  def local_subscribers(topic) do
    :pg.get_local_members(@pg_scope, topic)
  end

  ## Callbacks

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:publish, topic, message}, state) do
    :pg.get_members(@pg_scope, topic)
    |> send_to_all(message)

    {:noreply, state}
  end

  @impl true
  def handle_cast({:publish_local, topic, message}, state) do
    :pg.get_local_members(@pg_scope, topic)
    |> send_to_all(message)

    {:noreply, state}
  end

  defp send_to_all(pids, message) do
    Enum.each(pids, &send(&1, message))
  end
end

Optimización: broadcast jerárquico

Para miles de subscribers, enviar a cada uno individualmente desde un nodo es ineficiente. Mejor: enviar a un coordinador en cada nodo:

defmodule MarketData.HierarchicalBroadcast do
  @pg_scope MarketData.PG

  def broadcast(topic, message) do
    # 1. Obtener nodos que tienen subscribers
    nodes_with_members =
      :pg.get_members(@pg_scope, topic)
      |> Enum.map(&node/1)
      |> Enum.uniq()

    # 2. Enviar al coordinador de cada nodo
    Enum.each(nodes_with_members, fn target_node ->
      coordinator = {MarketData.LocalBroadcaster, target_node}
      GenServer.cast(coordinator, {:broadcast_local, topic, message})
    end)
  end
end

defmodule MarketData.LocalBroadcaster do
  use GenServer

  @pg_scope MarketData.PG

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:broadcast_local, topic, message}, state) do
    # Solo envía a subscribers locales
    :pg.get_local_members(@pg_scope, topic)
    |> Enum.each(&send(&1, message))

    {:noreply, state}
  end
end
💡 Cuándo usar broadcast jerárquico

Con pocos subscribers (<100), envío directo es más simple. Con muchos subscribers en múltiples nodos, el patrón jerárquico reduce mensajes entre nodos de O(N) a O(nodes).

Ejercicio 14.1 Chat distribuido con pg Intermedio

Implementa un sistema de chat usando pg:

  • Grupos por "room" (canal)
  • Usuarios pueden unirse/salir de rooms
  • Mensajes broadcast a todos en el room
  • Prueba con 2+ nodos conectados
Ejercicio 14.2 Presence distribuido Avanzado

Extiende el ejercicio anterior con presencia:

  • Trackear qué usuarios están en cada room
  • Notificar joins/leaves a todos los miembros
  • Manejar correctamente desconexiones de nodos
  • Implementar "typing indicators"

Conexión con el proyecto final

:pg será fundamental para nuestro sistema distribuido: