Ranch y Conexiones

Pool de conexiones TCP de alto rendimiento con Ranch. ⏱️ 2.5 horas

¿Por qué Ranch?

Ranch es la librería de manejo de conexiones del proyecto Cowboy, probada en producción por millones de sitios web. Proporciona:

📊 Ranch en producción

Ranch maneja millones de conexiones concurrentes en producción. WhatsApp, Discord y muchos otros sistemas críticos usan Ranch (directamente o via Cowboy/Phoenix).

Instalación

# mix.exs
defp deps do
  [
    {:ranch, "~> 2.1"}
  ]
end

Arquitectura de Ranch

# Estructura de procesos
#
# ranch_sup
# └── ranch_listener_sup (por cada listener)
#     ├── ranch_acceptors_sup
#     │   ├── acceptor 1
#     │   ├── acceptor 2
#     │   └── ... (num_acceptors)
#     └── ranch_conns_sup
#         ├── connection handler 1
#         ├── connection handler 2
#         └── ... (una por conexión)

Implementar un protocol handler

Ranch requiere implementar el behaviour :ranch_protocol:

defmodule MarketFeed.Handler do
  use GenServer
  require Logger

  # Ranch protocol callback
  def start_link(ref, transport, opts) do
    pid = :proc_lib.spawn_link(__MODULE__, :init, [{ref, transport, opts}])
    {:ok, pid}
  end

  def init({ref, transport, _opts}) do
    # Handshake: obtener el socket del acceptor
    {:ok, socket} = :ranch.handshake(ref)

    # Configurar socket
    :ok = transport.setopts(socket, [
      active: :once,
      packet: :raw,
      nodelay: true
    ])

    Logger.info("Nueva conexión aceptada")

    state = %{
      socket: socket,
      transport: transport,
      buffer: <<>>,
      subscriptions: MapSet.new()
    }

    :gen_server.enter_loop(__MODULE__, [], state)
  end

  ## Recepción de datos

  def handle_info({:tcp, socket, data}, %{transport: transport} = state) do
    # Procesar datos recibidos
    new_state = process_data(data, state)

    # Re-activar socket para siguiente mensaje
    transport.setopts(socket, [active: :once])
    {:noreply, new_state}
  end

  def handle_info({:tcp_closed, _socket}, state) do
    Logger.info("Conexión cerrada por cliente")
    {:stop, :normal, state}
  end

  def handle_info({:tcp_error, _socket, reason}, state) do
    Logger.error("Error TCP: #{inspect(reason)}")
    {:stop, reason, state}
  end

  ## Envío de datos a este cliente

  def handle_info({:tick, symbol, price, timestamp}, state) do
    if MapSet.member?(state.subscriptions, symbol) do
      message = MarketProtocol.encode_tick(symbol, price, 0, timestamp)
      state.transport.send(state.socket, message)
    end
    {:noreply, state}
  end

  ## Procesamiento de mensajes del cliente

  defp process_data(data, state) do
    buffer = state.buffer <> data
    process_buffer(%{state | buffer: buffer})
  end

  defp process_buffer(state) do
    case MarketProtocol.decode(state.buffer) do
      {:ok, message, rest} ->
        new_state = handle_message(message, %{state | buffer: rest})
        process_buffer(new_state)

      {:incomplete, _} ->
        state
    end
  end

  defp handle_message(%{type: :subscribe, symbol: symbol}, state) do
    Logger.debug("Subscribe: #{symbol}")
    MarketFeed.PubSub.subscribe(symbol)
    %{state | subscriptions: MapSet.put(state.subscriptions, symbol)}
  end

  defp handle_message(%{type: :unsubscribe, symbol: symbol}, state) do
    Logger.debug("Unsubscribe: #{symbol}")
    MarketFeed.PubSub.unsubscribe(symbol)
    %{state | subscriptions: MapSet.delete(state.subscriptions, symbol)}
  end

  defp handle_message(msg, state) do
    Logger.warn("Mensaje no manejado: #{inspect(msg)}")
    state
  end
end

Iniciar el listener

