Arquitectura del Sistema
Diseño completo de un sistema de distribución de datos financieros de baja latencia. ⏱️ 3 horas
El proyecto: FinFeed
Vamos a construir FinFeed, un sistema de distribución de datos financieros en tiempo real. El sistema:
- Recibe datos de múltiples fuentes (exchanges, APIs)
- Normaliza y valida los datos
- Distribuye a miles de subscribers conectados
- Opera en cluster para alta disponibilidad
- Optimizado para latencia sub-milisegundo
Diagrama de arquitectura
┌─────────────────────────────────────────────────────────────────┐
│ FINFEED CLUSTER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Feed Node │ │ Feed Node │ │ Feed Node │ │
│ │ (Ingress) │ │ (Ingress) │ │ (Ingress) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ PG (Process │ │
│ │ Groups) │ │
│ └───────┬───────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Client Node │ │ Client Node │ │ Client Node │ │
│ │ (Egress) │ │ (Egress) │ │ (Egress) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
└─────────┼──────────────────┼──────────────────┼─────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Subscribers│ │Subscribers│ │Subscribers│
└──────────┘ └──────────┘ └──────────┘
Componentes principales
1. FeedIngestor
Conecta a fuentes de datos externas y normaliza mensajes:
defmodule FinFeed.FeedIngestor do
use GenServer
require Logger
defstruct [:name, :socket, :parser, :stats]
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, opts, name: via(name))
end
defp via(name), do: {:via, Registry, {FinFeed.Registry, {:ingestor, name}}}
@impl true
def init(opts) do
state = %__MODULE__{
name: opts[:name],
parser: opts[:parser] || FinFeed.Parser.Default,
stats: %{messages: 0, errors: 0, started_at: System.system_time()}
}
# Conectar en init asíncrono para no bloquear supervisor
send(self(), :connect)
{:ok, state}
end
@impl true
def handle_info(:connect, state) do
case connect_to_feed(state.name) do
{:ok, socket} ->
Logger.info("[#{state.name}] Connected to feed")
{:noreply, %{state | socket: socket}}
{:error, reason} ->
Logger.error("[#{state.name}] Connection failed: #{reason}")
# Reintentar en 5 segundos
Process.send_after(self(), :connect, 5000)
{:noreply, state}
end
end
# Datos entrantes del socket
@impl true
def handle_info({:tcp, _socket, data}, state) do
case state.parser.parse(data) do
{:ok, messages} ->
Enum.each(messages, &publish_message/1)
stats = %{state.stats | messages: state.stats.messages + length(messages)}
{:noreply, %{state | stats: stats}}
{:error, reason} ->
Logger.warn("[#{state.name}] Parse error: #{reason}")
stats = %{state.stats | errors: state.stats.errors + 1}
{:noreply, %{state | stats: stats}}
end
end
defp publish_message(msg) do
# Publicar a process group por símbolo
:pg.get_members(FinFeed.PG, {:symbol, msg.symbol})
|> Enum.each(fn pid -> send(pid, {:tick, msg}) end)
end
defp connect_to_feed(_name) do
# Implementar conexión real
{:ok, nil}
end
end
2. SubscriberConnection
Maneja cada cliente conectado:
defmodule FinFeed.SubscriberConnection do
use GenServer
require Logger
defstruct [:socket, :subscriptions, :client_id, :stats]
def start_link(socket) do
GenServer.start_link(__MODULE__, socket)
end
@impl true
def init(socket) do
# Optimizar socket para baja latencia
:inet.setopts(socket, [
:binary,
active: :once,
nodelay: true,
packet: 4
])
client_id = Base.encode16(:crypto.strong_rand_bytes(8))
Logger.info("[#{client_id}] Client connected")
state = %__MODULE__{
socket: socket,
client_id: client_id,
subscriptions: MapSet.new(),
stats: %{sent: 0, received: 0}
}
{:ok, state}
end
# Mensaje del cliente (subscribe/unsubscribe)
@impl true
def handle_info({:tcp, socket, data}, state) do
:inet.setopts(socket, [active: :once])
case decode_client_message(data) do
{:subscribe, symbol} ->
handle_subscribe(symbol, state)
{:unsubscribe, symbol} ->
handle_unsubscribe(symbol, state)
:heartbeat ->
send_to_client(state.socket, :heartbeat_ack)
{:noreply, state}
_ ->
Logger.warn("[#{state.client_id}] Unknown message")
{:noreply, state}
end
end
# Tick de datos para enviar al cliente
@impl true
def handle_info({:tick, msg}, state) do
if MapSet.member?(state.subscriptions, msg.symbol) do
send_to_client(state.socket, {:tick, msg})
stats = %{state.stats | sent: state.stats.sent + 1}
{:noreply, %{state | stats: stats}}
else
{:noreply, state}
end
end
@impl true
def handle_info({:tcp_closed, _}, state) do
Logger.info("[#{state.client_id}] Disconnected")
{:stop, :normal, state}
end
defp handle_subscribe(symbol, state) do
# Unirse al process group para este símbolo
:pg.join(FinFeed.PG, {:symbol, symbol}, self())
new_subs = MapSet.put(state.subscriptions, symbol)
Logger.debug("[#{state.client_id}] Subscribed to #{symbol}")
send_to_client(state.socket, {:subscribed, symbol})
{:noreply, %{state | subscriptions: new_subs}}
end
defp handle_unsubscribe(symbol, state) do
:pg.leave(FinFeed.PG, {:symbol, symbol}, self())
new_subs = MapSet.delete(state.subscriptions, symbol)
{:noreply, %{state | subscriptions: new_subs}}
end
defp send_to_client(socket, msg) do
data = encode_message(msg)
:gen_tcp.send(socket, data)
end
defp decode_client_message(_data), do: :heartbeat
defp encode_message(_msg), do: <<>>
end
3. Árbol de supervisión
defmodule FinFeed.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Registry para nombrar procesos
{Registry, keys: :unique, name: FinFeed.Registry},
# Process Groups para pub/sub
{:pg, FinFeed.PG},
# Supervisor de Feed Ingestors
{DynamicSupervisor, name: FinFeed.IngestorSupervisor, strategy: :one_for_one},
# Supervisor de conexiones de cliente
{DynamicSupervisor, name: FinFeed.ConnectionSupervisor, strategy: :one_for_one},
# TCP Listener (Ranch)
tcp_listener_spec(),
# Cluster manager
{Cluster.Supervisor, [topologies(), [name: FinFeed.ClusterSupervisor]]},
# Métricas y telemetría
FinFeed.Telemetry
]
opts = [strategy: :one_for_one, name: FinFeed.Supervisor]
Supervisor.start_link(children, opts)
end
defp tcp_listener_spec do
port = Application.get_env(:finfeed, :port, 4000)
:ranch.child_spec(
:finfeed_tcp,
:ranch_tcp,
%{socket_opts: [port: port]},
FinFeed.TCPHandler,
[]
)
end
defp topologies do
Application.get_env(:libcluster, :topologies, [])
end
end
Estructura del proyecto
finfeed/
├── lib/
│ ├── finfeed/
│ │ ├── application.ex # Entry point
│ │ ├── feed_ingestor.ex # Conexión a feeds
│ │ ├── subscriber_connection.ex # Manejo de clientes
│ │ ├── tcp_handler.ex # Ranch handler
│ │ ├── protocol/
│ │ │ ├── encoder.ex # Serialización de mensajes
│ │ │ ├── decoder.ex # Parsing de mensajes
│ │ │ └── messages.ex # Definición de tipos
│ │ ├── parsers/
│ │ │ ├── default.ex
│ │ │ ├── fix.ex # Parser FIX
│ │ │ └── json.ex # Parser JSON
│ │ └── telemetry.ex # Métricas
│ └── finfeed.ex
├── config/
│ ├── config.exs
│ ├── dev.exs
│ ├── prod.exs
│ └── runtime.exs
├── test/
└── mix.exs
Requisitos no funcionales
| Requisito | Objetivo | Cómo lo logramos |
|---|---|---|
| Latencia | < 1ms p99 | Binary protocol, nodelay, pg local |
| Throughput | 100k msgs/s por nodo | Process per connection, ETS cache |
| Conexiones | 10k por nodo | Procesos ligeros BEAM |
| Disponibilidad | 99.99% | Cluster multi-nodo, supervisores |
| Recuperación | < 5s | Supervisor restarts, cluster rejoin |
Ejercicio 21.1
Inicializar proyecto
Básico
Crea el proyecto FinFeed con Mix y configura las dependencias básicas:
mix new finfeed --sup
cd finfeed
Añade al mix.exs:
{:ranch, "~> 2.1"}{:libcluster, "~> 3.3"}{:telemetry, "~> 1.2"}
Ejercicio 21.2
Definir protocolo binario
Intermedio
Diseña el protocolo binario para mensajes:
- Header: tipo (1 byte) + longitud (4 bytes)
- TICK: symbol (8 bytes) + price (8 bytes float) + timestamp (8 bytes)
- SUBSCRIBE: symbol (8 bytes)
- HEARTBEAT: timestamp (8 bytes)
Implementa FinFeed.Protocol.Encoder y
FinFeed.Protocol.Decoder.