ETS y Almacenamiento

Almacenamiento en memoria de alto rendimiento para datos de mercado. ⏱️ 2.5 horas

¿Por qué ETS?

ETS (Erlang Term Storage) es una base de datos en memoria incluida en OTP. A diferencia del estado de un GenServer, ETS permite:

📊 ETS en sistemas financieros

ETS es ideal para cachés de precios, order books, y tablas de símbolos. Un GenServer puede ser el "dueño" de la tabla pero múltiples procesos (subscribers, APIs) pueden leer directamente sin pasar por él.

Crear una tabla ETS

# Crear tabla básica
table = :ets.new(:precios, [])

# Crear tabla con nombre (accesible globalmente)
:ets.new(:precios, [:named_table])

# Ahora podemos usar el nombre directamente
:ets.insert(:precios, {"BTCUSD", 67500.0})
:ets.lookup(:precios, "BTCUSD")
# => [{"BTCUSD", 67500.0}]

Tipos de tablas

Tipo Keys duplicadas Orden Uso típico
:set No Hash (sin orden) Cache key-value, precios
:ordered_set No Ordenado por key Order books, time series
:bag Sí (valores diferentes) Hash Índices secundarios
:duplicate_bag Sí (cualquier) Hash Event logs
# Set: solo una entrada por key
:ets.new(:price_cache, [:set, :named_table])

# Ordered set: keys ordenadas (ideal para order books)
:ets.new(:order_book, [:ordered_set, :named_table])

# Bag: múltiples valores por key
:ets.new(:symbol_index, [:bag, :named_table])

Opciones de acceso

:ets.new(:tabla, [
  :set,                  # tipo de tabla
  :named_table,          # accesible por nombre
  :public,               # cualquier proceso puede leer/escribir
  read_concurrency: true,   # optimizar para lecturas concurrentes
  write_concurrency: true   # optimizar para escrituras concurrentes
])

Niveles de acceso

💡 Para máximo rendimiento en lectura

Usa :protected con read_concurrency: true. Un solo proceso (GenServer) escribe, pero múltiples leen sin contención. Ideal para cachés de precios donde las actualizaciones vienen de un feed pero muchos subscribers leen.

Operaciones CRUD

Insertar

# Insertar una tupla (primer elemento es la key)
:ets.insert(:precios, {"BTCUSD", 67500.0, 1699900000})

# Insertar múltiples
:ets.insert(:precios, [
  {"BTCUSD", 67500.0, 1699900000},
  {"ETHUSD", 3800.0, 1699900000},
  {"SOLUSD", 180.0, 1699900000}
])

# insert_new: solo si la key no existe
:ets.insert_new(:precios, {"BTCUSD", 0.0, 0})
# => false (ya existe)

Leer

# Lookup por key exacta
:ets.lookup(:precios, "BTCUSD")
# => [{"BTCUSD", 67500.0, 1699900000}]

# Si no existe, retorna lista vacía
:ets.lookup(:precios, "DOGEUSD")
# => []

# lookup_element: obtener elemento específico de la tupla
:ets.lookup_element(:precios, "BTCUSD", 2)
# => 67500.0 (segundo elemento)

# member: verificar si existe
:ets.member(:precios, "BTCUSD")
# => true

Actualizar

# update_element: actualizar campo específico
:ets.update_element(:precios, "BTCUSD", {2, 68000.0})
# Actualiza posición 2 (precio)

# update_counter: incrementar atómicamente
:ets.new(:stats, [:set, :named_table, :public])
:ets.insert(:stats, {"BTCUSD", 0})  # {symbol, tick_count}

# Incrementar contador atómicamente
:ets.update_counter(:stats, "BTCUSD", {2, 1})
# => 1 (nuevo valor)

# Incrementar por cantidad específica
:ets.update_counter(:stats, "BTCUSD", {2, 10})
# => 11

Eliminar

# Eliminar por key
:ets.delete(:precios, "BTCUSD")

# Eliminar toda la tabla
:ets.delete(:precios)

# Eliminar todas las entradas (mantener tabla)
:ets.delete_all_objects(:precios)

Iteración y búsquedas

Recorrer toda la tabla