defmodule MarketFeed.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Iniciar Ranch listener
      {MarketFeed.Listener, []}
    ]

    opts = [strategy: :one_for_one, name: MarketFeed.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

defmodule MarketFeed.Listener do
  def child_spec(opts) do
    port = Keyword.get(opts, :port, 9000)
    num_acceptors = Keyword.get(opts, :num_acceptors, 100)

    :ranch.child_spec(
      :market_feed,           # Nombre del listener
      :ranch_tcp,              # Transport (TCP)
      %{socket_opts: [port: port], num_acceptors: num_acceptors},
      MarketFeed.Handler,     # Protocol handler
      []                       # Opciones para el handler
    )
  end
end

Opciones de transporte

# TCP con opciones optimizadas
transport_opts = %{
  socket_opts: [
    port: 9000,
    nodelay: true,          # Disable Nagle
    backlog: 1024,          # Pending connections queue
    send_timeout: 30_000,   # 30s send timeout
    send_timeout_close: true
  ],
  num_acceptors: 100,       # Acceptor processes
  max_connections: 10_000   # Connection limit
}

:ranch.child_spec(:my_listener, :ranch_tcp, transport_opts, MyHandler, [])

SSL/TLS

# Usar ranch_ssl para conexiones seguras
ssl_opts = %{
  socket_opts: [
    port: 9443,
    certfile: "/path/to/cert.pem",
    keyfile: "/path/to/key.pem",
    cacertfile: "/path/to/ca.pem",
    verify: :verify_peer,
    versions: [:"tlsv1.3", :"tlsv1.2"]
  ],
  num_acceptors: 100
}

:ranch.child_spec(:secure_feed, :ranch_ssl, ssl_opts, MyHandler, [])

Operaciones en runtime

# Información del listener
:ranch.info(:market_feed)
# => %{...

# Conexiones activas
:ranch.procs(:market_feed, :connections)
# => [#PID<0.123.0>, #PID<0.124.0>, ...]

# Contar conexiones
:ranch_server.count_connections(:market_feed)
# => 5432

# Suspender listener (no acepta nuevas conexiones)
:ranch.suspend_listener(:market_feed)

# Reanudar
:ranch.resume_listener(:market_feed)

# Cambiar max_connections en runtime
:ranch.set_max_connections(:market_feed, 20_000)

# Detener listener (con draining)
:ranch.stop_listener(:market_feed)

Connection draining para deploys

defmodule MarketFeed.GracefulShutdown do
  require Logger

  def drain(listener, timeout \\ 30_000) do
    Logger.info("Iniciando drain de #{listener}...")

    # 1. Dejar de aceptar nuevas conexiones
    :ranch.suspend_listener(listener)

    # 2. Notificar a conexiones existentes
    connections = :ranch.procs(listener, :connections)
    Enum.each(connections, fn pid ->
      send(pid, :shutdown_soon)
    end)

    # 3. Esperar a que cierren o timeout
    wait_for_connections(listener, timeout)

    # 4. Forzar cierre de restantes
    :ranch.stop_listener(listener)

    Logger.info("Drain completado")
  end

  defp wait_for_connections(listener, timeout) do
    deadline = System.monotonic_time(:millisecond) + timeout
    do_wait(listener, deadline)
  end

  defp do_wait(listener, deadline) do
    count = :ranch_server.count_connections(listener)

    cond do
      count == 0 ->
        :ok

      System.monotonic_time(:millisecond) > deadline ->
        Logger.warn("Timeout con #{count} conexiones restantes")
        :timeout

      true ->
        Process.sleep(100)
        do_wait(listener, deadline)
    end
  end
end

Rate limiting por conexión

defmodule MarketFeed.RateLimitedHandler do
  use GenServer

  @max_messages_per_second 1000
  @window_ms 1000

  def start_link(ref, transport, opts) do
    pid = :proc_lib.spawn_link(__MODULE__, :init, [{ref, transport, opts}])
    {:ok, pid}
  end

  def init({ref, transport, _opts}) do
    {:ok, socket} = :ranch.handshake(ref)
    transport.setopts(socket, [active: :once, nodelay: true])

    state = %{
      socket: socket,
      transport: transport,
      message_count: 0,
      window_start: System.monotonic_time(:millisecond)
    }

    :gen_server.enter_loop(__MODULE__, [], state)
  end

  def handle_info({:tcp, socket, data}, state) do
    now = System.monotonic_time(:millisecond)
    state = maybe_reset_window(state, now)

    if state.message_count >= @max_messages_per_second do
      # Rate limited - cerrar conexión o enviar error
      state.transport.send(socket, "RATE_LIMITED\n")
      {:stop, :rate_limited, state}
    else
      # Procesar mensaje
      process_message(data, state)

      state.transport.setopts(socket, [active: :once])
      {:noreply, %{state | message_count: state.message_count + 1}}
    end
  end

  defp maybe_reset_window(state, now) do
    if now - state.window_start >= @window_ms do
      %{state | message_count: 0, window_start: now}
    else
      state
    end
  end

  defp process_message(_data, _state) do
    # ... procesar mensaje
  end
end

Métricas con Telemetry

defmodule MarketFeed.Telemetry do
  def setup do
    events = [
      [:ranch, :accept, :start],
      [:ranch, :accept, :stop]
    ]

    :telemetry.attach_many(
      "market-feed-ranch",
      events,
      &handle_event/4,
      nil
    )
  end

  def handle_event([:ranch, :accept, :start], _measurements, metadata, _config) do
    # Nueva conexión aceptada
    :telemetry.execute(
      [:market_feed, :connections],
      %{count: 1},
      %{listener: metadata.listener}
    )
  end

  def handle_event([:ranch, :accept, :stop], measurements, metadata, _config) do
    # Conexión cerrada
    :telemetry.execute(
      [:market_feed, :connection_duration],
      %{duration: measurements.duration},
      %{listener: metadata.listener}
    )
  end
end
Ejercicio 11.1 Echo Server con Ranch Básico

Implementa un servidor echo simple usando Ranch:

  • Cada mensaje recibido se reenvía al cliente
  • Soporta múltiples conexiones simultáneas
  • Loguea conexiones/desconexiones
  • Mide latencia round-trip
Ejercicio 11.2 Market Data Gateway Avanzado

Construye un gateway completo de market data:

  • Acepta conexiones TCP con tu protocolo binario
  • Soporta subscripción a múltiples símbolos
  • Implementa heartbeat bidireccional
  • Rate limiting por cliente
  • Graceful shutdown con draining

Conexión con el proyecto final

Ranch será la base de nuestro servidor de distribución: