Procesos y mensajes
Domina las primitivas fundamentales de concurrencia en Elixir: spawn, send, y receive. ⏱️ 2.5 horas
Creando procesos con spawn
La forma más básica de crear un proceso es con spawn/1,
que recibe una función anónima:
# Crear un proceso que imprime algo
pid = spawn(fn ->
IO.puts("¡Hola desde proceso #{inspect(self())}!")
end)
# pid es un Process ID, algo como #PID<0.123.0>
IO.puts("Proceso creado: #{inspect(pid)}")
El proceso se ejecuta de forma asíncrona. El código después de
spawn continúa inmediatamente sin esperar a que
el proceso termine.
spawn con módulo, función, argumentos
También puedes usar spawn/3 para especificar un módulo,
función, y lista de argumentos:
defmodule MiWorker do
def trabajar(dato) do
IO.puts("Procesando: #{dato}")
end
end
# Estas dos formas son equivalentes:
spawn(fn -> MiWorker.trabajar("datos") end)
spawn(MiWorker, :trabajar, ["datos"])
Enviando mensajes con send
Los procesos se comunican enviando mensajes. Un mensaje puede ser cualquier término de Elixir: átomos, tuplas, listas, maps, binarios, etc.
# send/2 envía un mensaje a un proceso
send(self(), :hola)
send(self(), {:precio, "BTCUSD", 67543.21})
send(self(), %{tipo: :tick, bid: 1.0842, ask: 1.0845})
send es asíncrono: retorna inmediatamente sin esperar
a que el mensaje sea procesado. Los mensajes se encolan en el
mailbox del proceso destino.
self() retorna el PID del proceso actual. Puedes
enviarte mensajes a ti mismo, lo cual es útil para auto-programar
tareas.
Recibiendo mensajes con receive
receive bloquea el proceso hasta que llega un mensaje
que coincida con algún patrón:
# Enviamos mensajes al proceso actual
send(self(), {:precio, 100.5})
send(self(), {:volumen, 50000})
# Recibimos con pattern matching
receive do
{:precio, valor} ->
IO.puts("Precio recibido: #{valor}")
{:volumen, valor} ->
IO.puts("Volumen recibido: #{valor}")
otro ->
IO.puts("Mensaje desconocido: #{inspect(otro)}")
end
receive procesa un solo mensaje y luego
continúa. Los mensajes se procesan en orden FIFO, pero el pattern
matching puede hacer que un mensaje posterior se procese primero si
coincide con un patrón anterior.
Timeout en receive
Por defecto, receive bloquea indefinidamente. Puedes
añadir un timeout con after:
receive do
{:precio, valor} ->
IO.puts("Precio: #{valor}")
after
5000 -> # 5 segundos
IO.puts("Timeout: no llegó precio en 5s")
end
Si un mensaje no coincide con ningún patrón en receive,
permanece en el mailbox. Esto puede causar que el mailbox crezca
indefinidamente si no manejas todos los tipos de mensajes posibles.
El loop de recepción
Un proceso que debe procesar múltiples mensajes necesita un loop recursivo:
defmodule PriceReceiver do
def start do
spawn(fn -> loop() end)
end
defp loop do
receive do
{:precio, symbol, precio} ->
IO.puts("[#{symbol}] $#{precio}")
loop() # Llamada recursiva para seguir recibiendo
:stop ->
IO.puts("Deteniendo receptor")
# No llamamos a loop(), el proceso termina
_ ->
loop() # Ignorar mensajes desconocidos
end
end
end
# Uso:
pid = PriceReceiver.start()
send(pid, {:precio, "BTCUSD", 67543.21})
send(pid, {:precio, "ETHUSD", 3421.05})
send(pid, :stop)
La llamada recursiva a loop() al final de cada rama
es optimizada por la BEAM (tail call optimization). No consume
stack, así que puede ejecutarse indefinidamente sin problemas.
Estado en procesos
Los procesos pueden mantener estado pasándolo como argumento en la recursión:
defmodule PriceAggregator do
def start do
spawn(fn -> loop(%{}) end)
end
defp loop(precios) do
receive do
{:precio, symbol, precio} ->
nuevos_precios = Map.put(precios, symbol, precio)
loop(nuevos_precios) # Pasar nuevo estado
{:get, symbol, caller} ->
precio = Map.get(precios, symbol, :no_disponible)
send(caller, {:precio_actual, symbol, precio})
loop(precios)
{:get_all, caller} ->
send(caller, {:todos_precios, precios})
loop(precios)
end
end
end
# Uso:
pid = PriceAggregator.start()
# Enviar precios
send(pid, {:precio, "BTCUSD", 67543.21})
send(pid, {:precio, "ETHUSD", 3421.05})
# Consultar precio
send(pid, {:get, "BTCUSD", self()})
receive do
{:precio_actual, symbol, precio} ->
IO.puts("#{symbol}: #{precio}")
end
Este patrón de proceso con estado es tan común que Elixir proporciona una abstracción: GenServer, que veremos en el capítulo 5.
Linking y monitoring de procesos
¿Qué pasa cuando un proceso falla? Por defecto, muere silenciosamente. Para detectar fallos, usamos links y monitors.
Links: destino compartido
Un link conecta dos procesos de forma bidireccional. Si uno muere, el otro también:
# spawn_link crea proceso y lo enlaza
pid = spawn_link(fn ->
:timer.sleep(1000)
raise "¡Error!"
end)
# Si ejecutas esto en IEx, también morirás (aunque IEx se reinicia)
Puedes atrapar exits para que el proceso no muera:
Process.flag(:trap_exit, true)
pid = spawn_link(fn ->
:timer.sleep(100)
raise "¡Error!"
end)
receive do
{:EXIT, ^pid, razon} ->
IO.puts("Proceso #{inspect(pid)} murió: #{inspect(razon)}")
end
Monitors: observación unidireccional
Un monitor observa un proceso sin afectarlo si el observador muere. Es más flexible que links:
pid = spawn(fn ->
:timer.sleep(100)
raise "¡Error!"
end)
# Crear monitor
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, razon} ->
IO.puts("Proceso monitoreado murió: #{inspect(razon)}")
end
| Links | Bidireccionales, propagan fallos. Usar para procesos que deben vivir o morir juntos. |
| Monitors | Unidireccionales, solo notifican. Usar para observar sin dependencia. |
Ejemplo: Publisher/Subscriber simple
Vamos a construir un sistema básico de pub/sub para datos de precios. Este será el embrión del proyecto final:
defmodule PricePubSub do
def start do
spawn(fn -> loop(MapSet.new()) end)
end
# API pública
def subscribe(pubsub, subscriber_pid) do
send(pubsub, {:subscribe, subscriber_pid})
end
def unsubscribe(pubsub, subscriber_pid) do
send(pubsub, {:unsubscribe, subscriber_pid})
end
def publish(pubsub, mensaje) do
send(pubsub, {:publish, mensaje})
end
# Loop interno
defp loop(subscribers) do
receive do
{:subscribe, pid} ->
Process.monitor(pid) # Detectar si el subscriber muere
loop(MapSet.put(subscribers, pid))
{:unsubscribe, pid} ->
loop(MapSet.delete(subscribers, pid))
{:publish, mensaje} ->
Enum.each(subscribers, fn pid ->
send(pid, {:price_update, mensaje})
end)
loop(subscribers)
{:DOWN, _ref, :process, pid, _reason} ->
# Subscriber murió, eliminarlo
loop(MapSet.delete(subscribers, pid))
end
end
end
# Crear subscriber
defmodule PriceLogger do
def start(nombre) do
spawn(fn -> loop(nombre) end)
end
defp loop(nombre) do
receive do
{:price_update, {symbol, precio}} ->
ts = DateTime.utc_now() |> DateTime.to_string()
IO.puts("[#{nombre}] #{ts} - #{symbol}: $#{precio}")
loop(nombre)
end
end
end
Probándolo:
# Iniciar el pubsub
pubsub = PricePubSub.start()
# Crear varios subscribers
logger1 = PriceLogger.start("Logger-1")
logger2 = PriceLogger.start("Logger-2")
# Suscribirlos
PricePubSub.subscribe(pubsub, logger1)
PricePubSub.subscribe(pubsub, logger2)
# Publicar precios
PricePubSub.publish(pubsub, {"BTCUSD", 67543.21})
PricePubSub.publish(pubsub, {"ETHUSD", 3421.05})
# Output:
# [Logger-1] 2024-01-15 10:30:45Z - BTCUSD: $67543.21
# [Logger-2] 2024-01-15 10:30:45Z - BTCUSD: $67543.21
# [Logger-1] 2024-01-15 10:30:45Z - ETHUSD: $3421.05
# [Logger-2] 2024-01-15 10:30:45Z - ETHUSD: $3421.05
Crea dos procesos que jueguen ping-pong enviándose mensajes alternadamente. El proceso debe:
- Al recibir
:ping, imprimir "Ping!" y responder:pong - Al recibir
:pong, imprimir "Pong!" y responder:ping - Después de 10 intercambios, ambos procesos deben terminar
Implementa un proceso contador con las siguientes operaciones:
:increment- incrementa el contador:decrement- decrementa el contador{:add, n}- suma n al contador{:get, caller}- envía el valor actual al caller:reset- reinicia a 0
Bonus: haz que múltiples procesos puedan incrementar el contador concurrentemente y verifica que el valor final es correcto.
Modifica el PricePubSub para soportar:
- Suscripción por símbolo:
{:subscribe, pid, "BTCUSD"} - Solo enviar actualizaciones del símbolo suscrito
- Un subscriber puede suscribirse a múltiples símbolos
- Añade
{:get_subscribers, caller}para obtener el conteo
Crea un proceso que actúe como rate limiter para publicación:
- Máximo N mensajes por segundo (configurable al iniciar)
- Los mensajes que excedan el límite se descartan
- Debe poder reportar cuántos mensajes se descartaron
Pista: usa :erlang.monotonic_time(:millisecond) para
tracking de tiempo y considera una ventana deslizante.
Conexión con el proyecto final
Lo que hemos construido es un prototipo del sistema de distribución:
- PricePubSub será evolucionado a un GenServer con process groups para distribución entre nodos.
- Monitors se usarán para detectar subscribers desconectados y limpiar recursos.
-
El patrón send/receive será abstraído por GenServer
con
callycast.