Ranch y Conexiones
Pool de conexiones TCP de alto rendimiento con Ranch. ⏱️ 2.5 horas
¿Por qué Ranch?
Ranch es la librería de manejo de conexiones del proyecto Cowboy, probada en producción por millones de sitios web. Proporciona:
- Pool de acceptors: múltiples procesos aceptando conexiones
- Un proceso por conexión: aislamiento total de fallos
- Hot upgrades: actualizar protocolo sin cerrar conexiones
- Soporte TCP y SSL: mismo API para ambos
- Connection draining: shutdown graceful
📊 Ranch en producción
Ranch maneja millones de conexiones concurrentes en producción. WhatsApp, Discord y muchos otros sistemas críticos usan Ranch (directamente o via Cowboy/Phoenix).
Instalación
# mix.exs
defp deps do
[
{:ranch, "~> 2.1"}
]
end
Arquitectura de Ranch
# Estructura de procesos
#
# ranch_sup
# └── ranch_listener_sup (por cada listener)
# ├── ranch_acceptors_sup
# │ ├── acceptor 1
# │ ├── acceptor 2
# │ └── ... (num_acceptors)
# └── ranch_conns_sup
# ├── connection handler 1
# ├── connection handler 2
# └── ... (una por conexión)
Implementar un protocol handler
Ranch requiere implementar el behaviour :ranch_protocol:
defmodule MarketFeed.Handler do
use GenServer
require Logger
# Ranch protocol callback
def start_link(ref, transport, opts) do
pid = :proc_lib.spawn_link(__MODULE__, :init, [{ref, transport, opts}])
{:ok, pid}
end
def init({ref, transport, _opts}) do
# Handshake: obtener el socket del acceptor
{:ok, socket} = :ranch.handshake(ref)
# Configurar socket
:ok = transport.setopts(socket, [
active: :once,
packet: :raw,
nodelay: true
])
Logger.info("Nueva conexión aceptada")
state = %{
socket: socket,
transport: transport,
buffer: <<>>,
subscriptions: MapSet.new()
}
:gen_server.enter_loop(__MODULE__, [], state)
end
## Recepción de datos
def handle_info({:tcp, socket, data}, %{transport: transport} = state) do
# Procesar datos recibidos
new_state = process_data(data, state)
# Re-activar socket para siguiente mensaje
transport.setopts(socket, [active: :once])
{:noreply, new_state}
end
def handle_info({:tcp_closed, _socket}, state) do
Logger.info("Conexión cerrada por cliente")
{:stop, :normal, state}
end
def handle_info({:tcp_error, _socket, reason}, state) do
Logger.error("Error TCP: #{inspect(reason)}")
{:stop, reason, state}
end
## Envío de datos a este cliente
def handle_info({:tick, symbol, price, timestamp}, state) do
if MapSet.member?(state.subscriptions, symbol) do
message = MarketProtocol.encode_tick(symbol, price, 0, timestamp)
state.transport.send(state.socket, message)
end
{:noreply, state}
end
## Procesamiento de mensajes del cliente
defp process_data(data, state) do
buffer = state.buffer <> data
process_buffer(%{state | buffer: buffer})
end
defp process_buffer(state) do
case MarketProtocol.decode(state.buffer) do
{:ok, message, rest} ->
new_state = handle_message(message, %{state | buffer: rest})
process_buffer(new_state)
{:incomplete, _} ->
state
end
end
defp handle_message(%{type: :subscribe, symbol: symbol}, state) do
Logger.debug("Subscribe: #{symbol}")
MarketFeed.PubSub.subscribe(symbol)
%{state | subscriptions: MapSet.put(state.subscriptions, symbol)}
end
defp handle_message(%{type: :unsubscribe, symbol: symbol}, state) do
Logger.debug("Unsubscribe: #{symbol}")
MarketFeed.PubSub.unsubscribe(symbol)
%{state | subscriptions: MapSet.delete(state.subscriptions, symbol)}
end
defp handle_message(msg, state) do
Logger.warn("Mensaje no manejado: #{inspect(msg)}")
state
end
end
Iniciar el listener
defmodule MarketFeed.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Iniciar Ranch listener
{MarketFeed.Listener, []}
]
opts = [strategy: :one_for_one, name: MarketFeed.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule MarketFeed.Listener do
def child_spec(opts) do
port = Keyword.get(opts, :port, 9000)
num_acceptors = Keyword.get(opts, :num_acceptors, 100)
:ranch.child_spec(
:market_feed, # Nombre del listener
:ranch_tcp, # Transport (TCP)
%{socket_opts: [port: port], num_acceptors: num_acceptors},
MarketFeed.Handler, # Protocol handler
[] # Opciones para el handler
)
end
end
Opciones de transporte
# TCP con opciones optimizadas
transport_opts = %{
socket_opts: [
port: 9000,
nodelay: true, # Disable Nagle
backlog: 1024, # Pending connections queue
send_timeout: 30_000, # 30s send timeout
send_timeout_close: true
],
num_acceptors: 100, # Acceptor processes
max_connections: 10_000 # Connection limit
}
:ranch.child_spec(:my_listener, :ranch_tcp, transport_opts, MyHandler, [])
SSL/TLS
# Usar ranch_ssl para conexiones seguras
ssl_opts = %{
socket_opts: [
port: 9443,
certfile: "/path/to/cert.pem",
keyfile: "/path/to/key.pem",
cacertfile: "/path/to/ca.pem",
verify: :verify_peer,
versions: [:"tlsv1.3", :"tlsv1.2"]
],
num_acceptors: 100
}
:ranch.child_spec(:secure_feed, :ranch_ssl, ssl_opts, MyHandler, [])
Operaciones en runtime
# Información del listener
:ranch.info(:market_feed)
# => %{...
# Conexiones activas
:ranch.procs(:market_feed, :connections)
# => [#PID<0.123.0>, #PID<0.124.0>, ...]
# Contar conexiones
:ranch_server.count_connections(:market_feed)
# => 5432
# Suspender listener (no acepta nuevas conexiones)
:ranch.suspend_listener(:market_feed)
# Reanudar
:ranch.resume_listener(:market_feed)
# Cambiar max_connections en runtime
:ranch.set_max_connections(:market_feed, 20_000)
# Detener listener (con draining)
:ranch.stop_listener(:market_feed)
Connection draining para deploys
defmodule MarketFeed.GracefulShutdown do
require Logger
def drain(listener, timeout \\ 30_000) do
Logger.info("Iniciando drain de #{listener}...")
# 1. Dejar de aceptar nuevas conexiones
:ranch.suspend_listener(listener)
# 2. Notificar a conexiones existentes
connections = :ranch.procs(listener, :connections)
Enum.each(connections, fn pid ->
send(pid, :shutdown_soon)
end)
# 3. Esperar a que cierren o timeout
wait_for_connections(listener, timeout)
# 4. Forzar cierre de restantes
:ranch.stop_listener(listener)
Logger.info("Drain completado")
end
defp wait_for_connections(listener, timeout) do
deadline = System.monotonic_time(:millisecond) + timeout
do_wait(listener, deadline)
end
defp do_wait(listener, deadline) do
count = :ranch_server.count_connections(listener)
cond do
count == 0 ->
:ok
System.monotonic_time(:millisecond) > deadline ->
Logger.warn("Timeout con #{count} conexiones restantes")
:timeout
true ->
Process.sleep(100)
do_wait(listener, deadline)
end
end
end
Rate limiting por conexión
defmodule MarketFeed.RateLimitedHandler do
use GenServer
@max_messages_per_second 1000
@window_ms 1000
def start_link(ref, transport, opts) do
pid = :proc_lib.spawn_link(__MODULE__, :init, [{ref, transport, opts}])
{:ok, pid}
end
def init({ref, transport, _opts}) do
{:ok, socket} = :ranch.handshake(ref)
transport.setopts(socket, [active: :once, nodelay: true])
state = %{
socket: socket,
transport: transport,
message_count: 0,
window_start: System.monotonic_time(:millisecond)
}
:gen_server.enter_loop(__MODULE__, [], state)
end
def handle_info({:tcp, socket, data}, state) do
now = System.monotonic_time(:millisecond)
state = maybe_reset_window(state, now)
if state.message_count >= @max_messages_per_second do
# Rate limited - cerrar conexión o enviar error
state.transport.send(socket, "RATE_LIMITED\n")
{:stop, :rate_limited, state}
else
# Procesar mensaje
process_message(data, state)
state.transport.setopts(socket, [active: :once])
{:noreply, %{state | message_count: state.message_count + 1}}
end
end
defp maybe_reset_window(state, now) do
if now - state.window_start >= @window_ms do
%{state | message_count: 0, window_start: now}
else
state
end
end
defp process_message(_data, _state) do
# ... procesar mensaje
end
end
Métricas con Telemetry
defmodule MarketFeed.Telemetry do
def setup do
events = [
[:ranch, :accept, :start],
[:ranch, :accept, :stop]
]
:telemetry.attach_many(
"market-feed-ranch",
events,
&handle_event/4,
nil
)
end
def handle_event([:ranch, :accept, :start], _measurements, metadata, _config) do
# Nueva conexión aceptada
:telemetry.execute(
[:market_feed, :connections],
%{count: 1},
%{listener: metadata.listener}
)
end
def handle_event([:ranch, :accept, :stop], measurements, metadata, _config) do
# Conexión cerrada
:telemetry.execute(
[:market_feed, :connection_duration],
%{duration: measurements.duration},
%{listener: metadata.listener}
)
end
end
Ejercicio 11.1
Echo Server con Ranch
Básico
Implementa un servidor echo simple usando Ranch:
- Cada mensaje recibido se reenvía al cliente
- Soporta múltiples conexiones simultáneas
- Loguea conexiones/desconexiones
- Mide latencia round-trip
Ejercicio 11.2
Market Data Gateway
Avanzado
Construye un gateway completo de market data:
- Acepta conexiones TCP con tu protocolo binario
- Soporta subscripción a múltiples símbolos
- Implementa heartbeat bidireccional
- Rate limiting por cliente
- Graceful shutdown con draining
Conexión con el proyecto final
Ranch será la base de nuestro servidor de distribución:
- Listener principal: acepta subscribers en puerto TCP
- Un handler por subscriber: aislamiento total de fallos
- Connection draining: deploys sin pérdida de datos
- Métricas integradas: conexiones activas, latencia, throughput