]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/QueueStrategy.cc
0ce279b31e9f1055abd6b79545a53df6d76a840e
[ceph.git] / ceph / src / msg / QueueStrategy.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 CohortFS, LLC
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include <string>
15 #include "QueueStrategy.h"
16 #define dout_subsys ceph_subsys_ms
17 #include "common/debug.h"
18
19 QueueStrategy::QueueStrategy(int _n_threads)
20 : lock("QueueStrategy::lock"),
21 n_threads(_n_threads),
22 stop(false),
23 mqueue(),
24 disp_threads()
25 {
26 }
27
28 void QueueStrategy::ds_dispatch(Message *m) {
29 msgr->ms_fast_preprocess(m);
30 if (msgr->ms_can_fast_dispatch(m)) {
31 msgr->ms_fast_dispatch(m);
32 return;
33 }
34 lock.Lock();
35 mqueue.push_back(*m);
36 if (disp_threads.size()) {
37 if (! disp_threads.empty()) {
38 QSThread *thrd = &disp_threads.front();
39 disp_threads.pop_front();
40 thrd->cond.Signal();
41 }
42 }
43 lock.Unlock();
44 }
45
46 void QueueStrategy::entry(QSThread *thrd)
47 {
48 Message *m = NULL;
49 for (;;) {
50 lock.Lock();
51 for (;;) {
52 if (! mqueue.empty()) {
53 m = &(mqueue.front());
54 mqueue.pop_front();
55 break;
56 }
57 m = NULL;
58 if (stop)
59 break;
60 disp_threads.push_front(*thrd);
61 thrd->cond.Wait(lock);
62 }
63 lock.Unlock();
64 if (stop) {
65 if (!m) break;
66 m->put();
67 continue;
68 }
69 get_messenger()->ms_deliver_dispatch(m);
70 }
71 }
72
73 void QueueStrategy::shutdown()
74 {
75 QSThread *thrd;
76 lock.Lock();
77 stop = true;
78 while (disp_threads.size()) {
79 thrd = &(disp_threads.front());
80 disp_threads.pop_front();
81 thrd->cond.Signal();
82 }
83 lock.Unlock();
84 }
85
86 void QueueStrategy::wait()
87 {
88 QSThread *thrd;
89 lock.Lock();
90 assert(stop);
91 while (disp_threads.size()) {
92 thrd = &(disp_threads.front());
93 disp_threads.pop_front();
94 lock.Unlock();
95
96 // join outside of lock
97 thrd->join();
98
99 lock.Lock();
100 }
101 lock.Unlock();
102 }
103
104 void QueueStrategy::start()
105 {
106 QSThread *thrd;
107 assert(!stop);
108 lock.Lock();
109 for (int ix = 0; ix < n_threads; ++ix) {
110 string thread_name = "ms_xio_qs_";
111 thread_name.append(std::to_string(ix));
112 thrd = new QSThread(this);
113 thrd->create(thread_name.c_str());
114 }
115 lock.Unlock();
116 }