Heap & Priority Queue: Dijkstra's Algorithm
Dijkstra's algorithm finds the shortest paths from a source vertex to all other vertices in a weighted graph with non-negative edge weights. It's a fundamental graph algorithm that showcases the power of priority queues for optimal pathfinding.
This algorithm is essential for GPS navigation, network routing, and any shortest path optimization problem.
Common Interview Questions
Network Delay Time (LC 743)
- Problem: Given a network of nodes and travel times, find the time it takes for a signal to reach all nodes from a given source.
- Heap Signal: Classic single-source shortest path problem requiring Dijkstra's algorithm.
Cheapest Flights Within K Stops (LC 787)
- Problem: Find the cheapest price to fly from source to destination with at most K stops.
- Heap Signal: Modified Dijkstra with additional state (number of stops).
Path with Maximum Probability (LC 1514)
- Problem: Find the path from start to end with maximum probability of success.
- Heap Signal: Dijkstra variant maximizing probability instead of minimizing distance.
Minimum Cost to Make at Least One Valid Path (LC 1368)
- Problem: Given a grid with directional arrows, find minimum cost to reach bottom-right from top-left.
- Heap Signal: Dijkstra on grid with edge weights representing direction changes.
Core Logic & Approach
Dijkstra's algorithm uses a greedy approach with a priority queue:
- Initialize: Set distance to source as 0, all others as infinity
- Priority Queue: Always process the unvisited vertex with minimum distance
- Relaxation: Update distances to neighbors if a shorter path is found
- Termination: Continue until all vertices are processed or target is reached
Key insights:
- Greedy choice: Always expanding the closest unvisited vertex is optimal
- No negative weights: Ensures that once a vertex is visited, its shortest distance is final
- Priority queue: Efficiently selects the next vertex to process
Basic Dijkstra Template
This is the standard implementation for finding the shortest distance from a single source to all other nodes in a graph.
Logic: The algorithm maintains a set of visited nodes and a priority queue of unvisited nodes, prioritized by their current shortest distance from the source.
- Initialization:
distances: A map or array to store the shortest distance found so far for each node. Initialize thestartnode's distance to0and all others to infinity.pq(Priority Queue): A min-heap to store(distance, node)pairs. It's initialized with(0, start). This allows us to always process the closest node next.visited: A set to track nodes for which we have already found the absolute shortest path.
- Main Loop: While the priority queue is not empty:
- Pop the node
uwith the smallest distance frompq. - If
uhas already been visited, skip it. This handles outdated, longer paths that might still be in the queue. - Mark
uas visited. Its current distance is now finalized as the shortest path.
- Pop the node
- Edge Relaxation: For each neighbor
vof the current nodeu:- Calculate the new potential distance to
vthroughu:distance(u) + weight(u, v). - If this new distance is shorter than the currently known
distance(v), updatedistances[v]and push the new(distance, v)pair into the priority queue.
- Calculate the new potential distance to
Core Template:
import heapq
from collections import defaultdict
def dijkstra(graph, start):
"""
Find shortest distances from start to all vertices
graph: adjacency list [node: [(neighbor, weight), ...]]
"""
# Distance tracking
distances = defaultdict(lambda: float('inf'))
distances[start] = 0
# Priority queue: (distance, node)
pq = [(0, start)]
visited = set()
while pq:
current_dist, node = heapq.heappop(pq)
# Skip if already visited (handles duplicates)
if node in visited:
continue
visited.add(node)
# Relax neighbors
for neighbor, weight in graph[node]:
distance = current_dist + weight
# If shorter path found
if distance < distances[neighbor]:
distances[neighbor] = distance
heapq.heappush(pq, (distance, neighbor))
return dict(distances)
# Example Usage:
# graph = {0: [(1, 4), (2, 1)], 1: [(3, 1)], 2: [(1, 2), (3, 5)], 3: []}
# start = 0
# dijkstra(graph, start) -> {0: 0, 1: 3, 2: 1, 3: 4}
#
# Trace:
# 1. pq=[(0,0)], dist={0:0}
# 2. Pop (0,0). visited={0}. Push neighbors: pq=[(1,2), (4,1)]
# 3. Pop (1,2). visited={0,2}. Relax 2->1 (1+2=3 < dist[1]=4). dist[1]=3. Push (3,1).
# pq=[(3,1), (4,1)]
# 4. Pop (3,1). visited={0,2,1}. Relax 1->3 (3+1=4 < inf). dist[3]=4. Push (4,3).
# pq=[(4,1), (4,3)]. Note: Old (4,1) is still there but obsolete.
# 5. Pop (4,1). 1 is visited, skip.
# 6. Pop (4,3). visited={0,2,1,3}. No unvisited neighbors.
# 7. pq is empty. End.
# Time: O((V + E) log V)
# Space: O(V)- The Dijkstra Guarantee: With non-negative edge weights, when the algorithm pops a node
ufrom the priority queue, it guarantees that it has found the absolute shortest path from thestartnode tou.
Dijkstra with Path Reconstruction
This variation extends the basic algorithm to not only find the shortest distance but also the sequence of nodes that form that path.
Logic: The core Dijkstra algorithm remains the same. The only addition is a previous map (or dictionary) that acts as a "parent pointer" for each node in the shortest path tree.
- Initialization: In addition to
distancesandpq, initialize aprevious = {}map. - Tracking the Path: During the edge relaxation step, whenever you find a shorter path to a neighbor
vthrough the current nodeu(i.e.,if new_distance < distances[v]), you record this connection by settingprevious[v] = u. - Reconstruction: After the main loop finishes, you can reconstruct the path to any
targetnode by starting from thetargetand repeatedly following the pointers in thepreviousmap back to thestartnode. The resulting path will be in reverse, so it needs to be reversed to get the correctstart -> targetorder.
Core Template:
import heapq
from collections import defaultdict
def dijkstra_with_path(graph, start, end=None):
"""Dijkstra with path reconstruction"""
distances = defaultdict(lambda: float('inf'))
distances[start] = 0
previous = {} # For path reconstruction
pq = [(0, start)]
visited = set()
while pq:
current_dist, node = heapq.heappop(pq)
if node in visited:
continue
visited.add(node)
# Early termination if target found
if end and node == end:
break
for neighbor, weight in graph[node]:
distance = current_dist + weight
if distance < distances[neighbor]:
distances[neighbor] = distance
previous[neighbor] = node
heapq.heappush(pq, (distance, neighbor))
def reconstruct_path(target):
"""Reconstruct shortest path to target"""
if target not in previous and target != start:
return [] # No path exists
path = []
current = target
while current is not None:
path.append(current)
current = previous.get(current)
return path[::-1] # Reverse to get start->target
return dict(distances), reconstruct_path
# Usage
distances, get_path = dijkstra_with_path(graph, start)
shortest_path_to_node = get_path(target_node)
# Example Usage:
# Using the same graph as before:
# graph = {0: [(1, 4), (2, 1)], 1: [(3, 1)], 2: [(1, 2), (3, 5)], 3: []}
# start = 0, end = 3
#
# Trace of `previous` map:
# 1. Pop (0,0).
# 2. Pop (1,2). Relax 2->1. `previous[1] = 2`. Relax 2->3. `previous[3] = 2` (dist 5).
# 3. Pop (3,1). Relax 1->3. `previous[3] = 1` (new dist 4).
# Final `previous` for the shortest path tree: {1: 2, 2: 0, 3: 1}
#
# Reconstruct path for target=3:
# - Start at 3.
# - previous[3] is 1. Path: [3, 1]
# - previous[1] is 2. Path: [3, 1, 2]
# - previous[2] is 0. Path: [3, 1, 2, 0]
# - Reverse: [0, 2, 1, 3]. Correct shortest path.Network Delay Time (Classic Application)
This problem is a direct application of Dijkstra's algorithm. The goal is to find the time it takes for a signal starting at a source node k to reach every other node.
Logic:
- Model as a Graph: The
timesarray represents the edges of a weighted, directed graph where nodes are servers and weights are travel times. - Find Shortest Paths: Run Dijkstra's algorithm starting from the source node
k. This will calculate the shortest time (path) for the signal to reach every other node. - Determine Final Time: The time it takes for the signal to reach all nodes is determined by the node that receives it last. Therefore, the answer is the maximum value in the
distancesmap after Dijkstra's has finished. - Handle Unreachability: If any node is unreachable from the source, its distance will remain infinity. If the maximum distance is infinity, it means not all nodes could be reached, so we should return
-1.
Core Template:
import heapq
from collections import defaultdict
def networkDelayTime(times, n, k):
"""Find time for signal to reach all nodes"""
# Build adjacency list
graph = defaultdict(list)
for u, v, w in times:
graph[u].append((v, w))
# Dijkstra from source k
distances = {i: float('inf') for i in range(1, n + 1)}
distances[k] = 0
pq = [(0, k)]
while pq:
dist, node = heapq.heappop(pq)
# Skip if we've found a better path already
if dist > distances[node]:
continue
for neighbor, weight in graph[node]:
new_dist = dist + weight
if new_dist < distances[neighbor]:
distances[neighbor] = new_dist
heapq.heappush(pq, (new_dist, neighbor))
# Find maximum distance (time for all to receive signal)
max_time = max(distances.values())
return max_time if max_time != float('inf') else -1
# Example Usage:
# times = [[2,1,1],[2,3,1],[3,4,1]], n = 4, k = 2
# networkDelayTime(times, n, k) -> 2
#
# The example walkthrough from above already provides a perfect trace for this.
# The final distances are {1: 1, 2: 0, 3: 1, 4: 2}. The maximum of these
# values is 2, which is the time it takes for the signal to reach the last node (node 4).Modified Dijkstra: Maximum Probability Path
This problem requires a modification to Dijkstra's to find the path with the highest product of probabilities, rather than the lowest sum of weights.
Logic: We can adapt Dijkstra's greedy approach. Instead of tracking the minimum distance, we track the maximum probability found so far to reach each node.
- Adaptation for Maximization:
- Standard Dijkstra finds the minimum sum (
d1 + d2 + ...). We need the maximum product (p1 * p2 * ...). - The core logic of Dijkstra works for any structure where path weights are monotonic (always increasing or decreasing).
- We use a max-heap to always explore the path with the current highest probability. In Python, this is simulated by pushing negative probabilities onto a min-heap.
- Standard Dijkstra finds the minimum sum (
- Initialization:
probabilities: An array to store the maximum probability to reach each node. Initialize thestartnode's probability to1.0and all others to0.0.pq: A max-heap storing(-probability, node). Initialize with(-1.0, start).
- Modified Relaxation: When exploring from node
uto neighborv:- The new probability to reach
visprob(u) * prob(u, v). - The update condition becomes: if
new_prob > probabilities[v], we've found a more probable path. Updateprobabilities[v]and push the new(-new_prob, v)to the heap.
- The new probability to reach
Core Template:
import heapq
from collections import defaultdict
def maxProbPath(n, edges, succProb, start, end):
"""Find path with maximum probability"""
# Build graph with probabilities
graph = defaultdict(list)
for i, (u, v) in enumerate(edges):
prob = succProb[i]
graph[u].append((v, prob))
graph[v].append((u, prob))
# Max-heap for probabilities (negate for max-heap behavior)
probabilities = [0.0] * n
probabilities[start] = 1.0
pq = [(-1.0, start)] # Negative probability for max-heap
while pq:
neg_prob, node = heapq.heappop(pq)
prob = -neg_prob
if node == end:
return prob
# Skip if we've found better probability
if prob < probabilities[node]:
continue
for neighbor, edge_prob in graph[node]:
new_prob = prob * edge_prob
if new_prob > probabilities[neighbor]:
probabilities[neighbor] = new_prob
heapq.heappush(pq, (-new_prob, neighbor))
return 0.0
# Example Usage:
# n = 3, edges = [[0,1],[1,2],[0,2]], succProb = [0.5,0.5,0.2], start = 0, end = 2
# maxProbPath(...) -> 0.25
#
# Trace:
# 1. pq = [(-1.0, 0)], probs = [1.0, 0, 0]
# 2. Pop (-1.0, 0). Explore neighbors of 0:
# - To 1: new_prob = 1.0 * 0.5 = 0.5. Push (-0.5, 1). probs=[1.0, 0.5, 0]
# - To 2: new_prob = 1.0 * 0.2 = 0.2. Push (-0.2, 2). probs=[1.0, 0.5, 0.2]
# - pq = [(-0.5, 1), (-0.2, 2)]
# 3. Pop (-0.5, 1). Explore neighbors of 1:
# - To 2: new_prob = 0.5 * 0.5 = 0.25. This is > probs[2].
# - Update probs[2]=0.25. Push (-0.25, 2).
# - pq = [(-0.25, 2), (-0.2, 2)]
# 4. Pop (-0.25, 2). Node 2 is the end. Return prob 0.25.Dijkstra with Additional State: K Stops
This is a variation where the shortest path is constrained by the number of edges (stops) it can have.
Logic: Standard Dijkstra fails here because a path that is currently longer might eventually lead to a cheaper final path that still respects the K-stop limit. For example, A -> B (cost 10, 0 stops) might be preferred over A -> C (cost 1, 0 stops) if the only path from C adds too many stops.
To solve this, we must include the number of stops in the state we track.
- Expanded State: The state in our priority queue becomes
(cost, city, stops). - Modified Priority Queue: The
pqwill now find the path with the lowest cost, using city and stops as tie-breakers. - Expanded
visitedTracking: We can no longer just mark a city as visited. We need to track the best cost to reach a city given a specific number of stops. A dictionaryvisited[(city, stops)] = costis a good way to do this. This prevents cycles and redundant explorations of the same state. - Constraint Check: During exploration, we only add a neighbor to the
pqif the number of stops used (stops + 1) is within the allowed limit (k+1, sincekstops meansk+1edges). When popping from thepq, we also check this constraint.
Core Template:
import heapq
from collections import defaultdict
def findCheapestPrice(n, flights, src, dst, k):
"""Find cheapest flight with at most k stops"""
# Build graph
graph = defaultdict(list)
for u, v, price in flights:
graph[u].append((v, price))
# State: (cost, city, stops_used)
# Key insight: track both city and number of stops
pq = [(0, src, 0)]
# Track best cost to reach each (city, stops) combination
visited = {}
while pq:
cost, city, stops = heapq.heappop(pq)
# Found destination
if city == dst:
return cost
# Skip if too many stops or already found better path
if stops > k or (city, stops) in visited:
continue
visited[(city, stops)] = cost
# Explore neighbors
for neighbor, price in graph[city]:
new_cost = cost + price
new_stops = stops + 1
# Only continue if within stop limit
if new_stops <= k + 1:
heapq.heappush(pq, (new_cost, neighbor, new_stops))
return -1
# Example Usage:
# n = 3, flights = [[0,1,100],[1,2,100],[0,2,500]], src = 0, dst = 2, k = 1
# findCheapestPrice(...) -> 200
#
# Trace:
# 1. pq = [(0, 0, 0)] # (cost, city, stops)
# 2. Pop (0, 0, 0). Explore neighbors of 0:
# - To 1: Push (100, 1, 1).
# - To 2: Push (500, 2, 1).
# - pq = [(100, 1, 1), (500, 2, 1)]
# 3. Pop (100, 1, 1). stops(1) <= k(1). OK.
# - Explore neighbors of 1:
# - To 2: Push (100+100, 2, 2).
# - pq = [(200, 2, 2), (500, 2, 1)]
# 4. Pop (200, 2, 2). City 2 is the destination. But stops(2) > k(1). This path is invalid.
# (Note: The template code checks stops on pop; a better version might check before push)
# - Actually, the code allows `k+1` edges, so stops=2 is valid.
# - The city is the destination, but we must wait for the cheapest path. The algorithm as written
# would return 200 here, but a more robust version might need to track costs differently.
# Let's follow the code:
# Pop (200, 2, 2). city == dst. return 200.
# A simpler interpretation: The path 0->1->2 (cost 200, 1 stop) is valid.
# The path 0->2 (cost 500, 0 stops) is also valid. The cheapest is 200.
# Let's re-trace with the correct goal of finding the *cheapest* path that meets the criteria.
#
# Corrected Trace:
# 1. pq = [(0, 0, 0)]
# 2. Pop (0, 0, 0). Push neighbors: pq = [(100, 1, 1), (500, 2, 1)]
# 3. Pop (100, 1, 1). stops(1) <= k(1)+1. This is a valid intermediate step. Explore neighbors.
# - Push (100+100, 2, 2) to pq. pq = [(200, 2, 2), (500, 2, 1)]
# 4. Pop (200, 2, 2). city == dst. This is a potential answer. cost = 200. We can return this.
# Because the heap gives us the lowest cost path first, the first time we reach the
# destination, we have found the cheapest way.Dijkstra on Grid
This pattern applies Dijkstra's algorithm to a 2D grid where movement costs are involved.
Logic: The grid is treated as an implicit graph where each cell (row, col) is a node.
- Graph Representation: The neighbors of a node
(row, col)are the adjacent cells (e.g., up, down, left, right). Theweightof an edge moving into a cell is the value stored in that cell. - State and Data Structures:
distances: A 2D matrix of the same size as the grid, initialized to infinity, to store the minimum cost to reach each cell.pq: A priority queue storing(cost, row, col).
- Algorithm:
- Initialize
distances[0][0]to the cost of the starting cell and push(cost, 0, 0)to thepq. - While the
pqis not empty, pop the cell with the lowest cost. - For each valid neighbor, perform the relaxation step: calculate the
new_dist = current_dist + cost_of_neighbor_cell. If it's an improvement, update thedistancesmatrix and push the new state to thepq.
- Initialize
Core Template:
import heapq
from collections import defaultdict
def dijkstra_grid(grid):
"""Dijkstra on 2D grid with weighted cells"""
rows, cols = len(grid), len(grid[0])
# Distance matrix
distances = [[float('inf')] * cols for _ in range(rows)]
distances[0][0] = grid[0][0] # Starting cost
# Priority queue: (distance, row, col)
pq = [(grid[0][0], 0, 0)]
directions = [(0, 1), (0, -1), (1, 0), (-1, 0)]
while pq:
dist, row, col = heapq.heappop(pq)
# Skip if better path already found
if dist > distances[row][col]:
continue
# Explore neighbors
for dr, dc in directions:
new_row, new_col = row + dr, col + dc
if 0 <= new_row < rows and 0 <= new_col < cols:
new_dist = dist + grid[new_row][new_col]
if new_dist < distances[new_row][new_col]:
distances[new_row][new_col] = new_dist
heapq.heappush(pq, (new_dist, new_row, new_col))
return distances[-1][-1] # Distance to bottom-right
# Example Usage:
# grid = [[1, 3, 1],
# [1, 5, 1],
# [4, 2, 1]]
# dijkstra_grid(grid) -> 7
#
# Trace:
# 1. pq = [(1, 0, 0)], dist[0][0]=1
# 2. Pop (1,0,0). Neighbors are (0,1) and (1,0).
# - Push (1+3, 0, 1)=(4,0,1). dist[0][1]=4.
# - Push (1+1, 1, 0)=(2,1,0). dist[1][0]=2.
# - pq = [(2,1,0), (4,0,1)]
# 3. Pop (2,1,0). Neighbors are (0,0) and (2,0).
# - Push (2+4, 2, 0)=(6,2,0). dist[2][0]=6.
# - pq = [(4,0,1), (6,2,0)]
# 4. Pop (4,0,1). Neighbors are (0,2) and (1,1).
# - Push (4+1, 0, 2)=(5,0,2). dist[0][2]=5
# - Push (4+5, 1, 1)=(9,1,1). dist[1][1]=9
# - pq = [(5,0,2), (6,2,0), (9,1,1)]
# ...and so on, until the shortest path of 7 is found for (2,2).
# The path is 1->1->2->1->1->1. Total cost: 1+1+2+1+1+1-grid[0][0] = 7 (note: sum path values)
# Correct path: 1->1->5->1->1. Path: (0,0)->(1,0)->(1,1)->(1,2)->(2,2). Cost=1+1+5+1+1=9. Let's recheck.
# Path: (0,0)->(0,1)->(0,2)->(1,2)->(2,2). Cost: 1+3+1+1+1 = 7. Yes.Example Walkthrough
Let's trace through Network Delay Time with:
times = [[2,1,1],[2,3,1],[3,4,1]], n = 4, k = 2