Protocolos Binarios

Diseña e implementa protocolos binarios eficientes para datos de mercado. ⏱️ 2.5 horas

¿Por qué protocolos binarios?

Los sistemas financieros de baja latencia evitan formatos de texto como JSON o XML. Los protocolos binarios ofrecen:

📊 Protocolos de mercado reales

Los exchanges usan protocolos binarios como FIX/FAST, ITCH, OUCH, o propietarios. Un tick de precio en JSON puede ser 200+ bytes; en binario, 20-30 bytes. A millones de mensajes por segundo, la diferencia es enorme.

Binary pattern matching en Elixir

Elixir hereda de Erlang un poderoso sistema de pattern matching binario:

# Sintaxis básica
<<byte1, byte2, rest::binary>> = <<1, 2, 3, 4, 5>>
# byte1 = 1, byte2 = 2, rest = <<3, 4, 5>>

# Especificar tamaños
<<header::binary-size(4), payload::binary>> = data

# Enteros con tamaño específico
<<value::16>> = <<0x01, 0x00>>
# value = 256 (big-endian por defecto)

# Little-endian
<<value::little-16>> = <<0x01, 0x00>>
# value = 1

Modificadores de tipo

Modificador Descripción Ejemplo
integer Entero (default) x::integer
float IEEE 754 float x::float-64
binary Secuencia de bytes x::binary-size(10)
bits/bitstring Bits arbitrarios x::bits-size(3)
utf8/utf16/utf32 Codepoints Unicode x::utf8

Modificadores de endianness

# Big-endian (network byte order, default)
<<value::big-32>> = <<0, 0, 1, 0>>
# value = 256

# Little-endian (x86/ARM)
<<value::little-32>> = <<0, 1, 0, 0>>
# value = 256

# Native (usa el endianness de la máquina)
<<value::native-32>> = data

Diseñando un protocolo de market data

Diseñemos un protocolo simple para transmitir ticks de precio:

Estructura del mensaje

# Header (8 bytes)
# ┌─────────┬──────────┬──────────┬──────────┐
# │ Magic   │ Version  │ Type     │ Length   │
# │ 2 bytes │ 1 byte   │ 1 byte   │ 4 bytes  │
# └─────────┴──────────┴──────────┴──────────┘

# Tick Message (después del header)
# ┌──────────┬──────────┬──────────┬──────────┬──────────┐
# │ Symbol   │ Price    │ Volume   │ Timestamp│ Flags    │
# │ 8 bytes  │ 8 bytes  │ 8 bytes  │ 8 bytes  │ 1 byte   │
# └──────────┴──────────┴──────────┴──────────┴──────────┘

Implementación del codec

defmodule MarketProtocol do
  # Constantes del protocolo
  @magic 0xFD01
  @version 1

  # Tipos de mensaje
  @msg_tick 1
  @msg_heartbeat 2
  @msg_subscribe 3
  @msg_unsubscribe 4
  @msg_snapshot 5

  # Tamaños
  @header_size 8
  @tick_size 33
  @symbol_size 8

  ## Encoding

  def encode_tick(symbol, price, volume, timestamp, flags \\ 0) do
    symbol_padded = pad_symbol(symbol)

    payload = <<
      symbol_padded::binary-size(@symbol_size),
      price::float-64,
      volume::unsigned-64,
      timestamp::unsigned-64,
      flags::8
    >>

    encode_message(@msg_tick, payload)
  end

  def encode_heartbeat(timestamp) do
    payload = <<timestamp::unsigned-64>>
    encode_message(@msg_heartbeat, payload)
  end

  def encode_subscribe(symbol) do
    symbol_padded = pad_symbol(symbol)
    encode_message(@msg_subscribe, symbol_padded)
  end

  defp encode_message(type, payload) do
    length = byte_size(payload)
    <<
      @magic::16,
      @version::8,
      type::8,
      length::32,
      payload::binary
    >>
  end

  defp pad_symbol(symbol) when is_binary(symbol) do
    String.pad_trailing(symbol, @symbol_size, <<0>>)
    |> String.slice(0, @symbol_size)
  end

  ## Decoding

  def decode(data) when byte_size(data) < @header_size do
    {:incomplete, data}
  end

  def decode(<<@magic::16, @version::8, type::8, length::32, rest::binary>>) do
    if byte_size(rest) >= length do
      <<payload::binary-size(length), remaining::binary>> = rest
      message = decode_payload(type, payload)
      {:ok, message, remaining}
    else
      {:incomplete, <<@magic::16, @version::8, type::8, length::32, rest::binary>>}
    end
  end

  def decode(<<_::8, rest::binary>>) do
    # Magic inválido, intentar resincronizar
    decode(rest)
  end

  def decode(<<>>) do
    {:incomplete, <<>>}
  end

  defp decode_payload(@msg_tick, payload) do
    <<
      symbol_raw::binary-size(@symbol_size),
      price::float-64,
      volume::unsigned-64,
      timestamp::unsigned-64,
      flags::8
    >> = payload

    symbol = trim_symbol(symbol_raw)

    %{
      type: :tick,
      symbol: symbol,
      price: price,
      volume: volume,
      timestamp: timestamp,
      flags: flags
    }
  end

  defp decode_payload(@msg_heartbeat, <<timestamp::unsigned-64>>) do
    %{type: :heartbeat, timestamp: timestamp}
  end

  defp decode_payload(@msg_subscribe, symbol_raw) do
    %{type: :subscribe, symbol: trim_symbol(symbol_raw)}
  end

  defp decode_payload(@msg_unsubscribe, symbol_raw) do
    %{type: :unsubscribe, symbol: trim_symbol(symbol_raw)}
  end

  defp trim_symbol(symbol_raw) do
    symbol_raw
    |> String.trim_trailing(<<0>>)
    |> String.trim()
  end
end

Streaming parser (manejo de fragmentación)

TCP no garantiza que recibirás mensajes completos. Debes manejar:

defmodule MarketProtocol.Parser do
  defstruct buffer: <<>>

  def new, do: %__MODULE__{}

  def feed(%__MODULE__{buffer: buffer} = parser, data) do
    parse_all(%{parser | buffer: buffer <> data}, [])
  end

  defp parse_all(parser, messages) do
    case MarketProtocol.decode(parser.buffer) do
      {:ok, message, remaining} ->
        parse_all(%{parser | buffer: remaining}, [message | messages])

      {:incomplete, buffer} ->
        {Enum.reverse(messages), %{parser | buffer: buffer}}
    end
  end
end

# Uso en un handler TCP
def handle_info({:tcp, socket, data}, %{parser: parser} = state) do
  {messages, new_parser} = MarketProtocol.Parser.feed(parser, data)

  Enum.each(messages, &process_message/1)

  :inet.setopts(socket, [active: :once])
  {:noreply, %{state | parser: new_parser}}
end

Optimizaciones de rendimiento

Sub-binarios (zero-copy)

# Elixir crea sub-binarios cuando es posible (zero-copy)
data = <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10>>
<<_::binary-size(2), rest::binary>> = data
# 'rest' es un sub-binario que apunta al mismo heap
# No hay copia de los bytes 3-10

# Forzar copia (cuando el original es muy grande)
rest_copy = :binary.copy(rest)

IOLists para construir mensajes

# Menos eficiente: concatenación crea nuevos binarios
def build_messages_slow(ticks) do
  Enum.reduce(ticks, <<>>, fn tick, acc ->
    acc <> encode_tick(tick)  # Copia todo el acumulador cada vez!
  end)
end

# Más eficiente: IOList, se concatena al enviar
def build_messages_fast(ticks) do
  Enum.map(ticks, &encode_tick/1)
  # Retorna lista de binarios, :gen_tcp.send acepta IOLists
end

# También puedes mezclar binarios y listas
iolist = [<<1, 2>>, [<<3>>, <<4, 5>>], 6]
:gen_tcp.send(socket, iolist)  # Se concatena en kernel

Precalcular campos estáticos

defmodule FastEncoder do
  # Precalcular header en compile-time
  @tick_header <<0xFD01::16, 1::8, 1::8, 33::32>>

  def encode_tick(symbol, price, volume, timestamp) do
    # Solo concatenar header precalculado + payload
    [@tick_header, encode_tick_payload(symbol, price, volume, timestamp)]
  end

  defp encode_tick_payload(symbol, price, volume, timestamp) do
    symbol_padded = String.pad_trailing(symbol, 8, <<0>>)
    <<
      symbol_padded::binary,
      price::float-64,
      volume::unsigned-64,
      timestamp::unsigned-64,
      0::8
    >>
  end
end

Floats de precisión fija

Los precios financieros a menudo usan precisión fija para evitar errores de punto flotante:

defmodule FixedPoint do
  # Precio con 8 decimales de precisión
  @decimals 8
  @multiplier trunc(:math.pow(10, @decimals))

  def encode(price) when is_float(price) do
    trunc(price * @multiplier)
  end

  def decode(encoded) do
    encoded / @multiplier
  end

  # En el protocolo binario
  def encode_price_binary(price) do
    encoded = encode(price)
    <<encoded::signed-64>>
  end

  def decode_price_binary(<<encoded::signed-64>>) do
    decode(encoded)
  end
end

# Ejemplo
FixedPoint.encode(67543.12345678)
# => 6754312345678

FixedPoint.decode(6754312345678)
# => 67543.12345678

Compresión de símbolos

defmodule SymbolTable do
  use GenServer

  # Mapear símbolos a IDs cortos
  # "BTCUSD" (6 bytes) -> ID 1 (2 bytes)

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def get_id(symbol) do
    case :ets.lookup(:symbol_to_id, symbol) do
      [{^symbol, id}] -> id
      [] -> GenServer.call(__MODULE__, {:register, symbol})
    end
  end

  def get_symbol(id) do
    case :ets.lookup(:id_to_symbol, id) do
      [{^id, symbol}] -> symbol
      [] -> nil
    end
  end

  @impl true
  def init(_opts) do
    :ets.new(:symbol_to_id, [:set, :named_table, :protected, read_concurrency: true])
    :ets.new(:id_to_symbol, [:set, :named_table, :protected, read_concurrency: true])
    {:ok, %{next_id: 1}}
  end

  @impl true
  def handle_call({:register, symbol}, _from, state) do
    # Double-check después de obtener el lock
    case :ets.lookup(:symbol_to_id, symbol) do
      [{^symbol, id}] ->
        {:reply, id, state}

      [] ->
        id = state.next_id
        :ets.insert(:symbol_to_id, {symbol, id})
        :ets.insert(:id_to_symbol, {id, symbol})
        {:reply, id, %{state | next_id: id + 1}}
    end
  end
end
Ejercicio 10.1 Protocolo de Order Book Intermedio

Diseña e implementa un protocolo binario para actualizaciones de order book:

  • Mensajes: add_order, modify_order, delete_order
  • Incluye order_id, side (bid/ask), price, quantity
  • Implementa encoder y decoder con tests
Ejercicio 10.2 Benchmark: JSON vs Binario Avanzado

Compara el rendimiento de tu protocolo binario contra JSON:

  • Usa Benchee para medir encode/decode de 1M de ticks
  • Mide tamaño de mensaje promedio
  • Calcula throughput teórico en Gbps de red

Conexión con el proyecto final

Nuestro sistema de distribución usará un protocolo binario custom: