1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
#include <cassert>
#include <cstdint>
#include <cstdio>
#include <atomic>
#include <thread>
#include <unistd.h>
#ifndef __aarch64__
#error "This must be compiled for arm64!"
#endif
// LDXR: Load exclusive register wrapper.
//
// Read from ADDR and marked address for exclusive access (exclusive monitor).
//
// Return value read from memory.
//
// NOTE: No memory ordering semantics.
//
// https://developer.arm.com/documentation/ddi0596/2021-12/Base-Instructions/LDXR--Load-Exclusive-Register-?lang=en
uint64_t ldxr(uint64_t* addr) {
uint64_t ret;
asm volatile("ldxr %0, %1" : "=r"(ret) : "Q"(*addr) : "memory");
return ret;
}
// STXR: Store exclusive register wrapper.
//
// Conditionally write VAL to ADDR if ADDR is marked for exclusive access by a
// previous exclusive load (eg LDXR).
//
// Return 0 if the write was successful, 1 otherwise.
//
// NOTE: No memory ordering semantics.
//
// https://developer.arm.com/documentation/ddi0596/2021-12/Base-Instructions/STXR--Store-Exclusive-Register-?lang=en
bool stxr(uint64_t* addr, uint64_t val) {
uint32_t ret;
asm volatile("stxr %w0, %2, %1"
: "=&r"(ret), "=Q"(*addr)
: "r"(val)
: "memory");
return ret == 0;
}
inline void mem_barrier() {
asm volatile("dmb ish" : /* out */ : /* in */ : "memory");
}
// -- BLOCK STACK --------------------------------------------------------------
struct BlockStack {
struct Block {
Block* next{nullptr};
unsigned mem{0};
};
void push(Block* blk) {
printf("[%d] push blk->mem=%d\n", gettid(), blk->mem);
do {
blk->next = load_head_ex();
} while (!store_head_ex(blk));
}
Block* pop(bool delay = false) {
Block *blk, *next;
do {
blk = load_head_ex();
next = blk->next;
// Spin some time to give other thread a chance to replace head.
// Delay somewhat tuned for my ARM device.
for (int i = 0; delay && (i < (1 << 16)); ++i) {
asm volatile("nop");
}
delay = false;
} while (!store_head_ex(next));
printf("[%d] pop blk->mem=%d\n", gettid(), blk->mem);
return blk;
}
private:
Block* load_head_ex() const {
static_assert(sizeof(void*) == sizeof(uint64_t),
"Pointer size miss-match!");
Block* blk = (Block*)ldxr((uint64_t*)&m_head);
mem_barrier();
return blk;
}
bool store_head_ex(Block* blk) {
static_assert(sizeof(void*) == sizeof(uint64_t),
"Pointer size miss-match!");
const bool success = stxr((uint64_t*)&m_head, (uint64_t)blk);
mem_barrier();
return success;
}
Block* m_head{nullptr};
};
int main() {
BlockStack::Block B1, B2, B3;
B1.mem = 1;
B2.mem = 2;
B3.mem = 3;
// Create free memory block list.
// S: head -> 1 -> 2 -> 3.
BlockStack S;
S.push(&B3);
S.push(&B2);
S.push(&B1);
// Atomics to coordinate when threads are being released.
std::atomic<size_t> worker(0);
std::atomic<bool> release(false);
auto T1 = std::thread([&S, &worker, &release]() {
// Notify thread T1 ready, and wait for release.
worker.fetch_add(1);
while (!release.load()) {}
// Read head=1 & next=2, and add some artificial delay by executing NOPs.
// This gives T2 some time to pop 1 & 2 and push back 1, which triggered the
// ABA problem with the CAS instruction.
//
// However with the LL/SC atomics, our exclusive monitor for T1 after the
// exclusive load of the head is cleared after T2 updated the head.
// Therefore the exclusive store after the NOP delay will fail and we will
// load the new head pointer, circumventing the ABA problem.
//
// NOTE: This does not work under qemu user emulation (aarch64) as qemu does
// not implement the strong LL/SC semantics.
// https://elixir.bootlin.com/qemu/v8.1.2/source/target/arm/tcg/translate-a64.c#L2411
auto* B1 = S.pop(true /* nops delay */);
assert(B1->mem == 1);
// Pops 3, correct, no ABA problem.
auto* B2 = S.pop();
assert(B2->mem == 3);
});
auto T2 = std::thread([&S, &worker, &release]() {
// Notify thread T2 ready, and wait for release.
worker.fetch_add(1);
while (!release.load()) {}
// Artificial sleep, such that T1 entered pop and read head & next.
// Delay somewhat tuned for my ARM device.
for (int i = 0; i < (1<<8); ++i) {
asm volatile("nop");
}
// Pop 1 & 2.
auto* B1 = S.pop();
assert(B1->mem == 1);
auto* B2 = S.pop();
assert(B2->mem == 2);
// Re-insert 1.
S.push(B1);
});
// Wait T1 & T2 ready, then release both threads.
while (worker.load() != 2) {}
release.store(true);
T2.join();
T1.join();
}
|