WebSockets Tiempo Real

Streaming de datos financieros a clientes web con WebSockets. ⏱️ 2.5 horas

¿Por qué WebSockets?

WebSockets proporcionan comunicación bidireccional full-duplex sobre una única conexión TCP. Ideal para:

📊 WebSockets vs TCP raw

Para clientes nativos (C++, Rust), TCP raw con protocolo binario es más eficiente. WebSockets añade ~2-14 bytes de overhead por frame. Sin embargo, para dashboards web y APIs públicas, WebSockets es el estándar.

Cowboy WebSocket Handler

Cowboy (el servidor HTTP/WebSocket de Erlang) viene con Ranch y es la base de Phoenix. Podemos usarlo directamente:

# mix.exs
defp deps do
  [
    {:cowboy, "~> 2.10"},
    {:jason, "~> 1.4"}
  ]
end

WebSocket Handler básico

defmodule MarketFeed.WebSocketHandler do
  require Logger

  @behaviour :cowboy_websocket

  ## Cowboy callbacks

  # Llamado durante HTTP upgrade
  @impl true
  def init(req, state) do
    Logger.info("WebSocket connection request")

    opts = %{
      idle_timeout: 60_000,      # 60s sin actividad
      compress: true             # Habilitar compresión
    }

    {:cowboy_websocket, req, state, opts}
  end

  # Llamado cuando el WebSocket está listo
  @impl true
  def websocket_init(state) do
    Logger.info("WebSocket connected")

    # Programar heartbeat
    schedule_heartbeat()

    new_state = Map.merge(state, %{
      subscriptions: MapSet.new(),
      connected_at: System.system_time(:millisecond)
    })

    {:ok, new_state}
  end

  # Recibir frames del cliente
  @impl true
  def websocket_handle({:text, json}, state) do
    case Jason.decode(json) do
      {:ok, message} ->
        handle_client_message(message, state)

      {:error, _} ->
        reply = Jason.encode!(%{"error" => "invalid_json"})
        {[{:text, reply}], state}
    end
  end

  def websocket_handle({:ping, data}, state) do
    {[{:pong, data}], state}
  end

  def websocket_handle(_frame, state) do
    {:ok, state}
  end

  # Recibir mensajes de otros procesos
  @impl true
  def websocket_info({:tick, symbol, price, timestamp}, state) do
    if MapSet.member?(state.subscriptions, symbol) do
      message = %{
        "type" => "tick",
        "symbol" => symbol,
        "price" => price,
        "timestamp" => timestamp
      }
      {[{:text, Jason.encode!(message)}], state}
    else
      {:ok, state}
    end
  end

  def websocket_info(:heartbeat, state) do
    schedule_heartbeat()
    message = %{"type" => "heartbeat", "timestamp" => System.system_time(:millisecond)}
    {[{:text, Jason.encode!(message)}], state}
  end

  def websocket_info(_info, state) do
    {:ok, state}
  end

  # Desconexión
  @impl true
  def terminate(reason, _req, state) do
    Logger.info("WebSocket disconnected: #{inspect(reason)}")

    # Limpiar subscripciones
    Enum.each(state.subscriptions, fn symbol ->
      MarketFeed.PubSub.unsubscribe(symbol)
    end)

    :ok
  end

  ## Manejo de mensajes del cliente

  defp handle_client_message(%{"action" => "subscribe", "symbol" => symbol}, state) do
    MarketFeed.PubSub.subscribe(symbol)
    new_subs = MapSet.put(state.subscriptions, symbol)

    reply = Jason.encode!(%{"type" => "subscribed", "symbol" => symbol})
    {[{:text, reply}], %{state | subscriptions: new_subs}}
  end

  defp handle_client_message(%{"action" => "unsubscribe", "symbol" => symbol}, state) do
    MarketFeed.PubSub.unsubscribe(symbol)
    new_subs = MapSet.delete(state.subscriptions, symbol)

    reply = Jason.encode!(%{"type" => "unsubscribed", "symbol" => symbol})
    {[{:text, reply}], %{state | subscriptions: new_subs}}
  end

  defp handle_client_message(msg, state) do
    Logger.warn("Unknown message: #{inspect(msg)}")
    reply = Jason.encode!(%{"error" => "unknown_action"})
    {[{:text, reply}], state}
  end

  defp schedule_heartbeat do
    Process.send_after(self(), :heartbeat, 30_000)
  end
end

Configurar el router

defmodule MarketFeed.Router do
  def routes do
    [
      {"/ws", MarketFeed.WebSocketHandler, %{}},
      {"/health", MarketFeed.HealthHandler, []},
      {"/", :cowboy_static, {:priv_file, :market_feed, "index.html"}}
    ]
  end
end

