Subscribers Distribuidos

Implementación del lado cliente del sistema de distribución.⏱️ 3 horas

TCP Subscriber Handler

defmodule MarketFeed.SubscriberHandler do
  use GenServer
  require Logger

  @pg_scope MarketFeed.PG

  defstruct [:socket, :transport, :subscriptions, :client_id, :stats]

  def start_link(ref, transport, opts) do
    pid = :proc_lib.spawn_link(__MODULE__, :init, [{ref, transport, opts}])
    {:ok, pid}
  end

  def init({ref, transport, _opts}) do
    {:ok, socket} = :ranch.handshake(ref)

    transport.setopts(socket, [
      active: :once,
      nodelay: true,
      packet: :raw
    ])

    client_id = generate_client_id()
    Logger.info("Subscriber connected: #{client_id}")

    state = %__MODULE__{
      socket: socket,
      transport: transport,
      subscriptions: MapSet.new(),
      client_id: client_id,
      stats: %{ticks_sent: 0, connected_at: System.system_time(:second)}
    }

    :gen_server.enter_loop(__MODULE__, [], state)
  end

  ## Recibir datos del cliente

  @impl true
  def handle_info({:tcp, socket, data}, state) do
    new_state = process_client_data(data, state)
    state.transport.setopts(socket, [active: :once])
    {:noreply, new_state}
  end

  @impl true
  def handle_info({:tcp_closed, _}, state) do
    Logger.info("Subscriber disconnected: #{state.client_id}")
    {:stop, :normal, state}
  end

  ## Recibir ticks del publisher

  @impl true
  def handle_info({:tick, symbol, price, volume, timestamp}, state) do
    if MapSet.member?(state.subscriptions, symbol) do
      message = MarketProtocol.encode_tick(symbol, price, volume, timestamp)
      state.transport.send(state.socket, message)

      new_stats = %{state.stats | ticks_sent: state.stats.ticks_sent + 1}
      {:noreply, %{state | stats: new_stats}}
    else
      {:noreply, state}
    end
  end

  ## Procesamiento de mensajes del cliente

  defp process_client_data(data, state) do
    case MarketProtocol.decode(data) do
      {:ok, %{type: :subscribe, symbol: symbol}, _} ->
        subscribe(symbol, state)

      {:ok, %{type: :unsubscribe, symbol: symbol}, _} ->
        unsubscribe(symbol, state)

      _ ->
        state
    end
  end

  defp subscribe(symbol, state) do
    :ok = :pg.join(@pg_scope, {:symbol, symbol}, self())
    Logger.debug("#{state.client_id} subscribed to #{symbol}")

    # Enviar snapshot actual
    case MarketFeed.PriceCache.get(symbol) do
      {:ok, price, timestamp} ->
        message = MarketProtocol.encode_tick(symbol, price, 0, timestamp)
        state.transport.send(state.socket, message)
      :not_found ->
        :ok
    end

    %{state | subscriptions: MapSet.put(state.subscriptions, symbol)}
  end

  defp unsubscribe(symbol, state) do
    :ok = :pg.leave(@pg_scope, {:symbol, symbol}, self())
    Logger.debug("#{state.client_id} unsubscribed from #{symbol}")
    %{state | subscriptions: MapSet.delete(state.subscriptions, symbol)}
  end

  @impl true
  def terminate(_reason, state) do
    Enum.each(state.subscriptions, fn symbol ->
      :pg.leave(@pg_scope, {:symbol, symbol}, self())
    end)
    :ok
  end

  defp generate_client_id do
    :crypto.strong_rand_bytes(8) |> Base.encode16()
  end
end

Subscriber Listener (Ranch)

defmodule MarketFeed.SubscriberListener do
  def child_spec(_opts) do
    port = Application.get_env(:market_feed, :subscriber_port, 9000)

    :ranch.child_spec(
      :subscriber_listener,
      :ranch_tcp,
      %{
        socket_opts: [port: port],
        num_acceptors: 100,
        max_connections: 10_000
      },
      MarketFeed.SubscriberHandler,
      []
    )
  end
end

Cliente Elixir para testing

defmodule MarketFeed.Client do
  use GenServer
  require Logger

  defstruct [:socket, :callback, :buffer]

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def subscribe(client, symbol) do
    GenServer.cast(client, {:subscribe, symbol})
  end

  def unsubscribe(client, symbol) do
    GenServer.cast(client, {:unsubscribe, symbol})
  end

  @impl true
  def init(opts) do
    host = Keyword.get(opts, :host, {127, 0, 0, 1})
    port = Keyword.get(opts, :port, 9000)
    callback = Keyword.fetch!(opts, :callback)

    {:ok, socket} = :gen_tcp.connect(host, port, [
      :binary,
      active: :once,
      nodelay: true
    ])

    {:ok, %__MODULE__{socket: socket, callback: callback, buffer: <<>>}}
  end

  @impl true
  def handle_cast({:subscribe, symbol}, state) do
    message = MarketProtocol.encode_subscribe(symbol)
    :gen_tcp.send(state.socket, message)
    {:noreply, state}
  end

  @impl true
  def handle_cast({:unsubscribe, symbol}, state) do
    message = MarketProtocol.encode_unsubscribe(symbol)
    :gen_tcp.send(state.socket, message)
    {:noreply, state}
  end

  @impl true
  def handle_info({:tcp, socket, data}, state) do
    buffer = state.buffer <> data
    {messages, new_buffer} = parse_messages(buffer, [])

    Enum.each(messages, state.callback)

    :inet.setopts(socket, [active: :once])
    {:noreply, %{state | buffer: new_buffer}}
  end

  defp parse_messages(buffer, acc) do
    case MarketProtocol.decode(buffer) do
      {:ok, msg, rest} -> parse_messages(rest, [msg | acc])
      {:incomplete, _} -> {Enum.reverse(acc), buffer}
    end
  end
end

# Uso
{:ok, client} = MarketFeed.Client.start_link(
  callback: fn tick ->
    IO.inspect(tick, label: "Tick received")
  end
)

MarketFeed.Client.subscribe(client, "BTCUSD")
Ejercicio 23.1 Rate limiting por cliente Intermedio

Implementa rate limiting en el SubscriberHandler:

  • Máximo N símbolos por cliente
  • Throttling si el cliente no consume lo suficientemente rápido
  • Métricas de clientes throttled
Ejercicio 23.2 Autenticación Avanzado

Añade autenticación al flujo de conexión:

  • Mensaje AUTH como primer mensaje del cliente
  • Validar API key contra Mnesia/ETS
  • Permisos por símbolo según nivel de suscripción

Conexión con el proyecto final

Los subscribers completan el ciclo de datos. En el último capítulo veremos cómo poner todo esto en producción.