# first/next para iterar
key = :ets.first(:precios)
# => "BTCUSD"

next_key = :ets.next(:precios, key)
# => "ETHUSD" o :"$end_of_table"

# tab2list: obtener todo como lista (¡cuidado con tablas grandes!)
:ets.tab2list(:precios)
# => [{"BTCUSD", 67500.0, ...}, {"ETHUSD", ...}, ...]

# foldl para reducir
:ets.foldl(fn {_symbol, price, _ts}, acc -> acc + price end, 0, :precios)

Match specifications

# match: pattern matching simple
:ets.match(:precios, {:_, '$1', :_})
# => [[67500.0], [3800.0], ...]  (todos los precios)

# match_object: retorna tuplas completas
:ets.match_object(:precios, {"BTC" <> :_, :_, :_})
# No funciona así - necesitamos select para esto

Select con match specs

# select: el más poderoso pero complejo
# Match spec: [{pattern, guards, result}]

# Encontrar precios mayores a 1000
match_spec = [
  {
    {:_, '$1', :_},          # pattern: {symbol, price, timestamp}
    [{:>, '$1', 1000}],        # guard: price > 1000
    ['$_']                     # result: toda la tupla
  }
]

:ets.select(:precios, match_spec)
💡 Usa :ets.fun2ms para match specs

Las match specs son difíciles de escribir a mano. Usa la macro :ets.fun2ms/1 para generarlas desde funciones anónimas:

require Ex2ms  # Librería que facilita esto en Elixir

# O usando :ets.fun2ms en el shell de Erlang:
# :ets.fun2ms(fn {_, price, _} when price > 1000 -> price end)

# En Elixir, puedes construirlo manualmente o usar Ex2ms:
import Ex2ms

match_spec = fun do
  {symbol, price, ts} when price > 1000 -> {symbol, price}
end

:ets.select(:precios, match_spec)

ETS con GenServer (patrón común)

defmodule PriceCache do
  use GenServer

  @table :price_cache

  ## API Cliente

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # Lectura directa de ETS (no pasa por GenServer)
  def get(symbol) do
    case :ets.lookup(@table, symbol) do
      [{^symbol, price, timestamp}] -> {:ok, price, timestamp}
      [] -> :not_found
    end
  end

  def get_all do
    :ets.tab2list(@table)
  end

  # Escritura a través del GenServer (serializada)
  def put(symbol, price) do
    GenServer.cast(__MODULE__, {:put, symbol, price})
  end

  def delete(symbol) do
    GenServer.cast(__MODULE__, {:delete, symbol})
  end

  ## Callbacks

  @impl true
  def init(_opts) do
    table = :ets.new(@table, [
      :set,
      :named_table,
      :protected,              # solo este proceso escribe
      read_concurrency: true   # optimizar lecturas concurrentes
    ])

    {:ok, %{table: table}}
  end

  @impl true
  def handle_cast({:put, symbol, price}, state) do
    timestamp = System.system_time(:millisecond)
    :ets.insert(@table, {symbol, price, timestamp})
    {:noreply, state}
  end

  @impl true
  def handle_cast({:delete, symbol}, state) do
    :ets.delete(@table, symbol)
    {:noreply, state}
  end
end

Order Book con ordered_set

defmodule OrderBook do
  use GenServer

  defstruct [:symbol, :bids_table, :asks_table]

  def start_link(symbol) do
    GenServer.start_link(__MODULE__, symbol, name: via(symbol))
  end

  def add_bid(symbol, price, qty) do
    GenServer.cast(via(symbol), {:add_bid, price, qty})
  end

  def add_ask(symbol, price, qty) do
    GenServer.cast(via(symbol), {:add_ask, price, qty})
  end

  # Lectura directa de ETS
  def best_bid(symbol) do
    table = bids_table_name(symbol)
    case :ets.last(table) do
      :"$end_of_table" -> nil
      price ->
        [{^price, qty}] = :ets.lookup(table, price)
        {price, qty}
    end
  end

  def best_ask(symbol) do
    table = asks_table_name(symbol)
    case :ets.first(table) do
      :"$end_of_table" -> nil
      price ->
        [{^price, qty}] = :ets.lookup(table, price)
        {price, qty}
    end
  end

  def top_of_book(symbol) do
    %{
      bid: best_bid(symbol),
      ask: best_ask(symbol)
    }
  end

  # Obtener N niveles de profundidad
  def depth(symbol, levels) do
    %{
      bids: get_levels(bids_table_name(symbol), levels, :desc),
      asks: get_levels(asks_table_name(symbol), levels, :asc)
    }
  end

  defp get_levels(table, levels, direction) do
    start = if direction == :asc, do: :ets.first(table), else: :ets.last(table)
    collect_levels(table, start, levels, direction, [])
  end

  defp collect_levels(_table, :"$end_of_table", _remaining, _dir, acc), do: Enum.reverse(acc)
  defp collect_levels(_table, _key, 0, _dir, acc), do: Enum.reverse(acc)

  defp collect_levels(table, key, remaining, direction, acc) do
    [{price, qty}] = :ets.lookup(table, key)
    next = if direction == :asc, do: :ets.next(table, key), else: :ets.prev(table, key)
    collect_levels(table, next, remaining - 1, direction, [{price, qty} | acc])
  end

  ## Internals

  defp via(symbol), do: {:via, Registry, {OrderBook.Registry, symbol}}
  defp bids_table_name(symbol), do: String.to_atom("orderbook_bids_#{symbol}")
  defp asks_table_name(symbol), do: String.to_atom("orderbook_asks_#{symbol}")

  @impl true
  def init(symbol) do
    bids = :ets.new(bids_table_name(symbol), [
      :ordered_set, :named_table, :protected, read_concurrency: true
    ])
    asks = :ets.new(asks_table_name(symbol), [
      :ordered_set, :named_table, :protected, read_concurrency: true
    ])

    state = %__MODULE__{symbol: symbol, bids_table: bids, asks_table: asks}
    {:ok, state}
  end

  @impl true
  def handle_cast({:add_bid, price, qty}, state) do
    :ets.insert(state.bids_table, {price, qty})
    {:noreply, state}
  end

  @impl true
  def handle_cast({:add_ask, price, qty}, state) do
    :ets.insert(state.asks_table, {price, qty})
    {:noreply, state}
  end
end

Persistencia: DETS

DETS (Disk ETS) persiste datos a disco. Útil para cachés que sobrevivan reinicios, pero no para datos críticos de producción.

# Abrir archivo DETS
{:ok, table} = :dets.open_file(:price_history, [
  file: 'price_history.dets',
  type: :set
])

# Misma API que ETS
:dets.insert(:price_history, {"BTCUSD", 67500.0, 1699900000})
:dets.lookup(:price_history, "BTCUSD")

# Sincronizar a disco
:dets.sync(:price_history)

# Cerrar
:dets.close(:price_history)
⚠️ Limitaciones de DETS
  • Máximo 2GB por archivo
  • Más lento que ETS (I/O a disco)
  • No tiene :ordered_set
  • Para datos persistentes reales, usa una base de datos apropiada

Monitoreo y estadísticas

# Información de una tabla
:ets.info(:precios)
# => [read_concurrency: true, write_concurrency: false,
#     type: :set, size: 1000, memory: 12345, ...]

# Solo el tamaño
:ets.info(:precios, :size)
# => 1000

# Memoria en palabras (multiplicar por word size)
:ets.info(:precios, :memory)

# Todas las tablas
:ets.all()

# Tablas de un proceso específico
:ets.all() |> Enum.filter(fn t ->
  :ets.info(t, :owner) == pid
end)
Ejercicio 8.1 Cache con TTL Intermedio

Implementa un cache ETS con expiración automática:

  • Cada entrada tiene un TTL configurable
  • Un proceso periódico limpia entradas expiradas
  • get/1 retorna :expired si la entrada expiró
  • Usa update_counter para estadísticas de hits/misses
Ejercicio 8.2 Order Book completo Avanzado

Extiende el OrderBook para soportar:

  • Múltiples órdenes al mismo precio (aggregated depth)
  • remove_order/2 que decrementa cantidad
  • match_order/3 que ejecuta contra el book
  • Snapshot del book completo para recovery

Conexión con el proyecto final

ETS será fundamental en nuestro sistema de distribución: