ETS y Almacenamiento
Almacenamiento en memoria de alto rendimiento para datos de mercado. ⏱️ 2.5 horas
¿Por qué ETS?
ETS (Erlang Term Storage) es una base de datos en memoria incluida en OTP. A diferencia del estado de un GenServer, ETS permite:
- Acceso concurrente: múltiples procesos leen/escriben simultáneamente
- Sin cuello de botella: no hay proceso único serializando acceso
- Latencia constante: O(1) para lookups por key
- Gran capacidad: millones de entradas sin degradación
ETS es ideal para cachés de precios, order books, y tablas de símbolos. Un GenServer puede ser el "dueño" de la tabla pero múltiples procesos (subscribers, APIs) pueden leer directamente sin pasar por él.
Crear una tabla ETS
# Crear tabla básica
table = :ets.new(:precios, [])
# Crear tabla con nombre (accesible globalmente)
:ets.new(:precios, [:named_table])
# Ahora podemos usar el nombre directamente
:ets.insert(:precios, {"BTCUSD", 67500.0})
:ets.lookup(:precios, "BTCUSD")
# => [{"BTCUSD", 67500.0}]
Tipos de tablas
| Tipo | Keys duplicadas | Orden | Uso típico |
|---|---|---|---|
:set |
No | Hash (sin orden) | Cache key-value, precios |
:ordered_set |
No | Ordenado por key | Order books, time series |
:bag |
Sí (valores diferentes) | Hash | Índices secundarios |
:duplicate_bag |
Sí (cualquier) | Hash | Event logs |
# Set: solo una entrada por key
:ets.new(:price_cache, [:set, :named_table])
# Ordered set: keys ordenadas (ideal para order books)
:ets.new(:order_book, [:ordered_set, :named_table])
# Bag: múltiples valores por key
:ets.new(:symbol_index, [:bag, :named_table])
Opciones de acceso
:ets.new(:tabla, [
:set, # tipo de tabla
:named_table, # accesible por nombre
:public, # cualquier proceso puede leer/escribir
read_concurrency: true, # optimizar para lecturas concurrentes
write_concurrency: true # optimizar para escrituras concurrentes
])
Niveles de acceso
:private- solo el proceso dueño puede acceder:protected- cualquiera lee, solo dueño escribe (default):public- cualquiera lee y escribe
Usa :protected con read_concurrency: true.
Un solo proceso (GenServer) escribe, pero múltiples leen sin
contención. Ideal para cachés de precios donde las actualizaciones
vienen de un feed pero muchos subscribers leen.
Operaciones CRUD
Insertar
# Insertar una tupla (primer elemento es la key)
:ets.insert(:precios, {"BTCUSD", 67500.0, 1699900000})
# Insertar múltiples
:ets.insert(:precios, [
{"BTCUSD", 67500.0, 1699900000},
{"ETHUSD", 3800.0, 1699900000},
{"SOLUSD", 180.0, 1699900000}
])
# insert_new: solo si la key no existe
:ets.insert_new(:precios, {"BTCUSD", 0.0, 0})
# => false (ya existe)
Leer
# Lookup por key exacta
:ets.lookup(:precios, "BTCUSD")
# => [{"BTCUSD", 67500.0, 1699900000}]
# Si no existe, retorna lista vacía
:ets.lookup(:precios, "DOGEUSD")
# => []
# lookup_element: obtener elemento específico de la tupla
:ets.lookup_element(:precios, "BTCUSD", 2)
# => 67500.0 (segundo elemento)
# member: verificar si existe
:ets.member(:precios, "BTCUSD")
# => true
Actualizar
# update_element: actualizar campo específico
:ets.update_element(:precios, "BTCUSD", {2, 68000.0})
# Actualiza posición 2 (precio)
# update_counter: incrementar atómicamente
:ets.new(:stats, [:set, :named_table, :public])
:ets.insert(:stats, {"BTCUSD", 0}) # {symbol, tick_count}
# Incrementar contador atómicamente
:ets.update_counter(:stats, "BTCUSD", {2, 1})
# => 1 (nuevo valor)
# Incrementar por cantidad específica
:ets.update_counter(:stats, "BTCUSD", {2, 10})
# => 11
Eliminar
# Eliminar por key
:ets.delete(:precios, "BTCUSD")
# Eliminar toda la tabla
:ets.delete(:precios)
# Eliminar todas las entradas (mantener tabla)
:ets.delete_all_objects(:precios)
Iteración y búsquedas
Recorrer toda la tabla
# first/next para iterar
key = :ets.first(:precios)
# => "BTCUSD"
next_key = :ets.next(:precios, key)
# => "ETHUSD" o :"$end_of_table"
# tab2list: obtener todo como lista (¡cuidado con tablas grandes!)
:ets.tab2list(:precios)
# => [{"BTCUSD", 67500.0, ...}, {"ETHUSD", ...}, ...]
# foldl para reducir
:ets.foldl(fn {_symbol, price, _ts}, acc -> acc + price end, 0, :precios)
Match specifications
# match: pattern matching simple
:ets.match(:precios, {:_, '$1', :_})
# => [[67500.0], [3800.0], ...] (todos los precios)
# match_object: retorna tuplas completas
:ets.match_object(:precios, {"BTC" <> :_, :_, :_})
# No funciona así - necesitamos select para esto
Select con match specs
# select: el más poderoso pero complejo
# Match spec: [{pattern, guards, result}]
# Encontrar precios mayores a 1000
match_spec = [
{
{:_, '$1', :_}, # pattern: {symbol, price, timestamp}
[{:>, '$1', 1000}], # guard: price > 1000
['$_'] # result: toda la tupla
}
]
:ets.select(:precios, match_spec)
Las match specs son difíciles de escribir a mano. Usa la macro
:ets.fun2ms/1 para generarlas desde funciones anónimas:
require Ex2ms # Librería que facilita esto en Elixir
# O usando :ets.fun2ms en el shell de Erlang:
# :ets.fun2ms(fn {_, price, _} when price > 1000 -> price end)
# En Elixir, puedes construirlo manualmente o usar Ex2ms:
import Ex2ms
match_spec = fun do
{symbol, price, ts} when price > 1000 -> {symbol, price}
end
:ets.select(:precios, match_spec)
ETS con GenServer (patrón común)
defmodule PriceCache do
use GenServer
@table :price_cache
## API Cliente
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
# Lectura directa de ETS (no pasa por GenServer)
def get(symbol) do
case :ets.lookup(@table, symbol) do
[{^symbol, price, timestamp}] -> {:ok, price, timestamp}
[] -> :not_found
end
end
def get_all do
:ets.tab2list(@table)
end
# Escritura a través del GenServer (serializada)
def put(symbol, price) do
GenServer.cast(__MODULE__, {:put, symbol, price})
end
def delete(symbol) do
GenServer.cast(__MODULE__, {:delete, symbol})
end
## Callbacks
@impl true
def init(_opts) do
table = :ets.new(@table, [
:set,
:named_table,
:protected, # solo este proceso escribe
read_concurrency: true # optimizar lecturas concurrentes
])
{:ok, %{table: table}}
end
@impl true
def handle_cast({:put, symbol, price}, state) do
timestamp = System.system_time(:millisecond)
:ets.insert(@table, {symbol, price, timestamp})
{:noreply, state}
end
@impl true
def handle_cast({:delete, symbol}, state) do
:ets.delete(@table, symbol)
{:noreply, state}
end
end
Order Book con ordered_set
defmodule OrderBook do
use GenServer
defstruct [:symbol, :bids_table, :asks_table]
def start_link(symbol) do
GenServer.start_link(__MODULE__, symbol, name: via(symbol))
end
def add_bid(symbol, price, qty) do
GenServer.cast(via(symbol), {:add_bid, price, qty})
end
def add_ask(symbol, price, qty) do
GenServer.cast(via(symbol), {:add_ask, price, qty})
end
# Lectura directa de ETS
def best_bid(symbol) do
table = bids_table_name(symbol)
case :ets.last(table) do
:"$end_of_table" -> nil
price ->
[{^price, qty}] = :ets.lookup(table, price)
{price, qty}
end
end
def best_ask(symbol) do
table = asks_table_name(symbol)
case :ets.first(table) do
:"$end_of_table" -> nil
price ->
[{^price, qty}] = :ets.lookup(table, price)
{price, qty}
end
end
def top_of_book(symbol) do
%{
bid: best_bid(symbol),
ask: best_ask(symbol)
}
end
# Obtener N niveles de profundidad
def depth(symbol, levels) do
%{
bids: get_levels(bids_table_name(symbol), levels, :desc),
asks: get_levels(asks_table_name(symbol), levels, :asc)
}
end
defp get_levels(table, levels, direction) do
start = if direction == :asc, do: :ets.first(table), else: :ets.last(table)
collect_levels(table, start, levels, direction, [])
end
defp collect_levels(_table, :"$end_of_table", _remaining, _dir, acc), do: Enum.reverse(acc)
defp collect_levels(_table, _key, 0, _dir, acc), do: Enum.reverse(acc)
defp collect_levels(table, key, remaining, direction, acc) do
[{price, qty}] = :ets.lookup(table, key)
next = if direction == :asc, do: :ets.next(table, key), else: :ets.prev(table, key)
collect_levels(table, next, remaining - 1, direction, [{price, qty} | acc])
end
## Internals
defp via(symbol), do: {:via, Registry, {OrderBook.Registry, symbol}}
defp bids_table_name(symbol), do: String.to_atom("orderbook_bids_#{symbol}")
defp asks_table_name(symbol), do: String.to_atom("orderbook_asks_#{symbol}")
@impl true
def init(symbol) do
bids = :ets.new(bids_table_name(symbol), [
:ordered_set, :named_table, :protected, read_concurrency: true
])
asks = :ets.new(asks_table_name(symbol), [
:ordered_set, :named_table, :protected, read_concurrency: true
])
state = %__MODULE__{symbol: symbol, bids_table: bids, asks_table: asks}
{:ok, state}
end
@impl true
def handle_cast({:add_bid, price, qty}, state) do
:ets.insert(state.bids_table, {price, qty})
{:noreply, state}
end
@impl true
def handle_cast({:add_ask, price, qty}, state) do
:ets.insert(state.asks_table, {price, qty})
{:noreply, state}
end
end
Persistencia: DETS
DETS (Disk ETS) persiste datos a disco. Útil para cachés que sobrevivan reinicios, pero no para datos críticos de producción.
# Abrir archivo DETS
{:ok, table} = :dets.open_file(:price_history, [
file: 'price_history.dets',
type: :set
])
# Misma API que ETS
:dets.insert(:price_history, {"BTCUSD", 67500.0, 1699900000})
:dets.lookup(:price_history, "BTCUSD")
# Sincronizar a disco
:dets.sync(:price_history)
# Cerrar
:dets.close(:price_history)
- Máximo 2GB por archivo
- Más lento que ETS (I/O a disco)
- No tiene
:ordered_set - Para datos persistentes reales, usa una base de datos apropiada
Monitoreo y estadísticas
# Información de una tabla
:ets.info(:precios)
# => [read_concurrency: true, write_concurrency: false,
# type: :set, size: 1000, memory: 12345, ...]
# Solo el tamaño
:ets.info(:precios, :size)
# => 1000
# Memoria en palabras (multiplicar por word size)
:ets.info(:precios, :memory)
# Todas las tablas
:ets.all()
# Tablas de un proceso específico
:ets.all() |> Enum.filter(fn t ->
:ets.info(t, :owner) == pid
end)
Implementa un cache ETS con expiración automática:
- Cada entrada tiene un TTL configurable
- Un proceso periódico limpia entradas expiradas
get/1retorna:expiredsi la entrada expiró- Usa
update_counterpara estadísticas de hits/misses
Extiende el OrderBook para soportar:
- Múltiples órdenes al mismo precio (aggregated depth)
remove_order/2que decrementa cantidadmatch_order/3que ejecuta contra el book- Snapshot del book completo para recovery
Conexión con el proyecto final
ETS será fundamental en nuestro sistema de distribución:
- Price Cache: últimos precios accesibles por cualquier subscriber
- Subscriber Registry: tracking de conexiones activas
- Rate Limiting: contadores atómicos por cliente
- Statistics: métricas de ticks procesados, latencia, etc.