Heap & Priority Queue: Merge K Lists
Merge K Lists is a classic problem that efficiently demonstrates heap usage for merging multiple sorted sequences. This pattern is fundamental for external sorting, database operations, and distributed system merging algorithms.
The key insight is using a heap to always extract the smallest element across all lists efficiently.
Common Interview Questions
Merge k Sorted Lists (LC 23)
- Problem: You are given an array of
klinked-listslists, each linked-list is sorted in ascending order. Merge all the linked-lists into one sorted linked-list and return it. - Heap Signal: This is a classic K-way merge problem where heap provides optimal selection of next smallest element.
- Problem: You are given an array of
Merge k Sorted Arrays
- Problem: Given
ksorted arrays, merge them into one sorted array. - Heap Signal: Similar to linked lists but with arrays - heap tracks positions in each array.
- Problem: Given
Smallest Range Covering Elements from K Lists (LC 632)
- Problem: You have
klists of sorted integers. Find the smallest range that includes at least one number from each of theklists. - Heap Signal: Uses heap to maintain elements from each list while tracking the current range.
- Problem: You have
Find K Pairs with Smallest Sums (LC 373)
- Problem: Given two integer arrays sorted in ascending order and an integer
k, find thekpairs with smallest sums. - Heap Signal: Uses heap to explore pairs in order of sum magnitude.
- Problem: Given two integer arrays sorted in ascending order and an integer
Core Logic & Approach
The fundamental challenge is efficiently selecting the next smallest element from among K different sources.
Naive approach: O(k) scan for each element → O(nk) total Heap approach: O(log k) selection for each element → O(n log k) total
Why Not a Simple Pointer Approach?
When merging two sorted arrays, we can indeed use two pointers and achieve O(n) time. At each step, we only need to compare two elements. This is a constant-time O(1) comparison for each of the n elements we place.
However, when we scale this to k sorted arrays, the logic changes:
- The K-Pointer Method: You would have
kpointers, one for each array. - The Problem: To find the overall smallest element to add to the result, you must compare the
kelements currently pointed to. - The Cost: Finding the minimum of
knumbers requires a linear scan, which takesk-1comparisons. This is an O(k) operation. - Total Complexity: Since you must perform this O(k) scan for each of the
ntotal elements, the final time complexity becomes O(n * k).
This is where the heap optimizes the process. By maintaining the k "frontier" elements in a min-heap:
- The O(k) linear scan is replaced by an O(log k) heap operation (to extract the min and add the next element).
- This improves the overall complexity from O(n * k) to O(n * log k), which is significantly faster for a large number of lists (
k).
Pattern:
- Initialize heap with first element from each list
- Extract minimum from heap
- Add next element from the same list to heap
- Repeat until all elements processed
Approach 1: Heap with Linked Lists
For the classic merge k sorted linked lists problem:
Core Template:
import heapq
class ListNode:
def __init__(self, val=0, next=None):
self.val = val
self.next = next
def __lt__(self, other):
return self.val < other.val
def merge_k_lists_heap(lists):
"""Merge k sorted linked lists using heap"""
if not lists:
return None
# Initialize heap with head of each non-empty list
heap = []
for i, head in enumerate(lists):
if head:
heapq.heappush(heap, head)
# Dummy head for result
dummy = ListNode(0)
current = dummy
while heap:
# Get smallest node
node = heapq.heappop(heap)
current.next = node
current = current.next
# Add next node from same list if exists
if node.next:
heapq.heappush(heap, node.next)
return dummy.next
# Time: O(n log k) where n = total nodes, k = number of lists
# Space: O(k) for heapAlternative without modifying ListNode class:
def merge_k_lists_tuples(lists):
"""Merge using tuples to avoid modifying ListNode"""
import heapq
if not lists:
return None
heap = []
# Use (value, index, node) to handle comparison
for i, head in enumerate(lists):
if head:
heapq.heappush(heap, (head.val, i, head))
dummy = ListNode(0)
current = dummy
while heap:
val, i, node = heapq.heappop(heap)
current.next = node
current = current.next
if node.next:
heapq.heappush(heap, (node.next.val, i, node.next))
return dummy.nextApproach 2: Heap with Arrays
For merging sorted arrays:
Core Template:
def merge_k_arrays(arrays):
"""Merge k sorted arrays using heap"""
import heapq
if not arrays:
return []
# Initialize heap with (value, array_index, element_index)
heap = []
for i, arr in enumerate(arrays):
if arr:
heapq.heappush(heap, (arr[0], i, 0))
result = []
while heap:
val, arr_idx, elem_idx = heapq.heappop(heap)
result.append(val)
# Add next element from same array if exists
if elem_idx + 1 < len(arrays[arr_idx]):
next_val = arrays[arr_idx][elem_idx + 1]
heapq.heappush(heap, (next_val, arr_idx, elem_idx + 1))
return result
# Time: O(n log k)
# Space: O(k + n) where n is for result arrayApproach 3: Divide and Conquer
An alternative approach that recursively merges pairs:
Core Template:
def merge_k_lists_divide_conquer(lists):
"""Merge using divide and conquer"""
if not lists:
return None
if len(lists) == 1:
return lists[0]
def merge_two_lists(l1, l2):
"""Standard merge two sorted lists"""
dummy = ListNode(0)
current = dummy
while l1 and l2:
if l1.val <= l2.val:
current.next = l1
l1 = l1.next
else:
current.next = l2
l2 = l2.next
current = current.next
current.next = l1 or l2
return dummy.next
def merge_lists(lists):
if len(lists) == 1:
return lists[0]
if len(lists) == 2:
return merge_two_lists(lists[0], lists[1])
mid = len(lists) // 2
left = merge_lists(lists[:mid])
right = merge_lists(lists[mid:])
return merge_two_lists(left, right)
return merge_lists(lists)
# Time: O(n log k)
# Space: O(log k) recursion stackAdvanced: Smallest Range Covering K Lists
A more complex application of the merge pattern. The core idea is to maintain a sliding window that always contains one element from each of the k lists. A min-heap is used to efficiently track the smallest element (the left boundary of the window), while a separate variable tracks the maximum element seen so far (the right boundary).
Algorithm:
- Initialize: Create a min-heap and populate it with the first element from each of the
klists. While doing this, keep track ofmax_val, the maximum value among these first elements. - Initial Range: The first window is from
heap[0](the minimum) tomax_val. This is our initial best range. - Slide the Window:
- Extract the minimum element from the heap. This element comes from list
i. - Get the next element from that same list
i. - Push this new element into the heap.
- Update
max_valif this new element is larger than the currentmax_val.
- Extract the minimum element from the heap. This element comes from list
- Update Result: After each slide, the new window is defined by the new heap minimum and the current
max_val. Compare this new range(max_val - heap[0])with the best range found so far and update if it's smaller. - Termination: The process stops when any of the
klists runs out of elements, as we can no longer maintain a window that covers all lists.
Core Template:
def smallest_range(nums):
"""Find smallest range covering elements from all k lists"""
import heapq
# Initialize heap with first element from each list
heap = []
max_val = float('-inf')
for i, arr in enumerate(nums):
if arr:
heapq.heappush(heap, (arr[0], i, 0))
max_val = max(max_val, arr[0])
range_start, range_end = 0, float('inf')
while len(heap) == len(nums): # All lists represented
min_val, list_idx, elem_idx = heapq.heappop(heap)
# Update range if current is smaller
if max_val - min_val < range_end - range_start:
range_start, range_end = min_val, max_val
# Add next element from same list
if elem_idx + 1 < len(nums[list_idx]):
next_val = nums[list_idx][elem_idx + 1]
heapq.heappush(heap, (next_val, list_idx, elem_idx + 1))
max_val = max(max_val, next_val)
return [range_start, range_end]Advanced: K Smallest Pairs
Another variation focusing on pair generation. This problem can be visualized as finding the k smallest values in a conceptual sorted matrix where matrix[i][j] = nums1[i] + nums2[j]. The matrix is sorted both row-wise and column-wise. A min-heap is the perfect tool to efficiently explore this matrix without building it explicitly.
Algorithm:
- Initialize: The smallest possible sums come from pairs involving the first elements of the arrays. A common efficient strategy is to push the first
kpairs(nums1[i], nums2[0])forifrom0up tok-1into the heap. This gives us a starting frontier of candidates. - Iterative Search:
- Repeatedly extract the pair with the smallest sum from the heap. Let this be
(nums1[i], nums2[j]). Add this pair to the result list. - Explore Next Candidate: Since the conceptual matrix is sorted, the next smallest sum that can be formed from
nums1[i]is(nums1[i], nums2[j+1]). Push this new pair onto the heap ifj+1is within bounds.
- Repeatedly extract the pair with the smallest sum from the heap. Let this be
- Termination: Repeat this process until
kpairs have been collected or the heap is empty. The heap ensures that at each step, we are always processing the pair with the globally smallest sum among all available candidates.
Core Template:
def k_smallest_pairs(nums1, nums2, k):
"""Find k pairs with smallest sums"""
import heapq
if not nums1 or not nums2:
return []
# Initialize heap with pairs involving nums1[0]
heap = []
for j in range(min(len(nums2), k)):
heapq.heappush(heap, (nums1[0] + nums2[j], 0, j))
result = []
visited = set()
while heap and len(result) < k:
sum_val, i, j = heapq.heappop(heap)
result.append([nums1[i], nums2[j]])
# Add next pair from nums1 if possible
if i + 1 < len(nums1) and (i + 1, j) not in visited:
heapq.heappush(heap, (nums1[i + 1] + nums2[j], i + 1, j))
visited.add((i + 1, j))
# Add next pair from nums2 if possible
if j + 1 < len(nums2) and (i, j + 1) not in visited:
heapq.heappush(heap, (nums1[i] + nums2[j + 1], i, j + 1))
visited.add((i, j + 1))
return resultExample Walkthrough
Let's trace through Merge k Sorted Lists with:
lists = [[1,4,5],[1,3,4],[2,6]]Heap Approach:
Initialize heap:
- Add heads: heap = [(1, 0, head1), (1, 1, head2), (2, 2, head3)]
- (Using tuple format for clarity)
First extraction:
- Pop (1, 0, head1): result starts with 1
- Add next from list 0: heap = [(1, 1, head2), (2, 2, head3), (4, 0, node4)]
Second extraction:
- Pop (1, 1, head2): append 1 to result
- Add next from list 1: heap = [(2, 2, head3), (3, 1, node3), (4, 0, node4)]
Third extraction:
- Pop (2, 2, head3): append 2 to result
- Add next from list 2: heap = [(3, 1, node3), (4, 0, node4), (6, 2, node6)]
Continue until heap empty:
- Final result: [1,1,2,3,4,4,5,6]
Why this works:
- Heap always gives us the globally smallest unprocessed element
- Each pop removes one element, each push adds at most one element
- We maintain the "frontier" of available choices efficiently
Approach Comparison
| Approach | Time Complexity | Space Complexity | Best Use Case |
|---|---|---|---|
| Heap | O(n log k) | O(k) | General case, good balance |
| Divide & Conquer | O(n log k) | O(log k) | Memory-constrained, recursive preference |
| Priority Queue | O(n log k) | O(k) | When custom comparisons needed |
| Two-at-a-time | O(nk) | O(1) | Only for small k |
Where n = total elements, k = number of lists.
Optimizations & Variations
1. Skip empty lists:
# Filter out empty lists at start
lists = [lst for lst in lists if lst]
if not lists:
return None2. Handle single list:
if len(lists) == 1:
return lists[0]3. Early termination: