Heap & Priority Queue: Task Scheduling
Task Scheduling with heaps involves optimally ordering tasks based on priorities, deadlines, or other constraints. Heaps are essential for scheduling algorithms because they provide efficient access to the next highest-priority task.
This pattern is fundamental for operating systems, job schedulers, and real-time systems.
Common Interview Questions
Task Scheduler (LC 621)
- Problem: Given a char array representing tasks and an integer
nrepresenting the cooldown time, return the least number of units of times needed to finish all tasks. - Heap Signal: Uses heap to track task frequencies and schedule optimally with constraints.
- Problem: Given a char array representing tasks and an integer
Meeting Rooms II (LC 253)
- Problem: Given an array of meeting time intervals, find the minimum number of conference rooms required.
- Heap Signal: Uses heap to track ending times of ongoing meetings.
Course Schedule III (LC 630)
- Problem: Given courses with durations and deadlines, find the maximum number of courses that can be taken.
- Heap Signal: Uses heap for deadline-aware scheduling optimization.
Single-Threaded CPU (LC 1834)
- Problem: Given tasks with arrival times and processing times, simulate a single-threaded CPU scheduler.
- Heap Signal: Uses heap to manage available tasks and processing order.
Core Logic & Approach
Task scheduling typically involves:
- Priority determination: What makes one task more important than another?
- Constraint handling: Cooldowns, deadlines, resource limitations
- Optimal ordering: Using heap to always select the best next task
Common patterns:
- Frequency-based: Schedule most frequent tasks first
- Deadline-based: Schedule earliest deadline first
- Duration-based: Schedule shortest/longest tasks first
- Arrival time-based: Schedule based on when tasks become available
Approach 1: Task Scheduler with Cooldown
The problem requires finding the minimum time to schedule tasks with a cooldown period between identical tasks. This can be solved with a greedy simulation using a heap, or with a more direct mathematical formula.
Logic (Heap Simulation): The greedy strategy is to always schedule the most frequent task that is not on cooldown. A max-heap is perfect for tracking the remaining counts of the most frequent tasks.
- Count & Heapify: Count the frequency of each task and push these counts into a max-heap (using negative values in Python's min-heap).
- Process in Cycles: The simulation proceeds in time steps. In each "cycle," we try to execute
n+1tasks (one task for each time slot, including the cooldown period). - Schedule & Cool Down:
- In each time slot of a cycle, extract the most frequent task from the heap.
- Decrement its count.
- Add this processed task to a temporary
templist. It cannot be scheduled again until the cycle ofn+1slots is over, effectively putting it on "cooldown." - If the heap becomes empty during a cycle, we still need to account for the time for the scheduled tasks (
time += 1), but we may not fill alln+1slots.
- Restore Cooled-Down Tasks: After a cycle of
n+1slots, add all the tasks from thetemplist (those with remaining counts > 0) back into the heap. They are now available to be scheduled again. - Termination: The process continues until the main heap is empty, meaning all tasks have been completed.
Logic (Mathematical Formula): The total time is determined by the task with the highest frequency, as it creates the most "idle" time.
- Find the highest frequency (
max_freq) and how many tasks have this frequency (max_count). - The most frequent task creates
max_freq - 1intervals, each of which must be filled withnother tasks or idle slots. This gives(max_freq - 1) * (n + 1)time slots. - We then add
max_countfor the final occurrences of the most frequent tasks. - The result is
(max_freq - 1) * (n + 1) + max_count. This formula calculates the time required by the bottleneck task. If the total number of tasks is greater than this (meaning we can fill all idle slots with other tasks), then the answer is simply the total number of tasks.
Core Template:
def leastInterval(tasks, n):
"""Find minimum time to complete all tasks with cooldown n"""
from collections import Counter
import heapq
# Count task frequencies
task_counts = Counter(tasks)
# Max-heap of frequencies (negate for max-heap behavior)
heap = [-count for count in task_counts.values()]
heapq.heapify(heap)
time = 0
while heap:
# Store tasks that need to wait for cooldown
temp = []
# Process tasks in current cycle (up to n+1 slots)
for _ in range(n + 1):
if heap:
# Schedule most frequent task
count = heapq.heappop(heap)
if count < -1: # Still has remaining instances
temp.append(count + 1) # Decrease count
time += 1
# If no more tasks, we're done
if not heap and not temp:
break
# Put cooled-down tasks back in heap
for count in temp:
heapq.heappush(heap, count)
return time
# Alternative: Mathematical approach (more efficient)
def leastInterval_math(tasks, n):
from collections import Counter
task_counts = Counter(tasks)
max_freq = max(task_counts.values())
max_count = sum(1 for freq in task_counts.values() if freq == max_freq)
# Minimum time based on most frequent tasks
min_time = (max_freq - 1) * (n + 1) + max_count
# Can't be less than total number of tasks
return max(min_time, len(tasks))
# Example Usage:
# tasks = ["A","A","A","B","B","B"], n = 2
# leastInterval(tasks, 2) -> 8
# (Execution: A -> B -> idle -> A -> B -> idle -> A -> B)Approach 2: Meeting Rooms (Interval Scheduling)
The goal is to find the maximum number of meetings that are happening at the same time, as this determines the minimum number of rooms required.
Logic: The core idea is to process meetings in the order they start. We use a min-heap to keep track of the end times of meetings currently in progress.
- Sort: First, sort the
intervalsarray based on the start time. This is crucial because it allows us to process meetings chronologically. - Initialize Heap: Create a min-heap. This heap will store the end times of the meetings that are currently occupying rooms. The top of the heap will always have the end time of the meeting that is scheduled to finish earliest.
- Iterate and Schedule: Go through each meeting in the sorted list:
- Check for Free Rooms: Look at the meeting at the top of the heap (the one that finishes earliest). If its end time is less than or equal to the start time of the current meeting, it means a room has become free. We can pop this meeting from the heap. We do this in a
whileloop to free up all rooms that are available before the current meeting starts. - Allocate a Room: After freeing up any available rooms, we need a room for the current meeting. We add its end time to the heap. This signifies that a room is now occupied until this new end time.
- Check for Free Rooms: Look at the meeting at the top of the heap (the one that finishes earliest). If its end time is less than or equal to the start time of the current meeting, it means a room has become free. We can pop this meeting from the heap. We do this in a
- Find the Maximum: The number of rooms needed at any point in time is the number of meetings currently in progress, which is equal to the size of the heap at that moment. The overall minimum number of rooms required is therefore the maximum size the heap reaches at any point during the process. We must track this maximum value as we iterate through the meetings, as the final size of the heap only represents the number of overlapping meetings at the very end, not the peak concurrency.
Core Template:
def minMeetingRooms(intervals):
"""Find minimum conference rooms needed"""
import heapq
if not intervals:
return 0
# Sort by start time
intervals.sort(key=lambda x: x[0])
# Min-heap to track end times of ongoing meetings
heap = []
max_rooms = 0
for start, end in intervals:
# Remove meetings that have ended
while heap and heap[0] <= start:
heapq.heappop(heap)
# Add current meeting's end time
heapq.heappush(heap, end)
# The number of rooms needed is the current size of the heap
max_rooms = max(max_rooms, len(heap))
# The result is the maximum number of concurrent meetings seen
return max_rooms
# Time: O(n log n)
# Space: O(n)
# Example Usage:
# intervals = [[0, 30], [5, 10], [15, 20], [45, 60]] -> 2
# Trace:
# 1. Sort by start time: [[0, 30], [5, 10], [15, 20], [45, 60]]
# 2. Process [0, 30]: A room is used. heap = [30]
# 3. Process [5, 10]: Meeting starts at 5. No rooms are free (30 > 5). A new room is needed. heap = [10, 30]. Max rooms so far: 2.
# 4. Process [15, 20]: Meeting starts at 15. The meeting ending at 10 is over (10 <= 15). Pop 10. heap = [30]. Then add the new meeting. heap = [20, 30]. Max rooms so far: 2.
# 5. Process [45, 60]: Meeting starts at 45. Both meetings ending at 20 and 30 are over. Pop 20, pop 30. heap = []. Then add new meeting. heap = [60]. Max rooms so far: 2.
# The maximum size the heap reached was 2.Approach 3: Course Schedule with Deadlines
The goal is to take the maximum number of courses possible, given that each has a duration and a deadline.
Logic (Greedy with a Max-Heap): This problem uses a clever greedy strategy. It might seem intuitive to pick shorter courses first, but the actual bottleneck is the deadline. Therefore, we should process courses in order of their deadlines.
- Sort by Deadline: First, sort the
coursesarray based on the deadline. This lets us consider courses with the most urgent time constraints first. - Initialize: We'll use a
total_timevariable to track the cumulative duration of courses we've decided to take, and a max-heap to store the durations of these chosen courses. - Iterate and Schedule: Process the deadline-sorted courses one by one:
- Tentatively Add Course: For each course, we greedily try to take it. Add its
durationtototal_timeand push itsdurationinto the max-heap (using negation for Python's min-heap). - Check for Conflict: After adding the course, if our
total_timenow exceeds the current course'sdeadline, we have a problem. We can't finish all the courses we've selected so far in time. - Resolve Conflict (Greedy Choice): To resolve this, we must drop one of the courses we've already selected. To maximize our chances of taking more courses later, it's always optimal to drop the longest course taken so far. The max-heap is perfect for this: we simply pop the top element (which is the longest duration) and subtract its duration from
total_time.
- Tentatively Add Course: For each course, we greedily try to take it. Add its
- Final Count: After iterating through all courses, the number of courses we were able to take is simply the final size of the heap.
Core Template:
def scheduleCourse(courses):
"""Find maximum courses that can be taken within deadlines"""
import heapq
# Sort by deadline (earliest first)
courses.sort(key=lambda x: x[1])
# Max-heap to track durations of selected courses
selected = [] # Max-heap (negate durations)
total_time = 0
for duration, deadline in courses:
# Try to add current course
total_time += duration
heapq.heappush(selected, -duration) # Max-heap
# If we exceed deadline, remove longest course
if total_time > deadline:
# Remove the longest duration course
longest = -heapq.heappop(selected)
total_time -= longest
return len(selected)
# Time: O(n log n)
# Space: O(n)
# Example Usage:
# courses = [[100, 200], [200, 1300], [1000, 1250], [2000, 3200]] -> 3
# Trace:
# 1. Sort by deadline: [[100, 200], [1000, 1250], [200, 1300], [2000, 3200]]
# 2. Take [100, 200]: time=100, heap=[-100]
# 3. Take [1000, 1250]: time=1100, heap=[-1000, -100]
# 4. Take [200, 1300]: time=1300, heap=[-1000, -100, -200].
# - time (1300) <= deadline (1300). OK.
# 5. Take [2000, 3200]: time=3300, heap=[-2000, -100, -200, -1000]
# - time (3300) > deadline (3200). Conflict!
# - Remove longest course (2000): pop -2000. time=1300.
# Final heap size is 3.Approach 4: Single-Threaded CPU Simulation
This problem requires simulating a CPU's scheduling process. The CPU is idle until a task arrives. Once tasks are available, it picks the one with the shortest processing time (with the smaller index as a tie-breaker). This is a time-driven or event-driven simulation.
Logic (Event-Driven Simulation with Two Heaps/Data Structures): We manage two groups of tasks: those that haven't arrived yet, and those that are available to be processed.
- Preprocessing: Augment the input
taskswith their original indices, as the output needs to be the order of these indices. Then, sort this new list of tasks based on their arrival time. This lets us process tasks in chronological order of availability. - Initialize:
available_tasks_heap: A min-heap that will store the tasks that have arrived and are waiting for the CPU. It's ordered by(processing_time, original_index). This ensures we always pick the shortest task, and if there's a tie, the one with the smaller original index.current_time: A variable to track the simulation's clock. It starts at 0.task_idx: A pointer for the sortedindexed_taskslist.
- Main Simulation Loop: The loop continues as long as there are tasks to be processed (either in the original list or in the available heap).
- Add Arrived Tasks: Move all tasks from the sorted list into the
available_tasks_heapif theirarrival_time <= current_time. - Process a Task:
- If the
available_tasks_heapis not empty, it means the CPU can work. Pop the best task (shortest duration) from the heap. - Add its original index to the
result. - Advance
current_timeby the task'sduration.
- If the
- Handle Idle CPU:
- If the
available_tasks_heapis empty, it means the CPU is idle. We must jump time forward. The next event will be the arrival of the next task in our sorted list. So, we setcurrent_timeto be thearrival_timeof that next task.
- If the
- Add Arrived Tasks: Move all tasks from the sorted list into the
- Termination: The loop ends when both the sorted task list has been fully processed and the
available_tasks_heapis empty.
Core Template:
def getOrder(tasks):
"""Simulate CPU task scheduling"""
import heapq
# Add indices to tasks and sort by arrival time
indexed_tasks = [(tasks[i][0], tasks[i][1], i) for i in range(len(tasks))]
indexed_tasks.sort() # Sort by arrival time
# Available tasks heap: (processing_time, index)
available = []
result = []
current_time = 0
task_idx = 0
while task_idx < len(indexed_tasks) or available:
# Add all tasks that have arrived
while task_idx < len(indexed_tasks) and indexed_tasks[task_idx][0] <= current_time:
arrival, duration, orig_idx = indexed_tasks[task_idx]
heapq.heappush(available, (duration, orig_idx))
task_idx += 1
if available:
# Process shortest task (or earliest index if tie)
duration, orig_idx = heapq.heappop(available)
result.append(orig_idx)
current_time += duration
else:
# No tasks available, jump to next arrival
if task_idx < len(indexed_tasks):
current_time = indexed_tasks[task_idx][0]
return result
# Example Usage:
# tasks = [[1,2],[2,4],[3,2],[4,1]] -> [0,2,3,1]
# Trace:
# 1. time=0: CPU idle. Jump to time=1.
# 2. time=1: Task 0 ([1,2]) arrives. Add (2,0) to available_heap.
# 3. time=1: Process (2,0). result=[0]. time becomes 1+2=3.
# 4. time=3: Tasks 1 ([2,4]) and 2 ([3,2]) have arrived.
# - Add (4,1) and (2,2) to available_heap. heap = [(2,2), (4,1)].
# 5. time=3: Process (2,2). result=[0,2]. time becomes 3+2=5.
# 6. time=5: Task 3 ([4,1]) has arrived.
# - Add (1,3) to available_heap. heap = [(1,3), (4,1)].
# 7. time=5: Process (1,3). result=[0,2,3]. time becomes 5+1=6.
# 8. time=6: Process (4,1). result=[0,2,3,1]. time becomes 6+4=10.
# Final result: [0, 2, 3, 1]Advanced: Priority Queue with Multiple Criteria
Many real-world scheduling problems involve tasks with several attributes that determine their priority (e.g., priority level, deadline, duration). A heap can handle this complex ordering gracefully.
Logic (Custom Comparison): The key is to define a clear, hierarchical order of comparison for the tasks. Python's heapq uses the default comparison behavior of the objects it stores. For tuples, it's lexicographical (first element, then second, and so on). For custom classes, we can define this behavior explicitly.
- Define Priority Hierarchy: Determine the order of importance. For example:
- Highest
priorityvalue first. - If priorities are equal, earliest
deadlinefirst. - If deadlines are also equal, shortest
durationfirst. - If all else is equal, a deterministic tie-breaker like
task_id.
- Highest
- Implement Comparison:
- Option A (Tuples): Create tuples where the elements are in the desired order of priority. Use negation for attributes where a higher value is better (like
priority). For example:(-priority, deadline, duration, task_id). - Option B (Custom Class): Create a
Taskclass and implement the__lt__(self, other)method (less-than). This method should contain the cascadingif/elselogic for your priority hierarchy.heapqwill use this method to order theTaskobjects.
- Option A (Tuples): Create tuples where the elements are in the desired order of priority. Use negation for attributes where a higher value is better (like
- Use the Heap: Populate the heap with these custom objects or tuples. The heap will automatically maintain the correct order based on your defined logic, always giving you the highest-priority task when you pop from it.
Core Template:
class Task:
def __init__(self, priority, deadline, duration, task_id):
self.priority = priority
self.deadline = deadline
self.duration = duration
self.task_id = task_id
def __lt__(self, other):
# Primary: Higher priority first
if self.priority != other.priority:
return self.priority > other.priority
# Secondary: Earlier deadline first
if self.deadline != other.deadline:
return self.deadline < other.deadline
# Tertiary: Shorter duration first
if self.duration != other.duration:
return self.duration < other.duration
# Final: Lower ID first (for deterministic ordering)
return self.task_id < other.task_id
def scheduleTasksAdvanced(tasks):
"""Schedule tasks with multiple criteria"""
import heapq
# Convert to Task objects
task_objects = []
for i, (priority, deadline, duration) in enumerate(tasks):
task_objects.append(Task(priority, deadline, duration, i))
# Create priority queue
pq = task_objects[:]
heapq.heapify(pq)
schedule = []
current_time = 0
while pq:
task = heapq.heappop(pq)
# Check if task can be completed by deadline
if current_time + task.duration <= task.deadline:
schedule.append(task.task_id)
current_time += task.duration
# Otherwise, task is dropped or rescheduled
return schedule
# Example Usage:
# tasks = [(10, 50, 20), (5, 100, 30), (10, 40, 15)] # (priority, deadline, duration)
#
# Trace:
# 1. Heapify: PQ contains Task objects for each input.
# - Task A: (p=10, d=50, dur=20)
# - Task B: (p=5, d=100, dur=30)
# - Task C: (p=10, d=40, dur=15)
# 2. Pop: Task C is highest priority (p=10, d=40 is earlier than d=50).
# - Can we do it? time(0)+dur(15) <= deadline(40). Yes.
# - schedule=[C], time=15.
# 3. Pop: Task A is next highest (p=10).
# - Can we do it? time(15)+dur(20) <= deadline(50). Yes.
# - schedule=[C, A], time=35.
# 4. Pop: Task B is last.
# - Can we do it? time(35)+dur(30) <= deadline(100). Yes.
# - schedule=[C, A, B], time=65.
# Final schedule (by original index): [2, 0, 1]Example Walkthrough
Let's trace through Task Scheduler with tasks = ['A','A','A','B','B','B'], n = 2:
Heap Approach:
- Count frequencies: ['A': 3, 'B': 3]
- Initialize heap: [-3, -3]
- Cycle 1 (slots 0-2):
- Slot 0: Schedule A (count becomes 2), temp = [-2]
- Slot 1: Schedule B (count becomes 2), temp = [-2, -2]
- Slot 2: No tasks available (cooldown), temp = [-2, -2]
- Put back: heap = [-2, -2]
- Cycle 2 (slots 3-5):
- Slot 3: Schedule A (count becomes 1), temp = [-1]
- Slot 4: Schedule B (count becomes 1), temp = [-1, -1]
- Slot 5: No tasks available, temp = [-1, -1]
- Put back: heap = [-1, -1]
- Cycle 3 (slots 6-7):
- Slot 6: Schedule A (count becomes 0), temp = []
- Slot 7: Schedule B (count becomes 0), temp = []
- Done: heap = []
Result: Total time = 8, Schedule = [A, B, idle, A, B, idle, A, B]
Mathematical Approach:
- max_freq = 3, max_count = 2 (both A and B appear 3 times)
- min_time = (3-1) × (2+1) + 2 = 2 × 3 + 2 = 8
- Since 8 > 6 (total tasks), answer is 8
Approach Comparison
| Problem Type | Time Complexity | Space Complexity | Key Insight |
|---|---|---|---|
| Task Scheduler | O(n) or O(n log k) | O(k) | Math formula vs simulation |
| Meeting Rooms | O(n log n) | O(n) | Track ongoing meeting end times |
| Course Schedule | O(n log n) | O(n) | Greedy with heap for removal |
| CPU Simulation | O(n log n) | O(n) | Time-driven event simulation |
Tricky Parts & Decision Points
- Choosing between simulation and mathematical formula:
# For Task Scheduler:
# Simulation: More intuitive, handles complex constraints
# Mathematical: O(n) time, requires insight into pattern
if constraints_are_complex():
use_simulation_with_heap()
else:
use_mathematical_formula()- Handling ties in priority:
# Use tuple comparison for multiple criteria
heapq.heappush(heap, (priority, secondary_criteria, tertiary, task))
# Or implement custom __lt__ method in task class- Time progression in simulations:
# Option 1: Discrete time steps
for time_slot in range(max_time):
process_time_slot(time_slot)
# Option 2: Event-driven (more efficient)
while events:
current_time = next_event_time()
process_events_at(current_time)- Cooldown and constraint handling:
# Track when tasks become available again
cooldown_queue = [] # (available_time, task)
# Or use separate data structure for timing
last_used = {} # task -> last_used_time- Resource capacity limits:
# For meeting rooms, conference calls, etc.
if len(active_resources) >= capacity:
wait_or_reject_task()
else:
allocate_resource()- Preemption vs non-preemption:
# Non-preemptive: Task runs to completion
current_task_end_time = current_time + task.duration
# Preemptive: Higher priority can interrupt
if new_task.priority > current_task.priority:
preempt_current_task()
schedule_new_task()