Arquitectura del Sistema

Diseño completo de un sistema de distribución de datos financieros de baja latencia. ⏱️ 3 horas

El proyecto: FinFeed

Vamos a construir FinFeed, un sistema de distribución de datos financieros en tiempo real. El sistema:

Diagrama de arquitectura

┌─────────────────────────────────────────────────────────────────┐
│                         FINFEED CLUSTER                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │  Feed Node  │    │  Feed Node  │    │  Feed Node  │         │
│  │  (Ingress)  │    │  (Ingress)  │    │  (Ingress)  │         │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘         │
│         │                  │                  │                 │
│         └──────────────────┼──────────────────┘                 │
│                            │                                    │
│                            ▼                                    │
│                    ┌───────────────┐                            │
│                    │  PG (Process  │                            │
│                    │   Groups)     │                            │
│                    └───────┬───────┘                            │
│                            │                                    │
│         ┌──────────────────┼──────────────────┐                 │
│         │                  │                  │                 │
│         ▼                  ▼                  ▼                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ Client Node │    │ Client Node │    │ Client Node │         │
│  │  (Egress)   │    │  (Egress)   │    │  (Egress)   │         │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘         │
│         │                  │                  │                 │
└─────────┼──────────────────┼──────────────────┼─────────────────┘
          │                  │                  │
          ▼                  ▼                  ▼
    ┌──────────┐       ┌──────────┐       ┌──────────┐
    │Subscribers│       │Subscribers│       │Subscribers│
    └──────────┘       └──────────┘       └──────────┘

Componentes principales

1. FeedIngestor

Conecta a fuentes de datos externas y normaliza mensajes:

defmodule FinFeed.FeedIngestor do
  use GenServer
  require Logger
  
  defstruct [:name, :socket, :parser, :stats]
  
  def start_link(opts) do
    name = Keyword.fetch!(opts, :name)
    GenServer.start_link(__MODULE__, opts, name: via(name))
  end
  
  defp via(name), do: {:via, Registry, {FinFeed.Registry, {:ingestor, name}}}
  
  @impl true
  def init(opts) do
    state = %__MODULE__{
      name: opts[:name],
      parser: opts[:parser] || FinFeed.Parser.Default,
      stats: %{messages: 0, errors: 0, started_at: System.system_time()}
    }
    
    # Conectar en init asíncrono para no bloquear supervisor
    send(self(), :connect)
    
    {:ok, state}
  end
  
  @impl true
  def handle_info(:connect, state) do
    case connect_to_feed(state.name) do
      {:ok, socket} ->
        Logger.info("[#{state.name}] Connected to feed")
        {:noreply, %{state | socket: socket}}
      
      {:error, reason} ->
        Logger.error("[#{state.name}] Connection failed: #{reason}")
        # Reintentar en 5 segundos
        Process.send_after(self(), :connect, 5000)
        {:noreply, state}
    end
  end
  
  # Datos entrantes del socket
  @impl true
  def handle_info({:tcp, _socket, data}, state) do
    case state.parser.parse(data) do
      {:ok, messages} ->
        Enum.each(messages, &publish_message/1)
        stats = %{state.stats | messages: state.stats.messages + length(messages)}
        {:noreply, %{state | stats: stats}}
      
      {:error, reason} ->
        Logger.warn("[#{state.name}] Parse error: #{reason}")
        stats = %{state.stats | errors: state.stats.errors + 1}
        {:noreply, %{state | stats: stats}}
    end
  end
  
  defp publish_message(msg) do
    # Publicar a process group por símbolo
    :pg.get_members(FinFeed.PG, {:symbol, msg.symbol})
    |> Enum.each(fn pid -> send(pid, {:tick, msg}) end)
  end
  
  defp connect_to_feed(_name) do
    # Implementar conexión real
    {:ok, nil}
  end
end

2. SubscriberConnection

Maneja cada cliente conectado:

defmodule FinFeed.SubscriberConnection do
  use GenServer
  require Logger
  
  defstruct [:socket, :subscriptions, :client_id, :stats]
  
  def start_link(socket) do
    GenServer.start_link(__MODULE__, socket)
  end
  
  @impl true
  def init(socket) do
    # Optimizar socket para baja latencia
    :inet.setopts(socket, [
      :binary,
      active: :once,
      nodelay: true,
      packet: 4
    ])
    
    client_id = Base.encode16(:crypto.strong_rand_bytes(8))
    Logger.info("[#{client_id}] Client connected")
    
    state = %__MODULE__{
      socket: socket,
      client_id: client_id,
      subscriptions: MapSet.new(),
      stats: %{sent: 0, received: 0}
    }
    
    {:ok, state}
  end
  
  # Mensaje del cliente (subscribe/unsubscribe)
  @impl true
  def handle_info({:tcp, socket, data}, state) do
    :inet.setopts(socket, [active: :once])
    
    case decode_client_message(data) do
      {:subscribe, symbol} ->
        handle_subscribe(symbol, state)
      
      {:unsubscribe, symbol} ->
        handle_unsubscribe(symbol, state)
      
      :heartbeat ->
        send_to_client(state.socket, :heartbeat_ack)
        {:noreply, state}
      
      _ ->
        Logger.warn("[#{state.client_id}] Unknown message")
        {:noreply, state}
    end
  end
  
  # Tick de datos para enviar al cliente
  @impl true
  def handle_info({:tick, msg}, state) do
    if MapSet.member?(state.subscriptions, msg.symbol) do
      send_to_client(state.socket, {:tick, msg})
      stats = %{state.stats | sent: state.stats.sent + 1}
      {:noreply, %{state | stats: stats}}
    else
      {:noreply, state}
    end
  end
  
  @impl true
  def handle_info({:tcp_closed, _}, state) do
    Logger.info("[#{state.client_id}] Disconnected")
    {:stop, :normal, state}
  end
  
  defp handle_subscribe(symbol, state) do
    # Unirse al process group para este símbolo
    :pg.join(FinFeed.PG, {:symbol, symbol}, self())
    
    new_subs = MapSet.put(state.subscriptions, symbol)
    Logger.debug("[#{state.client_id}] Subscribed to #{symbol}")
    
    send_to_client(state.socket, {:subscribed, symbol})
    {:noreply, %{state | subscriptions: new_subs}}
  end
  
  defp handle_unsubscribe(symbol, state) do
    :pg.leave(FinFeed.PG, {:symbol, symbol}, self())
    
    new_subs = MapSet.delete(state.subscriptions, symbol)
    {:noreply, %{state | subscriptions: new_subs}}
  end
  
  defp send_to_client(socket, msg) do
    data = encode_message(msg)
    :gen_tcp.send(socket, data)
  end
  
  defp decode_client_message(_data), do: :heartbeat
  defp encode_message(_msg), do: <<>>
end

3. Árbol de supervisión

defmodule FinFeed.Application do
  use Application
  
  @impl true
  def start(_type, _args) do
    children = [
      # Registry para nombrar procesos
      {Registry, keys: :unique, name: FinFeed.Registry},
      
      # Process Groups para pub/sub
      {:pg, FinFeed.PG},
      
      # Supervisor de Feed Ingestors
      {DynamicSupervisor, name: FinFeed.IngestorSupervisor, strategy: :one_for_one},
      
      # Supervisor de conexiones de cliente
      {DynamicSupervisor, name: FinFeed.ConnectionSupervisor, strategy: :one_for_one},
      
      # TCP Listener (Ranch)
      tcp_listener_spec(),
      
      # Cluster manager
      {Cluster.Supervisor, [topologies(), [name: FinFeed.ClusterSupervisor]]},
      
      # Métricas y telemetría
      FinFeed.Telemetry
    ]
    
    opts = [strategy: :one_for_one, name: FinFeed.Supervisor]
    Supervisor.start_link(children, opts)
  end
  
  defp tcp_listener_spec do
    port = Application.get_env(:finfeed, :port, 4000)
    
    :ranch.child_spec(
      :finfeed_tcp,
      :ranch_tcp,
      %{socket_opts: [port: port]},
      FinFeed.TCPHandler,
      []
    )
  end
  
  defp topologies do
    Application.get_env(:libcluster, :topologies, [])
  end
end

Estructura del proyecto

finfeed/
├── lib/
│   ├── finfeed/
│   │   ├── application.ex          # Entry point
│   │   ├── feed_ingestor.ex        # Conexión a feeds
│   │   ├── subscriber_connection.ex # Manejo de clientes
│   │   ├── tcp_handler.ex          # Ranch handler
│   │   ├── protocol/
│   │   │   ├── encoder.ex          # Serialización de mensajes
│   │   │   ├── decoder.ex          # Parsing de mensajes
│   │   │   └── messages.ex         # Definición de tipos
│   │   ├── parsers/
│   │   │   ├── default.ex
│   │   │   ├── fix.ex              # Parser FIX
│   │   │   └── json.ex             # Parser JSON
│   │   └── telemetry.ex            # Métricas
│   └── finfeed.ex
├── config/
│   ├── config.exs
│   ├── dev.exs
│   ├── prod.exs
│   └── runtime.exs
├── test/
└── mix.exs

Requisitos no funcionales

Requisito Objetivo Cómo lo logramos
Latencia < 1ms p99 Binary protocol, nodelay, pg local
Throughput 100k msgs/s por nodo Process per connection, ETS cache
Conexiones 10k por nodo Procesos ligeros BEAM
Disponibilidad 99.99% Cluster multi-nodo, supervisores
Recuperación < 5s Supervisor restarts, cluster rejoin
Ejercicio 21.1 Inicializar proyecto Básico

Crea el proyecto FinFeed con Mix y configura las dependencias básicas:

mix new finfeed --sup
cd finfeed

Añade al mix.exs:

  • {:ranch, "~> 2.1"}
  • {:libcluster, "~> 3.3"}
  • {:telemetry, "~> 1.2"}
Ejercicio 21.2 Definir protocolo binario Intermedio

Diseña el protocolo binario para mensajes:

  • Header: tipo (1 byte) + longitud (4 bytes)
  • TICK: symbol (8 bytes) + price (8 bytes float) + timestamp (8 bytes)
  • SUBSCRIBE: symbol (8 bytes)
  • HEARTBEAT: timestamp (8 bytes)

Implementa FinFeed.Protocol.Encoder y FinFeed.Protocol.Decoder.