Phoenix PubSub at Scale - Prismatic Platform
Engineering

Phoenix PubSub at Scale: Topic Design and Fan-Out Patterns

Deep dive into Phoenix PubSub architecture for real-time event distribution at scale, covering topic design, fan-out patterns, message batching, and production conventions.

Mar 02, 2026 Β· 10 min read Β· Tomas Korcak (korczis)

Phoenix PubSub is deceptively simple on the surface: subscribe to a topic, broadcast a message, receive it in your process. But building a platform with hundreds of concurrent subscribers, dozens of topic hierarchies, and strict latency requirements demands a disciplined approach to topic design, fan-out control, and message structure.

This post documents the patterns we use in the Prismatic Platform to handle real-time events across 94 umbrella applications, thousands of LiveView processes, and multiple distributed nodes.

#Topic Architecture

The first decision in any PubSub system is topic naming. A flat namespace quickly becomes unmanageable. We use a hierarchical convention with colon-separated segments:

domain:entity:action
domain:entity:id:action

Concrete examples from the platform:

Topic PatternExamplePurpose
system_eventssystem_eventsPlatform-wide broadcasts (health, config changes)
error_patternserror_patternsError intelligence feed for pattern detection
dd:pipelinedd:pipelineDue diligence pipeline status updates
dd:case:{id}dd:case:abc123Per-case DD updates for subscribers
osint:adapter:{name}osint:adapter:shodanPer-adapter OSINT result streaming
investigation:{id}investigation:inv_42Investigation progress for a specific run

The key principle: generic topics for system-wide events, specific topics for entity-scoped updates. A LiveView showing a single DD case subscribes to dd:case:#{case_id}, not to a firehose of all DD events.

#Subscribing and Broadcasting

The standard pattern wraps PubSub operations in domain-specific modules rather than calling Phoenix.PubSub directly throughout the codebase:

defmodule Prismatic.Events.DDPipeline do
  @moduledoc """
  PubSub interface for DD pipeline events.
  Centralizes topic naming and message structure.
  """

  @pubsub Prismatic.PubSub
  @topic "dd:pipeline"

  @spec subscribe() :: :ok | {:error, term()}
  def subscribe do
    Phoenix.PubSub.subscribe(@pubsub, @topic)
  end

  @spec subscribe_case(String.t()) :: :ok | {:error, term()}
  def subscribe_case(case_id) when is_binary(case_id) do
    Phoenix.PubSub.subscribe(@pubsub, "dd:case:#{case_id}")
  end

  @spec broadcast_stage_complete(String.t(), atom(), map()) :: :ok | {:error, term()}
  def broadcast_stage_complete(case_id, stage, metadata) do
    message = %{
      event: :stage_complete,
      case_id: case_id,
      stage: stage,
      metadata: metadata,
      timestamp: System.monotonic_time(:millisecond)
    }

    Phoenix.PubSub.broadcast(@pubsub, @topic, {:dd_pipeline, message})
    Phoenix.PubSub.broadcast(@pubsub, "dd:case:#{case_id}", {:dd_case, message})
  end
end

This gives us three guarantees: topic names are never string-interpolated inline, message shapes are consistent, and we can add telemetry or filtering at a single point.

#Fan-Out Control

The most common performance problem with PubSub is uncontrolled fan-out. If 500 LiveView processes subscribe to system_events and you broadcast a large payload, you are copying that payload 500 times in memory. Two strategies mitigate this.

#Strategy 1: Payload Minimization

Never broadcast full entities. Send references and let subscribers fetch what they need:

# Bad: broadcasting the entire case struct to all subscribers
Phoenix.PubSub.broadcast(pubsub, "dd:pipeline", {:case_updated, full_case_struct})

# Good: broadcast a reference, let subscribers decide what to fetch
Phoenix.PubSub.broadcast(pubsub, "dd:pipeline", {:case_updated, %{
  case_id: case.id,
  field: :status,
  new_value: :completed,
  timestamp: System.monotonic_time(:millisecond)
}})

#Strategy 2: Topic Granularity

Instead of one topic with filtering, use granular topics so processes only receive what they care about:

# Instead of one topic with all events:
Phoenix.PubSub.broadcast(pubsub, "osint:results", {:result, adapter, data})

# Use per-adapter topics:
Phoenix.PubSub.broadcast(pubsub, "osint:adapter:shodan", {:result, data})
Phoenix.PubSub.broadcast(pubsub, "osint:adapter:virustotal", {:result, data})

A LiveView monitoring Shodan results subscribes only to osint:adapter:shodan and never sees VirusTotal traffic.

#Message Batching

For high-throughput scenarios like OSINT adapter results or error pattern streams, individual messages per event can overwhelm subscribers. We batch using a GenServer accumulator:

defmodule Prismatic.Events.BatchBroadcaster do
  @moduledoc """
  Accumulates events and broadcasts them in batches
  at configurable intervals.
  """
  use GenServer

  @flush_interval_ms 100
  @max_batch_size 50

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @spec enqueue(GenServer.server(), String.t(), term()) :: :ok
  def enqueue(server, topic, message) do
    GenServer.cast(server, {:enqueue, topic, message})
  end

  @impl true
  def init(opts) do
    schedule_flush()
    {:ok, %{buffer: %{}, pubsub: opts[:pubsub]}}
  end

  @impl true
  def handle_cast({:enqueue, topic, message}, state) do
    buffer = Map.update(state.buffer, topic, [message], fn msgs ->
      if length(msgs) >= @max_batch_size do
        flush_topic(state.pubsub, topic, [message | msgs])
        []
      else
        [message | msgs]
      end
    end)

    {:noreply, %{state | buffer: buffer}}
  end

  @impl true
  def handle_info(:flush, state) do
    Enum.each(state.buffer, fn {topic, messages} ->
      if messages != [] do
        flush_topic(state.pubsub, topic, messages)
      end
    end)

    schedule_flush()
    {:noreply, %{state | buffer: %{}}}
  end

  defp flush_topic(pubsub, topic, messages) do
    batch = %{
      event: :batch,
      messages: Enum.reverse(messages),
      count: length(messages),
      flushed_at: System.monotonic_time(:millisecond)
    }

    Phoenix.PubSub.broadcast(pubsub, topic, {:batch, batch})
  end

  defp schedule_flush do
    Process.send_after(self(), :flush, @flush_interval_ms)
  end
end

Subscribers receive batches of up to 50 messages every 100ms instead of individual messages. This reduces message passing overhead by an order of magnitude in high-throughput scenarios.

#LiveView Integration

On the LiveView side, handling PubSub messages follows a consistent pattern:

defmodule PrismaticWeb.DDCaseLive do
  use PrismaticWeb, :live_view

  @impl true
  def mount(%{"id" => case_id}, _session, socket) do
    if connected?(socket) do
      Prismatic.Events.DDPipeline.subscribe_case(case_id)
    end

    {:ok, assign(socket, case_id: case_id, events: [])}
  end

  @impl true
  def handle_info({:dd_case, %{event: :stage_complete} = event}, socket) do
    events = [event | socket.assigns.events] |> Enum.take(100)
    {:noreply, assign(socket, events: events)}
  end

  def handle_info({:batch, %{messages: messages}}, socket) do
    events = (messages ++ socket.assigns.events) |> Enum.take(100)
    {:noreply, assign(socket, events: events)}
  end
end

The connected?(socket) guard ensures we only subscribe after the WebSocket connection is established, not during the static HTTP render.

#Distributed PubSub

Phoenix PubSub uses Phoenix.PubSub.PG2 by default, which leverages Erlang’s :pg module for distributed process groups. When running multiple nodes, messages broadcast on one node automatically reach subscribers on all nodes.

The configuration is straightforward:

# In application.ex
children = [
  {Phoenix.PubSub, name: Prismatic.PubSub, adapter: Phoenix.PubSub.PG2}
]

For cross-node scenarios, keep payloads small. Large structs serialized across the distribution protocol add latency. The reference-based approach described above becomes even more critical in multi-node deployments.

#Telemetry Integration

Every broadcast in the platform emits telemetry events for the OTEL doctrine pillar:

defp broadcast_with_telemetry(pubsub, topic, message) do
  start_time = System.monotonic_time()

  result = Phoenix.PubSub.broadcast(pubsub, topic, message)

  :telemetry.execute(
    [:prismatic, :pubsub, :broadcast],
    %{duration: System.monotonic_time() - start_time},
    %{topic: topic, result: result}
  )

  result
end

This feeds into dashboards showing broadcast frequency per topic, latency percentiles, and subscriber counts, which is essential for detecting fan-out problems before they impact users.

#Summary

PatternWhen to UseBenefit
Domain-specific PubSub modulesAlwaysCentralized topic naming, consistent message shapes
Granular topicsEntity-scoped updatesReduced fan-out, less filtering
Payload minimizationLarge entity updatesLower memory pressure
Batch broadcastingHigh-throughput streamsOrder of magnitude fewer messages
Telemetry wrappingProduction systemsObservability into PubSub performance

PubSub is the nervous system of any Phoenix application. Treating topic design with the same rigor as database schema design pays dividends as the system scales.

Browse all β†’