Procesos y mensajes

Domina las primitivas fundamentales de concurrencia en Elixir: spawn, send, y receive. ⏱️ 2.5 horas

Creando procesos con spawn

La forma más básica de crear un proceso es con spawn/1, que recibe una función anónima:

# Crear un proceso que imprime algo
pid = spawn(fn ->
  IO.puts("¡Hola desde proceso #{inspect(self())}!")
end)

# pid es un Process ID, algo como #PID<0.123.0>
IO.puts("Proceso creado: #{inspect(pid)}")

El proceso se ejecuta de forma asíncrona. El código después de spawn continúa inmediatamente sin esperar a que el proceso termine.

spawn con módulo, función, argumentos

También puedes usar spawn/3 para especificar un módulo, función, y lista de argumentos:

defmodule MiWorker do
  def trabajar(dato) do
    IO.puts("Procesando: #{dato}")
  end
end

# Estas dos formas son equivalentes:
spawn(fn -> MiWorker.trabajar("datos") end)
spawn(MiWorker, :trabajar, ["datos"])

Enviando mensajes con send

Los procesos se comunican enviando mensajes. Un mensaje puede ser cualquier término de Elixir: átomos, tuplas, listas, maps, binarios, etc.

# send/2 envía un mensaje a un proceso
send(self(), :hola)
send(self(), {:precio, "BTCUSD", 67543.21})
send(self(), %{tipo: :tick, bid: 1.0842, ask: 1.0845})

send es asíncrono: retorna inmediatamente sin esperar a que el mensaje sea procesado. Los mensajes se encolan en el mailbox del proceso destino.

💡 self() devuelve tu propio PID

self() retorna el PID del proceso actual. Puedes enviarte mensajes a ti mismo, lo cual es útil para auto-programar tareas.

Recibiendo mensajes con receive

receive bloquea el proceso hasta que llega un mensaje que coincida con algún patrón:

# Enviamos mensajes al proceso actual
send(self(), {:precio, 100.5})
send(self(), {:volumen, 50000})

# Recibimos con pattern matching
receive do
  {:precio, valor} ->
    IO.puts("Precio recibido: #{valor}")
  
  {:volumen, valor} ->
    IO.puts("Volumen recibido: #{valor}")
    
  otro ->
    IO.puts("Mensaje desconocido: #{inspect(otro)}")
end

receive procesa un solo mensaje y luego continúa. Los mensajes se procesan en orden FIFO, pero el pattern matching puede hacer que un mensaje posterior se procese primero si coincide con un patrón anterior.

Timeout en receive

Por defecto, receive bloquea indefinidamente. Puedes añadir un timeout con after:

receive do
  {:precio, valor} ->
    IO.puts("Precio: #{valor}")
after
  5000 ->  # 5 segundos
    IO.puts("Timeout: no llegó precio en 5s")
end
⚠️ Mensajes no consumidos

Si un mensaje no coincide con ningún patrón en receive, permanece en el mailbox. Esto puede causar que el mailbox crezca indefinidamente si no manejas todos los tipos de mensajes posibles.

El loop de recepción

Un proceso que debe procesar múltiples mensajes necesita un loop recursivo:

defmodule PriceReceiver do
  def start do
    spawn(fn -> loop() end)
  end
  
  defp loop do
    receive do
      {:precio, symbol, precio} ->
        IO.puts("[#{symbol}] $#{precio}")
        loop()  # Llamada recursiva para seguir recibiendo
      
      :stop ->
        IO.puts("Deteniendo receptor")
        # No llamamos a loop(), el proceso termina
      
      _ ->
        loop()  # Ignorar mensajes desconocidos
    end
  end
end

# Uso:
pid = PriceReceiver.start()
send(pid, {:precio, "BTCUSD", 67543.21})
send(pid, {:precio, "ETHUSD", 3421.05})
send(pid, :stop)
💡 Tail call optimization

La llamada recursiva a loop() al final de cada rama es optimizada por la BEAM (tail call optimization). No consume stack, así que puede ejecutarse indefinidamente sin problemas.

Estado en procesos

Los procesos pueden mantener estado pasándolo como argumento en la recursión:

defmodule PriceAggregator do
  def start do
    spawn(fn -> loop(%{}) end)
  end
  
  defp loop(precios) do
    receive do
      {:precio, symbol, precio} ->
        nuevos_precios = Map.put(precios, symbol, precio)
        loop(nuevos_precios)  # Pasar nuevo estado
      
      {:get, symbol, caller} ->
        precio = Map.get(precios, symbol, :no_disponible)
        send(caller, {:precio_actual, symbol, precio})
        loop(precios)
      
      {:get_all, caller} ->
        send(caller, {:todos_precios, precios})
        loop(precios)
    end
  end
end

# Uso:
pid = PriceAggregator.start()

# Enviar precios
send(pid, {:precio, "BTCUSD", 67543.21})
send(pid, {:precio, "ETHUSD", 3421.05})

# Consultar precio
send(pid, {:get, "BTCUSD", self()})
receive do
  {:precio_actual, symbol, precio} ->
    IO.puts("#{symbol}: #{precio}")
end

Este patrón de proceso con estado es tan común que Elixir proporciona una abstracción: GenServer, que veremos en el capítulo 5.

Linking y monitoring de procesos

¿Qué pasa cuando un proceso falla? Por defecto, muere silenciosamente. Para detectar fallos, usamos links y monitors.

Links: destino compartido

Un link conecta dos procesos de forma bidireccional. Si uno muere, el otro también:

