We can't find the internet
Attempting to reconnect
Something went wrong!
Attempting to reconnect
Certificate Transparency Monitoring: Real-Time Subdomain Discovery in Elixir
Implementing Certificate Transparency log monitoring in Elixir for continuous subdomain discovery, phishing detection from SAN fields, and real-time certificate stream processing with GenServer.
Tomas Korcak (korczis)
Prismatic Platform
Certificate Transparency as an Intelligence Source
Certificate Transparency (CT) logs are append-only, publicly auditable records of every TLS certificate issued by participating Certificate Authorities. Every time a CA issues a certificate for login.example.com or vpn-internal.corp.net, that fact becomes public record. For intelligence platforms, CT logs are a goldmine: they reveal subdomain structures, infrastructure changes, and potential phishing domains in real time.
The challenge is volume. Major CT logs process millions of certificates daily. An effective monitoring system must stream these entries continuously, extract actionable intelligence from Subject Alternative Name (SAN) fields, and alert on patterns that indicate phishing or unauthorized certificate issuance.
CT Log Stream Architecture
Our CT monitor uses a GenServer that polls CT log servers via their RFC 6962 API. Each log maintains a Merkle tree of certificate entries, and we track our position using the tree size:
defmodule Prismatic.OSINT.CT.Monitor do
@moduledoc """
Continuous Certificate Transparency log monitor.
Streams certificate entries, extracts domains from SAN fields,
and detects phishing patterns in real time.
"""
use GenServer
require Logger
@poll_interval_ms 30_000
@batch_size 256
@ct_logs [
%{name: "Google Argon", url: "https://ct.googleapis.com/logs/argon2025h1"},
%{name: "Cloudflare Nimbus", url: "https://ct.cloudflare.com/logs/nimbus2025"},
%{name: "DigiCert Yeti", url: "https://yeti2025.ct.digicert.com/log"}
]
defstruct [
:watched_domains,
log_positions: %{},
stats: %{processed: 0, matches: 0, phishing_alerts: 0}
]
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
watched = Keyword.get(opts, :watched_domains, [])
GenServer.start_link(__MODULE__, watched, name: __MODULE__)
end
@impl true
def init(watched_domains) do
state = %__MODULE__{
watched_domains: MapSet.new(watched_domains)
}
Enum.each(@ct_logs, fn log ->
Process.send_after(self(), {:poll_log, log}, 1_000)
end)
{:ok, state}
end
@impl true
def handle_info({:poll_log, log}, state) do
new_state =
case fetch_entries(log, state) do
{:ok, entries, new_position} ->
domains = extract_all_domains(entries)
matches = filter_watched(domains, state.watched_domains)
phishing = detect_phishing(domains, state.watched_domains)
broadcast_matches(matches)
broadcast_phishing_alerts(phishing)
positions = Map.put(state.log_positions, log.name, new_position)
stats = %{
state.stats
| processed: state.stats.processed + length(entries),
matches: state.stats.matches + length(matches),
phishing_alerts: state.stats.phishing_alerts + length(phishing)
}
%{state | log_positions: positions, stats: stats}
{:error, reason} ->
Logger.warning("CT poll failed for #{log.name}: #{inspect(reason)}")
state
end
Process.send_after(self(), {:poll_log, log}, @poll_interval_ms)
{:noreply, new_state}
end
end
SAN Field Extraction
Each certificate can contain dozens of domain names in the Subject Alternative Name extension. Parsing these requires handling both the leaf certificate and any pre-certificates:
defp extract_all_domains(entries) do
entries
|> Enum.flat_map(&extract_san_domains/1)
|> Enum.uniq()
end
defp extract_san_domains(%{"leaf_cert" => cert}) do
all_domains = Map.get(cert, "all_domains", [])
subject_cn = get_in(cert, ["subject", "CN"]) || ""
([subject_cn | all_domains])
|> Enum.reject(&(&1 == "" or &1 == "*"))
|> Enum.map(&clean_domain/1)
|> Enum.uniq()
end
defp extract_san_domains(_), do: []
defp clean_domain(domain) do
domain
|> String.downcase()
|> String.trim_leading("*.")
|> String.trim()
end
Phishing Detection Patterns
Phishing domains often mimic legitimate brands through typosquatting, homoglyph substitution, or strategic subdomain placement. Our detector scores each discovered domain against the watched list using multiple similarity algorithms:
|-----------------|-------------|---------------|
examp1e.com vs example.comexΠ°mple.com (Cyrillic 'Π°')example-login.attacker.comexample.xyz vs example.comexample-secure.com
defp detect_phishing(domains, watched_domains) do
watched_list = MapSet.to_list(watched_domains)
Enum.flat_map(domains, fn domain ->
Enum.flat_map(watched_list, fn watched ->
scores = [
{:levenshtein, levenshtein_score(domain, watched)},
{:keyword_embed, keyword_embed_score(domain, watched)},
{:tld_variation, tld_variation_score(domain, watched)},
{:combosquat, combosquat_score(domain, watched)}
]
max_score = scores |> Enum.map(&elem(&1, 1)) |> Enum.max()
if max_score > 0.7 and domain != watched do
method = scores |> Enum.max_by(&elem(&1, 1)) |> elem(0)
[%{
domain: domain,
target: watched,
method: method,
score: max_score,
detected_at: DateTime.utc_now()
}]
else
[]
end
end)
end)
end
defp levenshtein_score(domain, watched) do
base_a = extract_base_domain(domain)
base_b = extract_base_domain(watched)
distance = String.jaro_distance(base_a, base_b)
if distance > 0.85 and base_a != base_b, do: distance, else: 0.0
end
defp keyword_embed_score(domain, watched) do
brand = extract_base_domain(watched) |> String.split(".") |> List.first()
if String.contains?(domain, brand) and
extract_base_domain(domain) != extract_base_domain(watched) do
0.85
else
0.0
end
end
Real-Time Broadcasting
When the monitor detects a match or phishing attempt, it broadcasts through Phoenix PubSub for real-time dashboard updates:
defp broadcast_matches(matches) do
Enum.each(matches, fn match ->
Phoenix.PubSub.broadcast(
Prismatic.PubSub,
"ct:matches",
{:ct_match, match}
)
:telemetry.execute(
[:prismatic, :ct, :match],
%{count: 1},
%{domain: match.domain}
)
end)
end
defp broadcast_phishing_alerts(alerts) do
Enum.each(alerts, fn alert ->
Phoenix.PubSub.broadcast(
Prismatic.PubSub,
"ct:phishing",
{:phishing_alert, alert}
)
Logger.warning(
"Phishing detected: #{alert.domain} targeting #{alert.target} " <>
"(method=#{alert.method}, score=#{alert.score})"
)
end)
end
Production Deployment Considerations
Running CT monitoring in production requires addressing several scaling concerns. CT log servers can be slow or unreliable, so each log is polled independently with its own error backoff. Position tracking is persisted to ETS (and periodically to disk) so the monitor survives restarts without reprocessing millions of entries.
The watched domain list is managed dynamically β investigation workflows automatically add target domains, and completed cases remove them. This keeps the phishing detection focused and reduces false positives.
|--------|--------------|-----------------|
The CT monitor integrates with the broader OSINT mesh by publishing discovered subdomains to the entity enrichment pipeline. When a new subdomain appears for a watched domain, it automatically triggers DNS resolution, port scanning, and web technology fingerprinting β building a complete picture of infrastructure changes as they happen.