2 * Copyright (c) 2009-2012 Red Hat, Inc.
6 * Author: Steven Dake (sdake@redhat.com)
8 * This software licensed under BSD license, the text of which follows:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the MontaVista Software, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
36 #include <sys/types.h>
37 #include <sys/socket.h>
39 #include <sys/ioctl.h>
40 #include <netinet/in.h>
48 #include <arpa/inet.h>
50 #include <corosync/corotypes.h>
51 #include <corosync/swab.h>
52 #include <corosync/totem/totempg.h>
53 #include <corosync/totem/totem.h>
54 #include <corosync/logsys.h>
55 #include <qb/qbipc_common.h>
61 LOGSYS_DECLARE_SUBSYS ("SYNC");
63 #define MESSAGE_REQ_SYNC_BARRIER 0
64 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
66 enum sync_process_state
{
72 SYNC_SERVICELIST_BUILD
,
77 struct service_entry
{
80 const unsigned int *trans_list
,
81 size_t trans_list_entries
,
82 const unsigned int *member_list
,
83 size_t member_list_entries
,
84 const struct memb_ring_id
*ring_id
);
85 void (*sync_abort
) (void);
86 int (*sync_process
) (void);
87 void (*sync_activate
) (void);
88 enum sync_process_state state
;
92 struct processor_entry
{
97 struct req_exec_service_build_message
{
98 struct qb_ipc_request_header header
__attribute__((aligned(8)));
99 struct memb_ring_id ring_id
__attribute__((aligned(8)));
100 int service_list_entries
__attribute__((aligned(8)));
101 int service_list
[128] __attribute__((aligned(8)));
104 struct req_exec_barrier_message
{
105 struct qb_ipc_request_header header
__attribute__((aligned(8)));
106 struct memb_ring_id ring_id
__attribute__((aligned(8)));
109 static enum sync_state my_state
= SYNC_BARRIER
;
111 static struct memb_ring_id my_ring_id
;
113 static int my_processing_idx
= 0;
115 static hdb_handle_t my_schedwrk_handle
;
117 static struct processor_entry my_processor_list
[PROCESSOR_COUNT_MAX
];
119 static unsigned int my_member_list
[PROCESSOR_COUNT_MAX
];
121 static unsigned int my_trans_list
[PROCESSOR_COUNT_MAX
];
123 static size_t my_member_list_entries
= 0;
125 static size_t my_trans_list_entries
= 0;
127 static int my_processor_list_entries
= 0;
129 static struct service_entry my_service_list
[SERVICES_COUNT_MAX
];
131 static int my_service_list_entries
= 0;
133 static void (*sync_synchronization_completed
) (void);
135 static void sync_deliver_fn (
138 unsigned int msg_len
,
139 int endian_conversion_required
);
141 static int schedwrk_processor (const void *context
);
143 static void sync_process_enter (void);
145 static void sync_process_call_init (void);
147 static struct totempg_group sync_group
= {
152 static void *sync_group_handle
;
154 int (*my_sync_callbacks_retrieve
) (
156 struct sync_callbacks
*callbacks
);
159 int (*sync_callbacks_retrieve
) (
161 struct sync_callbacks
*callbacks
),
162 void (*synchronization_completed
) (void))
166 res
= totempg_groups_initialize (
171 log_printf (LOGSYS_LEVEL_ERROR
,
172 "Couldn't initialize groups interface.");
176 res
= totempg_groups_join (
181 log_printf (LOGSYS_LEVEL_ERROR
, "Couldn't join group.");
185 sync_synchronization_completed
= synchronization_completed
;
186 my_sync_callbacks_retrieve
= sync_callbacks_retrieve
;
191 static void sync_barrier_handler (unsigned int nodeid
, const void *msg
)
193 const struct req_exec_barrier_message
*req_exec_barrier_message
= msg
;
195 int barrier_reached
= 1;
197 if (memcmp (&my_ring_id
, &req_exec_barrier_message
->ring_id
,
198 sizeof (struct memb_ring_id
)) != 0) {
200 log_printf (LOGSYS_LEVEL_DEBUG
, "barrier for old ring - discarding");
203 for (i
= 0; i
< my_processor_list_entries
; i
++) {
204 if (my_processor_list
[i
].nodeid
== nodeid
) {
205 my_processor_list
[i
].received
= 1;
208 for (i
= 0; i
< my_processor_list_entries
; i
++) {
209 if (my_processor_list
[i
].received
== 0) {
213 if (barrier_reached
) {
214 log_printf (LOGSYS_LEVEL_DEBUG
, "Committing synchronization for %s",
215 my_service_list
[my_processing_idx
].name
);
216 my_service_list
[my_processing_idx
].state
= ACTIVATE
;
218 if (my_sync_callbacks_retrieve(my_service_list
[my_processing_idx
].service_id
, NULL
) != -1) {
219 my_service_list
[my_processing_idx
].sync_activate ();
222 my_processing_idx
+= 1;
223 if (my_service_list_entries
== my_processing_idx
) {
224 sync_synchronization_completed ();
226 sync_process_enter ();
231 static void dummy_sync_abort (void)
235 static int dummy_sync_process (void)
240 static void dummy_sync_activate (void)
244 static int service_entry_compare (const void *a
, const void *b
)
246 const struct service_entry
*service_entry_a
= a
;
247 const struct service_entry
*service_entry_b
= b
;
249 return (service_entry_a
->service_id
> service_entry_b
->service_id
);
252 static void sync_service_build_handler (unsigned int nodeid
, const void *msg
)
254 const struct req_exec_service_build_message
*req_exec_service_build_message
= msg
;
256 int barrier_reached
= 1;
258 int qsort_trigger
= 0;
260 if (memcmp (&my_ring_id
, &req_exec_service_build_message
->ring_id
,
261 sizeof (struct memb_ring_id
)) != 0) {
262 log_printf (LOGSYS_LEVEL_DEBUG
, "service build for old ring - discarding");
265 for (i
= 0; i
< req_exec_service_build_message
->service_list_entries
; i
++) {
268 for (j
= 0; j
< my_service_list_entries
; j
++) {
269 if (req_exec_service_build_message
->service_list
[i
] ==
270 my_service_list
[j
].service_id
) {
276 my_service_list
[my_service_list_entries
].state
= PROCESS
;
277 my_service_list
[my_service_list_entries
].service_id
=
278 req_exec_service_build_message
->service_list
[i
];
279 sprintf (my_service_list
[my_service_list_entries
].name
,
280 "Unknown External Service (id = %d)\n",
281 req_exec_service_build_message
->service_list
[i
]);
282 my_service_list
[my_service_list_entries
].sync_init
=
284 my_service_list
[my_service_list_entries
].sync_abort
=
286 my_service_list
[my_service_list_entries
].sync_process
=
288 my_service_list
[my_service_list_entries
].sync_activate
=
290 my_service_list_entries
+= 1;
296 qsort (my_service_list
, my_service_list_entries
,
297 sizeof (struct service_entry
), service_entry_compare
);
299 for (i
= 0; i
< my_processor_list_entries
; i
++) {
300 if (my_processor_list
[i
].nodeid
== nodeid
) {
301 my_processor_list
[i
].received
= 1;
304 for (i
= 0; i
< my_processor_list_entries
; i
++) {
305 if (my_processor_list
[i
].received
== 0) {
309 if (barrier_reached
) {
310 log_printf (LOGSYS_LEVEL_DEBUG
, "enter sync process");
311 sync_process_enter ();
315 static void sync_deliver_fn (
318 unsigned int msg_len
,
319 int endian_conversion_required
)
321 struct qb_ipc_request_header
*header
= (struct qb_ipc_request_header
*)msg
;
323 switch (header
->id
) {
324 case MESSAGE_REQ_SYNC_BARRIER
:
325 sync_barrier_handler (nodeid
, msg
);
327 case MESSAGE_REQ_SYNC_SERVICE_BUILD
:
328 sync_service_build_handler (nodeid
, msg
);
333 static void barrier_message_transmit (void)
336 struct req_exec_barrier_message req_exec_barrier_message
;
338 memset(&req_exec_barrier_message
, 0, sizeof(req_exec_barrier_message
));
340 req_exec_barrier_message
.header
.size
= sizeof (struct req_exec_barrier_message
);
341 req_exec_barrier_message
.header
.id
= MESSAGE_REQ_SYNC_BARRIER
;
343 memcpy (&req_exec_barrier_message
.ring_id
, &my_ring_id
,
344 sizeof (struct memb_ring_id
));
346 iovec
.iov_base
= (char *)&req_exec_barrier_message
;
347 iovec
.iov_len
= sizeof (req_exec_barrier_message
);
349 (void)totempg_groups_mcast_joined (sync_group_handle
,
350 &iovec
, 1, TOTEMPG_AGREED
);
353 static void service_build_message_transmit (struct req_exec_service_build_message
*service_build_message
)
357 service_build_message
->header
.size
= sizeof (struct req_exec_service_build_message
);
358 service_build_message
->header
.id
= MESSAGE_REQ_SYNC_SERVICE_BUILD
;
360 memcpy (&service_build_message
->ring_id
, &my_ring_id
,
361 sizeof (struct memb_ring_id
));
363 iovec
.iov_base
= (void *)service_build_message
;
364 iovec
.iov_len
= sizeof (struct req_exec_service_build_message
);
366 (void)totempg_groups_mcast_joined (sync_group_handle
,
367 &iovec
, 1, TOTEMPG_AGREED
);
370 static void sync_barrier_enter (void)
372 my_state
= SYNC_BARRIER
;
373 barrier_message_transmit ();
376 static void sync_process_call_init (void)
378 unsigned int old_trans_list
[PROCESSOR_COUNT_MAX
];
379 size_t old_trans_list_entries
= 0;
383 memcpy (old_trans_list
, my_trans_list
, my_trans_list_entries
*
384 sizeof (unsigned int));
385 old_trans_list_entries
= my_trans_list_entries
;
387 my_trans_list_entries
= 0;
388 for (o
= 0; o
< old_trans_list_entries
; o
++) {
389 for (m
= 0; m
< my_member_list_entries
; m
++) {
390 if (old_trans_list
[o
] == my_member_list
[m
]) {
391 my_trans_list
[my_trans_list_entries
] = my_member_list
[m
];
392 my_trans_list_entries
++;
398 for (i
= 0; i
< my_service_list_entries
; i
++) {
399 if (my_sync_callbacks_retrieve(my_service_list
[i
].service_id
, NULL
) != -1) {
400 my_service_list
[i
].sync_init (my_trans_list
,
401 my_trans_list_entries
, my_member_list
,
402 my_member_list_entries
,
408 static void sync_process_enter (void)
412 my_state
= SYNC_PROCESS
;
417 if (my_service_list_entries
== 0) {
418 my_state
= SYNC_SERVICELIST_BUILD
;
419 sync_synchronization_completed ();
422 for (i
= 0; i
< my_processor_list_entries
; i
++) {
423 my_processor_list
[i
].received
= 0;
426 schedwrk_create (&my_schedwrk_handle
,
431 static void sync_servicelist_build_enter (
432 const unsigned int *member_list
,
433 size_t member_list_entries
,
434 const struct memb_ring_id
*ring_id
)
436 struct req_exec_service_build_message service_build
;
439 struct sync_callbacks sync_callbacks
;
441 memset(&service_build
, 0, sizeof(service_build
));
443 my_state
= SYNC_SERVICELIST_BUILD
;
444 for (i
= 0; i
< member_list_entries
; i
++) {
445 my_processor_list
[i
].nodeid
= member_list
[i
];
446 my_processor_list
[i
].received
= 0;
448 my_processor_list_entries
= member_list_entries
;
450 memcpy (my_member_list
, member_list
,
451 member_list_entries
* sizeof (unsigned int));
452 my_member_list_entries
= member_list_entries
;
454 my_processing_idx
= 0;
456 memset(my_service_list
, 0, sizeof (struct service_entry
) * SERVICES_COUNT_MAX
);
457 my_service_list_entries
= 0;
459 for (i
= 0; i
< SERVICES_COUNT_MAX
; i
++) {
460 res
= my_sync_callbacks_retrieve (i
, &sync_callbacks
);
464 if (sync_callbacks
.sync_init
== NULL
) {
467 my_service_list
[my_service_list_entries
].state
= PROCESS
;
468 my_service_list
[my_service_list_entries
].service_id
= i
;
470 assert(strlen(sync_callbacks
.name
) < sizeof(my_service_list
[my_service_list_entries
].name
));
472 strcpy (my_service_list
[my_service_list_entries
].name
,
473 sync_callbacks
.name
);
474 my_service_list
[my_service_list_entries
].sync_init
= sync_callbacks
.sync_init
;
475 my_service_list
[my_service_list_entries
].sync_process
= sync_callbacks
.sync_process
;
476 my_service_list
[my_service_list_entries
].sync_abort
= sync_callbacks
.sync_abort
;
477 my_service_list
[my_service_list_entries
].sync_activate
= sync_callbacks
.sync_activate
;
478 my_service_list_entries
+= 1;
481 for (i
= 0; i
< my_service_list_entries
; i
++) {
482 service_build
.service_list
[i
] =
483 my_service_list
[i
].service_id
;
485 service_build
.service_list_entries
= my_service_list_entries
;
487 service_build_message_transmit (&service_build
);
489 log_printf (LOGSYS_LEVEL_DEBUG
, "call init for locally known services");
490 sync_process_call_init ();
493 static int schedwrk_processor (const void *context
)
497 if (my_service_list
[my_processing_idx
].state
== PROCESS
) {
498 if (my_sync_callbacks_retrieve(my_service_list
[my_processing_idx
].service_id
, NULL
) != -1) {
499 res
= my_service_list
[my_processing_idx
].sync_process ();
504 sync_barrier_enter();
513 const unsigned int *member_list
,
514 size_t member_list_entries
,
515 const struct memb_ring_id
*ring_id
)
518 memcpy (&my_ring_id
, ring_id
, sizeof (struct memb_ring_id
));
520 sync_servicelist_build_enter (member_list
, member_list_entries
,
524 void sync_save_transitional (
525 const unsigned int *member_list
,
526 size_t member_list_entries
,
527 const struct memb_ring_id
*ring_id
)
530 memcpy (my_trans_list
, member_list
, member_list_entries
*
531 sizeof (unsigned int));
532 my_trans_list_entries
= member_list_entries
;
535 void sync_abort (void)
538 if (my_state
== SYNC_PROCESS
) {
539 schedwrk_destroy (my_schedwrk_handle
);
540 if (my_sync_callbacks_retrieve(my_service_list
[my_processing_idx
].service_id
, NULL
) != -1) {
541 my_service_list
[my_processing_idx
].sync_abort ();
545 /* this will cause any "old" barrier messages from causing
548 memset (&my_ring_id
, 0, sizeof (struct memb_ring_id
));