-
Notifications
You must be signed in to change notification settings - Fork 2
/
running_parallel_cpus.qmd
1020 lines (767 loc) · 38.8 KB
/
running_parallel_cpus.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
---
title: Parallelisation
execute:
eval: false
jupyter: python3
---
> "Parallel computing is a type of computation in which many calculations or processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time."
[Wikipedia](https://en.wikipedia.org/wiki/Parallel_computing)
:::{.callout-note}
In the past, a computer processing unit (CPU) would have had a single **core**.
However, in more recent years, having multiple **cores** has become the norm. The benefit of this is that multiple tasks can be handled simultaneously.
By default, our Python code will not make use of multiple cores. Everything will be run sequentially on a single core.
However, for some kinds of code, running it across multiple cores at once can be a great way to speed things up.
:::
Simpy is a good candidate for running code in parallel! By running our simpy code in parallel, we can potentially dramatically cut down the length of time
:::{.callout-warning}
You may not be able to use parallelisation when deploying your code to the web - it will vary depending on your deployment/hosting choices.
:::
## A simple joblib example
First, it may be helpful to see a simpler example of joblib.
Let's start by looking at a for loop to square the numbers 1 to 10.
```{python}
#| eval: true
squared_numbers = []
for i in range(1, 11, 1):
squared_numbers.append(i * i)
print(squared_numbers)
```
We can simplify the code above into a list comprehension.
```{python}
#| eval: true
[i*i for i in range(1, 11, 1)]
```
Why is this important? Well, to use joblib, it's easiest to write our loop as a list comprehension.
Instead of doing i * i to square our number, we have made a new **function** that does the same thing.
```{python}
#| eval: true
from joblib import Parallel, delayed
def multiply_by_self(input_number):
return input_number * input_number
Parallel(n_jobs=2)(delayed(multiply_by_self)(i) for i in range(1, 11, 1))
```
## A code example
::: {.callout-note}
Thanks go to [Michael Allen](https://orcid.org/0000-0002-8746-9957) for providing an example of how this can be achieved in SimPy. His repository can be found [here](https://github.com/MichaelAllen1966/2004_simple_simpy_parallel).
:::
We will make use of the [Joblib](https://joblib.readthedocs.io/en/latest/parallel.html) package to easily split our SimPy code to run across multiple processor cores.
We will take the model created in the Reproducibility chapter (@sec-reproducibility) and add parallelisation to it.
### Library imports
We will need to import `Parallel` and `delayed` from the joblib library.
You will need to run `!pip install joblib` if you have not previously made use of this library.
```{python}
from joblib import Parallel, delayed
```
### The g, Patient and Model classes
Our g, patient and model classes are unchanged.
### The trial class
In the trial class, we need to change a number of functions, tweak our attributes, and make use of the joblib library.
:::{.callout-warning}
Because of the way joblib executes things, if we try to keep track of our results in the same way we have so far - setting up a dummy dataframe and then using the .loc accessor to write our results to the correct row of the dataframe for each run - we will end up with an empty results list.
Instead, we will create an empty list. Into this list we will place a **dictionary** of results from the run.
:::
#### The __init__ method
Let's start by adjusting our `__init__` method for our new way of carrying out the results collection.
```{python}
def __init__(self):
self.df_trial_results = []
```
#### The process_trial_results method
Next we want to create a new method that will turn our list of dictionaries into a Python dataframe.
All we need to do is call `pd.DataFrame` on that object. In this case, we overwrite the original df_trial_results object.
Next we set the index of the dataframe to the run number, which is how it was set up in the original code.
```{python}
def process_trial_results(self):
self.df_trial_results = pd.DataFrame(self.df_trial_results)
self.df_trial_results.set_index("Run Number", inplace=True)
```
#### the print_trial_results method
Because we went to the effort of setting the index in the step above, this method can remain unchanged.
#### the run_single method
First, let's look back at how our `run_trial` function was written before.
```{python}
def run_trial(self):
print(f"{g.number_of_receptionists} receptionists, {g.number_of_nurses} nurses, {g.number_of_doctors} doctors")
print("") # Print a blank line
# Run the simulation for the number of runs specified in g class.
# For each run, we create a new instance of the Model class and call its
# run method, which sets everything else in motion. Once the run has
# completed, we grab out the stored run results (just mean queuing time
# here) and store it against the run number in the trial results
# dataframe.
for run in range(g.number_of_runs):
random.seed(run)
my_model = Model(run)
patient_level_results = my_model.run()
self.df_trial_results.loc[run] = [
len(patient_level_results),
my_model.mean_q_time_recep,
my_model.mean_q_time_nurse,
my_model.mean_q_time_doctor
]
# Once the trial (ie all runs) has completed, print the final results
self.print_trial_results()
```
To use parallelisation, we now split this out into two separate functions. The first is the `run_single` method.
**Note that it's very similar to the indented part of the `for` loop from the code above.**
The main change is how the results are stored - they are now put into a dictionary. Remember, dictionaries use the format {"key":value} - here we have made our column names the 'keys' and our results the 'values'.
Finally, it's important to **return** the results object from the function.
```{python}
def run_single(self, run):
# For each run, we create a new instance of the Model class and call its
# run method, which sets everything else in motion. Once the run has
# completed, we grab out the stored run results (just mean queuing time
# here) and store it against the run number in the trial results
# dataframe.
random.seed(run)
my_model = Model(run)
patient_level_results = my_model.run()
results = {"Run Number":run,
"Arrivals": len(patient_level_results),
"Mean Q Time Recep": my_model.mean_q_time_recep,
"Mean Q Time Nurse": my_model.mean_q_time_nurse,
"Mean Q Time Doctor": my_model.mean_q_time_doctor
}
return results
```
#### the run_trial method
Finally, we need to do a few things.
The key one is making our trial now use the Parallel class and delayed function.
We set up an instance of the Parallel class and set the number of jobs to `-1`.
:::{.callout-tip}
`-1` just means that the joblib library will use every available core to run the code.
You can instead specify a particular number of cores to use as a positive integer value.
:::
We then pass in the `self.run_single` function to the `delayed` function.
Finally, we pass in the arguments that are required for the `self.run_single` function, which is just the run number.
The syntax can appear a little bit strange - just take a close look at the full line below and try and understand it.
```{python}
self.df_trial_results = Parallel(n_jobs=-1)(delayed(self.run_single)(run) for run in range(g.number_of_runs))
```
We assign all of this to the self.df_trial_results attribute of our class.
Now the only additional step is to run our new `process_trial_results()` function before we run `print_trial_results()`.
```{python}
def run_trial(self):
print(f"{g.number_of_receptionists} receptionists, {g.number_of_nurses} nurses, {g.number_of_doctors} doctors")
print("") # Print a blank line
# Run the simulation for the number of runs specified in g class.
self.df_trial_results = Parallel(n_jobs=-1)(delayed(self.run_single)(run) for run in range(g.number_of_runs))
# Once the trial (ie all runs) has completed, print the final results
self.process_trial_results()
self.print_trial_results()
```
Voila! Our model is now set up to use parallelisation. Let's take a look at how much faster this can make things.
## Evaluating the code outputs
First, let's run this the original way and time how long it takes.
```{python}
#| eval: true
#| echo: false
import simpy
import random
import pandas as pd
from sim_tools.distributions import Exponential
from joblib import Parallel, delayed ##NEW
# Class to store global parameter values. We don't create an instance of this
# class - we just refer to the class blueprint itself to access the numbers
# inside.
class g:
patient_inter = 5
mean_reception_time = 2
mean_n_consult_time = 6
mean_d_consult_time = 20
number_of_receptionists = 1
number_of_nurses = 1
number_of_doctors = 2
prob_seeing_doctor = 0.6
sim_duration = 600
number_of_runs = 1000
# Class representing patients coming in to the clinic.
class Patient:
def __init__(self, p_id):
self.id = p_id
self.q_time_recep = 0
self.q_time_nurse = 0
self.q_time_doctor = 0
# Class representing our model of the clinic.
class Model:
# Constructor to set up the model for a run. We pass in a run number when
# we create a new model.
def __init__(self, run_number):
# Create a SimPy environment in which everything will live
self.env = simpy.Environment()
# Create a patient counter (which we'll use as a patient ID)
self.patient_counter = 0
# Create our resources
self.receptionist = simpy.Resource(
self.env, capacity=g.number_of_receptionists
)
self.nurse = simpy.Resource(self.env, capacity=g.number_of_nurses)
self.doctor = simpy.Resource(
self.env, capacity=g.number_of_doctors)
# Store the passed in run number
self.run_number = run_number
# Create a new Pandas DataFrame that will store some results against
# the patient ID (which we'll use as the index).
self.results_df = pd.DataFrame()
self.results_df["Patient ID"] = [1]
self.results_df["Q Time Recep"] = [0.0]
self.results_df["Time with Recep"] = [0.0]
self.results_df["Q Time Nurse"] = [0.0]
self.results_df["Time with Nurse"] = [0.0]
self.results_df["Q Time Doctor"] = [0.0]
self.results_df["Time with Doctor"] = [0.0]
self.results_df.set_index("Patient ID", inplace=True)
# Create an attribute to store the mean queuing times across this run of
# the model
self.mean_q_time_recep = 0
self.mean_q_time_nurse = 0
self.mean_q_time_doctor = 0
self.patient_inter_arrival_dist = Exponential(mean = g.patient_inter, random_seed = self.run_number*2)
self.patient_reception_time_dist = Exponential(mean = g.mean_reception_time, random_seed = self.run_number*3)
self.nurse_consult_time_dist = Exponential(mean = g.mean_n_consult_time, random_seed = self.run_number*4)
self.doctor_consult_time_dist = Exponential(mean = g.mean_d_consult_time, random_seed = self.run_number*5)
# A generator function that represents the DES generator for patient
# arrivals
def generator_patient_arrivals(self):
# We use an infinite loop here to keep doing this indefinitely whilst
# the simulation runs
while True:
# Increment the patient counter by 1 (this means our first patient
# will have an ID of 1)
self.patient_counter += 1
# Create a new patient - an instance of the Patient Class we
# defined above. Remember, we pass in the ID when creating a
# patient - so here we pass the patient counter to use as the ID.
p = Patient(self.patient_counter)
# Tell SimPy to start up the attend_clinic generator function with
# this patient (the generator function that will model the
# patient's journey through the system)
self.env.process(self.attend_clinic(p))
# Randomly sample the time to the next patient arriving. Here, we
# sample from an exponential distribution (common for inter-arrival
# times), and pass in a lambda value of 1 / mean. The mean
# inter-arrival time is stored in the g class.
sampled_inter = self.patient_inter_arrival_dist.sample()
# Freeze this instance of this function in place until the
# inter-arrival time we sampled above has elapsed. Note - time in
# SimPy progresses in "Time Units", which can represent anything
# you like (just make sure you're consistent within the model)
yield self.env.timeout(sampled_inter)
# A generator function that represents the pathway for a patient going
# through the clinic.
# The patient object is passed in to the generator function so we can
# extract information from / record information to it
def attend_clinic(self, patient):
start_q_recep = self.env.now
with self.receptionist.request() as req:
yield req
end_q_recep = self.env.now
patient.q_time_recep = end_q_recep - start_q_recep
sampled_recep_act_time = self.patient_reception_time_dist.sample()
self.results_df.at[patient.id, "Q Time Recep"] = (
patient.q_time_recep
)
self.results_df.at[patient.id, "Time with Recep"] = (
sampled_recep_act_time
)
yield self.env.timeout(sampled_recep_act_time)
# Here's where the patient finishes with the receptionist, and starts
# queuing for the nurse
# Record the time the patient started queuing for a nurse
start_q_nurse = self.env.now
# This code says request a nurse resource, and do all of the following
# block of code with that nurse resource held in place (and therefore
# not usable by another patient)
with self.nurse.request() as req:
# Freeze the function until the request for a nurse can be met.
# The patient is currently queuing.
yield req
# When we get to this bit of code, control has been passed back to
# the generator function, and therefore the request for a nurse has
# been met. We now have the nurse, and have stopped queuing, so we
# can record the current time as the time we finished queuing.
end_q_nurse = self.env.now
# Calculate the time this patient was queuing for the nurse, and
# record it in the patient's attribute for this.
patient.q_time_nurse = end_q_nurse - start_q_nurse
# Now we'll randomly sample the time this patient with the nurse.
# Here, we use an Exponential distribution for simplicity, but you
# would typically use a Log Normal distribution for a real model
# (we'll come back to that). As with sampling the inter-arrival
# times, we grab the mean from the g class, and pass in 1 / mean
# as the lambda value.
sampled_nurse_act_time = self.nurse_consult_time_dist.sample()
# Here we'll store the queuing time for the nurse and the sampled
# time to spend with the nurse in the results DataFrame against the
# ID for this patient. In real world models, you may not want to
# bother storing the sampled activity times - but as this is a
# simple model, we'll do it here.
# We use a handy property of pandas called .at, which works a bit
# like .loc. .at allows us to access (and therefore change) a
# particular cell in our DataFrame by providing the row and column.
# Here, we specify the row as the patient ID (the index), and the
# column for the value we want to update for that patient.
self.results_df.at[patient.id, "Q Time Nurse"] = (
patient.q_time_nurse)
self.results_df.at[patient.id, "Time with Nurse"] = (
sampled_nurse_act_time)
# Freeze this function in place for the activity time we sampled
# above. This is the patient spending time with the nurse.
yield self.env.timeout(sampled_nurse_act_time)
# When the time above elapses, the generator function will return
# here. As there's nothing more that we've written, the function
# will simply end. This is a sink. We could choose to add
# something here if we wanted to record something - e.g. a counter
# for number of patients that left, recording something about the
# patients that left at a particular sink etc.
# Conditional logic to see if patient goes on to see doctor
# We sample from the uniform distribution between 0 and 1. If the value
# is less than the probability of seeing a doctor (stored in g Class)
# then we say the patient sees a doctor.
# If not, this block of code won't be run and the patient will just
# leave the system (we could add in an else if we wanted a branching
# path to another activity instead)
if random.uniform(0,1) < g.prob_seeing_doctor:
start_q_doctor = self.env.now
with self.doctor.request() as req:
yield req
end_q_doctor = self.env.now
patient.q_time_doctor = end_q_doctor - start_q_doctor
sampled_doctor_act_time = self.nurse_consult_time_dist.sample()
self.results_df.at[patient.id, "Q Time Doctor"] = (
patient.q_time_doctor
)
self.results_df.at[patient.id, "Time with Doctor"] = (
sampled_doctor_act_time
)
yield self.env.timeout(sampled_doctor_act_time)
# This method calculates results over a single run. Here we just calculate
# a mean, but in real world models you'd probably want to calculate more.
def calculate_run_results(self):
# Take the mean of the queuing times across patients in this run of the
# model.
self.mean_q_time_recep = self.results_df["Q Time Recep"].mean()
self.mean_q_time_nurse = self.results_df["Q Time Nurse"].mean()
self.mean_q_time_doctor = self.results_df["Q Time Doctor"].mean()
# The run method starts up the DES entity generators, runs the simulation,
# and in turns calls anything we need to generate results for the run
def run(self):
# Start up our DES entity generators that create new patients. We've
# only got one in this model, but we'd need to do this for each one if
# we had multiple generators.
self.env.process(self.generator_patient_arrivals())
# Run the model for the duration specified in g class
self.env.run(until=g.sim_duration)
# Now the simulation run has finished, call the method that calculates
# run results
self.calculate_run_results()
# Print the run number with the patient-level results from this run of
# the model
return (self.results_df)
# Class representing a Trial for our simulation - a batch of simulation runs.
class Trial:
# The constructor sets up a pandas dataframe that will store the key
# results from each run against run number, with run number as the index.
def __init__(self):
self.df_trial_results = pd.DataFrame()
self.df_trial_results["Run Number"] = [0]
self.df_trial_results["Arrivals"] = [0]
self.df_trial_results["Mean Q Time Recep"] = [0.0]
self.df_trial_results["Mean Q Time Nurse"] = [0.0]
self.df_trial_results["Mean Q Time Doctor"] = [0.0]
self.df_trial_results.set_index("Run Number", inplace=True)
# Method to print out the results from the trial. In real world models,
# you'd likely save them as well as (or instead of) printing them
def print_trial_results(self):
print ("Trial Results")
print (self.df_trial_results.round(2))
print(self.df_trial_results.mean().round(2))
# Method to run a trial
def run_trial(self):
print(f"{g.number_of_receptionists} receptionists, {g.number_of_nurses} nurses, {g.number_of_doctors} doctors")
print("") # Print a blank line
# Run the simulation for the number of runs specified in g class.
# For each run, we create a new instance of the Model class and call its
# run method, which sets everything else in motion. Once the run has
# completed, we grab out the stored run results (just mean queuing time
# here) and store it against the run number in the trial results
# dataframe.
for run in range(g.number_of_runs):
random.seed(run)
my_model = Model(run)
patient_level_results = my_model.run()
self.df_trial_results.loc[run] = [
len(patient_level_results),
my_model.mean_q_time_recep,
my_model.mean_q_time_nurse,
my_model.mean_q_time_doctor
]
# Once the trial (ie all runs) has completed, print the final results
self.print_trial_results()
```
```{python}
#| eval: true
#| echo: false
import time
start_time = time.time()
# Create an instance of the Trial class
my_trial = Trial()
# Call the run_trial method of our Trial object
my_trial.run_trial()
print("")
print(f"It took {(time.time() - start_time):.4f} seconds to do 10 runs without parallelisation")
```
Now let's run it again with parallisation.
:::{.callout-note collapse="true"}
### Click here to view the full code
```{python}
#| eval: true
#| echo: false
import simpy
import random
import pandas as pd
from sim_tools.distributions import Exponential
from joblib import Parallel, delayed ##NEW
# Class to store global parameter values. We don't create an instance of this
# class - we just refer to the class blueprint itself to access the numbers
# inside.
class g:
patient_inter = 5
mean_reception_time = 2
mean_n_consult_time = 6
mean_d_consult_time = 20
number_of_receptionists = 1
number_of_nurses = 1
number_of_doctors = 2
prob_seeing_doctor = 0.6
sim_duration = 600
number_of_runs = 1000
# Class representing patients coming in to the clinic.
class Patient:
def __init__(self, p_id):
self.id = p_id
self.q_time_recep = 0
self.q_time_nurse = 0
self.q_time_doctor = 0
# Class representing our model of the clinic.
class Model:
# Constructor to set up the model for a run. We pass in a run number when
# we create a new model.
def __init__(self, run_number):
# Create a SimPy environment in which everything will live
self.env = simpy.Environment()
# Create a patient counter (which we'll use as a patient ID)
self.patient_counter = 0
# Create our resources
self.receptionist = simpy.Resource(
self.env, capacity=g.number_of_receptionists
)
self.nurse = simpy.Resource(self.env, capacity=g.number_of_nurses)
self.doctor = simpy.Resource(
self.env, capacity=g.number_of_doctors)
# Store the passed in run number
self.run_number = run_number
# Create a new Pandas DataFrame that will store some results against
# the patient ID (which we'll use as the index).
self.results_df = pd.DataFrame()
self.results_df["Patient ID"] = [1]
self.results_df["Q Time Recep"] = [0.0]
self.results_df["Time with Recep"] = [0.0]
self.results_df["Q Time Nurse"] = [0.0]
self.results_df["Time with Nurse"] = [0.0]
self.results_df["Q Time Doctor"] = [0.0]
self.results_df["Time with Doctor"] = [0.0]
self.results_df.set_index("Patient ID", inplace=True)
# Create an attribute to store the mean queuing times across this run of
# the model
self.mean_q_time_recep = 0
self.mean_q_time_nurse = 0
self.mean_q_time_doctor = 0
self.patient_inter_arrival_dist = Exponential(mean = g.patient_inter, random_seed = self.run_number*2)
self.patient_reception_time_dist = Exponential(mean = g.mean_reception_time, random_seed = self.run_number*3)
self.nurse_consult_time_dist = Exponential(mean = g.mean_n_consult_time, random_seed = self.run_number*4)
self.doctor_consult_time_dist = Exponential(mean = g.mean_d_consult_time, random_seed = self.run_number*5)
# A generator function that represents the DES generator for patient
# arrivals
def generator_patient_arrivals(self):
# We use an infinite loop here to keep doing this indefinitely whilst
# the simulation runs
while True:
# Increment the patient counter by 1 (this means our first patient
# will have an ID of 1)
self.patient_counter += 1
# Create a new patient - an instance of the Patient Class we
# defined above. Remember, we pass in the ID when creating a
# patient - so here we pass the patient counter to use as the ID.
p = Patient(self.patient_counter)
# Tell SimPy to start up the attend_clinic generator function with
# this patient (the generator function that will model the
# patient's journey through the system)
self.env.process(self.attend_clinic(p))
# Randomly sample the time to the next patient arriving. Here, we
# sample from an exponential distribution (common for inter-arrival
# times), and pass in a lambda value of 1 / mean. The mean
# inter-arrival time is stored in the g class.
sampled_inter = self.patient_inter_arrival_dist.sample()
# Freeze this instance of this function in place until the
# inter-arrival time we sampled above has elapsed. Note - time in
# SimPy progresses in "Time Units", which can represent anything
# you like (just make sure you're consistent within the model)
yield self.env.timeout(sampled_inter)
# A generator function that represents the pathway for a patient going
# through the clinic.
# The patient object is passed in to the generator function so we can
# extract information from / record information to it
def attend_clinic(self, patient):
start_q_recep = self.env.now
with self.receptionist.request() as req:
yield req
end_q_recep = self.env.now
patient.q_time_recep = end_q_recep - start_q_recep
sampled_recep_act_time = self.patient_reception_time_dist.sample()
self.results_df.at[patient.id, "Q Time Recep"] = (
patient.q_time_recep
)
self.results_df.at[patient.id, "Time with Recep"] = (
sampled_recep_act_time
)
yield self.env.timeout(sampled_recep_act_time)
# Here's where the patient finishes with the receptionist, and starts
# queuing for the nurse
# Record the time the patient started queuing for a nurse
start_q_nurse = self.env.now
# This code says request a nurse resource, and do all of the following
# block of code with that nurse resource held in place (and therefore
# not usable by another patient)
with self.nurse.request() as req:
# Freeze the function until the request for a nurse can be met.
# The patient is currently queuing.
yield req
# When we get to this bit of code, control has been passed back to
# the generator function, and therefore the request for a nurse has
# been met. We now have the nurse, and have stopped queuing, so we
# can record the current time as the time we finished queuing.
end_q_nurse = self.env.now
# Calculate the time this patient was queuing for the nurse, and
# record it in the patient's attribute for this.
patient.q_time_nurse = end_q_nurse - start_q_nurse
# Now we'll randomly sample the time this patient with the nurse.
# Here, we use an Exponential distribution for simplicity, but you
# would typically use a Log Normal distribution for a real model
# (we'll come back to that). As with sampling the inter-arrival
# times, we grab the mean from the g class, and pass in 1 / mean
# as the lambda value.
sampled_nurse_act_time = self.nurse_consult_time_dist.sample()
# Here we'll store the queuing time for the nurse and the sampled
# time to spend with the nurse in the results DataFrame against the
# ID for this patient. In real world models, you may not want to
# bother storing the sampled activity times - but as this is a
# simple model, we'll do it here.
# We use a handy property of pandas called .at, which works a bit
# like .loc. .at allows us to access (and therefore change) a
# particular cell in our DataFrame by providing the row and column.
# Here, we specify the row as the patient ID (the index), and the
# column for the value we want to update for that patient.
self.results_df.at[patient.id, "Q Time Nurse"] = (
patient.q_time_nurse)
self.results_df.at[patient.id, "Time with Nurse"] = (
sampled_nurse_act_time)
# Freeze this function in place for the activity time we sampled
# above. This is the patient spending time with the nurse.
yield self.env.timeout(sampled_nurse_act_time)
# When the time above elapses, the generator function will return
# here. As there's nothing more that we've written, the function
# will simply end. This is a sink. We could choose to add
# something here if we wanted to record something - e.g. a counter
# for number of patients that left, recording something about the
# patients that left at a particular sink etc.
# Conditional logic to see if patient goes on to see doctor
# We sample from the uniform distribution between 0 and 1. If the value
# is less than the probability of seeing a doctor (stored in g Class)
# then we say the patient sees a doctor.
# If not, this block of code won't be run and the patient will just
# leave the system (we could add in an else if we wanted a branching
# path to another activity instead)
if random.uniform(0,1) < g.prob_seeing_doctor:
start_q_doctor = self.env.now
with self.doctor.request() as req:
yield req
end_q_doctor = self.env.now
patient.q_time_doctor = end_q_doctor - start_q_doctor
sampled_doctor_act_time = self.nurse_consult_time_dist.sample()
self.results_df.at[patient.id, "Q Time Doctor"] = (
patient.q_time_doctor
)
self.results_df.at[patient.id, "Time with Doctor"] = (
sampled_doctor_act_time
)
yield self.env.timeout(sampled_doctor_act_time)
# This method calculates results over a single run. Here we just calculate
# a mean, but in real world models you'd probably want to calculate more.
def calculate_run_results(self):
# Take the mean of the queuing times across patients in this run of the
# model.
self.mean_q_time_recep = self.results_df["Q Time Recep"].mean()
self.mean_q_time_nurse = self.results_df["Q Time Nurse"].mean()
self.mean_q_time_doctor = self.results_df["Q Time Doctor"].mean()
# The run method starts up the DES entity generators, runs the simulation,
# and in turns calls anything we need to generate results for the run
def run(self):
# Start up our DES entity generators that create new patients. We've
# only got one in this model, but we'd need to do this for each one if
# we had multiple generators.
self.env.process(self.generator_patient_arrivals())
# Run the model for the duration specified in g class
self.env.run(until=g.sim_duration)
# Now the simulation run has finished, call the method that calculates
# run results
self.calculate_run_results()
# Print the run number with the patient-level results from this run of
# the model
return (self.results_df)
# Class representing a Trial for our simulation - a batch of simulation runs.
class Trial:
# The constructor sets up an empty list, which we will place dictionaries
# into. Each dictionary will be the results from a single run.
def __init__(self):
self.df_trial_results = [] ## NEW
## NEW
def process_trial_results(self):
self.df_trial_results = pd.DataFrame(self.df_trial_results)
self.df_trial_results.set_index("Run Number", inplace=True)
# Method to print out the results from the trial. In real world models,
# you'd likely save them as well as (or instead of) printing them
def print_trial_results(self):
print("Trial Results")
print(self.df_trial_results.round(2))
print(self.df_trial_results.mean().round(2))
def run_single(self, run):
# For each run, we create a new instance of the Model class and call its
# run method, which sets everything else in motion. Once the run has
# completed, we grab out the stored run results (just mean queuing time
# here) and store it against the run number in the trial results
# dataframe.
random.seed(run)
my_model = Model(run)
patient_level_results = my_model.run()
results = {"Run Number":run,
"Arrivals": len(patient_level_results),
"Mean Q Time Recep": my_model.mean_q_time_recep,
"Mean Q Time Nurse": my_model.mean_q_time_nurse,
"Mean Q Time Doctor": my_model.mean_q_time_doctor
}
return results
def run_trial(self):
print(f"{g.number_of_receptionists} receptionists, {g.number_of_nurses} nurses, {g.number_of_doctors} doctors")
print("") # Print a blank line
# Run the simulation for the number of runs specified in g class.
self.df_trial_results = Parallel(n_jobs=-1)(delayed(self.run_single)(run) for run in range(g.number_of_runs))
# Once the trial (ie all runs) has completed, print the final results
self.process_trial_results()
self.print_trial_results()
```
:::
```{python}
#| eval: true
#| echo: false
import time
start_time = time.time()
# Create an instance of the Trial class
my_trial = Trial()
# Call the run_trial method of our Trial object
my_trial.run_trial()
print("")
print(f"It took {(time.time() - start_time):.4f} seconds to do 10 runs **with** parallelisation")
```
## Evaluating speed gains
Let's run the model a few times, specifying a different number of cores to run it on each time.
This book is being compiled on a machine with a 14 core processor.
```{python}
#| eval: true
#| echo: false
#|
class Trial:
# The constructor sets up an empty list, which we will place dictionaries
# into. Each dictionary will be the results from a single run.
def __init__(self):
self.df_trial_results = [] ## NEW
## NEW
def process_trial_results(self):
self.df_trial_results = pd.DataFrame(self.df_trial_results)
self.df_trial_results.set_index("Run Number", inplace=True)
# Method to print out the results from the trial. In real world models,
# you'd likely save them as well as (or instead of) printing them
def print_trial_results(self):
print("Trial Results")
print(self.df_trial_results.round(2))
print(self.df_trial_results.mean().round(2))
def run_single(self, run):
# For each run, we create a new instance of the Model class and call its
# run method, which sets everything else in motion. Once the run has
# completed, we grab out the stored run results (just mean queuing time
# here) and store it against the run number in the trial results
# dataframe.
random.seed(run)
my_model = Model(run)
patient_level_results = my_model.run()
results = {"Run Number":run,
"Arrivals": len(patient_level_results),
"Mean Q Time Recep": my_model.mean_q_time_recep,
"Mean Q Time Nurse": my_model.mean_q_time_nurse,
"Mean Q Time Doctor": my_model.mean_q_time_doctor
}
return results
def run_trial(self, cores):
# Run the simulation for the number of runs specified in g class.
self.df_trial_results = Parallel(n_jobs=cores)(delayed(self.run_single)(run) for run in range(g.number_of_runs))
```
An argument has been added to the run_trial function to allow us to pass in the number of cores to use.
The results below all relate to 100 runs of the simulation.
```{python}
#| eval: true
speed = []
g.number_of_runs = 100
for i in range(1, 15, 1):
start_time = time.time()
# Create an instance of the Trial class
my_trial = Trial()
# Call the run_trial method of our Trial object
my_trial.run_trial(cores=i)
run_time = round((time.time() - start_time), 3)
speed.append({"Cores":i, "Run Time (seconds)": run_time})
timing_results = pd.DataFrame(speed)
print(timing_results)
```
```{python}
#| eval: true
#| echo: false
import plotly.express as px
fig = px.line(timing_results, x="Cores", y="Run Time (seconds)")
fig.show()
```
Let's run it again and look at the speed gains when doing 1000 runs of the simulation.
Notice that doubling the number of cores doesn't halve the time - there is fixed overhead that will take a certain amount of time. This can be even more noticeable with a smaller number of runs.
We make big gains at the beginning, but the fixed overheads mean that higher numbers of cores start to have less and less of an effect.
```{python}
#| eval: true
#| echo: false
speed = []
g.number_of_runs = 1000
for i in range(1, 15, 1):
start_time = time.time()
# Create an instance of the Trial class
my_trial = Trial()
# Call the run_trial method of our Trial object
my_trial.run_trial(cores=i)