TCP y UDP
Construye servidores de red desde cero usando las primitivas de Erlang/OTP. ⏱️ 2.5 horas
Networking en la BEAM
Erlang fue diseñado para telecomunicaciones, así que el soporte de red es de primera clase. Los sockets se integran con el modelo de actores: cada conexión puede ser manejada por un proceso dedicado.
Para networking usamos directamente los módulos de Erlang:
:gen_tcp, :gen_udp, y :inet.
Elixir no tiene wrappers propios porque los de Erlang son excelentes.
TCP: Servidor básico
TCP garantiza entrega ordenada y confiable. Ideal para datos financieros donde no puedes perder un tick.
defmodule TCPServer do
require Logger
def start(port) do
# Opciones del socket
opts = [
:binary, # Recibir datos como binarios (no listas)
packet: :line, # Framing por líneas
active: false, # Modo pasivo (recv explícito)
reuseaddr: true # Reusar puerto inmediatamente
]
{:ok, listen_socket} = :gen_tcp.listen(port, opts)
Logger.info("TCP Server listening on port #{port}")
accept_loop(listen_socket)
end
defp accept_loop(listen_socket) do
# accept bloquea hasta que llegue una conexión
{:ok, client_socket} = :gen_tcp.accept(listen_socket)
Logger.info("New client connected")
# Spawn proceso para manejar este cliente
spawn(fn -> handle_client(client_socket) end)
# Continuar aceptando
accept_loop(listen_socket)
end
defp handle_client(socket) do
case :gen_tcp.recv(socket, 0) do
{:ok, data} ->
Logger.info("Received: #{String.trim(data)}")
# Echo back
:gen_tcp.send(socket, "ACK: #{data}")
handle_client(socket)
{:error, :closed} ->
Logger.info("Client disconnected")
{:error, reason} ->
Logger.error("Error: #{inspect(reason)}")
end
end
end
# Iniciar: TCPServer.start(4000)
# Probar: nc localhost 4000
Modo active vs passive
Los sockets pueden operar en tres modos:
| Modo | Comportamiento | Uso |
|---|---|---|
active: false |
Debes llamar recv explícitamente |
Control total, back-pressure natural |
active: true |
Datos llegan como mensajes al proceso | Más simple, pero sin flow control |
active: :once |
Un mensaje, luego vuelve a passive | Balance: mensajes pero con control |
Ejemplo con active: :once
defmodule ActiveOnceHandler do
use GenServer
require Logger
def start_link(socket) do
GenServer.start_link(__MODULE__, socket)
end
@impl true
def init(socket) do
# Activar para recibir un mensaje
:inet.setopts(socket, [active: :once])
{:ok, %{socket: socket, buffer: ""}}
end
@impl true
def handle_info({:tcp, socket, data}, state) do
Logger.info("Received: #{inspect(data)}")
# Procesar datos...
process_data(data)
# Re-activar para el siguiente mensaje
:inet.setopts(socket, [active: :once])
{:noreply, state}
end
@impl true
def handle_info({:tcp_closed, _socket}, state) do
Logger.info("Connection closed")
{:stop, :normal, state}
end
@impl true
def handle_info({:tcp_error, _socket, reason}, state) do
Logger.error("TCP error: #{inspect(reason)}")
{:stop, reason, state}
end
defp process_data(data) do
# Tu lógica aquí
:ok
end
end
Con active: true, un cliente malicioso puede inundar
tu proceso con mensajes más rápido de lo que puedes procesar,
llenando el mailbox. Con :once, controlas cuándo
estás listo para el siguiente mensaje.
UDP: Para datos de mercado multicast
UDP no garantiza entrega ni orden, pero tiene menor latencia. Muchos feeds de mercado usan UDP multicast para distribución masiva.
defmodule MarketDataReceiver do
use GenServer
require Logger
@multicast_group {239, 255, 0, 1} # Ejemplo de grupo multicast
@port 5000
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
opts = [
:binary,
active: true,
reuseaddr: true,
multicast_if: {0, 0, 0, 0},
multicast_loop: true,
add_membership: {@multicast_group, {0, 0, 0, 0}}
]
{:ok, socket} = :gen_udp.open(@port, opts)
Logger.info("Listening for multicast on #{inspect(@multicast_group)}:#{@port}")
{:ok, %{socket: socket, packets: 0}}
end
@impl true
def handle_info({:udp, _socket, ip, port, data}, state) do
# Parsear datos de mercado
case parse_market_data(data) do
{:ok, tick} ->
broadcast_tick(tick)
{:error, reason} ->
Logger.warn("Invalid packet from #{inspect(ip)}:#{port}: #{reason}")
end
{:noreply, %{state | packets: state.packets + 1}}
end
defp parse_market_data(<<
symbol::binary-size(8),
precio::float-big,
volumen::unsigned-integer-32-big,
timestamp::unsigned-integer-64-big
>>) do
{:ok, %{
symbol: String.trim_trailing(symbol, <<0>>),
precio: precio,
volumen: volumen,
timestamp: timestamp
}}
end
defp parse_market_data(_), do: {:error, :invalid_format}
defp broadcast_tick(tick) do
# Enviar a subscribers
Phoenix.PubSub.broadcast(MyApp.PubSub, "ticks", {:tick, tick})
end
end
Opciones importantes de socket
| Opción | Descripción | Valor típico |
|---|---|---|
:binary |
Recibir como binario vs lista | Siempre usar |
nodelay: true |
Desactiva Nagle (TCP) | Para baja latencia |
packet: N |
Framing: prefijo de N bytes indica longitud | 4 para protocolos binarios |
packet: :line |
Framing por newline | Protocolos texto |
buffer: N |
Tamaño del buffer de usuario | 65536+ |
recbuf: N |
Buffer de recepción del kernel | 1048576+ |
sndbuf: N |
Buffer de envío del kernel | 1048576+ |
# Opciones para baja latencia
opts = [
:binary,
packet: 4, # 4-byte length prefix
active: :once,
nodelay: true, # Disable Nagle's algorithm
buffer: 65536,
recbuf: 1048576, # 1MB receive buffer
sndbuf: 1048576 # 1MB send buffer
]
El algoritmo de Nagle agrupa pequeños paquetes para eficiencia,
pero añade latencia (hasta 200ms). Para datos financieros,
siempre usa nodelay: true.
Convierte el TCPServer en un sistema supervisado donde:
- Un Supervisor principal
- Un GenServer acceptor que acepta conexiones
- Un DynamicSupervisor para handlers de cliente
- Cada cliente es un GenServer supervisado
Crea un simulador de feed UDP que:
- Genera ticks aleatorios para varios símbolos
- Envía por UDP multicast en formato binario
- Configurable: ticks por segundo, símbolos, volatilidad
Luego crea el receiver correspondiente que parsee y muestre los ticks.
Conexión con el proyecto final
El sistema de distribución usará estas primitivas:
- TCP para clients: conexiones persistentes con subscribers
- UDP multicast para feeds: recepción de datos de exchanges
- active: :once: para flow control y evitar saturación
- Opciones optimizadas: nodelay, buffers grandes para baja latencia