]> git.proxmox.com Git - mirror_ubuntu-zesty-kernel.git/blame - fs/dlm/lock.c
[DLM] set purged flag on rsbs
[mirror_ubuntu-zesty-kernel.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
e7fd4179 59#include "dlm_internal.h"
597d0cae 60#include <linux/dlm_device.h>
e7fd4179
DT
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
597d0cae 73#include "user.h"
e7fd4179
DT
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
88
597d0cae
DT
89#define FAKE_USER_AST (void*)0xff00ff00
90
e7fd4179
DT
91/*
92 * Lock compatibilty matrix - thanks Steve
93 * UN = Unlocked state. Not really a state, used as a flag
94 * PD = Padding. Used to make the matrix a nice power of two in size
95 * Other states are the same as the VMS DLM.
96 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
97 */
98
99static const int __dlm_compat_matrix[8][8] = {
100 /* UN NL CR CW PR PW EX PD */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
102 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
103 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
104 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
105 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
106 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
107 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
108 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
109};
110
111/*
112 * This defines the direction of transfer of LVB data.
113 * Granted mode is the row; requested mode is the column.
114 * Usage: matrix[grmode+1][rqmode+1]
115 * 1 = LVB is returned to the caller
116 * 0 = LVB is written to the resource
117 * -1 = nothing happens to the LVB
118 */
119
120const int dlm_lvb_operations[8][8] = {
121 /* UN NL CR CW PR PW EX PD*/
122 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
123 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
124 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
125 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
126 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
127 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
128 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
129 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
130};
e7fd4179
DT
131
132#define modes_compat(gr, rq) \
133 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134
135int dlm_modes_compat(int mode1, int mode2)
136{
137 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
138}
139
140/*
141 * Compatibility matrix for conversions with QUECVT set.
142 * Granted mode is the row; requested mode is the column.
143 * Usage: matrix[grmode+1][rqmode+1]
144 */
145
146static const int __quecvt_compat_matrix[8][8] = {
147 /* UN NL CR CW PR PW EX PD */
148 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
149 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
150 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
151 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
152 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
153 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
154 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
155 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
156};
157
597d0cae 158void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
159{
160 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
162 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
163 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
164 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165}
166
167void dlm_print_rsb(struct dlm_rsb *r)
168{
169 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
170 r->res_nodeid, r->res_flags, r->res_first_lkid,
171 r->res_recover_locks_count, r->res_name);
172}
173
174/* Threads cannot use the lockspace while it's being recovered */
175
176static inline void lock_recovery(struct dlm_ls *ls)
177{
178 down_read(&ls->ls_in_recovery);
179}
180
181static inline void unlock_recovery(struct dlm_ls *ls)
182{
183 up_read(&ls->ls_in_recovery);
184}
185
186static inline int lock_recovery_try(struct dlm_ls *ls)
187{
188 return down_read_trylock(&ls->ls_in_recovery);
189}
190
191static inline int can_be_queued(struct dlm_lkb *lkb)
192{
193 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
194}
195
196static inline int force_blocking_asts(struct dlm_lkb *lkb)
197{
198 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
199}
200
201static inline int is_demoted(struct dlm_lkb *lkb)
202{
203 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
204}
205
206static inline int is_remote(struct dlm_rsb *r)
207{
208 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
209 return !!r->res_nodeid;
210}
211
212static inline int is_process_copy(struct dlm_lkb *lkb)
213{
214 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
215}
216
217static inline int is_master_copy(struct dlm_lkb *lkb)
218{
219 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
220 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 221 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
222}
223
224static inline int middle_conversion(struct dlm_lkb *lkb)
225{
226 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
227 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
228 return 1;
229 return 0;
e7fd4179
DT
230}
231
232static inline int down_conversion(struct dlm_lkb *lkb)
233{
234 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
235}
236
237static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
238{
239 if (is_master_copy(lkb))
240 return;
241
242 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
243
244 lkb->lkb_lksb->sb_status = rv;
245 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
246
247 dlm_add_ast(lkb, AST_COMP);
248}
249
250static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
251{
252 if (is_master_copy(lkb))
253 send_bast(r, lkb, rqmode);
254 else {
255 lkb->lkb_bastmode = rqmode;
256 dlm_add_ast(lkb, AST_BAST);
257 }
258}
259
260/*
261 * Basic operations on rsb's and lkb's
262 */
263
264static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
265{
266 struct dlm_rsb *r;
267
268 r = allocate_rsb(ls, len);
269 if (!r)
270 return NULL;
271
272 r->res_ls = ls;
273 r->res_length = len;
274 memcpy(r->res_name, name, len);
90135925 275 mutex_init(&r->res_mutex);
e7fd4179
DT
276
277 INIT_LIST_HEAD(&r->res_lookup);
278 INIT_LIST_HEAD(&r->res_grantqueue);
279 INIT_LIST_HEAD(&r->res_convertqueue);
280 INIT_LIST_HEAD(&r->res_waitqueue);
281 INIT_LIST_HEAD(&r->res_root_list);
282 INIT_LIST_HEAD(&r->res_recover_list);
283
284 return r;
285}
286
287static int search_rsb_list(struct list_head *head, char *name, int len,
288 unsigned int flags, struct dlm_rsb **r_ret)
289{
290 struct dlm_rsb *r;
291 int error = 0;
292
293 list_for_each_entry(r, head, res_hashchain) {
294 if (len == r->res_length && !memcmp(name, r->res_name, len))
295 goto found;
296 }
597d0cae 297 return -EBADR;
e7fd4179
DT
298
299 found:
300 if (r->res_nodeid && (flags & R_MASTER))
301 error = -ENOTBLK;
302 *r_ret = r;
303 return error;
304}
305
306static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
307 unsigned int flags, struct dlm_rsb **r_ret)
308{
309 struct dlm_rsb *r;
310 int error;
311
312 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
313 if (!error) {
314 kref_get(&r->res_ref);
315 goto out;
316 }
317 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
318 if (error)
319 goto out;
320
321 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
322
323 if (dlm_no_directory(ls))
324 goto out;
325
326 if (r->res_nodeid == -1) {
327 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
329 } else if (r->res_nodeid > 0) {
330 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
331 r->res_first_lkid = 0;
332 } else {
333 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
334 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
335 }
336 out:
337 *r_ret = r;
338 return error;
339}
340
341static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
342 unsigned int flags, struct dlm_rsb **r_ret)
343{
344 int error;
345 write_lock(&ls->ls_rsbtbl[b].lock);
346 error = _search_rsb(ls, name, len, b, flags, r_ret);
347 write_unlock(&ls->ls_rsbtbl[b].lock);
348 return error;
349}
350
351/*
352 * Find rsb in rsbtbl and potentially create/add one
353 *
354 * Delaying the release of rsb's has a similar benefit to applications keeping
355 * NL locks on an rsb, but without the guarantee that the cached master value
356 * will still be valid when the rsb is reused. Apps aren't always smart enough
357 * to keep NL locks on an rsb that they may lock again shortly; this can lead
358 * to excessive master lookups and removals if we don't delay the release.
359 *
360 * Searching for an rsb means looking through both the normal list and toss
361 * list. When found on the toss list the rsb is moved to the normal list with
362 * ref count of 1; when found on normal list the ref count is incremented.
363 */
364
365static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
366 unsigned int flags, struct dlm_rsb **r_ret)
367{
368 struct dlm_rsb *r, *tmp;
369 uint32_t hash, bucket;
370 int error = 0;
371
372 if (dlm_no_directory(ls))
373 flags |= R_CREATE;
374
375 hash = jhash(name, namelen, 0);
376 bucket = hash & (ls->ls_rsbtbl_size - 1);
377
378 error = search_rsb(ls, name, namelen, bucket, flags, &r);
379 if (!error)
380 goto out;
381
597d0cae 382 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
383 goto out;
384
385 /* the rsb was found but wasn't a master copy */
386 if (error == -ENOTBLK)
387 goto out;
388
389 error = -ENOMEM;
390 r = create_rsb(ls, name, namelen);
391 if (!r)
392 goto out;
393
394 r->res_hash = hash;
395 r->res_bucket = bucket;
396 r->res_nodeid = -1;
397 kref_init(&r->res_ref);
398
399 /* With no directory, the master can be set immediately */
400 if (dlm_no_directory(ls)) {
401 int nodeid = dlm_dir_nodeid(r);
402 if (nodeid == dlm_our_nodeid())
403 nodeid = 0;
404 r->res_nodeid = nodeid;
405 }
406
407 write_lock(&ls->ls_rsbtbl[bucket].lock);
408 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
409 if (!error) {
410 write_unlock(&ls->ls_rsbtbl[bucket].lock);
411 free_rsb(r);
412 r = tmp;
413 goto out;
414 }
415 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
416 write_unlock(&ls->ls_rsbtbl[bucket].lock);
417 error = 0;
418 out:
419 *r_ret = r;
420 return error;
421}
422
423int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
424 unsigned int flags, struct dlm_rsb **r_ret)
425{
426 return find_rsb(ls, name, namelen, flags, r_ret);
427}
428
429/* This is only called to add a reference when the code already holds
430 a valid reference to the rsb, so there's no need for locking. */
431
432static inline void hold_rsb(struct dlm_rsb *r)
433{
434 kref_get(&r->res_ref);
435}
436
437void dlm_hold_rsb(struct dlm_rsb *r)
438{
439 hold_rsb(r);
440}
441
442static void toss_rsb(struct kref *kref)
443{
444 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
445 struct dlm_ls *ls = r->res_ls;
446
447 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
448 kref_init(&r->res_ref);
449 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
450 r->res_toss_time = jiffies;
451 if (r->res_lvbptr) {
452 free_lvb(r->res_lvbptr);
453 r->res_lvbptr = NULL;
454 }
455}
456
457/* When all references to the rsb are gone it's transfered to
458 the tossed list for later disposal. */
459
460static void put_rsb(struct dlm_rsb *r)
461{
462 struct dlm_ls *ls = r->res_ls;
463 uint32_t bucket = r->res_bucket;
464
465 write_lock(&ls->ls_rsbtbl[bucket].lock);
466 kref_put(&r->res_ref, toss_rsb);
467 write_unlock(&ls->ls_rsbtbl[bucket].lock);
468}
469
470void dlm_put_rsb(struct dlm_rsb *r)
471{
472 put_rsb(r);
473}
474
475/* See comment for unhold_lkb */
476
477static void unhold_rsb(struct dlm_rsb *r)
478{
479 int rv;
480 rv = kref_put(&r->res_ref, toss_rsb);
481 DLM_ASSERT(!rv, dlm_print_rsb(r););
482}
483
484static void kill_rsb(struct kref *kref)
485{
486 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
487
488 /* All work is done after the return from kref_put() so we
489 can release the write_lock before the remove and free. */
490
491 DLM_ASSERT(list_empty(&r->res_lookup),);
492 DLM_ASSERT(list_empty(&r->res_grantqueue),);
493 DLM_ASSERT(list_empty(&r->res_convertqueue),);
494 DLM_ASSERT(list_empty(&r->res_waitqueue),);
495 DLM_ASSERT(list_empty(&r->res_root_list),);
496 DLM_ASSERT(list_empty(&r->res_recover_list),);
497}
498
499/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
500 The rsb must exist as long as any lkb's for it do. */
501
502static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
503{
504 hold_rsb(r);
505 lkb->lkb_resource = r;
506}
507
508static void detach_lkb(struct dlm_lkb *lkb)
509{
510 if (lkb->lkb_resource) {
511 put_rsb(lkb->lkb_resource);
512 lkb->lkb_resource = NULL;
513 }
514}
515
516static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
517{
518 struct dlm_lkb *lkb, *tmp;
519 uint32_t lkid = 0;
520 uint16_t bucket;
521
522 lkb = allocate_lkb(ls);
523 if (!lkb)
524 return -ENOMEM;
525
526 lkb->lkb_nodeid = -1;
527 lkb->lkb_grmode = DLM_LOCK_IV;
528 kref_init(&lkb->lkb_ref);
34e22bed 529 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
e7fd4179
DT
530
531 get_random_bytes(&bucket, sizeof(bucket));
532 bucket &= (ls->ls_lkbtbl_size - 1);
533
534 write_lock(&ls->ls_lkbtbl[bucket].lock);
535
536 /* counter can roll over so we must verify lkid is not in use */
537
538 while (lkid == 0) {
539 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
540
541 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
542 lkb_idtbl_list) {
543 if (tmp->lkb_id != lkid)
544 continue;
545 lkid = 0;
546 break;
547 }
548 }
549
550 lkb->lkb_id = lkid;
551 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
552 write_unlock(&ls->ls_lkbtbl[bucket].lock);
553
554 *lkb_ret = lkb;
555 return 0;
556}
557
558static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
559{
560 uint16_t bucket = lkid & 0xFFFF;
561 struct dlm_lkb *lkb;
562
563 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
564 if (lkb->lkb_id == lkid)
565 return lkb;
566 }
567 return NULL;
568}
569
570static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
571{
572 struct dlm_lkb *lkb;
573 uint16_t bucket = lkid & 0xFFFF;
574
575 if (bucket >= ls->ls_lkbtbl_size)
576 return -EBADSLT;
577
578 read_lock(&ls->ls_lkbtbl[bucket].lock);
579 lkb = __find_lkb(ls, lkid);
580 if (lkb)
581 kref_get(&lkb->lkb_ref);
582 read_unlock(&ls->ls_lkbtbl[bucket].lock);
583
584 *lkb_ret = lkb;
585 return lkb ? 0 : -ENOENT;
586}
587
588static void kill_lkb(struct kref *kref)
589{
590 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
591
592 /* All work is done after the return from kref_put() so we
593 can release the write_lock before the detach_lkb */
594
595 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
596}
597
b3f58d8f
DT
598/* __put_lkb() is used when an lkb may not have an rsb attached to
599 it so we need to provide the lockspace explicitly */
600
601static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 602{
e7fd4179
DT
603 uint16_t bucket = lkb->lkb_id & 0xFFFF;
604
605 write_lock(&ls->ls_lkbtbl[bucket].lock);
606 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
607 list_del(&lkb->lkb_idtbl_list);
608 write_unlock(&ls->ls_lkbtbl[bucket].lock);
609
610 detach_lkb(lkb);
611
612 /* for local/process lkbs, lvbptr points to caller's lksb */
613 if (lkb->lkb_lvbptr && is_master_copy(lkb))
614 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
615 free_lkb(lkb);
616 return 1;
617 } else {
618 write_unlock(&ls->ls_lkbtbl[bucket].lock);
619 return 0;
620 }
621}
622
623int dlm_put_lkb(struct dlm_lkb *lkb)
624{
b3f58d8f
DT
625 struct dlm_ls *ls;
626
627 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
628 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
629
630 ls = lkb->lkb_resource->res_ls;
631 return __put_lkb(ls, lkb);
e7fd4179
DT
632}
633
634/* This is only called to add a reference when the code already holds
635 a valid reference to the lkb, so there's no need for locking. */
636
637static inline void hold_lkb(struct dlm_lkb *lkb)
638{
639 kref_get(&lkb->lkb_ref);
640}
641
642/* This is called when we need to remove a reference and are certain
643 it's not the last ref. e.g. del_lkb is always called between a
644 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
645 put_lkb would work fine, but would involve unnecessary locking */
646
647static inline void unhold_lkb(struct dlm_lkb *lkb)
648{
649 int rv;
650 rv = kref_put(&lkb->lkb_ref, kill_lkb);
651 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
652}
653
654static void lkb_add_ordered(struct list_head *new, struct list_head *head,
655 int mode)
656{
657 struct dlm_lkb *lkb = NULL;
658
659 list_for_each_entry(lkb, head, lkb_statequeue)
660 if (lkb->lkb_rqmode < mode)
661 break;
662
663 if (!lkb)
664 list_add_tail(new, head);
665 else
666 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
667}
668
669/* add/remove lkb to rsb's grant/convert/wait queue */
670
671static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
672{
673 kref_get(&lkb->lkb_ref);
674
675 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
676
677 lkb->lkb_status = status;
678
679 switch (status) {
680 case DLM_LKSTS_WAITING:
681 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
682 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
683 else
684 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
685 break;
686 case DLM_LKSTS_GRANTED:
687 /* convention says granted locks kept in order of grmode */
688 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
689 lkb->lkb_grmode);
690 break;
691 case DLM_LKSTS_CONVERT:
692 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
693 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
694 else
695 list_add_tail(&lkb->lkb_statequeue,
696 &r->res_convertqueue);
697 break;
698 default:
699 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
700 }
701}
702
703static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
704{
705 lkb->lkb_status = 0;
706 list_del(&lkb->lkb_statequeue);
707 unhold_lkb(lkb);
708}
709
710static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
711{
712 hold_lkb(lkb);
713 del_lkb(r, lkb);
714 add_lkb(r, lkb, sts);
715 unhold_lkb(lkb);
716}
717
718/* add/remove lkb from global waiters list of lkb's waiting for
719 a reply from a remote node */
720
721static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
722{
723 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
724
90135925 725 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
726 if (lkb->lkb_wait_type) {
727 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
728 goto out;
729 }
730 lkb->lkb_wait_type = mstype;
731 kref_get(&lkb->lkb_ref);
732 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
733 out:
90135925 734 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
735}
736
737static int _remove_from_waiters(struct dlm_lkb *lkb)
738{
739 int error = 0;
740
741 if (!lkb->lkb_wait_type) {
742 log_print("remove_from_waiters error");
743 error = -EINVAL;
744 goto out;
745 }
746 lkb->lkb_wait_type = 0;
747 list_del(&lkb->lkb_wait_reply);
748 unhold_lkb(lkb);
749 out:
750 return error;
751}
752
753static int remove_from_waiters(struct dlm_lkb *lkb)
754{
755 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
756 int error;
757
90135925 758 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179 759 error = _remove_from_waiters(lkb);
90135925 760 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
761 return error;
762}
763
764static void dir_remove(struct dlm_rsb *r)
765{
766 int to_nodeid;
767
768 if (dlm_no_directory(r->res_ls))
769 return;
770
771 to_nodeid = dlm_dir_nodeid(r);
772 if (to_nodeid != dlm_our_nodeid())
773 send_remove(r);
774 else
775 dlm_dir_remove_entry(r->res_ls, to_nodeid,
776 r->res_name, r->res_length);
777}
778
779/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
780 found since they are in order of newest to oldest? */
781
782static int shrink_bucket(struct dlm_ls *ls, int b)
783{
784 struct dlm_rsb *r;
785 int count = 0, found;
786
787 for (;;) {
90135925 788 found = 0;
e7fd4179
DT
789 write_lock(&ls->ls_rsbtbl[b].lock);
790 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
791 res_hashchain) {
792 if (!time_after_eq(jiffies, r->res_toss_time +
793 dlm_config.toss_secs * HZ))
794 continue;
90135925 795 found = 1;
e7fd4179
DT
796 break;
797 }
798
799 if (!found) {
800 write_unlock(&ls->ls_rsbtbl[b].lock);
801 break;
802 }
803
804 if (kref_put(&r->res_ref, kill_rsb)) {
805 list_del(&r->res_hashchain);
806 write_unlock(&ls->ls_rsbtbl[b].lock);
807
808 if (is_master(r))
809 dir_remove(r);
810 free_rsb(r);
811 count++;
812 } else {
813 write_unlock(&ls->ls_rsbtbl[b].lock);
814 log_error(ls, "tossed rsb in use %s", r->res_name);
815 }
816 }
817
818 return count;
819}
820
821void dlm_scan_rsbs(struct dlm_ls *ls)
822{
823 int i;
824
825 if (dlm_locking_stopped(ls))
826 return;
827
828 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
829 shrink_bucket(ls, i);
830 cond_resched();
831 }
832}
833
834/* lkb is master or local copy */
835
836static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
837{
838 int b, len = r->res_ls->ls_lvblen;
839
840 /* b=1 lvb returned to caller
841 b=0 lvb written to rsb or invalidated
842 b=-1 do nothing */
843
844 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
845
846 if (b == 1) {
847 if (!lkb->lkb_lvbptr)
848 return;
849
850 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
851 return;
852
853 if (!r->res_lvbptr)
854 return;
855
856 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
857 lkb->lkb_lvbseq = r->res_lvbseq;
858
859 } else if (b == 0) {
860 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
861 rsb_set_flag(r, RSB_VALNOTVALID);
862 return;
863 }
864
865 if (!lkb->lkb_lvbptr)
866 return;
867
868 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
869 return;
870
871 if (!r->res_lvbptr)
872 r->res_lvbptr = allocate_lvb(r->res_ls);
873
874 if (!r->res_lvbptr)
875 return;
876
877 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
878 r->res_lvbseq++;
879 lkb->lkb_lvbseq = r->res_lvbseq;
880 rsb_clear_flag(r, RSB_VALNOTVALID);
881 }
882
883 if (rsb_flag(r, RSB_VALNOTVALID))
884 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
885}
886
887static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
888{
889 if (lkb->lkb_grmode < DLM_LOCK_PW)
890 return;
891
892 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
893 rsb_set_flag(r, RSB_VALNOTVALID);
894 return;
895 }
896
897 if (!lkb->lkb_lvbptr)
898 return;
899
900 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
901 return;
902
903 if (!r->res_lvbptr)
904 r->res_lvbptr = allocate_lvb(r->res_ls);
905
906 if (!r->res_lvbptr)
907 return;
908
909 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
910 r->res_lvbseq++;
911 rsb_clear_flag(r, RSB_VALNOTVALID);
912}
913
914/* lkb is process copy (pc) */
915
916static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
917 struct dlm_message *ms)
918{
919 int b;
920
921 if (!lkb->lkb_lvbptr)
922 return;
923
924 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
925 return;
926
597d0cae 927 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
928 if (b == 1) {
929 int len = receive_extralen(ms);
930 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
931 lkb->lkb_lvbseq = ms->m_lvbseq;
932 }
933}
934
935/* Manipulate lkb's on rsb's convert/granted/waiting queues
936 remove_lock -- used for unlock, removes lkb from granted
937 revert_lock -- used for cancel, moves lkb from convert to granted
938 grant_lock -- used for request and convert, adds lkb to granted or
939 moves lkb from convert or waiting to granted
940
941 Each of these is used for master or local copy lkb's. There is
942 also a _pc() variation used to make the corresponding change on
943 a process copy (pc) lkb. */
944
945static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
946{
947 del_lkb(r, lkb);
948 lkb->lkb_grmode = DLM_LOCK_IV;
949 /* this unhold undoes the original ref from create_lkb()
950 so this leads to the lkb being freed */
951 unhold_lkb(lkb);
952}
953
954static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
955{
956 set_lvb_unlock(r, lkb);
957 _remove_lock(r, lkb);
958}
959
960static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
961{
962 _remove_lock(r, lkb);
963}
964
965static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
966{
967 lkb->lkb_rqmode = DLM_LOCK_IV;
968
969 switch (lkb->lkb_status) {
597d0cae
DT
970 case DLM_LKSTS_GRANTED:
971 break;
e7fd4179
DT
972 case DLM_LKSTS_CONVERT:
973 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
974 break;
975 case DLM_LKSTS_WAITING:
976 del_lkb(r, lkb);
977 lkb->lkb_grmode = DLM_LOCK_IV;
978 /* this unhold undoes the original ref from create_lkb()
979 so this leads to the lkb being freed */
980 unhold_lkb(lkb);
981 break;
982 default:
983 log_print("invalid status for revert %d", lkb->lkb_status);
984 }
985}
986
987static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988{
989 revert_lock(r, lkb);
990}
991
992static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
993{
994 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
995 lkb->lkb_grmode = lkb->lkb_rqmode;
996 if (lkb->lkb_status)
997 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
998 else
999 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1000 }
1001
1002 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1003}
1004
1005static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1006{
1007 set_lvb_lock(r, lkb);
1008 _grant_lock(r, lkb);
1009 lkb->lkb_highbast = 0;
1010}
1011
1012static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1013 struct dlm_message *ms)
1014{
1015 set_lvb_lock_pc(r, lkb, ms);
1016 _grant_lock(r, lkb);
1017}
1018
1019/* called by grant_pending_locks() which means an async grant message must
1020 be sent to the requesting node in addition to granting the lock if the
1021 lkb belongs to a remote node. */
1022
1023static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1024{
1025 grant_lock(r, lkb);
1026 if (is_master_copy(lkb))
1027 send_grant(r, lkb);
1028 else
1029 queue_cast(r, lkb, 0);
1030}
1031
1032static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1033{
1034 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1035 lkb_statequeue);
1036 if (lkb->lkb_id == first->lkb_id)
90135925 1037 return 1;
e7fd4179 1038
90135925 1039 return 0;
e7fd4179
DT
1040}
1041
e7fd4179
DT
1042/* Check if the given lkb conflicts with another lkb on the queue. */
1043
1044static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1045{
1046 struct dlm_lkb *this;
1047
1048 list_for_each_entry(this, head, lkb_statequeue) {
1049 if (this == lkb)
1050 continue;
3bcd3687 1051 if (!modes_compat(this, lkb))
90135925 1052 return 1;
e7fd4179 1053 }
90135925 1054 return 0;
e7fd4179
DT
1055}
1056
1057/*
1058 * "A conversion deadlock arises with a pair of lock requests in the converting
1059 * queue for one resource. The granted mode of each lock blocks the requested
1060 * mode of the other lock."
1061 *
1062 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1063 * convert queue from being granted, then demote lkb (set grmode to NL).
1064 * This second form requires that we check for conv-deadlk even when
1065 * now == 0 in _can_be_granted().
1066 *
1067 * Example:
1068 * Granted Queue: empty
1069 * Convert Queue: NL->EX (first lock)
1070 * PR->EX (second lock)
1071 *
1072 * The first lock can't be granted because of the granted mode of the second
1073 * lock and the second lock can't be granted because it's not first in the
1074 * list. We demote the granted mode of the second lock (the lkb passed to this
1075 * function).
1076 *
1077 * After the resolution, the "grant pending" function needs to go back and try
1078 * to grant locks on the convert queue again since the first lock can now be
1079 * granted.
1080 */
1081
1082static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1083{
1084 struct dlm_lkb *this, *first = NULL, *self = NULL;
1085
1086 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1087 if (!first)
1088 first = this;
1089 if (this == lkb) {
1090 self = lkb;
1091 continue;
1092 }
1093
e7fd4179 1094 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1095 return 1;
e7fd4179
DT
1096 }
1097
1098 /* if lkb is on the convert queue and is preventing the first
1099 from being granted, then there's deadlock and we demote lkb.
1100 multiple converting locks may need to do this before the first
1101 converting lock can be granted. */
1102
1103 if (self && self != first) {
1104 if (!modes_compat(lkb, first) &&
1105 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1106 return 1;
e7fd4179
DT
1107 }
1108
90135925 1109 return 0;
e7fd4179
DT
1110}
1111
1112/*
1113 * Return 1 if the lock can be granted, 0 otherwise.
1114 * Also detect and resolve conversion deadlocks.
1115 *
1116 * lkb is the lock to be granted
1117 *
1118 * now is 1 if the function is being called in the context of the
1119 * immediate request, it is 0 if called later, after the lock has been
1120 * queued.
1121 *
1122 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1123 */
1124
1125static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1126{
1127 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1128
1129 /*
1130 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1131 * a new request for a NL mode lock being blocked.
1132 *
1133 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1134 * request, then it would be granted. In essence, the use of this flag
1135 * tells the Lock Manager to expedite theis request by not considering
1136 * what may be in the CONVERTING or WAITING queues... As of this
1137 * writing, the EXPEDITE flag can be used only with new requests for NL
1138 * mode locks. This flag is not valid for conversion requests.
1139 *
1140 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1141 * conversion or used with a non-NL requested mode. We also know an
1142 * EXPEDITE request is always granted immediately, so now must always
1143 * be 1. The full condition to grant an expedite request: (now &&
1144 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1145 * therefore be shortened to just checking the flag.
1146 */
1147
1148 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1149 return 1;
e7fd4179
DT
1150
1151 /*
1152 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1153 * added to the remaining conditions.
1154 */
1155
1156 if (queue_conflict(&r->res_grantqueue, lkb))
1157 goto out;
1158
1159 /*
1160 * 6-3: By default, a conversion request is immediately granted if the
1161 * requested mode is compatible with the modes of all other granted
1162 * locks
1163 */
1164
1165 if (queue_conflict(&r->res_convertqueue, lkb))
1166 goto out;
1167
1168 /*
1169 * 6-5: But the default algorithm for deciding whether to grant or
1170 * queue conversion requests does not by itself guarantee that such
1171 * requests are serviced on a "first come first serve" basis. This, in
1172 * turn, can lead to a phenomenon known as "indefinate postponement".
1173 *
1174 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1175 * the system service employed to request a lock conversion. This flag
1176 * forces certain conversion requests to be queued, even if they are
1177 * compatible with the granted modes of other locks on the same
1178 * resource. Thus, the use of this flag results in conversion requests
1179 * being ordered on a "first come first servce" basis.
1180 *
1181 * DCT: This condition is all about new conversions being able to occur
1182 * "in place" while the lock remains on the granted queue (assuming
1183 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1184 * doesn't _have_ to go onto the convert queue where it's processed in
1185 * order. The "now" variable is necessary to distinguish converts
1186 * being received and processed for the first time now, because once a
1187 * convert is moved to the conversion queue the condition below applies
1188 * requiring fifo granting.
1189 */
1190
1191 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1192 return 1;
e7fd4179
DT
1193
1194 /*
3bcd3687
DT
1195 * The NOORDER flag is set to avoid the standard vms rules on grant
1196 * order.
e7fd4179
DT
1197 */
1198
1199 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1200 return 1;
e7fd4179
DT
1201
1202 /*
1203 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1204 * granted until all other conversion requests ahead of it are granted
1205 * and/or canceled.
1206 */
1207
1208 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1209 return 1;
e7fd4179
DT
1210
1211 /*
1212 * 6-4: By default, a new request is immediately granted only if all
1213 * three of the following conditions are satisfied when the request is
1214 * issued:
1215 * - The queue of ungranted conversion requests for the resource is
1216 * empty.
1217 * - The queue of ungranted new requests for the resource is empty.
1218 * - The mode of the new request is compatible with the most
1219 * restrictive mode of all granted locks on the resource.
1220 */
1221
1222 if (now && !conv && list_empty(&r->res_convertqueue) &&
1223 list_empty(&r->res_waitqueue))
90135925 1224 return 1;
e7fd4179
DT
1225
1226 /*
1227 * 6-4: Once a lock request is in the queue of ungranted new requests,
1228 * it cannot be granted until the queue of ungranted conversion
1229 * requests is empty, all ungranted new requests ahead of it are
1230 * granted and/or canceled, and it is compatible with the granted mode
1231 * of the most restrictive lock granted on the resource.
1232 */
1233
1234 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1235 first_in_list(lkb, &r->res_waitqueue))
90135925 1236 return 1;
e7fd4179
DT
1237
1238 out:
1239 /*
1240 * The following, enabled by CONVDEADLK, departs from VMS.
1241 */
1242
1243 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1244 conversion_deadlock_detect(r, lkb)) {
1245 lkb->lkb_grmode = DLM_LOCK_NL;
1246 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1247 }
1248
90135925 1249 return 0;
e7fd4179
DT
1250}
1251
1252/*
1253 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1254 * simple way to provide a big optimization to applications that can use them.
1255 */
1256
1257static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1258{
1259 uint32_t flags = lkb->lkb_exflags;
1260 int rv;
1261 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1262
1263 rv = _can_be_granted(r, lkb, now);
1264 if (rv)
1265 goto out;
1266
1267 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1268 goto out;
1269
1270 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1271 alt = DLM_LOCK_PR;
1272 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1273 alt = DLM_LOCK_CW;
1274
1275 if (alt) {
1276 lkb->lkb_rqmode = alt;
1277 rv = _can_be_granted(r, lkb, now);
1278 if (rv)
1279 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1280 else
1281 lkb->lkb_rqmode = rqmode;
1282 }
1283 out:
1284 return rv;
1285}
1286
1287static int grant_pending_convert(struct dlm_rsb *r, int high)
1288{
1289 struct dlm_lkb *lkb, *s;
1290 int hi, demoted, quit, grant_restart, demote_restart;
1291
1292 quit = 0;
1293 restart:
1294 grant_restart = 0;
1295 demote_restart = 0;
1296 hi = DLM_LOCK_IV;
1297
1298 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1299 demoted = is_demoted(lkb);
90135925 1300 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1301 grant_lock_pending(r, lkb);
1302 grant_restart = 1;
1303 } else {
1304 hi = max_t(int, lkb->lkb_rqmode, hi);
1305 if (!demoted && is_demoted(lkb))
1306 demote_restart = 1;
1307 }
1308 }
1309
1310 if (grant_restart)
1311 goto restart;
1312 if (demote_restart && !quit) {
1313 quit = 1;
1314 goto restart;
1315 }
1316
1317 return max_t(int, high, hi);
1318}
1319
1320static int grant_pending_wait(struct dlm_rsb *r, int high)
1321{
1322 struct dlm_lkb *lkb, *s;
1323
1324 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1325 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1326 grant_lock_pending(r, lkb);
1327 else
1328 high = max_t(int, lkb->lkb_rqmode, high);
1329 }
1330
1331 return high;
1332}
1333
1334static void grant_pending_locks(struct dlm_rsb *r)
1335{
1336 struct dlm_lkb *lkb, *s;
1337 int high = DLM_LOCK_IV;
1338
1339 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1340
1341 high = grant_pending_convert(r, high);
1342 high = grant_pending_wait(r, high);
1343
1344 if (high == DLM_LOCK_IV)
1345 return;
1346
1347 /*
1348 * If there are locks left on the wait/convert queue then send blocking
1349 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1350 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1351 */
1352
1353 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1354 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1355 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1356 queue_bast(r, lkb, high);
1357 lkb->lkb_highbast = high;
1358 }
1359 }
1360}
1361
1362static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1363 struct dlm_lkb *lkb)
1364{
1365 struct dlm_lkb *gr;
1366
1367 list_for_each_entry(gr, head, lkb_statequeue) {
1368 if (gr->lkb_bastaddr &&
1369 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1370 !modes_compat(gr, lkb)) {
e7fd4179
DT
1371 queue_bast(r, gr, lkb->lkb_rqmode);
1372 gr->lkb_highbast = lkb->lkb_rqmode;
1373 }
1374 }
1375}
1376
1377static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1378{
1379 send_bast_queue(r, &r->res_grantqueue, lkb);
1380}
1381
1382static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1383{
1384 send_bast_queue(r, &r->res_grantqueue, lkb);
1385 send_bast_queue(r, &r->res_convertqueue, lkb);
1386}
1387
1388/* set_master(r, lkb) -- set the master nodeid of a resource
1389
1390 The purpose of this function is to set the nodeid field in the given
1391 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1392 known, it can just be copied to the lkb and the function will return
1393 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1394 before it can be copied to the lkb.
1395
1396 When the rsb nodeid is being looked up remotely, the initial lkb
1397 causing the lookup is kept on the ls_waiters list waiting for the
1398 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1399 on the rsb's res_lookup list until the master is verified.
1400
1401 Return values:
1402 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1403 1: the rsb master is not available and the lkb has been placed on
1404 a wait queue
1405*/
1406
1407static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408{
1409 struct dlm_ls *ls = r->res_ls;
1410 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1411
1412 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1413 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1414 r->res_first_lkid = lkb->lkb_id;
1415 lkb->lkb_nodeid = r->res_nodeid;
1416 return 0;
1417 }
1418
1419 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1420 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1421 return 1;
1422 }
1423
1424 if (r->res_nodeid == 0) {
1425 lkb->lkb_nodeid = 0;
1426 return 0;
1427 }
1428
1429 if (r->res_nodeid > 0) {
1430 lkb->lkb_nodeid = r->res_nodeid;
1431 return 0;
1432 }
1433
1434 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1435
1436 dir_nodeid = dlm_dir_nodeid(r);
1437
1438 if (dir_nodeid != our_nodeid) {
1439 r->res_first_lkid = lkb->lkb_id;
1440 send_lookup(r, lkb);
1441 return 1;
1442 }
1443
1444 for (;;) {
1445 /* It's possible for dlm_scand to remove an old rsb for
1446 this same resource from the toss list, us to create
1447 a new one, look up the master locally, and find it
1448 already exists just before dlm_scand does the
1449 dir_remove() on the previous rsb. */
1450
1451 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1452 r->res_length, &ret_nodeid);
1453 if (!error)
1454 break;
1455 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1456 schedule();
1457 }
1458
1459 if (ret_nodeid == our_nodeid) {
1460 r->res_first_lkid = 0;
1461 r->res_nodeid = 0;
1462 lkb->lkb_nodeid = 0;
1463 } else {
1464 r->res_first_lkid = lkb->lkb_id;
1465 r->res_nodeid = ret_nodeid;
1466 lkb->lkb_nodeid = ret_nodeid;
1467 }
1468 return 0;
1469}
1470
1471static void process_lookup_list(struct dlm_rsb *r)
1472{
1473 struct dlm_lkb *lkb, *safe;
1474
1475 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1476 list_del(&lkb->lkb_rsb_lookup);
1477 _request_lock(r, lkb);
1478 schedule();
1479 }
1480}
1481
1482/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1483
1484static void confirm_master(struct dlm_rsb *r, int error)
1485{
1486 struct dlm_lkb *lkb;
1487
1488 if (!r->res_first_lkid)
1489 return;
1490
1491 switch (error) {
1492 case 0:
1493 case -EINPROGRESS:
1494 r->res_first_lkid = 0;
1495 process_lookup_list(r);
1496 break;
1497
1498 case -EAGAIN:
1499 /* the remote master didn't queue our NOQUEUE request;
1500 make a waiting lkb the first_lkid */
1501
1502 r->res_first_lkid = 0;
1503
1504 if (!list_empty(&r->res_lookup)) {
1505 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1506 lkb_rsb_lookup);
1507 list_del(&lkb->lkb_rsb_lookup);
1508 r->res_first_lkid = lkb->lkb_id;
1509 _request_lock(r, lkb);
1510 } else
1511 r->res_nodeid = -1;
1512 break;
1513
1514 default:
1515 log_error(r->res_ls, "confirm_master unknown error %d", error);
1516 }
1517}
1518
1519static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1520 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1521 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1522{
1523 int rv = -EINVAL;
1524
1525 /* check for invalid arg usage */
1526
1527 if (mode < 0 || mode > DLM_LOCK_EX)
1528 goto out;
1529
1530 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1531 goto out;
1532
1533 if (flags & DLM_LKF_CANCEL)
1534 goto out;
1535
1536 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1537 goto out;
1538
1539 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1540 goto out;
1541
1542 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1543 goto out;
1544
1545 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1546 goto out;
1547
1548 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1549 goto out;
1550
1551 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1552 goto out;
1553
1554 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1555 goto out;
1556
1557 if (!ast || !lksb)
1558 goto out;
1559
1560 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1561 goto out;
1562
1563 /* parent/child locks not yet supported */
1564 if (parent_lkid)
1565 goto out;
1566
1567 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1568 goto out;
1569
1570 /* these args will be copied to the lkb in validate_lock_args,
1571 it cannot be done now because when converting locks, fields in
1572 an active lkb cannot be modified before locking the rsb */
1573
1574 args->flags = flags;
1575 args->astaddr = ast;
1576 args->astparam = (long) astarg;
1577 args->bastaddr = bast;
1578 args->mode = mode;
1579 args->lksb = lksb;
e7fd4179
DT
1580 rv = 0;
1581 out:
1582 return rv;
1583}
1584
1585static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1586{
1587 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1588 DLM_LKF_FORCEUNLOCK))
1589 return -EINVAL;
1590
1591 args->flags = flags;
1592 args->astparam = (long) astarg;
1593 return 0;
1594}
1595
1596static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1597 struct dlm_args *args)
1598{
1599 int rv = -EINVAL;
1600
1601 if (args->flags & DLM_LKF_CONVERT) {
1602 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1603 goto out;
1604
1605 if (args->flags & DLM_LKF_QUECVT &&
1606 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1607 goto out;
1608
1609 rv = -EBUSY;
1610 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1611 goto out;
1612
1613 if (lkb->lkb_wait_type)
1614 goto out;
1615 }
1616
1617 lkb->lkb_exflags = args->flags;
1618 lkb->lkb_sbflags = 0;
1619 lkb->lkb_astaddr = args->astaddr;
1620 lkb->lkb_astparam = args->astparam;
1621 lkb->lkb_bastaddr = args->bastaddr;
1622 lkb->lkb_rqmode = args->mode;
1623 lkb->lkb_lksb = args->lksb;
1624 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1625 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1626 rv = 0;
1627 out:
1628 return rv;
1629}
1630
1631static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1632{
1633 int rv = -EINVAL;
1634
1635 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1636 goto out;
1637
1638 if (args->flags & DLM_LKF_FORCEUNLOCK)
1639 goto out_ok;
1640
1641 if (args->flags & DLM_LKF_CANCEL &&
1642 lkb->lkb_status == DLM_LKSTS_GRANTED)
1643 goto out;
1644
1645 if (!(args->flags & DLM_LKF_CANCEL) &&
1646 lkb->lkb_status != DLM_LKSTS_GRANTED)
1647 goto out;
1648
1649 rv = -EBUSY;
1650 if (lkb->lkb_wait_type)
1651 goto out;
1652
1653 out_ok:
1654 lkb->lkb_exflags = args->flags;
1655 lkb->lkb_sbflags = 0;
1656 lkb->lkb_astparam = args->astparam;
1657
1658 rv = 0;
1659 out:
1660 return rv;
1661}
1662
1663/*
1664 * Four stage 4 varieties:
1665 * do_request(), do_convert(), do_unlock(), do_cancel()
1666 * These are called on the master node for the given lock and
1667 * from the central locking logic.
1668 */
1669
1670static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1671{
1672 int error = 0;
1673
90135925 1674 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1675 grant_lock(r, lkb);
1676 queue_cast(r, lkb, 0);
1677 goto out;
1678 }
1679
1680 if (can_be_queued(lkb)) {
1681 error = -EINPROGRESS;
1682 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1683 send_blocking_asts(r, lkb);
1684 goto out;
1685 }
1686
1687 error = -EAGAIN;
1688 if (force_blocking_asts(lkb))
1689 send_blocking_asts_all(r, lkb);
1690 queue_cast(r, lkb, -EAGAIN);
1691
1692 out:
1693 return error;
1694}
1695
1696static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1697{
1698 int error = 0;
1699
1700 /* changing an existing lock may allow others to be granted */
1701
90135925 1702 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1703 grant_lock(r, lkb);
1704 queue_cast(r, lkb, 0);
1705 grant_pending_locks(r);
1706 goto out;
1707 }
1708
1709 if (can_be_queued(lkb)) {
1710 if (is_demoted(lkb))
1711 grant_pending_locks(r);
1712 error = -EINPROGRESS;
1713 del_lkb(r, lkb);
1714 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1715 send_blocking_asts(r, lkb);
1716 goto out;
1717 }
1718
1719 error = -EAGAIN;
1720 if (force_blocking_asts(lkb))
1721 send_blocking_asts_all(r, lkb);
1722 queue_cast(r, lkb, -EAGAIN);
1723
1724 out:
1725 return error;
1726}
1727
1728static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1729{
1730 remove_lock(r, lkb);
1731 queue_cast(r, lkb, -DLM_EUNLOCK);
1732 grant_pending_locks(r);
1733 return -DLM_EUNLOCK;
1734}
1735
597d0cae
DT
1736/* FIXME: if revert_lock() finds that the lkb is granted, we should
1737 skip the queue_cast(ECANCEL). It indicates that the request/convert
1738 completed (and queued a normal ast) just before the cancel; we don't
1739 want to clobber the sb_result for the normal ast with ECANCEL. */
1740
e7fd4179
DT
1741static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1742{
1743 revert_lock(r, lkb);
1744 queue_cast(r, lkb, -DLM_ECANCEL);
1745 grant_pending_locks(r);
1746 return -DLM_ECANCEL;
1747}
1748
1749/*
1750 * Four stage 3 varieties:
1751 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1752 */
1753
1754/* add a new lkb to a possibly new rsb, called by requesting process */
1755
1756static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1757{
1758 int error;
1759
1760 /* set_master: sets lkb nodeid from r */
1761
1762 error = set_master(r, lkb);
1763 if (error < 0)
1764 goto out;
1765 if (error) {
1766 error = 0;
1767 goto out;
1768 }
1769
1770 if (is_remote(r))
1771 /* receive_request() calls do_request() on remote node */
1772 error = send_request(r, lkb);
1773 else
1774 error = do_request(r, lkb);
1775 out:
1776 return error;
1777}
1778
3bcd3687 1779/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
1780
1781static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1782{
1783 int error;
1784
1785 if (is_remote(r))
1786 /* receive_convert() calls do_convert() on remote node */
1787 error = send_convert(r, lkb);
1788 else
1789 error = do_convert(r, lkb);
1790
1791 return error;
1792}
1793
1794/* remove an existing lkb from the granted queue */
1795
1796static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1797{
1798 int error;
1799
1800 if (is_remote(r))
1801 /* receive_unlock() calls do_unlock() on remote node */
1802 error = send_unlock(r, lkb);
1803 else
1804 error = do_unlock(r, lkb);
1805
1806 return error;
1807}
1808
1809/* remove an existing lkb from the convert or wait queue */
1810
1811static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1812{
1813 int error;
1814
1815 if (is_remote(r))
1816 /* receive_cancel() calls do_cancel() on remote node */
1817 error = send_cancel(r, lkb);
1818 else
1819 error = do_cancel(r, lkb);
1820
1821 return error;
1822}
1823
1824/*
1825 * Four stage 2 varieties:
1826 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1827 */
1828
1829static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1830 int len, struct dlm_args *args)
1831{
1832 struct dlm_rsb *r;
1833 int error;
1834
1835 error = validate_lock_args(ls, lkb, args);
1836 if (error)
1837 goto out;
1838
1839 error = find_rsb(ls, name, len, R_CREATE, &r);
1840 if (error)
1841 goto out;
1842
1843 lock_rsb(r);
1844
1845 attach_lkb(r, lkb);
1846 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1847
1848 error = _request_lock(r, lkb);
1849
1850 unlock_rsb(r);
1851 put_rsb(r);
1852
1853 out:
1854 return error;
1855}
1856
1857static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1858 struct dlm_args *args)
1859{
1860 struct dlm_rsb *r;
1861 int error;
1862
1863 r = lkb->lkb_resource;
1864
1865 hold_rsb(r);
1866 lock_rsb(r);
1867
1868 error = validate_lock_args(ls, lkb, args);
1869 if (error)
1870 goto out;
1871
1872 error = _convert_lock(r, lkb);
1873 out:
1874 unlock_rsb(r);
1875 put_rsb(r);
1876 return error;
1877}
1878
1879static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1880 struct dlm_args *args)
1881{
1882 struct dlm_rsb *r;
1883 int error;
1884
1885 r = lkb->lkb_resource;
1886
1887 hold_rsb(r);
1888 lock_rsb(r);
1889
1890 error = validate_unlock_args(lkb, args);
1891 if (error)
1892 goto out;
1893
1894 error = _unlock_lock(r, lkb);
1895 out:
1896 unlock_rsb(r);
1897 put_rsb(r);
1898 return error;
1899}
1900
1901static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1902 struct dlm_args *args)
1903{
1904 struct dlm_rsb *r;
1905 int error;
1906
1907 r = lkb->lkb_resource;
1908
1909 hold_rsb(r);
1910 lock_rsb(r);
1911
1912 error = validate_unlock_args(lkb, args);
1913 if (error)
1914 goto out;
1915
1916 error = _cancel_lock(r, lkb);
1917 out:
1918 unlock_rsb(r);
1919 put_rsb(r);
1920 return error;
1921}
1922
1923/*
1924 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1925 */
1926
1927int dlm_lock(dlm_lockspace_t *lockspace,
1928 int mode,
1929 struct dlm_lksb *lksb,
1930 uint32_t flags,
1931 void *name,
1932 unsigned int namelen,
1933 uint32_t parent_lkid,
1934 void (*ast) (void *astarg),
1935 void *astarg,
3bcd3687 1936 void (*bast) (void *astarg, int mode))
e7fd4179
DT
1937{
1938 struct dlm_ls *ls;
1939 struct dlm_lkb *lkb;
1940 struct dlm_args args;
1941 int error, convert = flags & DLM_LKF_CONVERT;
1942
1943 ls = dlm_find_lockspace_local(lockspace);
1944 if (!ls)
1945 return -EINVAL;
1946
1947 lock_recovery(ls);
1948
1949 if (convert)
1950 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1951 else
1952 error = create_lkb(ls, &lkb);
1953
1954 if (error)
1955 goto out;
1956
1957 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 1958 astarg, bast, &args);
e7fd4179
DT
1959 if (error)
1960 goto out_put;
1961
1962 if (convert)
1963 error = convert_lock(ls, lkb, &args);
1964 else
1965 error = request_lock(ls, lkb, name, namelen, &args);
1966
1967 if (error == -EINPROGRESS)
1968 error = 0;
1969 out_put:
1970 if (convert || error)
b3f58d8f 1971 __put_lkb(ls, lkb);
e7fd4179
DT
1972 if (error == -EAGAIN)
1973 error = 0;
1974 out:
1975 unlock_recovery(ls);
1976 dlm_put_lockspace(ls);
1977 return error;
1978}
1979
1980int dlm_unlock(dlm_lockspace_t *lockspace,
1981 uint32_t lkid,
1982 uint32_t flags,
1983 struct dlm_lksb *lksb,
1984 void *astarg)
1985{
1986 struct dlm_ls *ls;
1987 struct dlm_lkb *lkb;
1988 struct dlm_args args;
1989 int error;
1990
1991 ls = dlm_find_lockspace_local(lockspace);
1992 if (!ls)
1993 return -EINVAL;
1994
1995 lock_recovery(ls);
1996
1997 error = find_lkb(ls, lkid, &lkb);
1998 if (error)
1999 goto out;
2000
2001 error = set_unlock_args(flags, astarg, &args);
2002 if (error)
2003 goto out_put;
2004
2005 if (flags & DLM_LKF_CANCEL)
2006 error = cancel_lock(ls, lkb, &args);
2007 else
2008 error = unlock_lock(ls, lkb, &args);
2009
2010 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2011 error = 0;
2012 out_put:
b3f58d8f 2013 dlm_put_lkb(lkb);
e7fd4179
DT
2014 out:
2015 unlock_recovery(ls);
2016 dlm_put_lockspace(ls);
2017 return error;
2018}
2019
2020/*
2021 * send/receive routines for remote operations and replies
2022 *
2023 * send_args
2024 * send_common
2025 * send_request receive_request
2026 * send_convert receive_convert
2027 * send_unlock receive_unlock
2028 * send_cancel receive_cancel
2029 * send_grant receive_grant
2030 * send_bast receive_bast
2031 * send_lookup receive_lookup
2032 * send_remove receive_remove
2033 *
2034 * send_common_reply
2035 * receive_request_reply send_request_reply
2036 * receive_convert_reply send_convert_reply
2037 * receive_unlock_reply send_unlock_reply
2038 * receive_cancel_reply send_cancel_reply
2039 * receive_lookup_reply send_lookup_reply
2040 */
2041
2042static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2043 int to_nodeid, int mstype,
2044 struct dlm_message **ms_ret,
2045 struct dlm_mhandle **mh_ret)
2046{
2047 struct dlm_message *ms;
2048 struct dlm_mhandle *mh;
2049 char *mb;
2050 int mb_len = sizeof(struct dlm_message);
2051
2052 switch (mstype) {
2053 case DLM_MSG_REQUEST:
2054 case DLM_MSG_LOOKUP:
2055 case DLM_MSG_REMOVE:
2056 mb_len += r->res_length;
2057 break;
2058 case DLM_MSG_CONVERT:
2059 case DLM_MSG_UNLOCK:
2060 case DLM_MSG_REQUEST_REPLY:
2061 case DLM_MSG_CONVERT_REPLY:
2062 case DLM_MSG_GRANT:
2063 if (lkb && lkb->lkb_lvbptr)
2064 mb_len += r->res_ls->ls_lvblen;
2065 break;
2066 }
2067
2068 /* get_buffer gives us a message handle (mh) that we need to
2069 pass into lowcomms_commit and a message buffer (mb) that we
2070 write our data into */
2071
2072 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2073 if (!mh)
2074 return -ENOBUFS;
2075
2076 memset(mb, 0, mb_len);
2077
2078 ms = (struct dlm_message *) mb;
2079
2080 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2081 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2082 ms->m_header.h_nodeid = dlm_our_nodeid();
2083 ms->m_header.h_length = mb_len;
2084 ms->m_header.h_cmd = DLM_MSG;
2085
2086 ms->m_type = mstype;
2087
2088 *mh_ret = mh;
2089 *ms_ret = ms;
2090 return 0;
2091}
2092
2093/* further lowcomms enhancements or alternate implementations may make
2094 the return value from this function useful at some point */
2095
2096static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2097{
2098 dlm_message_out(ms);
2099 dlm_lowcomms_commit_buffer(mh);
2100 return 0;
2101}
2102
2103static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2104 struct dlm_message *ms)
2105{
2106 ms->m_nodeid = lkb->lkb_nodeid;
2107 ms->m_pid = lkb->lkb_ownpid;
2108 ms->m_lkid = lkb->lkb_id;
2109 ms->m_remid = lkb->lkb_remid;
2110 ms->m_exflags = lkb->lkb_exflags;
2111 ms->m_sbflags = lkb->lkb_sbflags;
2112 ms->m_flags = lkb->lkb_flags;
2113 ms->m_lvbseq = lkb->lkb_lvbseq;
2114 ms->m_status = lkb->lkb_status;
2115 ms->m_grmode = lkb->lkb_grmode;
2116 ms->m_rqmode = lkb->lkb_rqmode;
2117 ms->m_hash = r->res_hash;
2118
2119 /* m_result and m_bastmode are set from function args,
2120 not from lkb fields */
2121
2122 if (lkb->lkb_bastaddr)
2123 ms->m_asts |= AST_BAST;
2124 if (lkb->lkb_astaddr)
2125 ms->m_asts |= AST_COMP;
2126
e7fd4179
DT
2127 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2128 memcpy(ms->m_extra, r->res_name, r->res_length);
2129
2130 else if (lkb->lkb_lvbptr)
2131 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2132
2133}
2134
2135static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2136{
2137 struct dlm_message *ms;
2138 struct dlm_mhandle *mh;
2139 int to_nodeid, error;
2140
2141 add_to_waiters(lkb, mstype);
2142
2143 to_nodeid = r->res_nodeid;
2144
2145 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2146 if (error)
2147 goto fail;
2148
2149 send_args(r, lkb, ms);
2150
2151 error = send_message(mh, ms);
2152 if (error)
2153 goto fail;
2154 return 0;
2155
2156 fail:
2157 remove_from_waiters(lkb);
2158 return error;
2159}
2160
2161static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2162{
2163 return send_common(r, lkb, DLM_MSG_REQUEST);
2164}
2165
2166static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2167{
2168 int error;
2169
2170 error = send_common(r, lkb, DLM_MSG_CONVERT);
2171
2172 /* down conversions go without a reply from the master */
2173 if (!error && down_conversion(lkb)) {
2174 remove_from_waiters(lkb);
2175 r->res_ls->ls_stub_ms.m_result = 0;
2176 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2177 }
2178
2179 return error;
2180}
2181
2182/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2183 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2184 that the master is still correct. */
2185
2186static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2187{
2188 return send_common(r, lkb, DLM_MSG_UNLOCK);
2189}
2190
2191static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2192{
2193 return send_common(r, lkb, DLM_MSG_CANCEL);
2194}
2195
2196static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197{
2198 struct dlm_message *ms;
2199 struct dlm_mhandle *mh;
2200 int to_nodeid, error;
2201
2202 to_nodeid = lkb->lkb_nodeid;
2203
2204 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2205 if (error)
2206 goto out;
2207
2208 send_args(r, lkb, ms);
2209
2210 ms->m_result = 0;
2211
2212 error = send_message(mh, ms);
2213 out:
2214 return error;
2215}
2216
2217static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2218{
2219 struct dlm_message *ms;
2220 struct dlm_mhandle *mh;
2221 int to_nodeid, error;
2222
2223 to_nodeid = lkb->lkb_nodeid;
2224
2225 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2226 if (error)
2227 goto out;
2228
2229 send_args(r, lkb, ms);
2230
2231 ms->m_bastmode = mode;
2232
2233 error = send_message(mh, ms);
2234 out:
2235 return error;
2236}
2237
2238static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2239{
2240 struct dlm_message *ms;
2241 struct dlm_mhandle *mh;
2242 int to_nodeid, error;
2243
2244 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2245
2246 to_nodeid = dlm_dir_nodeid(r);
2247
2248 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2249 if (error)
2250 goto fail;
2251
2252 send_args(r, lkb, ms);
2253
2254 error = send_message(mh, ms);
2255 if (error)
2256 goto fail;
2257 return 0;
2258
2259 fail:
2260 remove_from_waiters(lkb);
2261 return error;
2262}
2263
2264static int send_remove(struct dlm_rsb *r)
2265{
2266 struct dlm_message *ms;
2267 struct dlm_mhandle *mh;
2268 int to_nodeid, error;
2269
2270 to_nodeid = dlm_dir_nodeid(r);
2271
2272 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2273 if (error)
2274 goto out;
2275
2276 memcpy(ms->m_extra, r->res_name, r->res_length);
2277 ms->m_hash = r->res_hash;
2278
2279 error = send_message(mh, ms);
2280 out:
2281 return error;
2282}
2283
2284static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2285 int mstype, int rv)
2286{
2287 struct dlm_message *ms;
2288 struct dlm_mhandle *mh;
2289 int to_nodeid, error;
2290
2291 to_nodeid = lkb->lkb_nodeid;
2292
2293 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2294 if (error)
2295 goto out;
2296
2297 send_args(r, lkb, ms);
2298
2299 ms->m_result = rv;
2300
2301 error = send_message(mh, ms);
2302 out:
2303 return error;
2304}
2305
2306static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2307{
2308 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2309}
2310
2311static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2312{
2313 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2314}
2315
2316static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2317{
2318 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2319}
2320
2321static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2322{
2323 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2324}
2325
2326static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2327 int ret_nodeid, int rv)
2328{
2329 struct dlm_rsb *r = &ls->ls_stub_rsb;
2330 struct dlm_message *ms;
2331 struct dlm_mhandle *mh;
2332 int error, nodeid = ms_in->m_header.h_nodeid;
2333
2334 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2335 if (error)
2336 goto out;
2337
2338 ms->m_lkid = ms_in->m_lkid;
2339 ms->m_result = rv;
2340 ms->m_nodeid = ret_nodeid;
2341
2342 error = send_message(mh, ms);
2343 out:
2344 return error;
2345}
2346
2347/* which args we save from a received message depends heavily on the type
2348 of message, unlike the send side where we can safely send everything about
2349 the lkb for any type of message */
2350
2351static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2352{
2353 lkb->lkb_exflags = ms->m_exflags;
2354 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2355 (ms->m_flags & 0x0000FFFF);
2356}
2357
2358static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2359{
2360 lkb->lkb_sbflags = ms->m_sbflags;
2361 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2362 (ms->m_flags & 0x0000FFFF);
2363}
2364
2365static int receive_extralen(struct dlm_message *ms)
2366{
2367 return (ms->m_header.h_length - sizeof(struct dlm_message));
2368}
2369
e7fd4179
DT
2370static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2371 struct dlm_message *ms)
2372{
2373 int len;
2374
2375 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2376 if (!lkb->lkb_lvbptr)
2377 lkb->lkb_lvbptr = allocate_lvb(ls);
2378 if (!lkb->lkb_lvbptr)
2379 return -ENOMEM;
2380 len = receive_extralen(ms);
2381 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2382 }
2383 return 0;
2384}
2385
2386static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2387 struct dlm_message *ms)
2388{
2389 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2390 lkb->lkb_ownpid = ms->m_pid;
2391 lkb->lkb_remid = ms->m_lkid;
2392 lkb->lkb_grmode = DLM_LOCK_IV;
2393 lkb->lkb_rqmode = ms->m_rqmode;
2394 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2395 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2396
2397 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2398
e7fd4179
DT
2399 if (receive_lvb(ls, lkb, ms))
2400 return -ENOMEM;
2401
2402 return 0;
2403}
2404
2405static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2406 struct dlm_message *ms)
2407{
2408 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2409 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2410 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2411 lkb->lkb_id, lkb->lkb_remid);
2412 return -EINVAL;
2413 }
2414
2415 if (!is_master_copy(lkb))
2416 return -EINVAL;
2417
2418 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2419 return -EBUSY;
2420
e7fd4179
DT
2421 if (receive_lvb(ls, lkb, ms))
2422 return -ENOMEM;
2423
2424 lkb->lkb_rqmode = ms->m_rqmode;
2425 lkb->lkb_lvbseq = ms->m_lvbseq;
2426
2427 return 0;
2428}
2429
2430static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2431 struct dlm_message *ms)
2432{
2433 if (!is_master_copy(lkb))
2434 return -EINVAL;
2435 if (receive_lvb(ls, lkb, ms))
2436 return -ENOMEM;
2437 return 0;
2438}
2439
2440/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2441 uses to send a reply and that the remote end uses to process the reply. */
2442
2443static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2444{
2445 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2446 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2447 lkb->lkb_remid = ms->m_lkid;
2448}
2449
2450static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2451{
2452 struct dlm_lkb *lkb;
2453 struct dlm_rsb *r;
2454 int error, namelen;
2455
2456 error = create_lkb(ls, &lkb);
2457 if (error)
2458 goto fail;
2459
2460 receive_flags(lkb, ms);
2461 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2462 error = receive_request_args(ls, lkb, ms);
2463 if (error) {
b3f58d8f 2464 __put_lkb(ls, lkb);
e7fd4179
DT
2465 goto fail;
2466 }
2467
2468 namelen = receive_extralen(ms);
2469
2470 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2471 if (error) {
b3f58d8f 2472 __put_lkb(ls, lkb);
e7fd4179
DT
2473 goto fail;
2474 }
2475
2476 lock_rsb(r);
2477
2478 attach_lkb(r, lkb);
2479 error = do_request(r, lkb);
2480 send_request_reply(r, lkb, error);
2481
2482 unlock_rsb(r);
2483 put_rsb(r);
2484
2485 if (error == -EINPROGRESS)
2486 error = 0;
2487 if (error)
b3f58d8f 2488 dlm_put_lkb(lkb);
e7fd4179
DT
2489 return;
2490
2491 fail:
2492 setup_stub_lkb(ls, ms);
2493 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2494}
2495
2496static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2497{
2498 struct dlm_lkb *lkb;
2499 struct dlm_rsb *r;
90135925 2500 int error, reply = 1;
e7fd4179
DT
2501
2502 error = find_lkb(ls, ms->m_remid, &lkb);
2503 if (error)
2504 goto fail;
2505
2506 r = lkb->lkb_resource;
2507
2508 hold_rsb(r);
2509 lock_rsb(r);
2510
2511 receive_flags(lkb, ms);
2512 error = receive_convert_args(ls, lkb, ms);
2513 if (error)
2514 goto out;
2515 reply = !down_conversion(lkb);
2516
2517 error = do_convert(r, lkb);
2518 out:
2519 if (reply)
2520 send_convert_reply(r, lkb, error);
2521
2522 unlock_rsb(r);
2523 put_rsb(r);
b3f58d8f 2524 dlm_put_lkb(lkb);
e7fd4179
DT
2525 return;
2526
2527 fail:
2528 setup_stub_lkb(ls, ms);
2529 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2530}
2531
2532static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2533{
2534 struct dlm_lkb *lkb;
2535 struct dlm_rsb *r;
2536 int error;
2537
2538 error = find_lkb(ls, ms->m_remid, &lkb);
2539 if (error)
2540 goto fail;
2541
2542 r = lkb->lkb_resource;
2543
2544 hold_rsb(r);
2545 lock_rsb(r);
2546
2547 receive_flags(lkb, ms);
2548 error = receive_unlock_args(ls, lkb, ms);
2549 if (error)
2550 goto out;
2551
2552 error = do_unlock(r, lkb);
2553 out:
2554 send_unlock_reply(r, lkb, error);
2555
2556 unlock_rsb(r);
2557 put_rsb(r);
b3f58d8f 2558 dlm_put_lkb(lkb);
e7fd4179
DT
2559 return;
2560
2561 fail:
2562 setup_stub_lkb(ls, ms);
2563 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2564}
2565
2566static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2567{
2568 struct dlm_lkb *lkb;
2569 struct dlm_rsb *r;
2570 int error;
2571
2572 error = find_lkb(ls, ms->m_remid, &lkb);
2573 if (error)
2574 goto fail;
2575
2576 receive_flags(lkb, ms);
2577
2578 r = lkb->lkb_resource;
2579
2580 hold_rsb(r);
2581 lock_rsb(r);
2582
2583 error = do_cancel(r, lkb);
2584 send_cancel_reply(r, lkb, error);
2585
2586 unlock_rsb(r);
2587 put_rsb(r);
b3f58d8f 2588 dlm_put_lkb(lkb);
e7fd4179
DT
2589 return;
2590
2591 fail:
2592 setup_stub_lkb(ls, ms);
2593 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2594}
2595
2596static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2597{
2598 struct dlm_lkb *lkb;
2599 struct dlm_rsb *r;
2600 int error;
2601
2602 error = find_lkb(ls, ms->m_remid, &lkb);
2603 if (error) {
2604 log_error(ls, "receive_grant no lkb");
2605 return;
2606 }
2607 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2608
2609 r = lkb->lkb_resource;
2610
2611 hold_rsb(r);
2612 lock_rsb(r);
2613
2614 receive_flags_reply(lkb, ms);
2615 grant_lock_pc(r, lkb, ms);
2616 queue_cast(r, lkb, 0);
2617
2618 unlock_rsb(r);
2619 put_rsb(r);
b3f58d8f 2620 dlm_put_lkb(lkb);
e7fd4179
DT
2621}
2622
2623static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2624{
2625 struct dlm_lkb *lkb;
2626 struct dlm_rsb *r;
2627 int error;
2628
2629 error = find_lkb(ls, ms->m_remid, &lkb);
2630 if (error) {
2631 log_error(ls, "receive_bast no lkb");
2632 return;
2633 }
2634 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2635
2636 r = lkb->lkb_resource;
2637
2638 hold_rsb(r);
2639 lock_rsb(r);
2640
2641 queue_bast(r, lkb, ms->m_bastmode);
2642
2643 unlock_rsb(r);
2644 put_rsb(r);
b3f58d8f 2645 dlm_put_lkb(lkb);
e7fd4179
DT
2646}
2647
2648static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2649{
2650 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2651
2652 from_nodeid = ms->m_header.h_nodeid;
2653 our_nodeid = dlm_our_nodeid();
2654
2655 len = receive_extralen(ms);
2656
2657 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2658 if (dir_nodeid != our_nodeid) {
2659 log_error(ls, "lookup dir_nodeid %d from %d",
2660 dir_nodeid, from_nodeid);
2661 error = -EINVAL;
2662 ret_nodeid = -1;
2663 goto out;
2664 }
2665
2666 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2667
2668 /* Optimization: we're master so treat lookup as a request */
2669 if (!error && ret_nodeid == our_nodeid) {
2670 receive_request(ls, ms);
2671 return;
2672 }
2673 out:
2674 send_lookup_reply(ls, ms, ret_nodeid, error);
2675}
2676
2677static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2678{
2679 int len, dir_nodeid, from_nodeid;
2680
2681 from_nodeid = ms->m_header.h_nodeid;
2682
2683 len = receive_extralen(ms);
2684
2685 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2686 if (dir_nodeid != dlm_our_nodeid()) {
2687 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2688 dir_nodeid, from_nodeid);
2689 return;
2690 }
2691
2692 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2693}
2694
2695static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2696{
2697 struct dlm_lkb *lkb;
2698 struct dlm_rsb *r;
2699 int error, mstype;
2700
2701 error = find_lkb(ls, ms->m_remid, &lkb);
2702 if (error) {
2703 log_error(ls, "receive_request_reply no lkb");
2704 return;
2705 }
2706 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2707
2708 mstype = lkb->lkb_wait_type;
2709 error = remove_from_waiters(lkb);
2710 if (error) {
2711 log_error(ls, "receive_request_reply not on waiters");
2712 goto out;
2713 }
2714
2715 /* this is the value returned from do_request() on the master */
2716 error = ms->m_result;
2717
2718 r = lkb->lkb_resource;
2719 hold_rsb(r);
2720 lock_rsb(r);
2721
2722 /* Optimization: the dir node was also the master, so it took our
2723 lookup as a request and sent request reply instead of lookup reply */
2724 if (mstype == DLM_MSG_LOOKUP) {
2725 r->res_nodeid = ms->m_header.h_nodeid;
2726 lkb->lkb_nodeid = r->res_nodeid;
2727 }
2728
2729 switch (error) {
2730 case -EAGAIN:
2731 /* request would block (be queued) on remote master;
2732 the unhold undoes the original ref from create_lkb()
2733 so it leads to the lkb being freed */
2734 queue_cast(r, lkb, -EAGAIN);
2735 confirm_master(r, -EAGAIN);
2736 unhold_lkb(lkb);
2737 break;
2738
2739 case -EINPROGRESS:
2740 case 0:
2741 /* request was queued or granted on remote master */
2742 receive_flags_reply(lkb, ms);
2743 lkb->lkb_remid = ms->m_lkid;
2744 if (error)
2745 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2746 else {
2747 grant_lock_pc(r, lkb, ms);
2748 queue_cast(r, lkb, 0);
2749 }
2750 confirm_master(r, error);
2751 break;
2752
597d0cae 2753 case -EBADR:
e7fd4179
DT
2754 case -ENOTBLK:
2755 /* find_rsb failed to find rsb or rsb wasn't master */
2756 r->res_nodeid = -1;
2757 lkb->lkb_nodeid = -1;
2758 _request_lock(r, lkb);
2759 break;
2760
2761 default:
2762 log_error(ls, "receive_request_reply error %d", error);
2763 }
2764
2765 unlock_rsb(r);
2766 put_rsb(r);
2767 out:
b3f58d8f 2768 dlm_put_lkb(lkb);
e7fd4179
DT
2769}
2770
2771static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2772 struct dlm_message *ms)
2773{
2774 int error = ms->m_result;
2775
2776 /* this is the value returned from do_convert() on the master */
2777
2778 switch (error) {
2779 case -EAGAIN:
2780 /* convert would block (be queued) on remote master */
2781 queue_cast(r, lkb, -EAGAIN);
2782 break;
2783
2784 case -EINPROGRESS:
2785 /* convert was queued on remote master */
2786 del_lkb(r, lkb);
2787 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2788 break;
2789
2790 case 0:
2791 /* convert was granted on remote master */
2792 receive_flags_reply(lkb, ms);
2793 grant_lock_pc(r, lkb, ms);
2794 queue_cast(r, lkb, 0);
2795 break;
2796
2797 default:
2798 log_error(r->res_ls, "receive_convert_reply error %d", error);
2799 }
2800}
2801
2802static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2803{
2804 struct dlm_rsb *r = lkb->lkb_resource;
2805
2806 hold_rsb(r);
2807 lock_rsb(r);
2808
2809 __receive_convert_reply(r, lkb, ms);
2810
2811 unlock_rsb(r);
2812 put_rsb(r);
2813}
2814
2815static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2816{
2817 struct dlm_lkb *lkb;
2818 int error;
2819
2820 error = find_lkb(ls, ms->m_remid, &lkb);
2821 if (error) {
2822 log_error(ls, "receive_convert_reply no lkb");
2823 return;
2824 }
2825 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2826
2827 error = remove_from_waiters(lkb);
2828 if (error) {
2829 log_error(ls, "receive_convert_reply not on waiters");
2830 goto out;
2831 }
2832
2833 _receive_convert_reply(lkb, ms);
2834 out:
b3f58d8f 2835 dlm_put_lkb(lkb);
e7fd4179
DT
2836}
2837
2838static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2839{
2840 struct dlm_rsb *r = lkb->lkb_resource;
2841 int error = ms->m_result;
2842
2843 hold_rsb(r);
2844 lock_rsb(r);
2845
2846 /* this is the value returned from do_unlock() on the master */
2847
2848 switch (error) {
2849 case -DLM_EUNLOCK:
2850 receive_flags_reply(lkb, ms);
2851 remove_lock_pc(r, lkb);
2852 queue_cast(r, lkb, -DLM_EUNLOCK);
2853 break;
2854 default:
2855 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2856 }
2857
2858 unlock_rsb(r);
2859 put_rsb(r);
2860}
2861
2862static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2863{
2864 struct dlm_lkb *lkb;
2865 int error;
2866
2867 error = find_lkb(ls, ms->m_remid, &lkb);
2868 if (error) {
2869 log_error(ls, "receive_unlock_reply no lkb");
2870 return;
2871 }
2872 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2873
2874 error = remove_from_waiters(lkb);
2875 if (error) {
2876 log_error(ls, "receive_unlock_reply not on waiters");
2877 goto out;
2878 }
2879
2880 _receive_unlock_reply(lkb, ms);
2881 out:
b3f58d8f 2882 dlm_put_lkb(lkb);
e7fd4179
DT
2883}
2884
2885static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2886{
2887 struct dlm_rsb *r = lkb->lkb_resource;
2888 int error = ms->m_result;
2889
2890 hold_rsb(r);
2891 lock_rsb(r);
2892
2893 /* this is the value returned from do_cancel() on the master */
2894
2895 switch (error) {
2896 case -DLM_ECANCEL:
2897 receive_flags_reply(lkb, ms);
2898 revert_lock_pc(r, lkb);
2899 queue_cast(r, lkb, -DLM_ECANCEL);
2900 break;
2901 default:
2902 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2903 }
2904
2905 unlock_rsb(r);
2906 put_rsb(r);
2907}
2908
2909static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2910{
2911 struct dlm_lkb *lkb;
2912 int error;
2913
2914 error = find_lkb(ls, ms->m_remid, &lkb);
2915 if (error) {
2916 log_error(ls, "receive_cancel_reply no lkb");
2917 return;
2918 }
2919 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2920
2921 error = remove_from_waiters(lkb);
2922 if (error) {
2923 log_error(ls, "receive_cancel_reply not on waiters");
2924 goto out;
2925 }
2926
2927 _receive_cancel_reply(lkb, ms);
2928 out:
b3f58d8f 2929 dlm_put_lkb(lkb);
e7fd4179
DT
2930}
2931
2932static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2933{
2934 struct dlm_lkb *lkb;
2935 struct dlm_rsb *r;
2936 int error, ret_nodeid;
2937
2938 error = find_lkb(ls, ms->m_lkid, &lkb);
2939 if (error) {
2940 log_error(ls, "receive_lookup_reply no lkb");
2941 return;
2942 }
2943
2944 error = remove_from_waiters(lkb);
2945 if (error) {
2946 log_error(ls, "receive_lookup_reply not on waiters");
2947 goto out;
2948 }
2949
2950 /* this is the value returned by dlm_dir_lookup on dir node
2951 FIXME: will a non-zero error ever be returned? */
2952 error = ms->m_result;
2953
2954 r = lkb->lkb_resource;
2955 hold_rsb(r);
2956 lock_rsb(r);
2957
2958 ret_nodeid = ms->m_nodeid;
2959 if (ret_nodeid == dlm_our_nodeid()) {
2960 r->res_nodeid = 0;
2961 ret_nodeid = 0;
2962 r->res_first_lkid = 0;
2963 } else {
2964 /* set_master() will copy res_nodeid to lkb_nodeid */
2965 r->res_nodeid = ret_nodeid;
2966 }
2967
2968 _request_lock(r, lkb);
2969
2970 if (!ret_nodeid)
2971 process_lookup_list(r);
2972
2973 unlock_rsb(r);
2974 put_rsb(r);
2975 out:
b3f58d8f 2976 dlm_put_lkb(lkb);
e7fd4179
DT
2977}
2978
2979int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2980{
2981 struct dlm_message *ms = (struct dlm_message *) hd;
2982 struct dlm_ls *ls;
2983 int error;
2984
2985 if (!recovery)
2986 dlm_message_in(ms);
2987
2988 ls = dlm_find_lockspace_global(hd->h_lockspace);
2989 if (!ls) {
2990 log_print("drop message %d from %d for unknown lockspace %d",
2991 ms->m_type, nodeid, hd->h_lockspace);
2992 return -EINVAL;
2993 }
2994
2995 /* recovery may have just ended leaving a bunch of backed-up requests
2996 in the requestqueue; wait while dlm_recoverd clears them */
2997
2998 if (!recovery)
2999 dlm_wait_requestqueue(ls);
3000
3001 /* recovery may have just started while there were a bunch of
3002 in-flight requests -- save them in requestqueue to be processed
3003 after recovery. we can't let dlm_recvd block on the recovery
3004 lock. if dlm_recoverd is calling this function to clear the
3005 requestqueue, it needs to be interrupted (-EINTR) if another
3006 recovery operation is starting. */
3007
3008 while (1) {
3009 if (dlm_locking_stopped(ls)) {
3010 if (!recovery)
3011 dlm_add_requestqueue(ls, nodeid, hd);
3012 error = -EINTR;
3013 goto out;
3014 }
3015
3016 if (lock_recovery_try(ls))
3017 break;
3018 schedule();
3019 }
3020
3021 switch (ms->m_type) {
3022
3023 /* messages sent to a master node */
3024
3025 case DLM_MSG_REQUEST:
3026 receive_request(ls, ms);
3027 break;
3028
3029 case DLM_MSG_CONVERT:
3030 receive_convert(ls, ms);
3031 break;
3032
3033 case DLM_MSG_UNLOCK:
3034 receive_unlock(ls, ms);
3035 break;
3036
3037 case DLM_MSG_CANCEL:
3038 receive_cancel(ls, ms);
3039 break;
3040
3041 /* messages sent from a master node (replies to above) */
3042
3043 case DLM_MSG_REQUEST_REPLY:
3044 receive_request_reply(ls, ms);
3045 break;
3046
3047 case DLM_MSG_CONVERT_REPLY:
3048 receive_convert_reply(ls, ms);
3049 break;
3050
3051 case DLM_MSG_UNLOCK_REPLY:
3052 receive_unlock_reply(ls, ms);
3053 break;
3054
3055 case DLM_MSG_CANCEL_REPLY:
3056 receive_cancel_reply(ls, ms);
3057 break;
3058
3059 /* messages sent from a master node (only two types of async msg) */
3060
3061 case DLM_MSG_GRANT:
3062 receive_grant(ls, ms);
3063 break;
3064
3065 case DLM_MSG_BAST:
3066 receive_bast(ls, ms);
3067 break;
3068
3069 /* messages sent to a dir node */
3070
3071 case DLM_MSG_LOOKUP:
3072 receive_lookup(ls, ms);
3073 break;
3074
3075 case DLM_MSG_REMOVE:
3076 receive_remove(ls, ms);
3077 break;
3078
3079 /* messages sent from a dir node (remove has no reply) */
3080
3081 case DLM_MSG_LOOKUP_REPLY:
3082 receive_lookup_reply(ls, ms);
3083 break;
3084
3085 default:
3086 log_error(ls, "unknown message type %d", ms->m_type);
3087 }
3088
3089 unlock_recovery(ls);
3090 out:
3091 dlm_put_lockspace(ls);
3092 dlm_astd_wake();
3093 return 0;
3094}
3095
3096
3097/*
3098 * Recovery related
3099 */
3100
3101static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3102{
3103 if (middle_conversion(lkb)) {
3104 hold_lkb(lkb);
3105 ls->ls_stub_ms.m_result = -EINPROGRESS;
3106 _remove_from_waiters(lkb);
3107 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3108
3109 /* Same special case as in receive_rcom_lock_args() */
3110 lkb->lkb_grmode = DLM_LOCK_IV;
3111 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3112 unhold_lkb(lkb);
3113
3114 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3115 lkb->lkb_flags |= DLM_IFL_RESEND;
3116 }
3117
3118 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3119 conversions are async; there's no reply from the remote master */
3120}
3121
3122/* A waiting lkb needs recovery if the master node has failed, or
3123 the master node is changing (only when no directory is used) */
3124
3125static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3126{
3127 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3128 return 1;
3129
3130 if (!dlm_no_directory(ls))
3131 return 0;
3132
3133 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3134 return 1;
3135
3136 return 0;
3137}
3138
3139/* Recovery for locks that are waiting for replies from nodes that are now
3140 gone. We can just complete unlocks and cancels by faking a reply from the
3141 dead node. Requests and up-conversions we flag to be resent after
3142 recovery. Down-conversions can just be completed with a fake reply like
3143 unlocks. Conversions between PR and CW need special attention. */
3144
3145void dlm_recover_waiters_pre(struct dlm_ls *ls)
3146{
3147 struct dlm_lkb *lkb, *safe;
3148
90135925 3149 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3150
3151 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3152 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3153 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3154
3155 /* all outstanding lookups, regardless of destination will be
3156 resent after recovery is done */
3157
3158 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3159 lkb->lkb_flags |= DLM_IFL_RESEND;
3160 continue;
3161 }
3162
3163 if (!waiter_needs_recovery(ls, lkb))
3164 continue;
3165
3166 switch (lkb->lkb_wait_type) {
3167
3168 case DLM_MSG_REQUEST:
3169 lkb->lkb_flags |= DLM_IFL_RESEND;
3170 break;
3171
3172 case DLM_MSG_CONVERT:
3173 recover_convert_waiter(ls, lkb);
3174 break;
3175
3176 case DLM_MSG_UNLOCK:
3177 hold_lkb(lkb);
3178 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3179 _remove_from_waiters(lkb);
3180 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3181 dlm_put_lkb(lkb);
e7fd4179
DT
3182 break;
3183
3184 case DLM_MSG_CANCEL:
3185 hold_lkb(lkb);
3186 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3187 _remove_from_waiters(lkb);
3188 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3189 dlm_put_lkb(lkb);
e7fd4179
DT
3190 break;
3191
3192 default:
3193 log_error(ls, "invalid lkb wait_type %d",
3194 lkb->lkb_wait_type);
3195 }
3196 }
90135925 3197 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3198}
3199
3200static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3201{
3202 struct dlm_lkb *lkb;
3203 int rv = 0;
3204
90135925 3205 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3206 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3207 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3208 rv = lkb->lkb_wait_type;
3209 _remove_from_waiters(lkb);
3210 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3211 break;
3212 }
3213 }
90135925 3214 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3215
3216 if (!rv)
3217 lkb = NULL;
3218 *lkb_ret = lkb;
3219 return rv;
3220}
3221
3222/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3223 master or dir-node for r. Processing the lkb may result in it being placed
3224 back on waiters. */
3225
3226int dlm_recover_waiters_post(struct dlm_ls *ls)
3227{
3228 struct dlm_lkb *lkb;
3229 struct dlm_rsb *r;
3230 int error = 0, mstype;
3231
3232 while (1) {
3233 if (dlm_locking_stopped(ls)) {
3234 log_debug(ls, "recover_waiters_post aborted");
3235 error = -EINTR;
3236 break;
3237 }
3238
3239 mstype = remove_resend_waiter(ls, &lkb);
3240 if (!mstype)
3241 break;
3242
3243 r = lkb->lkb_resource;
3244
3245 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3246 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3247
3248 switch (mstype) {
3249
3250 case DLM_MSG_LOOKUP:
3251 hold_rsb(r);
3252 lock_rsb(r);
3253 _request_lock(r, lkb);
3254 if (is_master(r))
3255 confirm_master(r, 0);
3256 unlock_rsb(r);
3257 put_rsb(r);
3258 break;
3259
3260 case DLM_MSG_REQUEST:
3261 hold_rsb(r);
3262 lock_rsb(r);
3263 _request_lock(r, lkb);
3264 unlock_rsb(r);
3265 put_rsb(r);
3266 break;
3267
3268 case DLM_MSG_CONVERT:
3269 hold_rsb(r);
3270 lock_rsb(r);
3271 _convert_lock(r, lkb);
3272 unlock_rsb(r);
3273 put_rsb(r);
3274 break;
3275
3276 default:
3277 log_error(ls, "recover_waiters_post type %d", mstype);
3278 }
3279 }
3280
3281 return error;
3282}
3283
3284static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3285 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3286{
3287 struct dlm_ls *ls = r->res_ls;
3288 struct dlm_lkb *lkb, *safe;
3289
3290 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3291 if (test(ls, lkb)) {
97a35d1e 3292 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
3293 del_lkb(r, lkb);
3294 /* this put should free the lkb */
b3f58d8f 3295 if (!dlm_put_lkb(lkb))
e7fd4179
DT
3296 log_error(ls, "purged lkb not released");
3297 }
3298 }
3299}
3300
3301static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3302{
3303 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3304}
3305
3306static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3307{
3308 return is_master_copy(lkb);
3309}
3310
3311static void purge_dead_locks(struct dlm_rsb *r)
3312{
3313 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3314 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3315 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3316}
3317
3318void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3319{
3320 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3321 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3322 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3323}
3324
3325/* Get rid of locks held by nodes that are gone. */
3326
3327int dlm_purge_locks(struct dlm_ls *ls)
3328{
3329 struct dlm_rsb *r;
3330
3331 log_debug(ls, "dlm_purge_locks");
3332
3333 down_write(&ls->ls_root_sem);
3334 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3335 hold_rsb(r);
3336 lock_rsb(r);
3337 if (is_master(r))
3338 purge_dead_locks(r);
3339 unlock_rsb(r);
3340 unhold_rsb(r);
3341
3342 schedule();
3343 }
3344 up_write(&ls->ls_root_sem);
3345
3346 return 0;
3347}
3348
97a35d1e
DT
3349static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3350{
3351 struct dlm_rsb *r, *r_ret = NULL;
3352
3353 read_lock(&ls->ls_rsbtbl[bucket].lock);
3354 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3355 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3356 continue;
3357 hold_rsb(r);
3358 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3359 r_ret = r;
3360 break;
3361 }
3362 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3363 return r_ret;
3364}
3365
3366void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
3367{
3368 struct dlm_rsb *r;
3369 int i;
3370
3371 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
97a35d1e
DT
3372 r = find_purged_rsb(ls, i);
3373 if (!r)
3374 continue;
3375 lock_rsb(r);
3376 if (is_master(r)) {
3377 grant_pending_locks(r);
3378 confirm_master(r, 0);
e7fd4179 3379 }
97a35d1e
DT
3380 unlock_rsb(r);
3381 put_rsb(r);
e7fd4179 3382 }
e7fd4179
DT
3383}
3384
3385static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3386 uint32_t remid)
3387{
3388 struct dlm_lkb *lkb;
3389
3390 list_for_each_entry(lkb, head, lkb_statequeue) {
3391 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3392 return lkb;
3393 }
3394 return NULL;
3395}
3396
3397static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3398 uint32_t remid)
3399{
3400 struct dlm_lkb *lkb;
3401
3402 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3403 if (lkb)
3404 return lkb;
3405 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3406 if (lkb)
3407 return lkb;
3408 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3409 if (lkb)
3410 return lkb;
3411 return NULL;
3412}
3413
3414static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3415 struct dlm_rsb *r, struct dlm_rcom *rc)
3416{
3417 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3418 int lvblen;
3419
3420 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3421 lkb->lkb_ownpid = rl->rl_ownpid;
3422 lkb->lkb_remid = rl->rl_lkid;
3423 lkb->lkb_exflags = rl->rl_exflags;
3424 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3425 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3426 lkb->lkb_lvbseq = rl->rl_lvbseq;
3427 lkb->lkb_rqmode = rl->rl_rqmode;
3428 lkb->lkb_grmode = rl->rl_grmode;
3429 /* don't set lkb_status because add_lkb wants to itself */
3430
3431 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3432 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3433
e7fd4179
DT
3434 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3435 lkb->lkb_lvbptr = allocate_lvb(ls);
3436 if (!lkb->lkb_lvbptr)
3437 return -ENOMEM;
3438 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3439 sizeof(struct rcom_lock);
3440 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3441 }
3442
3443 /* Conversions between PR and CW (middle modes) need special handling.
3444 The real granted mode of these converting locks cannot be determined
3445 until all locks have been rebuilt on the rsb (recover_conversion) */
3446
3447 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3448 rl->rl_status = DLM_LKSTS_CONVERT;
3449 lkb->lkb_grmode = DLM_LOCK_IV;
3450 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3451 }
3452
3453 return 0;
3454}
3455
3456/* This lkb may have been recovered in a previous aborted recovery so we need
3457 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3458 If so we just send back a standard reply. If not, we create a new lkb with
3459 the given values and send back our lkid. We send back our lkid by sending
3460 back the rcom_lock struct we got but with the remid field filled in. */
3461
3462int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3463{
3464 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3465 struct dlm_rsb *r;
3466 struct dlm_lkb *lkb;
3467 int error;
3468
3469 if (rl->rl_parent_lkid) {
3470 error = -EOPNOTSUPP;
3471 goto out;
3472 }
3473
3474 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3475 if (error)
3476 goto out;
3477
3478 lock_rsb(r);
3479
3480 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3481 if (lkb) {
3482 error = -EEXIST;
3483 goto out_remid;
3484 }
3485
3486 error = create_lkb(ls, &lkb);
3487 if (error)
3488 goto out_unlock;
3489
3490 error = receive_rcom_lock_args(ls, lkb, r, rc);
3491 if (error) {
b3f58d8f 3492 __put_lkb(ls, lkb);
e7fd4179
DT
3493 goto out_unlock;
3494 }
3495
3496 attach_lkb(r, lkb);
3497 add_lkb(r, lkb, rl->rl_status);
3498 error = 0;
3499
3500 out_remid:
3501 /* this is the new value returned to the lock holder for
3502 saving in its process-copy lkb */
3503 rl->rl_remid = lkb->lkb_id;
3504
3505 out_unlock:
3506 unlock_rsb(r);
3507 put_rsb(r);
3508 out:
3509 if (error)
3510 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3511 rl->rl_result = error;
3512 return error;
3513}
3514
3515int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3516{
3517 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3518 struct dlm_rsb *r;
3519 struct dlm_lkb *lkb;
3520 int error;
3521
3522 error = find_lkb(ls, rl->rl_lkid, &lkb);
3523 if (error) {
3524 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3525 return error;
3526 }
3527
3528 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3529
3530 error = rl->rl_result;
3531
3532 r = lkb->lkb_resource;
3533 hold_rsb(r);
3534 lock_rsb(r);
3535
3536 switch (error) {
3537 case -EEXIST:
3538 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3539 /* fall through */
3540 case 0:
3541 lkb->lkb_remid = rl->rl_remid;
3542 break;
3543 default:
3544 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3545 error, lkb->lkb_id);
3546 }
3547
3548 /* an ack for dlm_recover_locks() which waits for replies from
3549 all the locks it sends to new masters */
3550 dlm_recovered_lock(r);
3551
3552 unlock_rsb(r);
3553 put_rsb(r);
b3f58d8f 3554 dlm_put_lkb(lkb);
e7fd4179
DT
3555
3556 return 0;
3557}
3558
597d0cae
DT
3559int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3560 int mode, uint32_t flags, void *name, unsigned int namelen,
3561 uint32_t parent_lkid)
3562{
3563 struct dlm_lkb *lkb;
3564 struct dlm_args args;
3565 int error;
3566
3567 lock_recovery(ls);
3568
3569 error = create_lkb(ls, &lkb);
3570 if (error) {
3571 kfree(ua);
3572 goto out;
3573 }
3574
3575 if (flags & DLM_LKF_VALBLK) {
3576 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3577 if (!ua->lksb.sb_lvbptr) {
3578 kfree(ua);
3579 __put_lkb(ls, lkb);
3580 error = -ENOMEM;
3581 goto out;
3582 }
3583 }
3584
3585 /* After ua is attached to lkb it will be freed by free_lkb().
3586 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3587 lock and that lkb_astparam is the dlm_user_args structure. */
3588
3589 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3590 FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3591 lkb->lkb_flags |= DLM_IFL_USER;
3592 ua->old_mode = DLM_LOCK_IV;
3593
3594 if (error) {
3595 __put_lkb(ls, lkb);
3596 goto out;
3597 }
3598
3599 error = request_lock(ls, lkb, name, namelen, &args);
3600
3601 switch (error) {
3602 case 0:
3603 break;
3604 case -EINPROGRESS:
3605 error = 0;
3606 break;
3607 case -EAGAIN:
3608 error = 0;
3609 /* fall through */
3610 default:
3611 __put_lkb(ls, lkb);
3612 goto out;
3613 }
3614
3615 /* add this new lkb to the per-process list of locks */
3616 spin_lock(&ua->proc->locks_spin);
3617 kref_get(&lkb->lkb_ref);
3618 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3619 spin_unlock(&ua->proc->locks_spin);
3620 out:
3621 unlock_recovery(ls);
3622 return error;
3623}
3624
3625int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3626 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3627{
3628 struct dlm_lkb *lkb;
3629 struct dlm_args args;
3630 struct dlm_user_args *ua;
3631 int error;
3632
3633 lock_recovery(ls);
3634
3635 error = find_lkb(ls, lkid, &lkb);
3636 if (error)
3637 goto out;
3638
3639 /* user can change the params on its lock when it converts it, or
3640 add an lvb that didn't exist before */
3641
3642 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3643
3644 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3645 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3646 if (!ua->lksb.sb_lvbptr) {
3647 error = -ENOMEM;
3648 goto out_put;
3649 }
3650 }
3651 if (lvb_in && ua->lksb.sb_lvbptr)
3652 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3653
3654 ua->castparam = ua_tmp->castparam;
3655 ua->castaddr = ua_tmp->castaddr;
3656 ua->bastparam = ua_tmp->bastparam;
3657 ua->bastaddr = ua_tmp->bastaddr;
3658 ua->old_mode = lkb->lkb_grmode;
3659
3660 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3661 FAKE_USER_AST, &args);
3662 if (error)
3663 goto out_put;
3664
3665 error = convert_lock(ls, lkb, &args);
3666
3667 if (error == -EINPROGRESS || error == -EAGAIN)
3668 error = 0;
3669 out_put:
3670 dlm_put_lkb(lkb);
3671 out:
3672 unlock_recovery(ls);
3673 kfree(ua_tmp);
3674 return error;
3675}
3676
3677int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3678 uint32_t flags, uint32_t lkid, char *lvb_in)
3679{
3680 struct dlm_lkb *lkb;
3681 struct dlm_args args;
3682 struct dlm_user_args *ua;
3683 int error;
3684
3685 lock_recovery(ls);
3686
3687 error = find_lkb(ls, lkid, &lkb);
3688 if (error)
3689 goto out;
3690
3691 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3692
3693 if (lvb_in && ua->lksb.sb_lvbptr)
3694 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3695 ua->castparam = ua_tmp->castparam;
3696
3697 error = set_unlock_args(flags, ua, &args);
3698 if (error)
3699 goto out_put;
3700
3701 error = unlock_lock(ls, lkb, &args);
3702
3703 if (error == -DLM_EUNLOCK)
3704 error = 0;
3705 if (error)
3706 goto out_put;
3707
3708 spin_lock(&ua->proc->locks_spin);
34e22bed 3709 list_del_init(&lkb->lkb_ownqueue);
597d0cae
DT
3710 spin_unlock(&ua->proc->locks_spin);
3711
3712 /* this removes the reference for the proc->locks list added by
3713 dlm_user_request */
3714 unhold_lkb(lkb);
3715 out_put:
3716 dlm_put_lkb(lkb);
3717 out:
3718 unlock_recovery(ls);
3719 return error;
3720}
3721
3722int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3723 uint32_t flags, uint32_t lkid)
3724{
3725 struct dlm_lkb *lkb;
3726 struct dlm_args args;
3727 struct dlm_user_args *ua;
3728 int error;
3729
3730 lock_recovery(ls);
3731
3732 error = find_lkb(ls, lkid, &lkb);
3733 if (error)
3734 goto out;
3735
3736 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3737 ua->castparam = ua_tmp->castparam;
3738
3739 error = set_unlock_args(flags, ua, &args);
3740 if (error)
3741 goto out_put;
3742
3743 error = cancel_lock(ls, lkb, &args);
3744
3745 if (error == -DLM_ECANCEL)
3746 error = 0;
3747 if (error)
3748 goto out_put;
3749
3750 /* this lkb was removed from the WAITING queue */
3751 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3752 spin_lock(&ua->proc->locks_spin);
34e22bed 3753 list_del_init(&lkb->lkb_ownqueue);
597d0cae
DT
3754 spin_unlock(&ua->proc->locks_spin);
3755 unhold_lkb(lkb);
3756 }
3757 out_put:
3758 dlm_put_lkb(lkb);
3759 out:
3760 unlock_recovery(ls);
3761 return error;
3762}
3763
3764static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3765{
3766 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3767
3768 if (ua->lksb.sb_lvbptr)
3769 kfree(ua->lksb.sb_lvbptr);
3770 kfree(ua);
3771 lkb->lkb_astparam = (long)NULL;
3772
3773 /* TODO: propogate to master if needed */
3774 return 0;
3775}
3776
3777/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3778 Regardless of what rsb queue the lock is on, it's removed and freed. */
3779
3780static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3781{
3782 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3783 struct dlm_args args;
3784 int error;
3785
3786 /* FIXME: we need to handle the case where the lkb is in limbo
3787 while the rsb is being looked up, currently we assert in
3788 _unlock_lock/is_remote because rsb nodeid is -1. */
3789
3790 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3791
3792 error = unlock_lock(ls, lkb, &args);
3793 if (error == -DLM_EUNLOCK)
3794 error = 0;
3795 return error;
3796}
3797
3798/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3799 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3800 which we clear here. */
3801
3802/* proc CLOSING flag is set so no more device_reads should look at proc->asts
3803 list, and no more device_writes should add lkb's to proc->locks list; so we
3804 shouldn't need to take asts_spin or locks_spin here. this assumes that
3805 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3806 them ourself. */
3807
3808void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3809{
3810 struct dlm_lkb *lkb, *safe;
3811
3812 lock_recovery(ls);
3813 mutex_lock(&ls->ls_clear_proc_locks);
3814
3815 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3816 if (lkb->lkb_ast_type) {
3817 list_del(&lkb->lkb_astqueue);
3818 unhold_lkb(lkb);
3819 }
3820
34e22bed 3821 list_del_init(&lkb->lkb_ownqueue);
597d0cae
DT
3822
3823 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3824 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3825 orphan_proc_lock(ls, lkb);
3826 } else {
3827 lkb->lkb_flags |= DLM_IFL_DEAD;
3828 unlock_proc_lock(ls, lkb);
3829 }
3830
3831 /* this removes the reference for the proc->locks list
3832 added by dlm_user_request, it may result in the lkb
3833 being freed */
3834
3835 dlm_put_lkb(lkb);
3836 }
3837 mutex_unlock(&ls->ls_clear_proc_locks);
3838 unlock_recovery(ls);
3839}