Publisher de Datos
Implementación del componente que recibe y distribuye datos de mercado.⏱️ 3 horas
Arquitectura del Publisher
# Componentes principales
#
# FeedConnector
# │
# ▼
# FeedParser ──▶ PriceCache (ETS)
# │
# ▼
# Broadcaster ──▶ :pg groups ──▶ Subscribers
Feed Connector
defmodule MarketFeed.FeedConnector do
use GenServer
require Logger
@reconnect_interval 5_000
defstruct [:socket, :host, :port, :parser, :status]
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(opts) do
host = Keyword.fetch!(opts, :host)
port = Keyword.fetch!(opts, :port)
state = %__MODULE__{
host: host,
port: port,
parser: MarketProtocol.Parser.new(),
status: :disconnected
}
send(self(), :connect)
{:ok, state}
end
@impl true
def handle_info(:connect, state) do
case :gen_tcp.connect(state.host, state.port, [
:binary,
active: :once,
nodelay: true,
packet: :raw
]) do
{:ok, socket} ->
Logger.info("Connected to feed #{state.host}:#{state.port}")
{:noreply, %{state | socket: socket, status: :connected}}
{:error, reason} ->
Logger.error("Failed to connect: #{inspect(reason)}")
Process.send_after(self(), :connect, @reconnect_interval)
{:noreply, state}
end
end
@impl true
def handle_info({:tcp, socket, data}, state) do
{messages, new_parser} = MarketProtocol.Parser.feed(state.parser, data)
Enum.each(messages, &process_message/1)
:inet.setopts(socket, [active: :once])
{:noreply, %{state | parser: new_parser}}
end
@impl true
def handle_info({:tcp_closed, _}, state) do
Logger.warn("Feed connection closed, reconnecting...")
Process.send_after(self(), :connect, @reconnect_interval)
{:noreply, %{state | socket: nil, status: :disconnected}}
end
defp process_message(%{type: :tick} = tick) do
# Actualizar cache
MarketFeed.PriceCache.update(tick.symbol, tick.price, tick.timestamp)
# Broadcast a subscribers
MarketFeed.Broadcaster.broadcast(tick.symbol, tick)
end
defp process_message(%{type: :heartbeat}) do
Logger.debug("Heartbeat received")
end
end
Price Cache
defmodule MarketFeed.PriceCache do
use GenServer
@table :price_cache
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def update(symbol, price, timestamp) do
:ets.insert(@table, {symbol, price, timestamp})
end
def get(symbol) do
case :ets.lookup(@table, symbol) do
[{^symbol, price, timestamp}] -> {:ok, price, timestamp}
[] -> :not_found
end
end
def get_all do
:ets.tab2list(@table)
end
@impl true
def init(_) do
table = :ets.new(@table, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
{:ok, %{table: table}}
end
end
Broadcaster
defmodule MarketFeed.Broadcaster do
@pg_scope MarketFeed.PG
def broadcast(symbol, tick) do
group = {:symbol, symbol}
message = encode_tick(tick)
# Broadcast local primero (más eficiente)
:pg.get_local_members(@pg_scope, group)
|> send_to_all(message)
# Luego a otros nodos
remote_members = :pg.get_members(@pg_scope, group) --
:pg.get_local_members(@pg_scope, group)
send_to_all(remote_members, message)
end
defp encode_tick(tick) do
{:tick, tick.symbol, tick.price, tick.volume, tick.timestamp}
end
defp send_to_all(pids, message) do
Enum.each(pids, &send(&1, message))
end
end
Application y Supervision Tree
defmodule MarketFeed.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Process Groups
{:pg, name: MarketFeed.PG},
# Price Cache
MarketFeed.PriceCache,
# Feed Connector
{MarketFeed.FeedConnector, [
host: Application.get_env(:market_feed, :feed_host),
port: Application.get_env(:market_feed, :feed_port)
]},
# Subscriber TCP listener
MarketFeed.SubscriberListener,
# WebSocket endpoint
MarketFeed.WebSocketEndpoint,
# Telemetry
MarketFeed.Telemetry
]
opts = [strategy: :one_for_one, name: MarketFeed.Supervisor]
Supervisor.start_link(children, opts)
end
end
Ejercicio 22.1
Multi-feed connector
Intermedio
Extiende el FeedConnector para soportar múltiples feeds:
- DynamicSupervisor para N feeds
- Cada feed configurable independientemente
- Consolidación de precios de múltiples fuentes
Ejercicio 22.2
Feed simulator
Básico
Crea un simulador de feed para testing:
- Genera ticks aleatorios a tasa configurable
- Simula símbolos con características diferentes
- Incluye opción de latencia artificial
Conexión con el proyecto final
El publisher es el corazón del sistema. En el siguiente capítulo construiremos los subscribers que recibirán estos datos.