WebSockets Tiempo Real
Streaming de datos financieros a clientes web con WebSockets. ⏱️ 2.5 horas
¿Por qué WebSockets?
WebSockets proporcionan comunicación bidireccional full-duplex sobre una única conexión TCP. Ideal para:
- Datos en tiempo real: push de precios sin polling
- Baja latencia: conexión persistente, sin overhead HTTP
- Clientes web: browsers lo soportan nativamente
- Bidireccional: subscribe/unsubscribe desde el cliente
Para clientes nativos (C++, Rust), TCP raw con protocolo binario es más eficiente. WebSockets añade ~2-14 bytes de overhead por frame. Sin embargo, para dashboards web y APIs públicas, WebSockets es el estándar.
Cowboy WebSocket Handler
Cowboy (el servidor HTTP/WebSocket de Erlang) viene con Ranch y es la base de Phoenix. Podemos usarlo directamente:
# mix.exs
defp deps do
[
{:cowboy, "~> 2.10"},
{:jason, "~> 1.4"}
]
end
WebSocket Handler básico
defmodule MarketFeed.WebSocketHandler do
require Logger
@behaviour :cowboy_websocket
## Cowboy callbacks
# Llamado durante HTTP upgrade
@impl true
def init(req, state) do
Logger.info("WebSocket connection request")
opts = %{
idle_timeout: 60_000, # 60s sin actividad
compress: true # Habilitar compresión
}
{:cowboy_websocket, req, state, opts}
end
# Llamado cuando el WebSocket está listo
@impl true
def websocket_init(state) do
Logger.info("WebSocket connected")
# Programar heartbeat
schedule_heartbeat()
new_state = Map.merge(state, %{
subscriptions: MapSet.new(),
connected_at: System.system_time(:millisecond)
})
{:ok, new_state}
end
# Recibir frames del cliente
@impl true
def websocket_handle({:text, json}, state) do
case Jason.decode(json) do
{:ok, message} ->
handle_client_message(message, state)
{:error, _} ->
reply = Jason.encode!(%{"error" => "invalid_json"})
{[{:text, reply}], state}
end
end
def websocket_handle({:ping, data}, state) do
{[{:pong, data}], state}
end
def websocket_handle(_frame, state) do
{:ok, state}
end
# Recibir mensajes de otros procesos
@impl true
def websocket_info({:tick, symbol, price, timestamp}, state) do
if MapSet.member?(state.subscriptions, symbol) do
message = %{
"type" => "tick",
"symbol" => symbol,
"price" => price,
"timestamp" => timestamp
}
{[{:text, Jason.encode!(message)}], state}
else
{:ok, state}
end
end
def websocket_info(:heartbeat, state) do
schedule_heartbeat()
message = %{"type" => "heartbeat", "timestamp" => System.system_time(:millisecond)}
{[{:text, Jason.encode!(message)}], state}
end
def websocket_info(_info, state) do
{:ok, state}
end
# Desconexión
@impl true
def terminate(reason, _req, state) do
Logger.info("WebSocket disconnected: #{inspect(reason)}")
# Limpiar subscripciones
Enum.each(state.subscriptions, fn symbol ->
MarketFeed.PubSub.unsubscribe(symbol)
end)
:ok
end
## Manejo de mensajes del cliente
defp handle_client_message(%{"action" => "subscribe", "symbol" => symbol}, state) do
MarketFeed.PubSub.subscribe(symbol)
new_subs = MapSet.put(state.subscriptions, symbol)
reply = Jason.encode!(%{"type" => "subscribed", "symbol" => symbol})
{[{:text, reply}], %{state | subscriptions: new_subs}}
end
defp handle_client_message(%{"action" => "unsubscribe", "symbol" => symbol}, state) do
MarketFeed.PubSub.unsubscribe(symbol)
new_subs = MapSet.delete(state.subscriptions, symbol)
reply = Jason.encode!(%{"type" => "unsubscribed", "symbol" => symbol})
{[{:text, reply}], %{state | subscriptions: new_subs}}
end
defp handle_client_message(msg, state) do
Logger.warn("Unknown message: #{inspect(msg)}")
reply = Jason.encode!(%{"error" => "unknown_action"})
{[{:text, reply}], state}
end
defp schedule_heartbeat do
Process.send_after(self(), :heartbeat, 30_000)
end
end
Configurar el router
defmodule MarketFeed.Router do
def routes do
[
{"/ws", MarketFeed.WebSocketHandler, %{}},
{"/health", MarketFeed.HealthHandler, []},
{"/", :cowboy_static, {:priv_file, :market_feed, "index.html"}}
]
end
end
defmodule MarketFeed.Application do
use Application
@impl true
def start(_type, _args) do
port = Application.get_env(:market_feed, :port, 8080)
dispatch = :cowboy_router.compile([
{:_, MarketFeed.Router.routes()}
])
children = [
# PubSub para broadcast
{Registry, keys: :duplicate, name: MarketFeed.PubSub},
# Cowboy listener
{:cowboy.child_spec(
:http,
:market_feed_http,
:ranch_tcp,
%{socket_opts: [{:port, port}], num_acceptors: 100},
%{env: %{dispatch: dispatch}}
), []}
]
opts = [strategy: :one_for_one, name: MarketFeed.Supervisor]
Supervisor.start_link(children, opts)
end
end
PubSub para broadcast eficiente
defmodule MarketFeed.PubSub do
@registry __MODULE__
def subscribe(topic) do
Registry.register(@registry, topic, [])
end
def unsubscribe(topic) do
Registry.unregister(@registry, topic)
end
def broadcast(topic, message) do
Registry.dispatch(@registry, topic, fn entries ->
Enum.each(entries, fn {pid, _} ->
send(pid, message)
end)
end)
end
def subscriber_count(topic) do
Registry.count_match(@registry, topic, :_)
end
end
# Uso desde el generador de precios
defmodule MarketFeed.PriceGenerator do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
schedule_tick()
{:ok, %{prices: %{"BTCUSD" => 67500.0, "ETHUSD" => 3800.0}}}
end
def handle_info(:tick, state) do
# Generar precios aleatorios
new_prices = Enum.map(state.prices, fn {symbol, price} ->
change = (:rand.uniform() - 0.5) * 10
{symbol, price + change}
end) |> Map.new()
timestamp = System.system_time(:millisecond)
# Broadcast a todos los subscribers
Enum.each(new_prices, fn {symbol, price} ->
MarketFeed.PubSub.broadcast(symbol, {:tick, symbol, price, timestamp})
end)
schedule_tick()
{:noreply, %{state | prices: new_prices}}
end
defp schedule_tick do
Process.send_after(self(), :tick, 100) # 10 ticks/segundo
end
end
Batching para alto throughput
Enviar cada tick individualmente puede saturar el WebSocket. Batching agrupa mensajes:
defmodule MarketFeed.BatchingHandler do
@behaviour :cowboy_websocket
@batch_interval 50 # 50ms = 20 batches/segundo
@max_batch_size 100
@impl true
def websocket_init(state) do
schedule_flush()
new_state = Map.merge(state, %{
subscriptions: MapSet.new(),
pending_ticks: []
})
{:ok, new_state}
end
# Acumular ticks en lugar de enviar inmediatamente
@impl true
def websocket_info({:tick, symbol, price, timestamp}, state) do
if MapSet.member?(state.subscriptions, symbol) do
tick = %{"s" => symbol, "p" => price, "t" => timestamp}
pending = [tick | state.pending_ticks]
# Flush si alcanzamos max batch size
if length(pending) >= @max_batch_size do
send_batch(pending, state)
else
{:ok, %{state | pending_ticks: pending}}
end
else
{:ok, state}
end
end
# Flush periódico
def websocket_info(:flush, state) do
schedule_flush()
if state.pending_ticks != [] do
send_batch(state.pending_ticks, state)
else
{:ok, state}
end
end
defp send_batch(ticks, state) do
message = %{
"type" => "batch",
"ticks" => Enum.reverse(ticks)
}
json = Jason.encode!(message)
{[{:text, json}], %{state | pending_ticks: []}}
end
defp schedule_flush do
Process.send_after(self(), :flush, @batch_interval)
end
end
Compresión y formatos eficientes
Per-message compression
# Cowboy soporta compresión automática
@impl true
def init(req, state) do
opts = %{
compress: true, # Habilitar deflate
max_frame_size: 1_000_000 # 1MB max frame
}
{:cowboy_websocket, req, state, opts}
end
MessagePack para mensajes más compactos
# mix.exs
{:msgpax, "~> 2.4"}
# En el handler
defp send_tick_msgpack(tick, state) do
binary = Msgpax.pack!(tick)
{[{:binary, binary}], state}
end
# Cliente JS
# const data = msgpack.decode(new Uint8Array(event.data));
Cliente JavaScript de ejemplo
// cliente.js
class MarketFeedClient {
constructor(url) {
this.url = url;
this.ws = null;
this.handlers = new Map();
this.reconnectDelay = 1000;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectDelay = 1000;
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
this.handleMessage(msg);
};
this.ws.onclose = () => {
console.log('Disconnected, reconnecting...');
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
};
}
subscribe(symbol, callback) {
this.handlers.set(symbol, callback);
this.send({ action: 'subscribe', symbol });
}
unsubscribe(symbol) {
this.handlers.delete(symbol);
this.send({ action: 'unsubscribe', symbol });
}
send(msg) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(msg));
}
}
handleMessage(msg) {
switch (msg.type) {
case 'tick':
const handler = this.handlers.get(msg.symbol);
if (handler) handler(msg);
break;
case 'batch':
msg.ticks.forEach(tick => {
const h = this.handlers.get(tick.s);
if (h) h({ symbol: tick.s, price: tick.p, timestamp: tick.t });
});
break;
case 'heartbeat':
// Opcional: responder con pong
break;
}
}
}
// Uso
const client = new MarketFeedClient('ws://localhost:8080/ws');
client.connect();
client.subscribe('BTCUSD', (tick) => {
console.log(`BTC: $${tick.price.toFixed(2)}`);
});
Extiende el WebSocket handler para soportar order book data:
- Snapshot inicial al subscribirse
- Actualizaciones incrementales (add/modify/delete)
- Mantener estado del book en el cliente JS
Implementa autenticación para el WebSocket:
- Token JWT en el query string o primer mensaje
- Validar permisos de subscripción por usuario
- Rate limiting diferenciado por tier de usuario
- Revocación de tokens en caliente
Conexión con el proyecto final
WebSockets será una interfaz alternativa a nuestro sistema:
- Dashboards web: visualización en tiempo real
- APIs públicas: acceso estándar para integraciones
- Debugging: interfaz simple para probar el sistema
- Complemento a TCP: TCP para baja latencia, WS para conveniencia