Publisher de Datos

Implementación del componente que recibe y distribuye datos de mercado.⏱️ 3 horas

Arquitectura del Publisher

# Componentes principales
#
# FeedConnector
#    │
#    ▼
# FeedParser ──▶ PriceCache (ETS)
#    │
#    ▼
# Broadcaster ──▶ :pg groups ──▶ Subscribers

Feed Connector

defmodule MarketFeed.FeedConnector do
  use GenServer
  require Logger

  @reconnect_interval 5_000

  defstruct [:socket, :host, :port, :parser, :status]

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    host = Keyword.fetch!(opts, :host)
    port = Keyword.fetch!(opts, :port)

    state = %__MODULE__{
      host: host,
      port: port,
      parser: MarketProtocol.Parser.new(),
      status: :disconnected
    }

    send(self(), :connect)
    {:ok, state}
  end

  @impl true
  def handle_info(:connect, state) do
    case :gen_tcp.connect(state.host, state.port, [
      :binary,
      active: :once,
      nodelay: true,
      packet: :raw
    ]) do
      {:ok, socket} ->
        Logger.info("Connected to feed #{state.host}:#{state.port}")
        {:noreply, %{state | socket: socket, status: :connected}}

      {:error, reason} ->
        Logger.error("Failed to connect: #{inspect(reason)}")
        Process.send_after(self(), :connect, @reconnect_interval)
        {:noreply, state}
    end
  end

  @impl true
  def handle_info({:tcp, socket, data}, state) do
    {messages, new_parser} = MarketProtocol.Parser.feed(state.parser, data)

    Enum.each(messages, &process_message/1)

    :inet.setopts(socket, [active: :once])
    {:noreply, %{state | parser: new_parser}}
  end

  @impl true
  def handle_info({:tcp_closed, _}, state) do
    Logger.warn("Feed connection closed, reconnecting...")
    Process.send_after(self(), :connect, @reconnect_interval)
    {:noreply, %{state | socket: nil, status: :disconnected}}
  end

  defp process_message(%{type: :tick} = tick) do
    # Actualizar cache
    MarketFeed.PriceCache.update(tick.symbol, tick.price, tick.timestamp)

    # Broadcast a subscribers
    MarketFeed.Broadcaster.broadcast(tick.symbol, tick)
  end

  defp process_message(%{type: :heartbeat}) do
    Logger.debug("Heartbeat received")
  end
end

Price Cache

defmodule MarketFeed.PriceCache do
  use GenServer

  @table :price_cache

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def update(symbol, price, timestamp) do
    :ets.insert(@table, {symbol, price, timestamp})
  end

  def get(symbol) do
    case :ets.lookup(@table, symbol) do
      [{^symbol, price, timestamp}] -> {:ok, price, timestamp}
      [] -> :not_found
    end
  end

  def get_all do
    :ets.tab2list(@table)
  end

  @impl true
  def init(_) do
    table = :ets.new(@table, [
      :set,
      :named_table,
      :public,
      read_concurrency: true,
      write_concurrency: true
    ])
    {:ok, %{table: table}}
  end
end

Broadcaster

defmodule MarketFeed.Broadcaster do
  @pg_scope MarketFeed.PG

  def broadcast(symbol, tick) do
    group = {:symbol, symbol}
    message = encode_tick(tick)

    # Broadcast local primero (más eficiente)
    :pg.get_local_members(@pg_scope, group)
    |> send_to_all(message)

    # Luego a otros nodos
    remote_members = :pg.get_members(@pg_scope, group) --
                     :pg.get_local_members(@pg_scope, group)
    send_to_all(remote_members, message)
  end

  defp encode_tick(tick) do
    {:tick, tick.symbol, tick.price, tick.volume, tick.timestamp}
  end

  defp send_to_all(pids, message) do
    Enum.each(pids, &send(&1, message))
  end
end

Application y Supervision Tree

defmodule MarketFeed.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Process Groups
      {:pg, name: MarketFeed.PG},

      # Price Cache
      MarketFeed.PriceCache,

      # Feed Connector
      {MarketFeed.FeedConnector, [
        host: Application.get_env(:market_feed, :feed_host),
        port: Application.get_env(:market_feed, :feed_port)
      ]},

      # Subscriber TCP listener
      MarketFeed.SubscriberListener,

      # WebSocket endpoint
      MarketFeed.WebSocketEndpoint,

      # Telemetry
      MarketFeed.Telemetry
    ]

    opts = [strategy: :one_for_one, name: MarketFeed.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
Ejercicio 22.1 Multi-feed connector Intermedio

Extiende el FeedConnector para soportar múltiples feeds:

  • DynamicSupervisor para N feeds
  • Cada feed configurable independientemente
  • Consolidación de precios de múltiples fuentes
Ejercicio 22.2 Feed simulator Básico

Crea un simulador de feed para testing:

  • Genera ticks aleatorios a tasa configurable
  • Simula símbolos con características diferentes
  • Incluye opción de latencia artificial

Conexión con el proyecto final

El publisher es el corazón del sistema. En el siguiente capítulo construiremos los subscribers que recibirán estos datos.