2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
22 #include <seastar/core/smp.hh>
23 #include <seastar/core/loop.hh>
24 #include <seastar/core/semaphore.hh>
25 #include <seastar/core/print.hh>
26 #include <seastar/core/on_internal_error.hh>
27 #include <boost/range/algorithm/find_if.hpp>
32 extern logger seastar_logger
;
34 void smp_message_queue::work_item::process() {
38 struct smp_service_group_impl
{
39 std::vector
<smp_service_group_semaphore
> clients
; // one client per server shard
45 static smp_service_group_semaphore smp_service_group_management_sem
{1, named_semaphore_exception_factory
{"smp_service_group_management_sem"}};
46 static thread_local
std::vector
<smp_service_group_impl
> smp_service_groups
;
48 static named_semaphore_exception_factory
make_service_group_semaphore_exception_factory(unsigned id
, shard_id client_cpu
, shard_id this_cpu
, std::optional
<sstring
> smp_group_name
) {
50 return named_semaphore_exception_factory
{format("smp_service_group:'{}' (#{}) {}->{} semaphore", *smp_group_name
, id
, client_cpu
, this_cpu
)};
52 return named_semaphore_exception_factory
{format("smp_service_group#{} {}->{} semaphore", id
, client_cpu
, this_cpu
)};
57 static_assert(std::is_nothrow_copy_constructible_v
<smp_service_group
>);
58 static_assert(std::is_nothrow_move_constructible_v
<smp_service_group
>);
60 static_assert(std::is_nothrow_default_constructible_v
<smp_submit_to_options
>);
61 static_assert(std::is_nothrow_copy_constructible_v
<smp_submit_to_options
>);
62 static_assert(std::is_nothrow_move_constructible_v
<smp_submit_to_options
>);
64 future
<smp_service_group
> create_smp_service_group(smp_service_group_config ssgc
) noexcept
{
65 ssgc
.max_nonlocal_requests
= std::max(ssgc
.max_nonlocal_requests
, smp::count
- 1);
66 return smp::submit_to(0, [ssgc
] {
67 return with_semaphore(smp_service_group_management_sem
, 1, [ssgc
] {
68 auto it
= boost::range::find_if(smp_service_groups
, [&] (smp_service_group_impl
& ssgi
) { return ssgi
.clients
.empty(); });
69 size_t id
= it
- smp_service_groups
.begin();
70 return parallel_for_each(smp::all_cpus(), [ssgc
, id
] (unsigned cpu
) {
71 return smp::submit_to(cpu
, [ssgc
, id
, cpu
] {
72 if (id
>= smp_service_groups
.size()) {
73 smp_service_groups
.resize(id
+ 1); // may throw
75 smp_service_groups
[id
].clients
.reserve(smp::count
); // may throw
76 auto per_client
= smp::count
> 1 ? ssgc
.max_nonlocal_requests
/ (smp::count
- 1) : 0u;
77 for (unsigned i
= 0; i
!= smp::count
; ++i
) {
78 smp_service_groups
[id
].clients
.emplace_back(per_client
, make_service_group_semaphore_exception_factory(id
, i
, cpu
, ssgc
.group_name
));
81 }).handle_exception([id
] (std::exception_ptr e
) {
83 return smp::invoke_on_all([id
] {
84 if (smp_service_groups
.size() > id
) {
85 smp_service_groups
[id
].clients
.clear();
87 }).then([e
= std::move(e
)] () mutable {
88 std::rethrow_exception(std::move(e
));
91 auto ret
= smp_service_group(id
);
93 ret
._version
= smp_service_groups
[id
].version
;
101 future
<> destroy_smp_service_group(smp_service_group ssg
) noexcept
{
102 return smp::submit_to(0, [ssg
] {
103 return with_semaphore(smp_service_group_management_sem
, 1, [ssg
] {
104 auto id
= internal::smp_service_group_id(ssg
);
105 if (id
>= smp_service_groups
.size()) {
106 on_fatal_internal_error(seastar_logger
, format("destroy_smp_service_group id={}: out of range", id
));
109 if (ssg
._version
!= smp_service_groups
[id
].version
) {
110 on_fatal_internal_error(seastar_logger
, format("destroy_smp_service_group id={}: stale version={}: current_version={}", id
, ssg
._version
, smp_service_groups
[id
].version
));
113 return smp::invoke_on_all([id
] {
114 smp_service_groups
[id
].clients
.clear();
116 ++smp_service_groups
[id
].version
;
123 void init_default_smp_service_group(shard_id cpu
) {
124 smp_service_groups
.emplace_back();
125 auto& ssg0
= smp_service_groups
.back();
126 ssg0
.clients
.reserve(smp::count
);
127 for (unsigned i
= 0; i
!= smp::count
; ++i
) {
128 ssg0
.clients
.emplace_back(smp_service_group_semaphore::max_counter(), make_service_group_semaphore_exception_factory(0, i
, cpu
, {"default"}));
132 smp_service_group_semaphore
& get_smp_service_groups_semaphore(unsigned ssg_id
, shard_id t
) noexcept
{
133 return smp_service_groups
[ssg_id
].clients
[t
];