defmodule MarketFeed.Application do
  use Application

  @impl true
  def start(_type, _args) do
    port = Application.get_env(:market_feed, :port, 8080)

    dispatch = :cowboy_router.compile([
      {:_, MarketFeed.Router.routes()}
    ])

    children = [
      # PubSub para broadcast
      {Registry, keys: :duplicate, name: MarketFeed.PubSub},

      # Cowboy listener
      {:cowboy.child_spec(
        :http,
        :market_feed_http,
        :ranch_tcp,
        %{socket_opts: [{:port, port}], num_acceptors: 100},
        %{env: %{dispatch: dispatch}}
      ), []}
    ]

    opts = [strategy: :one_for_one, name: MarketFeed.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

PubSub para broadcast eficiente

defmodule MarketFeed.PubSub do
  @registry __MODULE__

  def subscribe(topic) do
    Registry.register(@registry, topic, [])
  end

  def unsubscribe(topic) do
    Registry.unregister(@registry, topic)
  end

  def broadcast(topic, message) do
    Registry.dispatch(@registry, topic, fn entries ->
      Enum.each(entries, fn {pid, _} ->
        send(pid, message)
      end)
    end)
  end

  def subscriber_count(topic) do
    Registry.count_match(@registry, topic, :_)
  end
end

# Uso desde el generador de precios
defmodule MarketFeed.PriceGenerator do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    schedule_tick()
    {:ok, %{prices: %{"BTCUSD" => 67500.0, "ETHUSD" => 3800.0}}}
  end

  def handle_info(:tick, state) do
    # Generar precios aleatorios
    new_prices = Enum.map(state.prices, fn {symbol, price} ->
      change = (:rand.uniform() - 0.5) * 10
      {symbol, price + change}
    end) |> Map.new()

    timestamp = System.system_time(:millisecond)

    # Broadcast a todos los subscribers
    Enum.each(new_prices, fn {symbol, price} ->
      MarketFeed.PubSub.broadcast(symbol, {:tick, symbol, price, timestamp})
    end)

    schedule_tick()
    {:noreply, %{state | prices: new_prices}}
  end

  defp schedule_tick do
    Process.send_after(self(), :tick, 100)  # 10 ticks/segundo
  end
end

Batching para alto throughput

Enviar cada tick individualmente puede saturar el WebSocket. Batching agrupa mensajes:

defmodule MarketFeed.BatchingHandler do
  @behaviour :cowboy_websocket

  @batch_interval 50  # 50ms = 20 batches/segundo
  @max_batch_size 100

  @impl true
  def websocket_init(state) do
    schedule_flush()

    new_state = Map.merge(state, %{
      subscriptions: MapSet.new(),
      pending_ticks: []
    })

    {:ok, new_state}
  end

  # Acumular ticks en lugar de enviar inmediatamente
  @impl true
  def websocket_info({:tick, symbol, price, timestamp}, state) do
    if MapSet.member?(state.subscriptions, symbol) do
      tick = %{"s" => symbol, "p" => price, "t" => timestamp}
      pending = [tick | state.pending_ticks]

      # Flush si alcanzamos max batch size
      if length(pending) >= @max_batch_size do
        send_batch(pending, state)
      else
        {:ok, %{state | pending_ticks: pending}}
      end
    else
      {:ok, state}
    end
  end

  # Flush periódico
  def websocket_info(:flush, state) do
    schedule_flush()

    if state.pending_ticks != [] do
      send_batch(state.pending_ticks, state)
    else
      {:ok, state}
    end
  end

  defp send_batch(ticks, state) do
    message = %{
      "type" => "batch",
      "ticks" => Enum.reverse(ticks)
    }
    json = Jason.encode!(message)
    {[{:text, json}], %{state | pending_ticks: []}}
  end

  defp schedule_flush do
    Process.send_after(self(), :flush, @batch_interval)
  end
end

Compresión y formatos eficientes

Per-message compression

# Cowboy soporta compresión automática
@impl true
def init(req, state) do
  opts = %{
    compress: true,              # Habilitar deflate
    max_frame_size: 1_000_000   # 1MB max frame
  }
  {:cowboy_websocket, req, state, opts}
end

MessagePack para mensajes más compactos

# mix.exs
{:msgpax, "~> 2.4"}

# En el handler
defp send_tick_msgpack(tick, state) do
  binary = Msgpax.pack!(tick)
  {[{:binary, binary}], state}
end

# Cliente JS
# const data = msgpack.decode(new Uint8Array(event.data));

Cliente JavaScript de ejemplo

// cliente.js
class MarketFeedClient {
  constructor(url) {
    this.url = url;
    this.ws = null;
    this.handlers = new Map();
    this.reconnectDelay = 1000;
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('Connected');
      this.reconnectDelay = 1000;
    };

    this.ws.onmessage = (event) => {
      const msg = JSON.parse(event.data);
      this.handleMessage(msg);
    };

    this.ws.onclose = () => {
      console.log('Disconnected, reconnecting...');
      setTimeout(() => this.connect(), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
    };
  }

  subscribe(symbol, callback) {
    this.handlers.set(symbol, callback);
    this.send({ action: 'subscribe', symbol });
  }

  unsubscribe(symbol) {
    this.handlers.delete(symbol);
    this.send({ action: 'unsubscribe', symbol });
  }

  send(msg) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(msg));
    }
  }

  handleMessage(msg) {
    switch (msg.type) {
      case 'tick':
        const handler = this.handlers.get(msg.symbol);
        if (handler) handler(msg);
        break;

      case 'batch':
        msg.ticks.forEach(tick => {
          const h = this.handlers.get(tick.s);
          if (h) h({ symbol: tick.s, price: tick.p, timestamp: tick.t });
        });
        break;

      case 'heartbeat':
        // Opcional: responder con pong
        break;
    }
  }
}

// Uso
const client = new MarketFeedClient('ws://localhost:8080/ws');
client.connect();
client.subscribe('BTCUSD', (tick) => {
  console.log(`BTC: $${tick.price.toFixed(2)}`);
});
Ejercicio 12.1 Order Book WebSocket Intermedio

Extiende el WebSocket handler para soportar order book data:

  • Snapshot inicial al subscribirse
  • Actualizaciones incrementales (add/modify/delete)
  • Mantener estado del book en el cliente JS
Ejercicio 12.2 WebSocket con autenticación Avanzado

Implementa autenticación para el WebSocket:

  • Token JWT en el query string o primer mensaje
  • Validar permisos de subscripción por usuario
  • Rate limiting diferenciado por tier de usuario
  • Revocación de tokens en caliente

Conexión con el proyecto final

WebSockets será una interfaz alternativa a nuestro sistema: