]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Thread.cc
update sources to v12.1.0
[ceph.git] / ceph / src / common / Thread.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2011 New Dream Network
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "include/compat.h"
16 #include "common/Thread.h"
17 #include "common/code_environment.h"
18 #include "common/debug.h"
19 #include "common/signal.h"
20 #include "common/io_priority.h"
21
22 #ifdef HAVE_SCHED
23 #include <sched.h>
24 #endif
25
26 static int _set_affinity(int id)
27 {
28 #ifdef HAVE_SCHED
29 if (id >= 0 && id < CPU_SETSIZE) {
30 cpu_set_t cpuset;
31 CPU_ZERO(&cpuset);
32
33 CPU_SET(id, &cpuset);
34
35 if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0)
36 return -errno;
37 /* guaranteed to take effect immediately */
38 sched_yield();
39 }
40 #endif
41 return 0;
42 }
43
44 Thread::Thread()
45 : thread_id(0),
46 pid(0),
47 ioprio_class(-1),
48 ioprio_priority(-1),
49 cpuid(-1),
50 thread_name(NULL)
51 {
52 }
53
54 Thread::~Thread()
55 {
56 }
57
58 void *Thread::_entry_func(void *arg) {
59 void *r = ((Thread*)arg)->entry_wrapper();
60 return r;
61 }
62
63 void *Thread::entry_wrapper()
64 {
65 int p = ceph_gettid(); // may return -ENOSYS on other platforms
66 if (p > 0)
67 pid = p;
68 if (pid &&
69 ioprio_class >= 0 &&
70 ioprio_priority >= 0) {
71 ceph_ioprio_set(IOPRIO_WHO_PROCESS,
72 pid,
73 IOPRIO_PRIO_VALUE(ioprio_class, ioprio_priority));
74 }
75 if (pid && cpuid >= 0)
76 _set_affinity(cpuid);
77
78 ceph_pthread_setname(pthread_self(), thread_name);
79 return entry();
80 }
81
82 const pthread_t &Thread::get_thread_id() const
83 {
84 return thread_id;
85 }
86
87 bool Thread::is_started() const
88 {
89 return thread_id != 0;
90 }
91
92 bool Thread::am_self() const
93 {
94 return (pthread_self() == thread_id);
95 }
96
97 int Thread::kill(int signal)
98 {
99 if (thread_id)
100 return pthread_kill(thread_id, signal);
101 else
102 return -EINVAL;
103 }
104
105 int Thread::try_create(size_t stacksize)
106 {
107 pthread_attr_t *thread_attr = NULL;
108 pthread_attr_t thread_attr_loc;
109
110 stacksize &= CEPH_PAGE_MASK; // must be multiple of page
111 if (stacksize) {
112 thread_attr = &thread_attr_loc;
113 pthread_attr_init(thread_attr);
114 pthread_attr_setstacksize(thread_attr, stacksize);
115 }
116
117 int r;
118
119 // The child thread will inherit our signal mask. Set our signal mask to
120 // the set of signals we want to block. (It's ok to block signals more
121 // signals than usual for a little while-- they will just be delivered to
122 // another thread or delieverd to this thread later.)
123 sigset_t old_sigset;
124 if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
125 block_signals(NULL, &old_sigset);
126 }
127 else {
128 int to_block[] = { SIGPIPE , 0 };
129 block_signals(to_block, &old_sigset);
130 }
131 r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
132 restore_sigset(&old_sigset);
133
134 if (thread_attr) {
135 pthread_attr_destroy(thread_attr);
136 }
137
138 return r;
139 }
140
141 void Thread::create(const char *name, size_t stacksize)
142 {
143 assert(strlen(name) < 16);
144 thread_name = name;
145
146 int ret = try_create(stacksize);
147 if (ret != 0) {
148 char buf[256];
149 snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
150 "failed with error %d", ret);
151 dout_emergency(buf);
152 assert(ret == 0);
153 }
154 }
155
156 int Thread::join(void **prval)
157 {
158 if (thread_id == 0) {
159 assert("join on thread that was never started" == 0);
160 return -EINVAL;
161 }
162
163 int status = pthread_join(thread_id, prval);
164 if (status != 0) {
165 char buf[256];
166 snprintf(buf, sizeof(buf), "Thread::join(): pthread_join "
167 "failed with error %d\n", status);
168 dout_emergency(buf);
169 assert(status == 0);
170 }
171
172 thread_id = 0;
173 return status;
174 }
175
176 int Thread::detach()
177 {
178 return pthread_detach(thread_id);
179 }
180
181 int Thread::set_ioprio(int cls, int prio)
182 {
183 // fixme, maybe: this can race with create()
184 ioprio_class = cls;
185 ioprio_priority = prio;
186 if (pid && cls >= 0 && prio >= 0)
187 return ceph_ioprio_set(IOPRIO_WHO_PROCESS,
188 pid,
189 IOPRIO_PRIO_VALUE(cls, prio));
190 return 0;
191 }
192
193 int Thread::set_affinity(int id)
194 {
195 int r = 0;
196 cpuid = id;
197 if (pid && ceph_gettid() == pid)
198 r = _set_affinity(id);
199 return r;
200 }