]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/smp.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / smp.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22 #include <seastar/core/smp.hh>
23 #include <seastar/core/loop.hh>
24 #include <seastar/core/semaphore.hh>
25 #include <seastar/core/print.hh>
26 #include <boost/range/algorithm/find_if.hpp>
27 #include <vector>
28
29 namespace seastar {
30
31 void smp_message_queue::work_item::process() {
32 schedule(this);
33 }
34
35 struct smp_service_group_impl {
36 std::vector<smp_service_group_semaphore> clients; // one client per server shard
37 };
38
39 static smp_service_group_semaphore smp_service_group_management_sem{1, named_semaphore_exception_factory{"smp_service_group_management_sem"}};
40 static thread_local std::vector<smp_service_group_impl> smp_service_groups;
41
42 static named_semaphore_exception_factory make_service_group_semaphore_exception_factory(unsigned id, shard_id client_cpu, shard_id this_cpu, std::optional<sstring> smp_group_name) {
43 if (smp_group_name) {
44 return named_semaphore_exception_factory{format("smp_service_group:'{}' (#{}) {}->{} semaphore", *smp_group_name, id, client_cpu, this_cpu)};
45 } else {
46 return named_semaphore_exception_factory{format("smp_service_group#{} {}->{} semaphore", id, client_cpu, this_cpu)};
47 }
48
49 }
50
51 static_assert(std::is_nothrow_copy_constructible_v<smp_service_group>);
52 static_assert(std::is_nothrow_move_constructible_v<smp_service_group>);
53
54 static_assert(std::is_nothrow_default_constructible_v<smp_submit_to_options>);
55 static_assert(std::is_nothrow_copy_constructible_v<smp_submit_to_options>);
56 static_assert(std::is_nothrow_move_constructible_v<smp_submit_to_options>);
57
58 future<smp_service_group> create_smp_service_group(smp_service_group_config ssgc) noexcept {
59 ssgc.max_nonlocal_requests = std::max(ssgc.max_nonlocal_requests, smp::count - 1);
60 return smp::submit_to(0, [ssgc] {
61 return with_semaphore(smp_service_group_management_sem, 1, [ssgc] {
62 auto it = boost::range::find_if(smp_service_groups, [&] (smp_service_group_impl& ssgi) { return ssgi.clients.empty(); });
63 size_t id = it - smp_service_groups.begin();
64 return parallel_for_each(smp::all_cpus(), [ssgc, id] (unsigned cpu) {
65 return smp::submit_to(cpu, [ssgc, id, cpu] {
66 if (id >= smp_service_groups.size()) {
67 smp_service_groups.resize(id + 1); // may throw
68 }
69 smp_service_groups[id].clients.reserve(smp::count); // may throw
70 auto per_client = smp::count > 1 ? ssgc.max_nonlocal_requests / (smp::count - 1) : 0u;
71 for (unsigned i = 0; i != smp::count; ++i) {
72 smp_service_groups[id].clients.emplace_back(per_client, make_service_group_semaphore_exception_factory(id, i, cpu, ssgc.group_name));
73 }
74 });
75 }).handle_exception([id] (std::exception_ptr e) {
76 // rollback
77 return smp::invoke_on_all([id] {
78 if (smp_service_groups.size() > id) {
79 smp_service_groups[id].clients.clear();
80 }
81 }).then([e = std::move(e)] () mutable {
82 std::rethrow_exception(std::move(e));
83 });
84 }).then([id] {
85 return smp_service_group(id);
86 });
87 });
88 });
89 }
90
91 future<> destroy_smp_service_group(smp_service_group ssg) noexcept {
92 return smp::submit_to(0, [ssg] {
93 return with_semaphore(smp_service_group_management_sem, 1, [ssg] {
94 auto id = internal::smp_service_group_id(ssg);
95 return smp::invoke_on_all([id] {
96 smp_service_groups[id].clients.clear();
97 });
98 });
99 });
100 }
101
102 void init_default_smp_service_group(shard_id cpu) {
103 smp_service_groups.emplace_back();
104 auto& ssg0 = smp_service_groups.back();
105 ssg0.clients.reserve(smp::count);
106 for (unsigned i = 0; i != smp::count; ++i) {
107 ssg0.clients.emplace_back(smp_service_group_semaphore::max_counter(), make_service_group_semaphore_exception_factory(0, i, cpu, {"default"}));
108 }
109 }
110
111 smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) noexcept {
112 return smp_service_groups[ssg_id].clients[t];
113 }
114
115 }