TCP y UDP

Construye servidores de red desde cero usando las primitivas de Erlang/OTP. ⏱️ 2.5 horas

Networking en la BEAM

Erlang fue diseñado para telecomunicaciones, así que el soporte de red es de primera clase. Los sockets se integran con el modelo de actores: cada conexión puede ser manejada por un proceso dedicado.

🔴 Módulos de Erlang

Para networking usamos directamente los módulos de Erlang: :gen_tcp, :gen_udp, y :inet. Elixir no tiene wrappers propios porque los de Erlang son excelentes.

TCP: Servidor básico

TCP garantiza entrega ordenada y confiable. Ideal para datos financieros donde no puedes perder un tick.

defmodule TCPServer do
  require Logger
  
  def start(port) do
    # Opciones del socket
    opts = [
      :binary,           # Recibir datos como binarios (no listas)
      packet: :line,     # Framing por líneas
      active: false,     # Modo pasivo (recv explícito)
      reuseaddr: true    # Reusar puerto inmediatamente
    ]
    
    {:ok, listen_socket} = :gen_tcp.listen(port, opts)
    Logger.info("TCP Server listening on port #{port}")
    
    accept_loop(listen_socket)
  end
  
  defp accept_loop(listen_socket) do
    # accept bloquea hasta que llegue una conexión
    {:ok, client_socket} = :gen_tcp.accept(listen_socket)
    Logger.info("New client connected")
    
    # Spawn proceso para manejar este cliente
    spawn(fn -> handle_client(client_socket) end)
    
    # Continuar aceptando
    accept_loop(listen_socket)
  end
  
  defp handle_client(socket) do
    case :gen_tcp.recv(socket, 0) do
      {:ok, data} ->
        Logger.info("Received: #{String.trim(data)}")
        # Echo back
        :gen_tcp.send(socket, "ACK: #{data}")
        handle_client(socket)
      
      {:error, :closed} ->
        Logger.info("Client disconnected")
      
      {:error, reason} ->
        Logger.error("Error: #{inspect(reason)}")
    end
  end
end

# Iniciar: TCPServer.start(4000)
# Probar: nc localhost 4000

Modo active vs passive

Los sockets pueden operar en tres modos:

Modo Comportamiento Uso
active: false Debes llamar recv explícitamente Control total, back-pressure natural
active: true Datos llegan como mensajes al proceso Más simple, pero sin flow control
active: :once Un mensaje, luego vuelve a passive Balance: mensajes pero con control

Ejemplo con active: :once

defmodule ActiveOnceHandler do
  use GenServer
  require Logger
  
  def start_link(socket) do
    GenServer.start_link(__MODULE__, socket)
  end
  
  @impl true
  def init(socket) do
    # Activar para recibir un mensaje
    :inet.setopts(socket, [active: :once])
    {:ok, %{socket: socket, buffer: ""}}
  end
  
  @impl true
  def handle_info({:tcp, socket, data}, state) do
    Logger.info("Received: #{inspect(data)}")
    
    # Procesar datos...
    process_data(data)
    
    # Re-activar para el siguiente mensaje
    :inet.setopts(socket, [active: :once])
    {:noreply, state}
  end
  
  @impl true
  def handle_info({:tcp_closed, _socket}, state) do
    Logger.info("Connection closed")
    {:stop, :normal, state}
  end
  
  @impl true
  def handle_info({:tcp_error, _socket, reason}, state) do
    Logger.error("TCP error: #{inspect(reason)}")
    {:stop, reason, state}
  end
  
  defp process_data(data) do
    # Tu lógica aquí
    :ok
  end
end
💡 ¿Por qué active: :once?

Con active: true, un cliente malicioso puede inundar tu proceso con mensajes más rápido de lo que puedes procesar, llenando el mailbox. Con :once, controlas cuándo estás listo para el siguiente mensaje.

UDP: Para datos de mercado multicast

UDP no garantiza entrega ni orden, pero tiene menor latencia. Muchos feeds de mercado usan UDP multicast para distribución masiva.

defmodule MarketDataReceiver do
  use GenServer
  require Logger
  
  @multicast_group {239, 255, 0, 1}  # Ejemplo de grupo multicast
  @port 5000
  
  def start_link(_) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end
  
  @impl true
  def init(:ok) do
    opts = [
      :binary,
      active: true,
      reuseaddr: true,
      multicast_if: {0, 0, 0, 0},
      multicast_loop: true,
      add_membership: {@multicast_group, {0, 0, 0, 0}}
    ]
    
    {:ok, socket} = :gen_udp.open(@port, opts)
    Logger.info("Listening for multicast on #{inspect(@multicast_group)}:#{@port}")
    
    {:ok, %{socket: socket, packets: 0}}
  end
  
  @impl true
  def handle_info({:udp, _socket, ip, port, data}, state) do
    # Parsear datos de mercado
    case parse_market_data(data) do
      {:ok, tick} ->
        broadcast_tick(tick)
      
      {:error, reason} ->
        Logger.warn("Invalid packet from #{inspect(ip)}:#{port}: #{reason}")
    end
    
    {:noreply, %{state | packets: state.packets + 1}}
  end
  
  defp parse_market_data(<<
    symbol::binary-size(8),
    precio::float-big,
    volumen::unsigned-integer-32-big,
    timestamp::unsigned-integer-64-big
  >>) do
    {:ok, %{
      symbol: String.trim_trailing(symbol, <<0>>),
      precio: precio,
      volumen: volumen,
      timestamp: timestamp
    }}
  end
  
  defp parse_market_data(_), do: {:error, :invalid_format}
  
  defp broadcast_tick(tick) do
    # Enviar a subscribers
    Phoenix.PubSub.broadcast(MyApp.PubSub, "ticks", {:tick, tick})
  end
end

Opciones importantes de socket

Opción Descripción Valor típico
:binary Recibir como binario vs lista Siempre usar
nodelay: true Desactiva Nagle (TCP) Para baja latencia
packet: N Framing: prefijo de N bytes indica longitud 4 para protocolos binarios
packet: :line Framing por newline Protocolos texto
buffer: N Tamaño del buffer de usuario 65536+
recbuf: N Buffer de recepción del kernel 1048576+
sndbuf: N Buffer de envío del kernel 1048576+
# Opciones para baja latencia
opts = [
  :binary,
  packet: 4,           # 4-byte length prefix
  active: :once,
  nodelay: true,        # Disable Nagle's algorithm
  buffer: 65536,
  recbuf: 1048576,      # 1MB receive buffer
  sndbuf: 1048576       # 1MB send buffer
]
⚠️ nodelay para finanzas

El algoritmo de Nagle agrupa pequeños paquetes para eficiencia, pero añade latencia (hasta 200ms). Para datos financieros, siempre usa nodelay: true.

Ejercicio 9.1 Servidor TCP supervisado Intermedio

Convierte el TCPServer en un sistema supervisado donde:

  • Un Supervisor principal
  • Un GenServer acceptor que acepta conexiones
  • Un DynamicSupervisor para handlers de cliente
  • Cada cliente es un GenServer supervisado
Ejercicio 9.2 Price feed simulator Avanzado

Crea un simulador de feed UDP que:

  • Genera ticks aleatorios para varios símbolos
  • Envía por UDP multicast en formato binario
  • Configurable: ticks por segundo, símbolos, volatilidad

Luego crea el receiver correspondiente que parsee y muestre los ticks.

Conexión con el proyecto final

El sistema de distribución usará estas primitivas: