Mnesia Distribuido

Base de datos distribuida integrada en OTP para estado compartido. ⏱️ 2.5 horas

¿Qué es Mnesia?

Mnesia es la base de datos distribuida de Erlang/OTP. A diferencia de bases de datos externas, Mnesia vive dentro de la VM:

⚠️ Limitaciones de Mnesia

Mnesia no está diseñado para big data. Funciona bien para configuración, sesiones, y estado compartido pequeño-mediano. Para datos masivos, usa PostgreSQL, Cassandra, etc.

Configuración inicial

# Crear directorio de datos (antes de iniciar Mnesia)
Application.put_env(:mnesia, :dir, '/tmp/mnesia_data')

# Crear schema (solo una vez por cluster)
:mnesia.create_schema([node()])

# Iniciar Mnesia
:mnesia.start()

# Ver estado
:mnesia.info()

Crear tablas

# Tabla simple
:mnesia.create_table(:users, [
  attributes: [:id, :name, :email, :created_at]
])

# Tabla con opciones
:mnesia.create_table(:orders, [
  attributes: [:id, :user_id, :symbol, :side, :price, :qty, :status],
  type: :ordered_set,          # :set, :bag, :ordered_set
  disc_copies: [node()],       # Persistir a disco
  index: [:user_id, :symbol]  # Índices secundarios
])

# Tabla solo en RAM (más rápida)
:mnesia.create_table(:price_cache, [
  attributes: [:symbol, :price, :timestamp],
  ram_copies: [node()]
])

Tipos de copia

Tipo RAM Disco Uso
ram_copies No Cache, datos volátiles
disc_copies Datos persistentes frecuentes
disc_only_copies No Datos grandes poco accedidos

Operaciones CRUD

# Escribir (requiere transacción)
:mnesia.transaction(fn ->
  :mnesia.write({:users, 1, "Alice", "[email protected]", DateTime.utc_now()})
end)

# Leer por key primaria
:mnesia.transaction(fn ->
  :mnesia.read({:users, 1})
end)
# => {:atomic, [{:users, 1, "Alice", "[email protected]", ~U[...]}]}

# Leer sin transacción (dirty, más rápido)
:mnesia.dirty_read({:users, 1})

# Eliminar
:mnesia.transaction(fn ->
  :mnesia.delete({:users, 1})
end)

# Leer por índice secundario
:mnesia.transaction(fn ->
  :mnesia.index_read(:orders, "BTCUSD", :symbol)
end)

Transacciones vs Dirty

# Transacción: ACID, más lento, distribuido
:mnesia.transaction(fn ->
  user = :mnesia.read({:users, 1})
  # ... modificar ...
  :mnesia.write(new_user)
end)

# Dirty: sin garantías, más rápido
:mnesia.dirty_write({:price_cache, "BTCUSD", 67500.0, System.system_time()})
:mnesia.dirty_read({:price_cache, "BTCUSD"})

# activity: alternativa flexible
:mnesia.activity(:sync_dirty, fn ->
  :mnesia.write({:price_cache, "BTCUSD", 67500.0, System.system_time()})
end)

Queries con QLC

import Record
require Qlc

# Query básica
:mnesia.transaction(fn ->
  query = :qlc.q([
    order ||
    order <- :mnesia.table(:orders),
    elem(order, 3) == "BTCUSD"  # symbol
  ])
  :qlc.eval(query)
end)

# Match spec (más eficiente para queries simples)
:mnesia.transaction(fn ->
  :mnesia.select(:orders, [
    {
      {:orders, :_, :_, "BTCUSD", :_, '$1', '$2', :_},
      [],
      [{'$1', '$2'}]  # Retornar {price, qty}
    }
  ])
end)

# Dirty select
:mnesia.dirty_select(:orders, match_spec)

Replicación distribuida

# En node1: crear schema
:mnesia.create_schema([:node1@host, :node2@host])

# Iniciar en ambos nodos
:mnesia.start()

# Crear tabla replicada
:mnesia.create_table(:config, [
  attributes: [:key, :value],
  disc_copies: [:node1@host, :node2@host]  # Copia en ambos
])

# Añadir copia de tabla existente a otro nodo
:mnesia.add_table_copy(:orders, :node2@host, :disc_copies)

# Mover copia entre tipos
:mnesia.change_table_copy_type(:orders, :node1@host, :ram_copies)

Wrapper Elixir idiomático

defmodule OrderStore do
  require Logger

  @table :orders

  defmodule Order do
    defstruct [:id, :user_id, :symbol, :side, :price, :qty, :status]
  end

  def setup do
    case :mnesia.create_table(@table, [
      attributes: [:id, :user_id, :symbol, :side, :price, :qty, :status],
      type: :ordered_set,
      disc_copies: [node()],
      index: [:user_id, :symbol]
    ]) do
      {:atomic, :ok} -> :ok
      {:aborted, {:already_exists, @table}} -> :ok
      error -> error
    end
  end

  def insert(%Order{} = order) do
    record = order_to_record(order)
    case :mnesia.transaction(fn -> :mnesia.write(record) end) do
      {:atomic, :ok} -> {:ok, order}
      {:aborted, reason} -> {:error, reason}
    end
  end

  def get(id) do
    case :mnesia.dirty_read({@table, id}) do
      [record] -> {:ok, record_to_order(record)}
      [] -> {:error, :not_found}
    end
  end

  def get_by_symbol(symbol) do
    :mnesia.dirty_index_read(@table, symbol, :symbol)
    |> Enum.map(&record_to_order/1)
  end

  def get_by_user(user_id) do
    :mnesia.dirty_index_read(@table, user_id, :user_id)
    |> Enum.map(&record_to_order/1)
  end

  def update_status(id, new_status) do
    :mnesia.transaction(fn ->
      case :mnesia.read({@table, id}) do
        [record] ->
          updated = put_elem(record, 7, new_status)
          :mnesia.write(updated)
        [] ->
          :mnesia.abort(:not_found)
      end
    end)
    |> handle_result()
  end

  def delete(id) do
    :mnesia.transaction(fn -> :mnesia.delete({@table, id}) end)
    |> handle_result()
  end

  defp order_to_record(%Order{} = o) do
    {@table, o.id, o.user_id, o.symbol, o.side, o.price, o.qty, o.status}
  end

  defp record_to_order({@table, id, user_id, symbol, side, price, qty, status}) do
    %Order{
      id: id,
      user_id: user_id,
      symbol: symbol,
      side: side,
      price: price,
      qty: qty,
      status: status
    }
  end

  defp handle_result({:atomic, result}), do: {:ok, result}
  defp handle_result({:aborted, reason}), do: {:error, reason}
end

Suscripción a cambios

defmodule OrderWatcher do
  use GenServer
  require Logger

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    :mnesia.subscribe({:table, :orders, :detailed})
    {:ok, %{}}
  end

  @impl true
  def handle_info({:mnesia_table_event, {:write, :orders, record, _, _}}, state) do
    Logger.info("Order written: #{inspect(record)}")
    {:noreply, state}
  end

  @impl true
  def handle_info({:mnesia_table_event, {:delete, :orders, key, _, _}}, state) do
    Logger.info("Order deleted: #{inspect(key)}")
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    Logger.debug("Mnesia event: #{inspect(msg)}")
    {:noreply, state}
  end
end
Ejercicio 15.1 Session Store con Mnesia Intermedio

Implementa un almacén de sesiones distribuido:

  • Tabla :sessions con token, user_id, expires_at
  • Replicada en RAM en todos los nodos
  • Proceso que limpia sesiones expiradas periódicamente
  • Funciones: create_session, get_session, invalidate_session
Ejercicio 15.2 Config Store distribuido Avanzado

Crea un sistema de configuración distribuido:

  • Tabla de configuración con versionado
  • API: get_config, set_config, get_history
  • Notificaciones a procesos interesados cuando cambia
  • Rollback a versiones anteriores

Conexión con el proyecto final

Mnesia puede almacenar metadata del sistema de distribución: