自旋锁

spinlock是一种死等的锁机制,一次只能有一个执行单元获取锁并进入临界区,其他执行单元不断循环死等。spinlock可以在中断上下文执行。由于不睡眠,因此spinlock可以在中断上下文中使用。

spinlock函数

void spin_lock(spinlock_t *lock)。进程和进程之间同步

void spin_lock_bh(spinlock_t *lock)。涉及到和本地软中断之间的同步

void spin_lock_irq(spinlock_t *lock)。涉及到和本地硬中断之间的同步

void spin_lock_irqsave(spinlock_t *lock)。涉及到和本地硬中断之间的同步并保存本地中断状态

int spin_trylock(spinlock_t *lock)。尝试获取锁,如果成功返回非零,否则返回零

使用

1
2
3
4
5
6
7
spinlock_t lock;
// 初始化
spin_lock_init(&lock);
// 获取锁
spin_lock(&lock);
// 释放锁
spin_unlock(&lock);

spinlock实现

spinlock定义

include/linux/spinlock_types.h:

由于spinlock不允许睡眠,所以spinlock不可被抢占。

但是如果定义CONFIG_PREEMPT_RT实时,则spinlock不可被抢占;否则可以被抢占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#ifndef CONFIG_PREEMPT_RT

struct rt_mutex_base {
raw_spinlock_t wait_lock;
struct rb_root_cached waiters;
struct task_struct *owner;
};

typedef struct spinlock {
struct rt_mutex_base lock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} spinlock_t;

#else /* !CONFIG_PREEMPT_RT */

typedef struct spinlock {
struct rt_mutex_base lock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} spinlock_t;

#endif /* CONFIG_PREEMPT_RT */

raw_spinlock定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct { } arch_spinlock_t;

typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} raw_spinlock_t;

arch_spinlock_t根据体系不同,实现也不同。

初始化

非RT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define __ARCH_SPIN_LOCK_UNLOCKED { }

#define ___SPIN_LOCK_INITIALIZER(lockname) \
{ \
.raw_lock = __ARCH_SPIN_LOCK_UNLOCKED, \
SPIN_DEBUG_INIT(lockname) \
SPIN_DEP_MAP_INIT(lockname) }

#define __SPIN_LOCK_INITIALIZER(lockname) \
{ { .rlock = ___SPIN_LOCK_INITIALIZER(lockname) } }

#define __SPIN_LOCK_UNLOCKED(lockname) \
(spinlock_t) __SPIN_LOCK_INITIALIZER(lockname)

# define spin_lock_init(_lock) \
do { \
spinlock_check(_lock); \
*(_lock) = __SPIN_LOCK_UNLOCKED(_lock); \
} while (0)

RT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#define __ARCH_SPIN_LOCK_UNLOCKED { }

#define __RAW_SPIN_LOCK_INITIALIZER(lockname) \
{ \
.raw_lock = __ARCH_SPIN_LOCK_UNLOCKED, \
SPIN_DEBUG_INIT(lockname) \
RAW_SPIN_DEP_MAP_INIT(lockname) }

#define __RAW_SPIN_LOCK_UNLOCKED(lockname) \
(raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)

# define raw_spin_lock_init(lock) \
do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)

#define RB_ROOT_CACHED (struct rb_root_cached) { {NULL, }, NULL }

static inline void __rt_mutex_base_init(struct rt_mutex_base *lock)
{
// 初始化raw自旋锁
raw_spin_lock_init(&lock->wait_lock);
// 初始化红黑树
lock->waiters = RB_ROOT_CACHED;
// 当前执行的进程
lock->owner = NULL;
}

void rt_mutex_base_init(struct rt_mutex_base *rtb)
{
__rt_mutex_base_init(rtb);
}
EXPORT_SYMBOL(rt_mutex_base_init);

static inline void __rt_spin_lock_init(spinlock_t *lock, const char *name,
struct lock_class_key *key, bool percpu)
{
}

#define spin_lock_init(slock) \
do { \
static struct lock_class_key __key; \
\
rt_mutex_base_init(&(slock)->lock); \
__rt_spin_lock_init(slock, #slock, &__key, false); \
} while (0)

spin_lock()实现

非RT

1
2
3
4
5
6
#define raw_spin_lock(lock)	_raw_spin_lock(lock)

static __always_inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}

UP 单核实现

单核实现较为简单,就只是关闭了抢占。

1
2
3
4
5
6
7
8
9
10
11
# define __acquire(x)	(void)0

#define ___LOCK(lock) \
do { __acquire(lock); (void)(lock); } while (0)

#define preempt_disable() barrier()

#define __LOCK(lock) \
do { preempt_disable(); ___LOCK(lock); } while (0)

#define _raw_spin_lock(lock) __LOCK(lock)

SMP多核实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

#define lock_acquire_exclusive(l, s, t, n, i) lock_acquire(l, s, t, 0, 1, n, i)

#define spin_acquire(l, s, t, i) lock_acquire_exclusive(l, s, t, NULL, i)

#define LOCK_CONTENDED(_lock, try, lock) \
lock(_lock)

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
// 关闭抢占
preempt_disable();
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
// do_raw_spin_lock(lock)
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

#define _raw_spin_lock(lock) __raw_spin_lock(lock)

do_raw_spin_lock实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* queued_spin_lock - acquire a queued spinlock
* @lock: Pointer to queued spinlock structure
*/
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
int val = 0;

// 执行带有获取语义的原子比较交换操作,如果锁的值是0,那么把锁的locked字段设置为1。
if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
// 如果锁的旧值是0,说明申请锁的时候锁处于空闲状态,那么成功地获得锁。
return;

// 如果锁的旧值不是0,说明锁不是处于空闲状态,那么执行申请自旋锁的慢速路径。
queued_spin_lock_slowpath(lock, val);
}
// x86使用asm-generic
#define arch_spin_lock(l) queued_spin_lock(l)

static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
__acquire(lock);
arch_spin_lock(&lock->raw_lock);
mmiowb_spin_lock();
}

queued_spin_lock_slowpath 自旋锁慢路径实现(kernel/locking/qspinlock.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/**
* queued_spin_lock_slowpath - acquire the queued spinlock
* @lock: Pointer to queued spinlock structure
* @val: Current value of the queued spinlock 32-bit word
*
* (queue tail, pending bit, lock value)
*
* fast : slow : unlock
* : :
* uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
* : | ^--------.------. / :
* : v \ \ | :
* pending : (0,1,1) +--> (0,1,0) \ | :
* : | ^--' | | :
* : v | | :
* uncontended : (n,x,y) +--> (n,0,0) --' | :
* queue : | ^--' | :
* : v | :
* contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' :
* queue : ^--' :
*/
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
struct mcs_spinlock *prev, *next, *node;
u32 old, tail;
int idx;

BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));

if (pv_enabled())
goto pv_queue;

if (virt_spin_lock(lock))
return;

/*
* Wait for in-progress pending->locked hand-overs with a bounded
* number of spins so that we guarantee forward progress.
*
* 0,1,0 -> 0,0,1
*/
// 如果锁的状态是pending,即{tail=0,pending=1,locked=0},那么等待锁的状态变成locked,即{tail=0,pending=0,locked=1}。
if (val == _Q_PENDING_VAL) {
int cnt = _Q_PENDING_LOOPS;
val = atomic_cond_read_relaxed(&lock->val,
(VAL != _Q_PENDING_VAL) || !cnt--);
}

/*
* If we observe any contention; queue.
*/
// 如果锁的tail字段不是0或者pending位是1,说明已经有处理器在等待自旋锁,那么跳转到标号queue,本处理器加入等待队列。
if (val & ~_Q_LOCKED_MASK)
goto queue;

/*
* trylock || pending
*
* 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
*/
val = queued_fetch_set_pending_acquire(lock);

/*
* If we observe contention, there is a concurrent locker.
*
* Undo and queue; our setting of PENDING might have made the
* n,0,0 -> 0,0,0 transition fail and it will now be waiting
* on @next to become !NULL.
*/
if (unlikely(val & ~_Q_LOCKED_MASK)) {

/* Undo PENDING if we set it. */
if (!(val & _Q_PENDING_MASK))
clear_pending(lock);

goto queue;
}

/*
* We're pending, wait for the owner to go away.
*
* 0,1,1 -> 0,1,0
*
* this wait loop must be a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because not all
* clear_pending_set_locked() implementations imply full
* barriers.
*/
if (val & _Q_LOCKED_MASK)
atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));

/*
* take ownership and clear the pending bit.
*
* 0,1,0 -> 0,0,1
*/
clear_pending_set_locked(lock);
lockevent_inc(lock_pending);
return;

/*
* End of pending bit optimistic spinning and beginning of MCS
* queuing.
*/
queue:
lockevent_inc(lock_slowpath);
pv_queue:
node = this_cpu_ptr(&qnodes[0].mcs);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);

/*
* 4 nodes are allocated based on the assumption that there will
* not be nested NMIs taking spinlocks. That may not be true in
* some architectures even though the chance of needing more than
* 4 nodes will still be extremely unlikely. When that happens,
* we fall back to spinning on the lock directly without using
* any MCS node. This is not the most elegant solution, but is
* simple enough.
*/
if (unlikely(idx >= MAX_NODES)) {
lockevent_inc(lock_no_node);
while (!queued_spin_trylock(lock))
cpu_relax();
goto release;
}

node = grab_mcs_node(node, idx);

/*
* Keep counts of non-zero index values:
*/
lockevent_cond_inc(lock_use_node2 + idx - 1, idx);

/*
* Ensure that we increment the head node->count before initialising
* the actual node. If the compiler is kind enough to reorder these
* stores, then an IRQ could overwrite our assignments.
*/
barrier();

node->locked = 0;
node->next = NULL;
pv_init_node(node);

/*
* We touched a (possibly) cold cacheline in the per-cpu queue node;
* attempt the trylock once more in the hope someone let go while we
* weren't watching.
*/
if (queued_spin_trylock(lock))
goto release;

/*
* Ensure that the initialisation of @node is complete before we
* publish the updated tail via xchg_tail() and potentially link
* @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
*/
smp_wmb();

/*
* Publish the updated tail.
* We have already touched the queueing cacheline; don't bother with
* pending stuff.
*
* p,*,* -> n,*,*
*/
old = xchg_tail(lock, tail);
next = NULL;

/*
* if there was a previous node; link it and wait until reaching the
* head of the waitqueue.
*/
if (old & _Q_TAIL_MASK) {
prev = decode_tail(old);

/* Link @node into the waitqueue. */
WRITE_ONCE(prev->next, node);

pv_wait_node(node, prev);
arch_mcs_spin_lock_contended(&node->locked);

/*
* While waiting for the MCS lock, the next pointer may have
* been set by another lock waiter. We optimistically load
* the next pointer & prefetch the cacheline for writing
* to reduce latency in the upcoming MCS unlock operation.
*/
next = READ_ONCE(node->next);
if (next)
prefetchw(next);
}

/*
* we're at the head of the waitqueue, wait for the owner & pending to
* go away.
*
* *,x,y -> *,0,0
*
* this wait loop must use a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because the set_locked() function below
* does not imply a full barrier.
*
* The PV pv_wait_head_or_lock function, if active, will acquire
* the lock and return a non-zero value. So we have to skip the
* atomic_cond_read_acquire() call. As the next PV queue head hasn't
* been designated yet, there is no way for the locked value to become
* _Q_SLOW_VAL. So both the set_locked() and the
* atomic_cmpxchg_relaxed() calls will be safe.
*
* If PV isn't active, 0 will be returned instead.
*
*/
if ((val = pv_wait_head_or_lock(lock, node)))
goto locked;

val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));

locked:
/*
* claim the lock:
*
* n,0,0 -> 0,0,1 : lock, uncontended
* *,*,0 -> *,*,1 : lock, contended
*
* If the queue head is the only one in the queue (lock value == tail)
* and nobody is pending, clear the tail code and grab the lock.
* Otherwise, we only need to grab the lock.
*/

/*
* In the PV case we might already have _Q_LOCKED_VAL set, because
* of lock stealing; therefore we must also allow:
*
* n,0,1 -> 0,0,1
*
* Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
* above wait condition, therefore any concurrent setting of
* PENDING will make the uncontended transition fail.
*/
if ((val & _Q_TAIL_MASK) == tail) {
if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
goto release; /* No contention */
}

/*
* Either somebody is queued behind us or _Q_PENDING_VAL got set
* which will then detect the remaining tail and queue behind us
* ensuring we'll see a @next.
*/
set_locked(lock);

/*
* contended path; wait for next if not observed yet, release.
*/
if (!next)
next = smp_cond_load_relaxed(&node->next, (VAL));

arch_mcs_spin_unlock_contended(&next->locked);
pv_kick_node(lock, next);

release:
/*
* release the node
*/
__this_cpu_dec(qnodes[0].mcs.count);
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);

RT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static __always_inline void __rt_spin_lock(spinlock_t *lock)
{
rtlock_might_resched();
rtlock_lock(&lock->lock);
rcu_read_lock();
migrate_disable();
}
void __sched rt_spin_lock(spinlock_t *lock)
{
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
__rt_spin_lock(lock);
}
EXPORT_SYMBOL(rt_spin_lock);

static __always_inline void spin_lock(spinlock_t *lock)
{
rt_spin_lock(lock);
}

RW (read/write) spinlock

  1. 如果临界区没有任何thread,则任何read thread或write thread都可以进入
  2. 如果临界区有一个read thread,则新来的read thread可以任意进入,凡事write thread不可以进入
  3. 如果临界区有一个write thread,则任何read thread和write thread都不可以进入
  4. 如果临界区有一个或者多个read thread,write thread不可以进入临界区,但是该write thread也无法阻止后续的read thread进入,它要一直等待临界区的read thread执行完毕,才可以进入。

rw spinlock 赋予 read 更高的优先级

seqlock 顺序锁

  1. 如果临界区没有任何thread,则任何read thread或write thread都可以进入
  2. 如果临界区有一个read thread,则read thread可以任意进入
  3. 如果临界区有一个write thread,则任何read thread和write thread都不可以进入
  4. 如果临界区只有read thread,write thread可以立刻执行,不会等待

seqlock 赋予 write 更高的优先级

spinlock的不足

rw spinlock、seqlock和spinlock,它们都是基于一个memory中的共享变量(对该变量的访问是原子的)

当CPU一级缓存中的锁失效,就会往二级、三级、内存中去读取lock值,会造成性能浪费。

RCU

Read Copy Update,克服了以上锁的缺点,具有很好的扩展性,但是这种锁机制的使用范围比较窄,它只适用于读多写少的情况,如网络路由表的查询更新、设备状态表的维护、数据结构的延迟释放以及多径I/O设备的维护等