# spawn_link crea proceso y lo enlaza
pid = spawn_link(fn ->
  :timer.sleep(1000)
  raise "¡Error!"
end)

# Si ejecutas esto en IEx, también morirás (aunque IEx se reinicia)

Puedes atrapar exits para que el proceso no muera:

Process.flag(:trap_exit, true)

pid = spawn_link(fn ->
  :timer.sleep(100)
  raise "¡Error!"
end)

receive do
  {:EXIT, ^pid, razon} ->
    IO.puts("Proceso #{inspect(pid)} murió: #{inspect(razon)}")
end

Monitors: observación unidireccional

Un monitor observa un proceso sin afectarlo si el observador muere. Es más flexible que links:

pid = spawn(fn ->
  :timer.sleep(100)
  raise "¡Error!"
end)

# Crear monitor
ref = Process.monitor(pid)

receive do
  {:DOWN, ^ref, :process, ^pid, razon} ->
    IO.puts("Proceso monitoreado murió: #{inspect(razon)}")
end
📊 Links vs Monitors
Links Bidireccionales, propagan fallos. Usar para procesos que deben vivir o morir juntos.
Monitors Unidireccionales, solo notifican. Usar para observar sin dependencia.

Ejemplo: Publisher/Subscriber simple

Vamos a construir un sistema básico de pub/sub para datos de precios. Este será el embrión del proyecto final:

defmodule PricePubSub do
  def start do
    spawn(fn -> loop(MapSet.new()) end)
  end
  
  # API pública
  def subscribe(pubsub, subscriber_pid) do
    send(pubsub, {:subscribe, subscriber_pid})
  end
  
  def unsubscribe(pubsub, subscriber_pid) do
    send(pubsub, {:unsubscribe, subscriber_pid})
  end
  
  def publish(pubsub, mensaje) do
    send(pubsub, {:publish, mensaje})
  end
  
  # Loop interno
  defp loop(subscribers) do
    receive do
      {:subscribe, pid} ->
        Process.monitor(pid)  # Detectar si el subscriber muere
        loop(MapSet.put(subscribers, pid))
      
      {:unsubscribe, pid} ->
        loop(MapSet.delete(subscribers, pid))
      
      {:publish, mensaje} ->
        Enum.each(subscribers, fn pid ->
          send(pid, {:price_update, mensaje})
        end)
        loop(subscribers)
      
      {:DOWN, _ref, :process, pid, _reason} ->
        # Subscriber murió, eliminarlo
        loop(MapSet.delete(subscribers, pid))
    end
  end
end

# Crear subscriber
defmodule PriceLogger do
  def start(nombre) do
    spawn(fn -> loop(nombre) end)
  end
  
  defp loop(nombre) do
    receive do
      {:price_update, {symbol, precio}} ->
        ts = DateTime.utc_now() |> DateTime.to_string()
        IO.puts("[#{nombre}] #{ts} - #{symbol}: $#{precio}")
        loop(nombre)
    end
  end
end

Probándolo:

# Iniciar el pubsub
pubsub = PricePubSub.start()

# Crear varios subscribers
logger1 = PriceLogger.start("Logger-1")
logger2 = PriceLogger.start("Logger-2")

# Suscribirlos
PricePubSub.subscribe(pubsub, logger1)
PricePubSub.subscribe(pubsub, logger2)

# Publicar precios
PricePubSub.publish(pubsub, {"BTCUSD", 67543.21})
PricePubSub.publish(pubsub, {"ETHUSD", 3421.05})

# Output:
# [Logger-1] 2024-01-15 10:30:45Z - BTCUSD: $67543.21
# [Logger-2] 2024-01-15 10:30:45Z - BTCUSD: $67543.21
# [Logger-1] 2024-01-15 10:30:45Z - ETHUSD: $3421.05
# [Logger-2] 2024-01-15 10:30:45Z - ETHUSD: $3421.05
Ejercicio 2.1 Ping-Pong Básico

Crea dos procesos que jueguen ping-pong enviándose mensajes alternadamente. El proceso debe:

  1. Al recibir :ping, imprimir "Ping!" y responder :pong
  2. Al recibir :pong, imprimir "Pong!" y responder :ping
  3. Después de 10 intercambios, ambos procesos deben terminar
Ejercicio 2.2 Contador distribuido Intermedio

Implementa un proceso contador con las siguientes operaciones:

  • :increment - incrementa el contador
  • :decrement - decrementa el contador
  • {:add, n} - suma n al contador
  • {:get, caller} - envía el valor actual al caller
  • :reset - reinicia a 0

Bonus: haz que múltiples procesos puedan incrementar el contador concurrentemente y verifica que el valor final es correcto.

Ejercicio 2.3 Extender PricePubSub Intermedio

Modifica el PricePubSub para soportar:

  1. Suscripción por símbolo: {:subscribe, pid, "BTCUSD"}
  2. Solo enviar actualizaciones del símbolo suscrito
  3. Un subscriber puede suscribirse a múltiples símbolos
  4. Añade {:get_subscribers, caller} para obtener el conteo
Ejercicio 2.4 Rate limiter Avanzado

Crea un proceso que actúe como rate limiter para publicación:

  • Máximo N mensajes por segundo (configurable al iniciar)
  • Los mensajes que excedan el límite se descartan
  • Debe poder reportar cuántos mensajes se descartaron

Pista: usa :erlang.monotonic_time(:millisecond) para tracking de tiempo y considera una ventana deslizante.

Conexión con el proyecto final

Lo que hemos construido es un prototipo del sistema de distribución: