GenServer
Domina la abstracción central de OTP para procesos con estado. ⏱️ 2.5 horas
¿Por qué GenServer?
En el capítulo 2 vimos cómo crear procesos con estado usando
spawn y recursión. GenServer es una abstracción que
estandariza ese patrón y añade:
- Integración con supervisores
- Manejo de timeouts y heartbeats
- Debugging y tracing automático
- Shutdown graceful
- API consistente (call/cast)
GenServer significa "Generic Server" y viene de OTP (Open Telecom Platform), el framework de Erlang para aplicaciones robustas.
Anatomía de un GenServer
defmodule PriceCache do
use GenServer
## API Cliente (funciones que llama el usuario)
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def get(pid, symbol) do
GenServer.call(pid, {:get, symbol})
end
def put(pid, symbol, precio) do
GenServer.cast(pid, {:put, symbol, precio})
end
## Callbacks del servidor (manejo de mensajes)
@impl true
def init(:ok) do
{:ok, %{}} # Estado inicial: map vacío
end
@impl true
def handle_call({:get, symbol}, _from, state) do
{:reply, Map.get(state, symbol), state}
end
@impl true
def handle_cast({:put, symbol, precio}, state) do
{:noreply, Map.put(state, symbol, precio)}
end
end
call vs cast
| GenServer.call | GenServer.cast | |
|---|---|---|
| Tipo | Síncrono | Asíncrono |
| Espera respuesta | Sí (bloquea) | No (fire & forget) |
| Callback | handle_call/3 |
handle_cast/2 |
| Retorno | {:reply, resp, state} |
{:noreply, state} |
| Timeout | 5000ms default | N/A |
| Uso | Queries, operaciones críticas | Updates, notificaciones |
# call: espera respuesta
precio = PriceCache.get(pid, "BTCUSD")
# Bloquea hasta recibir respuesta o timeout (5s por defecto)
# cast: no espera
PriceCache.put(pid, "BTCUSD", 67543.21)
# Retorna :ok inmediatamente, no sabemos si se procesó
# call con timeout personalizado
GenServer.call(pid, :operacion_lenta, 30_000) # 30 segundos
call: cuando necesitas el resultado o confirmar que la operación se completó.
cast: para updates donde no necesitas confirmación. Más eficiente, pero sin garantías.
En sistemas financieros, prefiere call para operaciones
críticas (trades, confirmaciones) y cast para
actualizaciones de alta frecuencia (ticks de precio).
handle_info: mensajes arbitrarios
handle_info/2 maneja mensajes que no vienen de call/cast:
mensajes de otros procesos, timers, señales del sistema, etc.
defmodule PriceMonitor do
use GenServer
def start_link(symbol) do
GenServer.start_link(__MODULE__, symbol)
end
@impl true
def init(symbol) do
# Programar check periódico
schedule_check()
{:ok, %{symbol: symbol, ultimo_precio: nil, checks: 0}}
end
@impl true
def handle_info(:check, state) do
IO.puts("[#{state.symbol}] Check ##{state.checks + 1}")
schedule_check()
{:noreply, %{state | checks: state.checks + 1}}
end
# Recibir mensajes de otros procesos
@impl true
def handle_info({:precio, precio}, state) do
IO.puts("[#{state.symbol}] Nuevo precio: $#{precio}")
{:noreply, %{state | ultimo_precio: precio}}
end
# Catch-all para mensajes desconocidos
@impl true
def handle_info(msg, state) do
IO.puts("Mensaje desconocido: #{inspect(msg)}")
{:noreply, state}
end
defp schedule_check do
Process.send_after(self(), :check, 5000)
end
end
Nombrar GenServers
En lugar de pasar PIDs, puedes registrar GenServers con nombres:
Nombres locales (átomo)
# Iniciar con nombre
GenServer.start_link(PriceCache, :ok, name: PriceCache)
# Usar el nombre en lugar del PID
PriceCache.get(PriceCache, "BTCUSD")
# O definirlo en start_link
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
Registry: nombres dinámicos
Para múltiples instancias del mismo GenServer (ej: un worker por símbolo), usa Registry:
# En tu Application supervisor, añadir el Registry
children = [
{Registry, keys: :unique, name: PriceWorkerRegistry},
# ... otros hijos
]
# En el GenServer
defmodule PriceWorker do
use GenServer
def start_link(symbol) do
GenServer.start_link(__MODULE__, symbol, name: via_tuple(symbol))
end
def get_price(symbol) do
GenServer.call(via_tuple(symbol), :get_price)
end
defp via_tuple(symbol) do
{:via, Registry, {PriceWorkerRegistry, symbol}}
end
# ... callbacks
end
# Uso
PriceWorker.start_link("BTCUSD")
PriceWorker.start_link("ETHUSD")
PriceWorker.get_price("BTCUSD") # Encuentra el worker por símbolo
Valores de retorno de callbacks
handle_call/3
# Responder y continuar
{:reply, respuesta, nuevo_estado}
# Responder y terminar el proceso
{:stop, razon, respuesta, nuevo_estado}
# No responder ahora (responderás después con GenServer.reply)
{:noreply, nuevo_estado}
# Continuar con timeout (handle_info(:timeout, state) se llamará)
{:reply, respuesta, nuevo_estado, timeout_ms}
# Hibernate para liberar memoria
{:reply, respuesta, nuevo_estado, :hibernate}
handle_cast/2 y handle_info/2
# Continuar normalmente
{:noreply, nuevo_estado}
# Terminar el proceso
{:stop, razon, nuevo_estado}
# Con timeout
{:noreply, nuevo_estado, timeout_ms}
# Hibernate
{:noreply, nuevo_estado, :hibernate}
:hibernate hace que el proceso libere su stack y haga
un GC completo. Útil para procesos que reciben mensajes infrecuentes.
El siguiente mensaje lo "despierta" con algo más de latencia.
Ejemplo completo: Price Feed Handler
defmodule PriceFeedHandler do
use GenServer
require Logger
@heartbeat_interval 30_000 # 30 segundos
@stale_threshold 60_000 # 1 minuto sin datos = stale
defstruct [
:symbol,
:last_price,
:last_update,
:tick_count,
subscribers: []
]
## API Cliente
def start_link(symbol) do
GenServer.start_link(__MODULE__, symbol, name: via(symbol))
end
def update_price(symbol, precio) do
GenServer.cast(via(symbol), {:update, precio})
end
def subscribe(symbol) do
GenServer.call(via(symbol), {:subscribe, self()})
end
def get_state(symbol) do
GenServer.call(via(symbol), :get_state)
end
defp via(symbol), do: {:via, Registry, {FeedRegistry, symbol}}
## Callbacks
@impl true
def init(symbol) do
Logger.info("[#{symbol}] Feed handler started")
schedule_heartbeat()
state = %__MODULE__{
symbol: symbol,
last_price: nil,
last_update: nil,
tick_count: 0
}
{:ok, state}
end
@impl true
def handle_cast({:update, precio}, state) do
now = System.monotonic_time(:millisecond)
# Notificar a subscribers
Enum.each(state.subscribers, fn pid ->
send(pid, {:tick, state.symbol, precio, now})
end)
new_state = %{state |
last_price: precio,
last_update: now,
tick_count: state.tick_count + 1
}
{:noreply, new_state}
end
@impl true
def handle_call({:subscribe, pid}, _from, state) do
Process.monitor(pid)
new_subs = [pid | state.subscribers] |> Enum.uniq()
{:reply, :ok, %{state | subscribers: new_subs}}
end
@impl true
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
@impl true
def handle_info(:heartbeat, state) do
now = System.monotonic_time(:millisecond)
cond do
state.last_update == nil ->
Logger.warn("[#{state.symbol}] No data received yet")
now - state.last_update > @stale_threshold ->
Logger.error("[#{state.symbol}] Data is STALE!")
true ->
Logger.debug("[#{state.symbol}] Heartbeat OK, #{state.tick_count} ticks")
end
schedule_heartbeat()
{:noreply, state}
end
# Subscriber died
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
Logger.info("[#{state.symbol}] Subscriber #{inspect(pid)} disconnected")
new_subs = Enum.reject(state.subscribers, &(&1 == pid))
{:noreply, %{state | subscribers: new_subs}}
end
defp schedule_heartbeat do
Process.send_after(self(), :heartbeat, @heartbeat_interval)
end
end
Implementa un GenServer que actúe como rate limiter:
check(limiter, key)- retorna:oko:rate_limited- Permite máximo N requests por ventana de tiempo (configurable)
- Usa
handle_infopara limpiar ventanas expiradas
Crea un GenServer que mantenga un order book simplificado:
- Mantiene listas de bids y asks ordenadas
add_order(symbol, side, price, qty)cancel_order(symbol, order_id)get_top_of_book(symbol)- mejor bid/askget_depth(symbol, levels)- N niveles de profundidad
Conexión con el proyecto final
GenServer será el building block principal:
- FeedHandler: un GenServer por cada feed de datos
- SubscriberConnection: un GenServer por cada cliente conectado
- PriceAggregator: GenServer que consolida precios de múltiples feeds
- Todos supervisados: para auto-recuperación de fallos