]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | #include <seastar/core/smp.hh> | |
f67539c2 | 23 | #include <seastar/core/loop.hh> |
9f95a23c TL |
24 | #include <seastar/core/semaphore.hh> |
25 | #include <seastar/core/print.hh> | |
1e59de90 | 26 | #include <seastar/core/on_internal_error.hh> |
9f95a23c TL |
27 | #include <boost/range/algorithm/find_if.hpp> |
28 | #include <vector> | |
29 | ||
30 | namespace seastar { | |
31 | ||
1e59de90 TL |
32 | extern logger seastar_logger; |
33 | ||
9f95a23c TL |
34 | void smp_message_queue::work_item::process() { |
35 | schedule(this); | |
36 | } | |
37 | ||
38 | struct smp_service_group_impl { | |
39 | std::vector<smp_service_group_semaphore> clients; // one client per server shard | |
1e59de90 TL |
40 | #ifdef SEASTAR_DEBUG |
41 | unsigned version = 0; | |
42 | #endif | |
9f95a23c TL |
43 | }; |
44 | ||
45 | static smp_service_group_semaphore smp_service_group_management_sem{1, named_semaphore_exception_factory{"smp_service_group_management_sem"}}; | |
46 | static thread_local std::vector<smp_service_group_impl> smp_service_groups; | |
47 | ||
f67539c2 TL |
48 | static named_semaphore_exception_factory make_service_group_semaphore_exception_factory(unsigned id, shard_id client_cpu, shard_id this_cpu, std::optional<sstring> smp_group_name) { |
49 | if (smp_group_name) { | |
50 | return named_semaphore_exception_factory{format("smp_service_group:'{}' (#{}) {}->{} semaphore", *smp_group_name, id, client_cpu, this_cpu)}; | |
51 | } else { | |
52 | return named_semaphore_exception_factory{format("smp_service_group#{} {}->{} semaphore", id, client_cpu, this_cpu)}; | |
53 | } | |
54 | ||
9f95a23c TL |
55 | } |
56 | ||
f67539c2 TL |
57 | static_assert(std::is_nothrow_copy_constructible_v<smp_service_group>); |
58 | static_assert(std::is_nothrow_move_constructible_v<smp_service_group>); | |
59 | ||
60 | static_assert(std::is_nothrow_default_constructible_v<smp_submit_to_options>); | |
61 | static_assert(std::is_nothrow_copy_constructible_v<smp_submit_to_options>); | |
62 | static_assert(std::is_nothrow_move_constructible_v<smp_submit_to_options>); | |
63 | ||
64 | future<smp_service_group> create_smp_service_group(smp_service_group_config ssgc) noexcept { | |
9f95a23c TL |
65 | ssgc.max_nonlocal_requests = std::max(ssgc.max_nonlocal_requests, smp::count - 1); |
66 | return smp::submit_to(0, [ssgc] { | |
67 | return with_semaphore(smp_service_group_management_sem, 1, [ssgc] { | |
68 | auto it = boost::range::find_if(smp_service_groups, [&] (smp_service_group_impl& ssgi) { return ssgi.clients.empty(); }); | |
69 | size_t id = it - smp_service_groups.begin(); | |
70 | return parallel_for_each(smp::all_cpus(), [ssgc, id] (unsigned cpu) { | |
71 | return smp::submit_to(cpu, [ssgc, id, cpu] { | |
72 | if (id >= smp_service_groups.size()) { | |
73 | smp_service_groups.resize(id + 1); // may throw | |
74 | } | |
75 | smp_service_groups[id].clients.reserve(smp::count); // may throw | |
76 | auto per_client = smp::count > 1 ? ssgc.max_nonlocal_requests / (smp::count - 1) : 0u; | |
77 | for (unsigned i = 0; i != smp::count; ++i) { | |
f67539c2 | 78 | smp_service_groups[id].clients.emplace_back(per_client, make_service_group_semaphore_exception_factory(id, i, cpu, ssgc.group_name)); |
9f95a23c TL |
79 | } |
80 | }); | |
81 | }).handle_exception([id] (std::exception_ptr e) { | |
82 | // rollback | |
83 | return smp::invoke_on_all([id] { | |
84 | if (smp_service_groups.size() > id) { | |
85 | smp_service_groups[id].clients.clear(); | |
86 | } | |
87 | }).then([e = std::move(e)] () mutable { | |
88 | std::rethrow_exception(std::move(e)); | |
89 | }); | |
90 | }).then([id] { | |
1e59de90 TL |
91 | auto ret = smp_service_group(id); |
92 | #ifdef SEASTAR_DEBUG | |
93 | ret._version = smp_service_groups[id].version; | |
94 | #endif | |
95 | return ret; | |
9f95a23c TL |
96 | }); |
97 | }); | |
98 | }); | |
99 | } | |
100 | ||
f67539c2 | 101 | future<> destroy_smp_service_group(smp_service_group ssg) noexcept { |
9f95a23c TL |
102 | return smp::submit_to(0, [ssg] { |
103 | return with_semaphore(smp_service_group_management_sem, 1, [ssg] { | |
104 | auto id = internal::smp_service_group_id(ssg); | |
1e59de90 TL |
105 | if (id >= smp_service_groups.size()) { |
106 | on_fatal_internal_error(seastar_logger, format("destroy_smp_service_group id={}: out of range", id)); | |
107 | } | |
108 | #ifdef SEASTAR_DEBUG | |
109 | if (ssg._version != smp_service_groups[id].version) { | |
110 | on_fatal_internal_error(seastar_logger, format("destroy_smp_service_group id={}: stale version={}: current_version={}", id, ssg._version, smp_service_groups[id].version)); | |
111 | } | |
112 | #endif | |
9f95a23c TL |
113 | return smp::invoke_on_all([id] { |
114 | smp_service_groups[id].clients.clear(); | |
1e59de90 TL |
115 | #ifdef SEASTAR_DEBUG |
116 | ++smp_service_groups[id].version; | |
117 | #endif | |
9f95a23c TL |
118 | }); |
119 | }); | |
120 | }); | |
121 | } | |
122 | ||
123 | void init_default_smp_service_group(shard_id cpu) { | |
124 | smp_service_groups.emplace_back(); | |
125 | auto& ssg0 = smp_service_groups.back(); | |
126 | ssg0.clients.reserve(smp::count); | |
127 | for (unsigned i = 0; i != smp::count; ++i) { | |
f67539c2 | 128 | ssg0.clients.emplace_back(smp_service_group_semaphore::max_counter(), make_service_group_semaphore_exception_factory(0, i, cpu, {"default"})); |
9f95a23c TL |
129 | } |
130 | } | |
131 | ||
f67539c2 | 132 | smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) noexcept { |
9f95a23c TL |
133 | return smp_service_groups[ssg_id].clients[t]; |
134 | } | |
135 | ||
9f95a23c | 136 | } |