]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Thread.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / Thread.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2011 New Dream Network
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "include/compat.h"
16 #include "common/Thread.h"
17 #include "common/code_environment.h"
18 #include "common/debug.h"
19 #include "common/signal.h"
20 #include "common/io_priority.h"
21
22 #include <dirent.h>
23 #include <errno.h>
24 #include <iostream>
25 #include <pthread.h>
26
27 #include <signal.h>
28 #include <sstream>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <sys/types.h>
32 #ifdef HAVE_SCHED
33 #include <sched.h>
34 #endif
35
36 static int _set_affinity(int id)
37 {
38 #ifdef HAVE_SCHED
39 if (id >= 0 && id < CPU_SETSIZE) {
40 cpu_set_t cpuset;
41 CPU_ZERO(&cpuset);
42
43 CPU_SET(id, &cpuset);
44
45 if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0)
46 return -errno;
47 /* guaranteed to take effect immediately */
48 sched_yield();
49 }
50 #endif
51 return 0;
52 }
53
54 Thread::Thread()
55 : thread_id(0),
56 pid(0),
57 ioprio_class(-1),
58 ioprio_priority(-1),
59 cpuid(-1),
60 thread_name(NULL)
61 {
62 }
63
64 Thread::~Thread()
65 {
66 }
67
68 void *Thread::_entry_func(void *arg) {
69 void *r = ((Thread*)arg)->entry_wrapper();
70 return r;
71 }
72
73 void *Thread::entry_wrapper()
74 {
75 int p = ceph_gettid(); // may return -ENOSYS on other platforms
76 if (p > 0)
77 pid = p;
78 if (pid &&
79 ioprio_class >= 0 &&
80 ioprio_priority >= 0) {
81 ceph_ioprio_set(IOPRIO_WHO_PROCESS,
82 pid,
83 IOPRIO_PRIO_VALUE(ioprio_class, ioprio_priority));
84 }
85 if (pid && cpuid >= 0)
86 _set_affinity(cpuid);
87
88 ceph_pthread_setname(pthread_self(), thread_name);
89 return entry();
90 }
91
92 const pthread_t &Thread::get_thread_id() const
93 {
94 return thread_id;
95 }
96
97 bool Thread::is_started() const
98 {
99 return thread_id != 0;
100 }
101
102 bool Thread::am_self() const
103 {
104 return (pthread_self() == thread_id);
105 }
106
107 int Thread::kill(int signal)
108 {
109 if (thread_id)
110 return pthread_kill(thread_id, signal);
111 else
112 return -EINVAL;
113 }
114
115 int Thread::try_create(size_t stacksize)
116 {
117 pthread_attr_t *thread_attr = NULL;
118 pthread_attr_t thread_attr_loc;
119
120 stacksize &= CEPH_PAGE_MASK; // must be multiple of page
121 if (stacksize) {
122 thread_attr = &thread_attr_loc;
123 pthread_attr_init(thread_attr);
124 pthread_attr_setstacksize(thread_attr, stacksize);
125 }
126
127 int r;
128
129 // The child thread will inherit our signal mask. Set our signal mask to
130 // the set of signals we want to block. (It's ok to block signals more
131 // signals than usual for a little while-- they will just be delivered to
132 // another thread or delieverd to this thread later.)
133 sigset_t old_sigset;
134 if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
135 block_signals(NULL, &old_sigset);
136 }
137 else {
138 int to_block[] = { SIGPIPE , 0 };
139 block_signals(to_block, &old_sigset);
140 }
141 r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
142 restore_sigset(&old_sigset);
143
144 if (thread_attr) {
145 pthread_attr_destroy(thread_attr);
146 }
147
148 return r;
149 }
150
151 void Thread::create(const char *name, size_t stacksize)
152 {
153 assert(strlen(name) < 16);
154 thread_name = name;
155
156 int ret = try_create(stacksize);
157 if (ret != 0) {
158 char buf[256];
159 snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
160 "failed with error %d", ret);
161 dout_emergency(buf);
162 assert(ret == 0);
163 }
164 }
165
166 int Thread::join(void **prval)
167 {
168 if (thread_id == 0) {
169 assert("join on thread that was never started" == 0);
170 return -EINVAL;
171 }
172
173 int status = pthread_join(thread_id, prval);
174 if (status != 0) {
175 char buf[256];
176 snprintf(buf, sizeof(buf), "Thread::join(): pthread_join "
177 "failed with error %d\n", status);
178 dout_emergency(buf);
179 assert(status == 0);
180 }
181
182 thread_id = 0;
183 return status;
184 }
185
186 int Thread::detach()
187 {
188 return pthread_detach(thread_id);
189 }
190
191 int Thread::set_ioprio(int cls, int prio)
192 {
193 // fixme, maybe: this can race with create()
194 ioprio_class = cls;
195 ioprio_priority = prio;
196 if (pid && cls >= 0 && prio >= 0)
197 return ceph_ioprio_set(IOPRIO_WHO_PROCESS,
198 pid,
199 IOPRIO_PRIO_VALUE(cls, prio));
200 return 0;
201 }
202
203 int Thread::set_affinity(int id)
204 {
205 int r = 0;
206 cpuid = id;
207 if (pid && ceph_gettid() == pid)
208 r = _set_affinity(id);
209 return r;
210 }