-
Notifications
You must be signed in to change notification settings - Fork 5
/
scheduler.go
213 lines (184 loc) · 6.03 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package block_stm
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
type TaskKind int
const (
TaskKindExecution TaskKind = iota
TaskKindValidation
)
type TxDependency struct {
sync.Mutex
dependents []TxnIndex
}
func (t *TxDependency) Swap(new []TxnIndex) []TxnIndex {
t.Lock()
old := t.dependents
t.dependents = new
t.Unlock()
return old
}
// Scheduler implements the scheduler for the block-stm
// ref: `Algorithm 4 The Scheduler module, variables, utility APIs and next task logic`
type Scheduler struct {
block_size int
// An index that tracks the next transaction to try and execute.
execution_idx atomic.Uint64
// A similar index for tracking validation.
validation_idx atomic.Uint64
// Number of times validation_idx or execution_idx was decreased
decrease_cnt atomic.Uint64
// Number of ongoing validation and execution tasks
num_active_tasks atomic.Uint64
// Marker for completion
done_marker atomic.Bool
// txn_idx to a mutex-protected set of dependent transaction indices
txn_dependency []TxDependency
// txn_idx to a mutex-protected pair (incarnation_number, status), where status ∈ {READY_TO_EXECUTE, EXECUTING, EXECUTED, ABORTING}.
txn_status []StatusEntry
// metrics
executedTxns atomic.Int64
validatedTxns atomic.Int64
}
func NewScheduler(block_size int) *Scheduler {
return &Scheduler{
block_size: block_size,
txn_dependency: make([]TxDependency, block_size),
txn_status: make([]StatusEntry, block_size),
}
}
func (s *Scheduler) Done() bool {
return s.done_marker.Load()
}
func (s *Scheduler) DecreaseValidationIdx(target TxnIndex) {
StoreMin(&s.validation_idx, uint64(target))
s.decrease_cnt.Add(1)
}
func (s *Scheduler) CheckDone() {
observed_cnt := s.decrease_cnt.Load()
if s.execution_idx.Load() >= uint64(s.block_size) &&
s.validation_idx.Load() >= uint64(s.block_size) &&
s.num_active_tasks.Load() == 0 {
if observed_cnt == s.decrease_cnt.Load() {
s.done_marker.Store(true)
}
}
// avoid busy waiting
runtime.Gosched()
}
// TryIncarnate tries to incarnate a transaction index to execute.
// Returns the transaction version if successful, otherwise returns invalid version.
//
// Invariant `num_active_tasks`: decreased if an invalid task is returned.
func (s *Scheduler) TryIncarnate(idx TxnIndex) TxnVersion {
if int(idx) < s.block_size {
if incarnation, ok := s.txn_status[idx].TrySetExecuting(); ok {
return TxnVersion{idx, incarnation}
}
}
DecrAtomic(&s.num_active_tasks)
return InvalidTxnVersion
}
// NextVersionToExecute get the next transaction index to execute,
// returns invalid version if no task is available
//
// Invariant `num_active_tasks`: increased if a valid task is returned.
func (s *Scheduler) NextVersionToExecute() TxnVersion {
if s.execution_idx.Load() >= uint64(s.block_size) {
s.CheckDone()
return InvalidTxnVersion
}
IncrAtomic(&s.num_active_tasks)
idx_to_execute := s.execution_idx.Add(1) - 1
return s.TryIncarnate(TxnIndex(idx_to_execute))
}
// NextVersionToValidate get the next transaction index to validate,
// returns invalid version if no task is available.
//
// Invariant `num_active_tasks`: increased if a valid task is returned.
func (s *Scheduler) NextVersionToValidate() TxnVersion {
if s.validation_idx.Load() >= uint64(s.block_size) {
s.CheckDone()
return InvalidTxnVersion
}
IncrAtomic(&s.num_active_tasks)
idx_to_validate := FetchIncr(&s.validation_idx)
if idx_to_validate < uint64(s.block_size) {
if ok, incarnation := s.txn_status[idx_to_validate].IsExecuted(); ok {
return TxnVersion{TxnIndex(idx_to_validate), incarnation}
}
}
DecrAtomic(&s.num_active_tasks)
return InvalidTxnVersion
}
// NextTask returns the transaction index and task kind for the next task to execute or validate,
// returns invalid version if no task is available.
//
// Invariant `num_active_tasks`: increased if a valid task is returned.
func (s *Scheduler) NextTask() (TxnVersion, TaskKind) {
validation_idx := s.validation_idx.Load()
execution_idx := s.execution_idx.Load()
if validation_idx < execution_idx {
return s.NextVersionToValidate(), TaskKindValidation
} else {
return s.NextVersionToExecute(), TaskKindExecution
}
}
func (s *Scheduler) WaitForDependency(txn TxnIndex, blocking_txn TxnIndex) *Condvar {
cond := NewCondvar()
entry := &s.txn_dependency[blocking_txn]
entry.Lock()
// thread holds 2 locks
if ok, _ := s.txn_status[blocking_txn].IsExecuted(); ok {
// dependency resolved before locking in Line 148
entry.Unlock()
return nil
}
s.txn_status[txn].Suspend(cond)
entry.dependents = append(entry.dependents, txn)
entry.Unlock()
return cond
}
func (s *Scheduler) ResumeDependencies(txns []TxnIndex) {
for _, txn := range txns {
s.txn_status[txn].Resume()
}
}
// Invariant `num_active_tasks`: decreased if an invalid task is returned.
func (s *Scheduler) FinishExecution(version TxnVersion, wroteNewPath bool) (TxnVersion, TaskKind) {
s.txn_status[version.Index].SetExecuted()
deps := s.txn_dependency[version.Index].Swap(nil)
s.ResumeDependencies(deps)
if s.validation_idx.Load() > uint64(version.Index) { // otherwise index already small enough
if !wroteNewPath {
// schedule validation for current tx only, don't decrease num_active_tasks
return version, TaskKindValidation
}
// schedule validation for txn_idx and higher txns
s.DecreaseValidationIdx(version.Index)
}
DecrAtomic(&s.num_active_tasks)
return InvalidTxnVersion, 0
}
func (s *Scheduler) TryValidationAbort(version TxnVersion) bool {
return s.txn_status[version.Index].TryValidationAbort(version.Incarnation)
}
// Invariant `num_active_tasks`: decreased if an invalid task is returned.
func (s *Scheduler) FinishValidation(txn TxnIndex, aborted bool) (TxnVersion, TaskKind) {
if aborted {
s.txn_status[txn].SetReadyStatus()
s.DecreaseValidationIdx(txn + 1)
if s.execution_idx.Load() > uint64(txn) {
return s.TryIncarnate(txn), TaskKindExecution
}
}
DecrAtomic(&s.num_active_tasks)
return InvalidTxnVersion, 0
}
func (s *Scheduler) Stats() string {
return fmt.Sprintf("executed: %d, validated: %d",
s.executedTxns.Load(), s.validatedTxns.Load())
}