Subscribers Distribuidos
Implementación del lado cliente del sistema de distribución.⏱️ 3 horas
TCP Subscriber Handler
defmodule MarketFeed.SubscriberHandler do
use GenServer
require Logger
@pg_scope MarketFeed.PG
defstruct [:socket, :transport, :subscriptions, :client_id, :stats]
def start_link(ref, transport, opts) do
pid = :proc_lib.spawn_link(__MODULE__, :init, [{ref, transport, opts}])
{:ok, pid}
end
def init({ref, transport, _opts}) do
{:ok, socket} = :ranch.handshake(ref)
transport.setopts(socket, [
active: :once,
nodelay: true,
packet: :raw
])
client_id = generate_client_id()
Logger.info("Subscriber connected: #{client_id}")
state = %__MODULE__{
socket: socket,
transport: transport,
subscriptions: MapSet.new(),
client_id: client_id,
stats: %{ticks_sent: 0, connected_at: System.system_time(:second)}
}
:gen_server.enter_loop(__MODULE__, [], state)
end
## Recibir datos del cliente
@impl true
def handle_info({:tcp, socket, data}, state) do
new_state = process_client_data(data, state)
state.transport.setopts(socket, [active: :once])
{:noreply, new_state}
end
@impl true
def handle_info({:tcp_closed, _}, state) do
Logger.info("Subscriber disconnected: #{state.client_id}")
{:stop, :normal, state}
end
## Recibir ticks del publisher
@impl true
def handle_info({:tick, symbol, price, volume, timestamp}, state) do
if MapSet.member?(state.subscriptions, symbol) do
message = MarketProtocol.encode_tick(symbol, price, volume, timestamp)
state.transport.send(state.socket, message)
new_stats = %{state.stats | ticks_sent: state.stats.ticks_sent + 1}
{:noreply, %{state | stats: new_stats}}
else
{:noreply, state}
end
end
## Procesamiento de mensajes del cliente
defp process_client_data(data, state) do
case MarketProtocol.decode(data) do
{:ok, %{type: :subscribe, symbol: symbol}, _} ->
subscribe(symbol, state)
{:ok, %{type: :unsubscribe, symbol: symbol}, _} ->
unsubscribe(symbol, state)
_ ->
state
end
end
defp subscribe(symbol, state) do
:ok = :pg.join(@pg_scope, {:symbol, symbol}, self())
Logger.debug("#{state.client_id} subscribed to #{symbol}")
# Enviar snapshot actual
case MarketFeed.PriceCache.get(symbol) do
{:ok, price, timestamp} ->
message = MarketProtocol.encode_tick(symbol, price, 0, timestamp)
state.transport.send(state.socket, message)
:not_found ->
:ok
end
%{state | subscriptions: MapSet.put(state.subscriptions, symbol)}
end
defp unsubscribe(symbol, state) do
:ok = :pg.leave(@pg_scope, {:symbol, symbol}, self())
Logger.debug("#{state.client_id} unsubscribed from #{symbol}")
%{state | subscriptions: MapSet.delete(state.subscriptions, symbol)}
end
@impl true
def terminate(_reason, state) do
Enum.each(state.subscriptions, fn symbol ->
:pg.leave(@pg_scope, {:symbol, symbol}, self())
end)
:ok
end
defp generate_client_id do
:crypto.strong_rand_bytes(8) |> Base.encode16()
end
end
Subscriber Listener (Ranch)
defmodule MarketFeed.SubscriberListener do
def child_spec(_opts) do
port = Application.get_env(:market_feed, :subscriber_port, 9000)
:ranch.child_spec(
:subscriber_listener,
:ranch_tcp,
%{
socket_opts: [port: port],
num_acceptors: 100,
max_connections: 10_000
},
MarketFeed.SubscriberHandler,
[]
)
end
end
Cliente Elixir para testing
defmodule MarketFeed.Client do
use GenServer
require Logger
defstruct [:socket, :callback, :buffer]
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
def subscribe(client, symbol) do
GenServer.cast(client, {:subscribe, symbol})
end
def unsubscribe(client, symbol) do
GenServer.cast(client, {:unsubscribe, symbol})
end
@impl true
def init(opts) do
host = Keyword.get(opts, :host, {127, 0, 0, 1})
port = Keyword.get(opts, :port, 9000)
callback = Keyword.fetch!(opts, :callback)
{:ok, socket} = :gen_tcp.connect(host, port, [
:binary,
active: :once,
nodelay: true
])
{:ok, %__MODULE__{socket: socket, callback: callback, buffer: <<>>}}
end
@impl true
def handle_cast({:subscribe, symbol}, state) do
message = MarketProtocol.encode_subscribe(symbol)
:gen_tcp.send(state.socket, message)
{:noreply, state}
end
@impl true
def handle_cast({:unsubscribe, symbol}, state) do
message = MarketProtocol.encode_unsubscribe(symbol)
:gen_tcp.send(state.socket, message)
{:noreply, state}
end
@impl true
def handle_info({:tcp, socket, data}, state) do
buffer = state.buffer <> data
{messages, new_buffer} = parse_messages(buffer, [])
Enum.each(messages, state.callback)
:inet.setopts(socket, [active: :once])
{:noreply, %{state | buffer: new_buffer}}
end
defp parse_messages(buffer, acc) do
case MarketProtocol.decode(buffer) do
{:ok, msg, rest} -> parse_messages(rest, [msg | acc])
{:incomplete, _} -> {Enum.reverse(acc), buffer}
end
end
end
# Uso
{:ok, client} = MarketFeed.Client.start_link(
callback: fn tick ->
IO.inspect(tick, label: "Tick received")
end
)
MarketFeed.Client.subscribe(client, "BTCUSD")
Ejercicio 23.1
Rate limiting por cliente
Intermedio
Implementa rate limiting en el SubscriberHandler:
- Máximo N símbolos por cliente
- Throttling si el cliente no consume lo suficientemente rápido
- Métricas de clientes throttled
Ejercicio 23.2
Autenticación
Avanzado
Añade autenticación al flujo de conexión:
- Mensaje AUTH como primer mensaje del cliente
- Validar API key contra Mnesia/ETS
- Permisos por símbolo según nivel de suscripción
Conexión con el proyecto final
Los subscribers completan el ciclo de datos. En el último capítulo veremos cómo poner todo esto en producción.