#### # # CPU schedule simulation by Brandon Waite # # developed in python 2.7.2 (runs with python 2.7.3 as well) # # command line example: # # python cpu_sim_waiteb3 [-PART2|-delay_creation=True] -cores=3 -processes=40 -time_slice=200 # -priority_max=5 -burst_min=100 -burst_max=1000' # # no extra arguments given assumes: # # python cpu_sim_waiteb3 -delay_creation=False -cores=1 -process=40 -time_slice=100 -priority_max=4 -burst_min=50 -burst_max=400 # # there is a strange corner case where Preemptive Priority hangs in a loop # since I updated expo_rand() to not allow negative values, I have not been able to re-encounter the issue # everything is randomly generated, so targeting this corner case is almost impossible # sorry if it breaks on you - B # it could also just be the stdout buffer filling up, no clue, rough life ### # imports import random as rand import sys import math # import redirects (point to functions from imports) write = sys.stdout.write log = math.log ############################### # # Helper Functions # ############################### def toAlpha(n): n -= 1 a = (n) % 26 string = '' if (n - a == 0): string = chr(a + 65) else: string = toAlpha((n - a) // 26 ) + chr(a + 65) return string ############################### # # OS Emulation Classes # ############################### class CPU: #class var/methods _last_cpu = 1 # class variable to make sure that no 2 cpu's are the same if default call is only call used _swap_overhead = 15 # 8 + 7 seconds to remove then load a process to the cpu @staticmethod def reset(): CPU._last_cpu = 1 #instance methods def __init__(self, number=0): self._busy = False #creates CPU number id automatically based on _last_cpu if number: assert(number > 0) self._number = number else: self._number = CPU._last_cpu CPU._last_cpu += 1 #get string representation of CPU number self._string = toAlpha(self._number) self._process = None self._last_swapped = 0 @property def busy(self): return self._busy @property def name(self): return self._string @property def priority(self): return self._process.priority def time_passed(self, time): self._process.sub_burst(time) def free(self): old_process = self._process self._busy = False self._process = None return old_process def use(self, time, process): self._busy = True self._process = process self._process.waited(time) self._last_swapped = time def swap(self, time, process): #returns the previous process #question#4 assert(time > 0) # add 15 seconds of waiting to the processes self._process.waited(time + CPU._swap_overhead) #free the core, add next process old_process = self.free() self.use(time + CPU._swap_overhead, process) #reset last swapped self._last_swapped = time + CPU._swap_overhead return old_process def __str__(self): return self._string @property def time_left(self): return self._process.burst @property def last_swapped(self): return self._last_swapped def reset_swap_timer(self, time): self._last_swapped = time class Process: #class vars/methods _last_p = 1 _burst_minimum = 50 _burst_maximum = 400 @staticmethod def reset(): Process._last_p = 1 #instance methods def __init__(self, priority=0, Id=0, burst=0, create_time=0): self._turn_around = 0 self._initial_wait = 0 self._total_wait = 0 self._last_called = 0 self._create_time = create_time assert(priority > -1) self._priority = priority if Id: assert(Id > 0) self._id = Id else: self._id = Process._last_p Process._last_p += 1 #randomly assign the time it takes to burst if burst: assert(burst > 0) self._burst = burst else: self._burst = rand.randint(Process._burst_minimum, Process._burst_maximum) def __str__(self): return str(self._id) def __repr__(self): return str(self._id) @property def ID(self): return self._id @property def priority(self): return self._priority @property def burst(self): return self._burst def sub_burst(self, time): assert(time > 0) try: assert(self._burst >= time - self._last_called) except AssertionError: write( "AssertionError in process '%s' with core '%s'.\nburst: %d\ttime: %d\tlast_called: %d\n" % (self, self.core, self.burst, time, self._last_called) ) raise self._burst -= time - self._last_called self._turn_around += time - self._last_called self._last_called = time @property def turn_around(self): return self._turn_around @property def initial_wait(self): return self._initial_wait def waited(self, time): #make sure time is positive and has not been set already assert(time > -1) # if this process has not been called yet, set initial time if self._initial_wait == 0: self._initial_wait = time self._total_wait += time - self._last_called #on first pass, last_called will be 0, so it's mathmatically total = initial self._turn_around += time - self._last_called self._last_called = time @property def total_wait(self): return self._total_wait @property def create_time(self): return self._create_time #cmpovrlt# comparisons override def __lt__(self, other): if self.priority == other.priority: #same priority? resolve by id return self.ID < other.ID return self.priority > other.priority #otherwise, higher number is less priority class Computer: def __init__(self): self._cores = [] self._processes = [] self._time = 0 self._time_slice = 0 self._priority_max = 0 self._num_processes = 0 self._finished_processes = [] def start(self, algorithm, delay_creation=False, cores=1, processes=20, time_slice=100, priority_max=4): self._time = 0 CPU.reset() Process.reset() self._time_slice = time_slice self._priority_max = priority_max assert(cores > 0) assert(processes > 0) assert(time_slice > 0) assert(priority_max > -1) # a priority form 0 to 0 would just be a regular RoundRobin effectively self._cores = [CPU() for i in range(cores)] self._processes = [] self._finished_processes = [] self._num_processes = processes algorithm(self, delay_creation) # methods @property def time(self): return self._time @property def num_p(self): return self._num_processes @property def time_slice(self): return self._time_slice def cores(self): return self._cores def core(self, core): return self._cores[core - 1] def append_process(self, process): self._processes.append(process) def process(self, process, pop=False): if len(self._processes) == 0: return None if pop: if isinstance(process, Process): return self._processes.pop(self._processes.index(process)) elif isinstance(process, int): return self._processes.pop(process) else: if isinstance(process, Process): return self._processes[self._processes.index(process)] elif isinstance(process, int): return self._processes[process] def processes(self): return self._processes def process_complete(self, process, pop=False): self._finished_processes.append(process) if pop: self._processes.remove(process) def add_time(self, milliseconds): self._time += milliseconds def finished(self): if self._num_processes <= len(self._finished_processes): return True #if len(self._processes) == 0: #left in just in case pop is needed for processes # return True return False #returns either a free core or None def free_core(self): for core in self._cores: if not core._busy: return core return None @property def pmax(self): return self._priority_max ############################### # #Scheduling algorithms # ############################### # algorithm variables timeCS = 15 #time it takes to switch contextes in miliseconds milliRatio = 1000 # 1 second = 1000 milliseconds # print functions def print_time(time): write( "[time %dms] " % time ) def print_process_created(time, process, priority=False): print_time(time) if priority: #data printout for testing PP_algo write( "Process %d (pr=%d) created (requires %dms CPU time)\n" % (process.ID, process.priority, process.burst) ) return write( "Process %d created (requires %dms CPU time)\n" % (process.ID, process.burst) ) def print_process_swap(time, prev, next, CPU, priorities=False): print_time(time) if priorities: #data printout for testing PP_algo write( "Context switch (swapping out process %s (pr=%d) for process %s (pr=%d) in CPU %s)\n" % (prev, prev.priority, next, next.priority, CPU) ) return write( "Context switch (swapping out process %s for process %s in CPU %s)\n" % (prev, next, CPU) ) def print_process_completed(time, process, priority=False): print_time(time) if priority: #data printout for testing PP_algo write( "Process %d (pr=%d) completed its CPU burst (turnaround time %dms, initial wait time %dms, total wait time %dms)\n" % (process.ID, process.priority, process.turn_around, process.initial_wait, process.total_wait) ) return write( "Process %d completed its CPU burst (turnaround time %dms, initial wait time %dms, total wait time %dms)\n" % (process.ID, process.turn_around, process.initial_wait, process.total_wait) ) def print_completed_data(computer): write( "Number of CPUs: %d\n" % len(computer.cores()) ) turn_around = []; initial_wait = []; total_wait = [] number_of_processes = len(computer._finished_processes) for process in computer._finished_processes: turn_around.append(process.turn_around) initial_wait.append(process.initial_wait) total_wait.append(process.total_wait) write( "Turnaround time: min %d ms; avg %d ms; max %d ms\n" % (min(turn_around), sum(turn_around)/number_of_processes, max(turn_around)) ) write( "Initial wait time: min %d ms; avg %d ms; max %d ms\n" % (min(initial_wait), sum(initial_wait)/number_of_processes, max(initial_wait)) ) write( "Total wait time: min %d ms; avg %d ms; max %d ms\n" % (min(total_wait), sum(total_wait)/number_of_processes, max(total_wait)) ) def expo_rand(max): l = 0.001 x = max + 1.0 while 0 < x > max: #loop until x is between 0 and max: !(-1 > x > max) == (-1 < x < max - 1) x = -log(rand.random())/l return int(x) def populate_processes(computer, delay_creation, print_priority=False): #if: either create 10% of the processes and set times for creation later #else: or create all processes at the start create_after_delay = [] if delay_creation: num = computer.num_p//10 if num < 1: num = 1 #create 10% initially for i in range(num): computer.append_process(Process(priority=rand.randint(0, computer.pmax))) print_process_created(computer.time, computer.process(i), print_priority) # set creation times later on for the other 90% of proceses for i in range(computer.num_p - num): create_after_delay.append( Process(create_time=expo_rand(8000), priority=rand.randint(0, computer.pmax)) ) else: num = computer.num_p for i in range(num): computer.append_process(Process(priority=rand.randint(0, computer.pmax))) print_process_created(computer.time, computer.process(i), print_priority) return create_after_delay # Notes: # all algorithms are given to a computer and told what parameters to enter # A First Come First Serve algorithm def FCFS_scheduler(computer, delay_creation): # set delayed creation times (if needed) and populate computer.processes() list create_after_delay = populate_processes(computer, delay_creation) #starting variables time_to_advance = 0 pop = True while not computer.finished(): computer.time += time_to_advance time_to_advance = sys.maxint #add all processes to stack that are to be created at current moment i = 0 while i < len(create_after_delay): #if it will be made in the future, compare to when next step should occure if create_after_delay[i].create_time > computer.time: time_to_advance = min(time_to_advance, create_after_delay[i].create_time) # else, it needs to be created else: computer.append_process(create_after_delay[i]) print_process_created(create_after_delay[i].create_time, create_after_delay[i]) create_after_delay.pop(i) i += 1 for core in computer.cores(): if core.busy: #core is in use, update time passage core.time_passed(computer.time) # if the process is completed if core.time_left == 0: # get next next_process = computer.process(0, pop) # free and print details on finished process prev_process = core.free() computer.process_complete(prev_process) # don't add pop parameter since we popped upon creation print_process_completed(computer.time, prev_process) if next_process: #if there is another process to complete core.use(computer.time, next_process) time_to_advance = min(time_to_advance, core.time_left) else: time_to_advance = min(time_to_advance, core.time_left) #otherwise, the core is free, try to put a new process onto the core else: new_process = computer.process(0, pop) if new_process: core.use(computer.time, new_process) time_to_advance = min(time_to_advance, core.time_left) # First come, shortest job first algorithm def SJF_scheduler(computer, delay_creation): # set delayed creation times (if needed) and populate computer.processes() list create_after_delay = populate_processes(computer, delay_creation) #starting variables time_to_advance = 0 pop = True while not computer.finished(): computer.time += time_to_advance time_to_advance = sys.maxint #add all processes to stack that are to be created at current moment i = 0 while i < len(create_after_delay): #if it will be made in the future, compare to when next step should occure if create_after_delay[i].create_time > computer.time: time_to_advance = min(time_to_advance, create_after_delay[i].create_time) #increment i if nothing popped i += 1 # else, it needs to be created else: computer.append_process(create_after_delay[i]) print_process_created(create_after_delay[i].create_time, create_after_delay[i]) create_after_delay.pop(i) #don't increment if popped for core in computer.cores(): if core.busy: #core is in use, update time passage core.time_passed(computer.time) # if the process is completed if core.time_left == 0: # free and print details on finished process prev_process = core.free() computer.process_complete(prev_process) # don't add pop parameter since we popped upon creation print_process_completed(computer.time, prev_process) # get next shortest job next = 0 job_time = sys.maxint for i in range(len(computer.processes())): if computer.process(i).burst < job_time: next = i job_time = computer.process(i).burst next_process = computer.process(next, pop) if next_process: #if there is another process to complete core.use(computer.time, next_process) time_to_advance = min(time_to_advance, core.time_left) else: time_to_advance = min(time_to_advance, core.time_left) #otherwise, the core is free, try to put a new process onto the core else: new = 0 job_time = sys.maxint for i in range(len(computer.processes())): if computer.process(i).burst < job_time: new = i job_time = computer.process(i).burst new_process = computer.process(new, pop) if new_process: core.use(computer.time, new_process) time_to_advance = min(time_to_advance, core.time_left) # A shortest job first, interrupts processes if quicker process created def SJF_pre_scheduler(computer, delay_creation): # set delayed creation times (if needed) and populate computer.processes() list create_after_delay = populate_processes(computer, delay_creation) #starting variables time_to_advance = 0 pop = True while not computer.finished(): computer.time += time_to_advance time_to_advance = sys.maxint #add all processes to stack that are to be created at current moment i = 0 while i < len(create_after_delay): #if it will be made in the future, compare to when next step should occure if create_after_delay[i].create_time > computer.time: time_to_advance = min(time_to_advance, create_after_delay[i].create_time - computer.time) #increment i if nothing popped i += 1 # else, it needs to be created else: new = create_after_delay[i] print_process_created(new.create_time, new) # if stays false, append to stack alter swapped = False create_after_delay.pop(i) #find if it can be swapped in for a process that will take longer -- preemption if computer.free_core() is None: for core in computer.cores(): if core.time_left > new.burst: old = core.swap(computer.time, new) print_process_swap(computer.time, new, old, core) #put proces back onto stack computer.append_process(old) #remove new from stack swapped = True break if not swapped: computer.append_process(new) else: computer.free_core().use(computer.time, new) #don't increment if create_after_delay was popped for core in computer.cores(): if core.busy: #core is in use, update time passage core.time_passed(computer.time) # if the process is completed if core.time_left <= 0: # free and print details on finished process prev_process = core.free() computer.process_complete(prev_process) # don't add pop parameter since we popped upon creation print_process_completed(computer.time, prev_process) # get next shortest job next = 0 job_time = sys.maxint for i in range(len(computer.processes())): if computer.process(i).burst < job_time: next = i job_time = computer.process(i).burst next_process = computer.process(next, pop) if next_process: #if there is another process to complete core.use(computer.time, next_process) time_to_advance = min(time_to_advance, core.time_left) #time left in process burst else: time_to_advance = min(time_to_advance, core.time_left) #otherwise, the core is free, try to put a new process onto the core else: new = 0 job_time = sys.maxint for i in range(len(computer.processes())): if computer.process(i).burst < job_time: new = i job_time = computer.process(i).burst new_process = computer.process(new, pop) if new_process: core.use(computer.time, new_process) time_to_advance = min(time_to_advance, core.time_left) # round robin scheduling without preemtion (any late created processes are just added to the end of the list) def RR_scheduler(computer, delay_creation): # set delayed creation times (if needed) and populate computer.processes() list create_after_delay = populate_processes(computer, delay_creation) time_to_advance = 0 pop = True while not computer.finished(): computer.time += time_to_advance time_to_advance = sys.maxint # add to stack any processes that needed to be created i = 0 while i < len(create_after_delay): #if it will be made in the future, compare to when next step should occure if create_after_delay[i].create_time > computer.time: time_to_advance = min(time_to_advance, create_after_delay[i].create_time - computer.time) #increment i if nothing popped i += 1 # else, it needs to be created and put onto the stack else: new = create_after_delay.pop(i) print_process_created(new.create_time, new) computer.append_process(new) for core in computer.cores(): if core.busy: core.time_passed(computer.time) # process completed if core.time_left == 0: old_process = core.free() print_process_completed(computer.time, old_process) computer.process_complete(old_process) #get next, put in core if there is one next_process = computer.process(0, pop) if next_process: core.use(computer.time, next_process) time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice) # process has used up it's time_slice elif computer.time - core.last_swapped == computer.time_slice: next_process = computer.process(0, pop) #if there is a process in line, swap into core if next_process: # get previous process, add to end of stack, print prev_process = core.swap(computer.time, next_process) computer.append_process(prev_process) print_process_swap(computer.time, prev_process, next_process, core) time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice) #otherwise, do nothing until the process finishes else: time_to_advance = min(time_to_advance, core.time_left) #proces isn't finished or isn't ready to swap, find next iteration advance else: time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice - (computer.time - core.last_swapped) ) # duration until next slice check - time spent on process this slice # otherwise, core is free, see if there is a free process to put onto the core else: next_process = computer.process(0, pop) if next_process: core.use(computer.time, next_process) time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice) # preemptive priority (0=high, priority_max=low) def PP_scheduler(computer, delay_creation): #change post testing# print_priority = False # set delayed creation times (if needed) and populate computer.processes() list create_after_delay = populate_processes(computer, delay_creation, print_priority) time_to_advance = 0 pop = True while not computer.finished(): computer.time += time_to_advance time_to_advance = sys.maxint # add to stack any processes that needed to be created i = 0 while i < len(create_after_delay): #if it will be made in the future, compare to when next step should occure if create_after_delay[i].create_time > computer.time: time_to_advance = min(time_to_advance, create_after_delay[i].create_time - computer.time) #increment i if nothing popped i += 1 # else, it needs to be created and put onto the stack else: new = create_after_delay.pop(i) print_process_created(new.create_time, new, print_priority) append = True # find if there is a lower priority (higher pnum) to swap out, don't swap out equals for core in computer.cores(): if not core.busy: core.use(computer.time, new) append = False break elif new < core._process: #specialzed overriden compares: search for #cmpovrlt# core.time_passed(computer.time) prev_process = core.swap(computer.time, new) print_process_swap(computer.time, prev_process, new, core, print_priority) computer.append_process(prev_process) append = False break if append: computer.append_process(new) for core in computer.cores(): if core.busy: core.time_passed(computer.time) # process completed if core.time_left == 0: old_process = core.free() print_process_completed(computer.time, old_process, print_priority) computer.process_complete(old_process) #get next lowest priority process, put in core if there is one next_process = computer.process(0, not pop) if next_process: for process in computer.processes(): if process < next_process: #specialzed overriden compares: search for #cmpovrlt# next_process = process #pop it off the stack next_process = computer.process(next_process, pop) core.use(computer.time, next_process) time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice) # process has used up it's time_slice elif computer.time - core.last_swapped == computer.time_slice: next_process = computer.process(0, not pop) #if there is a process in line, swap into core, highest priority first if next_process: for process in computer.processes(): if process < next_process: #specialzed overriden compares: search for #cmpovrlt# next_process = process #pop it off the stack next_process = computer.process(next_process, pop) # get previous process, add to end of stack, print prev_process = core.swap(computer.time, next_process) computer.append_process(prev_process) print_process_swap(computer.time, prev_process, next_process, core, print_priority) time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice) #otherwise, do nothing until the process finishes else: core.reset_swap_timer(computer.time) # resets current time_slice and does not swap #(continues on current process until end of slice or completion or a new process interrupts current process) time_to_advance = min(time_to_advance, core.time_left) #proces isn't finished or isn't ready to swap, find next iteration advance else: time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice - (computer.time - core.last_swapped) ) # duration until next slice check - time spent on process this slice # otherwise, core is free, see if there is a free process to put onto the core else: next_process = computer.process(0, not pop) if next_process: for process in computer.processes(): if process.priority < next_process.priority: next_process = process #pop it off the stack next_process = computer.process(next_process, pop) core.use(computer.time, next_process) time_to_advance = min(time_to_advance, core.time_left) time_to_advance = min(time_to_advance, computer.time_slice) ############################### # # Main # ############################### def main(): # default values delay_creation = True cores = 1 processes = 20 time_slice = 100 priority_max = 4 part2 = '' #sys argv # eg: python cpu_sim_waiteb3 [-PART2|-delay_creation=True] -cores=3 -processes=40 -time_slice=200 -priority_max=5 #subsitute strings for testing args #sys.argv = ['', '-cores=2', '-processes=30', '-time_slice=200', '-priority_max=5', '-burst_min=100', '-burst_max=700'] #sys.argv = ['-PART2', '-cores=3', '-processes=40', '-time_slice=200', '-priority_max=5', '-burst_min=200', '-burst_max=700'] for arg in sys.argv: part2 = arg.upper() if part2 == '-PART2': delay_creation = True continue elif arg == 'python': continue elif arg.endswith('.py'): continue elif arg == '': continue try: cmd, val = arg.split('=', 1) if val == 'True': val = True elif val == 'False': val = False val = int(val) #will throw valueerror if cannot convert #assert value isn't negative assert(val > 0) if cmd == '-cores': cores = val elif cmd == '-processes': processes = val elif cmd == '-time_slice': time_slice = val elif cmd == '-priority_max': priority_max = val elif cmd == '-delay_creation': delay_creation = val print val, bool(val) elif cmd == '-burst_min': assert(val <= Process._burst_maximum) Process._burst_minimum = val elif cmd == '-burst_max': assert(val >= Process._burst_maximum) Process._burst_maximum = val else: raise ValueError except ValueError: write("'%s' is not a valid argument. Ignoring argument.\n" % arg ) except AssertionError: write("Argument '%s' creates a negative value. Ignoring argument.\n" % arg) # algorithm calls and completed then reset # start(algo, T/F, core#, proc#, time, prior) # first come first serve computer = Computer() write("\n\n --- Starting First Come First Serve (no preemtion) --- \n\n") computer.start(FCFS_scheduler, delay_creation, cores, processes, time_slice, priority_max) print_completed_data(computer) del computer # shortest job first no preemption computer = Computer() write("\n\n --- Starting Shortest Job First (no preemption) --- \n\n") computer.start(SJF_scheduler, delay_creation, cores, processes, time_slice, priority_max) print_completed_data(computer) del computer # shortest job first with preemption computer = Computer() write("\n\n --- Starting Shortest Job First (with preemtion) --- \n\n") computer.start(SJF_pre_scheduler, delay_creation, cores, processes, time_slice, priority_max) print_completed_data(computer) del computer # round robin computer = Computer() write("\n\n --- Starting Round Robin (with preemtion, no priority) --- \n\n") computer.start(RR_scheduler, delay_creation, cores, processes, time_slice, priority_max) print_completed_data(computer) del computer # preemptive priority round robin computer = Computer() write("\n\n --- Starting Preemptive Priority (tiered Round Robin scheduling) --- \n\n") computer.start(PP_scheduler, delay_creation, cores, processes, time_slice, priority_max) print_completed_data(computer) del computer if __name__ == "__main__": main()
Run
Reset
Share
Import
Link
Embed
Language▼
English
中文
Python Fiddle
Python Cloud IDE
Follow @python_fiddle
Browser Version Not Supported
Due to Python Fiddle's reliance on advanced JavaScript techniques, older browsers might have problems running it correctly. Please download the latest version of your favourite browser.
Chrome 10+
Firefox 4+
Safari 5+
IE 10+
Let me try anyway!
url:
Go
Python Snippet
Stackoverflow Question