Protocolos Binarios
Diseña e implementa protocolos binarios eficientes para datos de mercado. ⏱️ 2.5 horas
¿Por qué protocolos binarios?
Los sistemas financieros de baja latencia evitan formatos de texto como JSON o XML. Los protocolos binarios ofrecen:
- Tamaño reducido: menos bytes = menor latencia de red
- Parsing rápido: sin overhead de parsing de texto
- Tipos nativos: floats y enteros directamente en binario
- Estructura fija: posiciones conocidas, acceso O(1)
Los exchanges usan protocolos binarios como FIX/FAST, ITCH, OUCH, o propietarios. Un tick de precio en JSON puede ser 200+ bytes; en binario, 20-30 bytes. A millones de mensajes por segundo, la diferencia es enorme.
Binary pattern matching en Elixir
Elixir hereda de Erlang un poderoso sistema de pattern matching binario:
# Sintaxis básica
<<byte1, byte2, rest::binary>> = <<1, 2, 3, 4, 5>>
# byte1 = 1, byte2 = 2, rest = <<3, 4, 5>>
# Especificar tamaños
<<header::binary-size(4), payload::binary>> = data
# Enteros con tamaño específico
<<value::16>> = <<0x01, 0x00>>
# value = 256 (big-endian por defecto)
# Little-endian
<<value::little-16>> = <<0x01, 0x00>>
# value = 1
Modificadores de tipo
| Modificador | Descripción | Ejemplo |
|---|---|---|
integer |
Entero (default) | x::integer |
float |
IEEE 754 float | x::float-64 |
binary |
Secuencia de bytes | x::binary-size(10) |
bits/bitstring |
Bits arbitrarios | x::bits-size(3) |
utf8/utf16/utf32 |
Codepoints Unicode | x::utf8 |
Modificadores de endianness
# Big-endian (network byte order, default)
<<value::big-32>> = <<0, 0, 1, 0>>
# value = 256
# Little-endian (x86/ARM)
<<value::little-32>> = <<0, 1, 0, 0>>
# value = 256
# Native (usa el endianness de la máquina)
<<value::native-32>> = data
Diseñando un protocolo de market data
Diseñemos un protocolo simple para transmitir ticks de precio:
Estructura del mensaje
# Header (8 bytes)
# ┌─────────┬──────────┬──────────┬──────────┐
# │ Magic │ Version │ Type │ Length │
# │ 2 bytes │ 1 byte │ 1 byte │ 4 bytes │
# └─────────┴──────────┴──────────┴──────────┘
# Tick Message (después del header)
# ┌──────────┬──────────┬──────────┬──────────┬──────────┐
# │ Symbol │ Price │ Volume │ Timestamp│ Flags │
# │ 8 bytes │ 8 bytes │ 8 bytes │ 8 bytes │ 1 byte │
# └──────────┴──────────┴──────────┴──────────┴──────────┘
Implementación del codec
defmodule MarketProtocol do
# Constantes del protocolo
@magic 0xFD01
@version 1
# Tipos de mensaje
@msg_tick 1
@msg_heartbeat 2
@msg_subscribe 3
@msg_unsubscribe 4
@msg_snapshot 5
# Tamaños
@header_size 8
@tick_size 33
@symbol_size 8
## Encoding
def encode_tick(symbol, price, volume, timestamp, flags \\ 0) do
symbol_padded = pad_symbol(symbol)
payload = <<
symbol_padded::binary-size(@symbol_size),
price::float-64,
volume::unsigned-64,
timestamp::unsigned-64,
flags::8
>>
encode_message(@msg_tick, payload)
end
def encode_heartbeat(timestamp) do
payload = <<timestamp::unsigned-64>>
encode_message(@msg_heartbeat, payload)
end
def encode_subscribe(symbol) do
symbol_padded = pad_symbol(symbol)
encode_message(@msg_subscribe, symbol_padded)
end
defp encode_message(type, payload) do
length = byte_size(payload)
<<
@magic::16,
@version::8,
type::8,
length::32,
payload::binary
>>
end
defp pad_symbol(symbol) when is_binary(symbol) do
String.pad_trailing(symbol, @symbol_size, <<0>>)
|> String.slice(0, @symbol_size)
end
## Decoding
def decode(data) when byte_size(data) < @header_size do
{:incomplete, data}
end
def decode(<<@magic::16, @version::8, type::8, length::32, rest::binary>>) do
if byte_size(rest) >= length do
<<payload::binary-size(length), remaining::binary>> = rest
message = decode_payload(type, payload)
{:ok, message, remaining}
else
{:incomplete, <<@magic::16, @version::8, type::8, length::32, rest::binary>>}
end
end
def decode(<<_::8, rest::binary>>) do
# Magic inválido, intentar resincronizar
decode(rest)
end
def decode(<<>>) do
{:incomplete, <<>>}
end
defp decode_payload(@msg_tick, payload) do
<<
symbol_raw::binary-size(@symbol_size),
price::float-64,
volume::unsigned-64,
timestamp::unsigned-64,
flags::8
>> = payload
symbol = trim_symbol(symbol_raw)
%{
type: :tick,
symbol: symbol,
price: price,
volume: volume,
timestamp: timestamp,
flags: flags
}
end
defp decode_payload(@msg_heartbeat, <<timestamp::unsigned-64>>) do
%{type: :heartbeat, timestamp: timestamp}
end
defp decode_payload(@msg_subscribe, symbol_raw) do
%{type: :subscribe, symbol: trim_symbol(symbol_raw)}
end
defp decode_payload(@msg_unsubscribe, symbol_raw) do
%{type: :unsubscribe, symbol: trim_symbol(symbol_raw)}
end
defp trim_symbol(symbol_raw) do
symbol_raw
|> String.trim_trailing(<<0>>)
|> String.trim()
end
end
Streaming parser (manejo de fragmentación)
TCP no garantiza que recibirás mensajes completos. Debes manejar:
- Fragmentación: un mensaje puede llegar en múltiples paquetes
- Coalescing: múltiples mensajes pueden llegar en un paquete
defmodule MarketProtocol.Parser do
defstruct buffer: <<>>
def new, do: %__MODULE__{}
def feed(%__MODULE__{buffer: buffer} = parser, data) do
parse_all(%{parser | buffer: buffer <> data}, [])
end
defp parse_all(parser, messages) do
case MarketProtocol.decode(parser.buffer) do
{:ok, message, remaining} ->
parse_all(%{parser | buffer: remaining}, [message | messages])
{:incomplete, buffer} ->
{Enum.reverse(messages), %{parser | buffer: buffer}}
end
end
end
# Uso en un handler TCP
def handle_info({:tcp, socket, data}, %{parser: parser} = state) do
{messages, new_parser} = MarketProtocol.Parser.feed(parser, data)
Enum.each(messages, &process_message/1)
:inet.setopts(socket, [active: :once])
{:noreply, %{state | parser: new_parser}}
end
Optimizaciones de rendimiento
Sub-binarios (zero-copy)
# Elixir crea sub-binarios cuando es posible (zero-copy)
data = <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10>>
<<_::binary-size(2), rest::binary>> = data
# 'rest' es un sub-binario que apunta al mismo heap
# No hay copia de los bytes 3-10
# Forzar copia (cuando el original es muy grande)
rest_copy = :binary.copy(rest)
IOLists para construir mensajes
# Menos eficiente: concatenación crea nuevos binarios
def build_messages_slow(ticks) do
Enum.reduce(ticks, <<>>, fn tick, acc ->
acc <> encode_tick(tick) # Copia todo el acumulador cada vez!
end)
end
# Más eficiente: IOList, se concatena al enviar
def build_messages_fast(ticks) do
Enum.map(ticks, &encode_tick/1)
# Retorna lista de binarios, :gen_tcp.send acepta IOLists
end
# También puedes mezclar binarios y listas
iolist = [<<1, 2>>, [<<3>>, <<4, 5>>], 6]
:gen_tcp.send(socket, iolist) # Se concatena en kernel
Precalcular campos estáticos
defmodule FastEncoder do
# Precalcular header en compile-time
@tick_header <<0xFD01::16, 1::8, 1::8, 33::32>>
def encode_tick(symbol, price, volume, timestamp) do
# Solo concatenar header precalculado + payload
[@tick_header, encode_tick_payload(symbol, price, volume, timestamp)]
end
defp encode_tick_payload(symbol, price, volume, timestamp) do
symbol_padded = String.pad_trailing(symbol, 8, <<0>>)
<<
symbol_padded::binary,
price::float-64,
volume::unsigned-64,
timestamp::unsigned-64,
0::8
>>
end
end
Floats de precisión fija
Los precios financieros a menudo usan precisión fija para evitar errores de punto flotante:
defmodule FixedPoint do
# Precio con 8 decimales de precisión
@decimals 8
@multiplier trunc(:math.pow(10, @decimals))
def encode(price) when is_float(price) do
trunc(price * @multiplier)
end
def decode(encoded) do
encoded / @multiplier
end
# En el protocolo binario
def encode_price_binary(price) do
encoded = encode(price)
<<encoded::signed-64>>
end
def decode_price_binary(<<encoded::signed-64>>) do
decode(encoded)
end
end
# Ejemplo
FixedPoint.encode(67543.12345678)
# => 6754312345678
FixedPoint.decode(6754312345678)
# => 67543.12345678
Compresión de símbolos
defmodule SymbolTable do
use GenServer
# Mapear símbolos a IDs cortos
# "BTCUSD" (6 bytes) -> ID 1 (2 bytes)
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get_id(symbol) do
case :ets.lookup(:symbol_to_id, symbol) do
[{^symbol, id}] -> id
[] -> GenServer.call(__MODULE__, {:register, symbol})
end
end
def get_symbol(id) do
case :ets.lookup(:id_to_symbol, id) do
[{^id, symbol}] -> symbol
[] -> nil
end
end
@impl true
def init(_opts) do
:ets.new(:symbol_to_id, [:set, :named_table, :protected, read_concurrency: true])
:ets.new(:id_to_symbol, [:set, :named_table, :protected, read_concurrency: true])
{:ok, %{next_id: 1}}
end
@impl true
def handle_call({:register, symbol}, _from, state) do
# Double-check después de obtener el lock
case :ets.lookup(:symbol_to_id, symbol) do
[{^symbol, id}] ->
{:reply, id, state}
[] ->
id = state.next_id
:ets.insert(:symbol_to_id, {symbol, id})
:ets.insert(:id_to_symbol, {id, symbol})
{:reply, id, %{state | next_id: id + 1}}
end
end
end
Diseña e implementa un protocolo binario para actualizaciones de order book:
- Mensajes:
add_order,modify_order,delete_order - Incluye order_id, side (bid/ask), price, quantity
- Implementa encoder y decoder con tests
Compara el rendimiento de tu protocolo binario contra JSON:
- Usa Benchee para medir encode/decode de 1M de ticks
- Mide tamaño de mensaje promedio
- Calcula throughput teórico en Gbps de red
Conexión con el proyecto final
Nuestro sistema de distribución usará un protocolo binario custom:
- Ticks compactos: máxima eficiencia de red
- Symbol IDs: tabla compartida publisher-subscriber
- Streaming parser: manejo robusto de fragmentación TCP
- Heartbeats: detección de conexiones muertas