]> git.proxmox.com Git - mirror_corosync.git/blob - exec/sync.c
config: Don't free pointers used by transports
[mirror_corosync.git] / exec / sync.c
1 /*
2 * Copyright (c) 2009-2012 Red Hat, Inc.
3 *
4 * All rights reserved.
5 *
6 * Author: Steven Dake (sdake@redhat.com)
7 *
8 * This software licensed under BSD license, the text of which follows:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
12 *
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the MontaVista Software, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
33 */
34 #include <config.h>
35
36 #include <sys/types.h>
37 #include <sys/socket.h>
38 #include <sys/un.h>
39 #include <sys/ioctl.h>
40 #include <netinet/in.h>
41 #include <sys/uio.h>
42 #include <unistd.h>
43 #include <fcntl.h>
44 #include <stdlib.h>
45 #include <stdio.h>
46 #include <errno.h>
47 #include <time.h>
48 #include <arpa/inet.h>
49
50 #include <corosync/corotypes.h>
51 #include <corosync/swab.h>
52 #include <corosync/totem/totempg.h>
53 #include <corosync/totem/totem.h>
54 #include <corosync/logsys.h>
55 #include <qb/qbipc_common.h>
56 #include "schedwrk.h"
57 #include "quorum.h"
58 #include "sync.h"
59 #include "main.h"
60
61 LOGSYS_DECLARE_SUBSYS ("SYNC");
62
63 #define MESSAGE_REQ_SYNC_BARRIER 0
64 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
65
66 enum sync_process_state {
67 PROCESS,
68 ACTIVATE
69 };
70
71 enum sync_state {
72 SYNC_SERVICELIST_BUILD,
73 SYNC_PROCESS,
74 SYNC_BARRIER
75 };
76
77 struct service_entry {
78 int service_id;
79 void (*sync_init) (
80 const unsigned int *trans_list,
81 size_t trans_list_entries,
82 const unsigned int *member_list,
83 size_t member_list_entries,
84 const struct memb_ring_id *ring_id);
85 void (*sync_abort) (void);
86 int (*sync_process) (void);
87 void (*sync_activate) (void);
88 enum sync_process_state state;
89 char name[128];
90 };
91
92 struct processor_entry {
93 int nodeid;
94 int received;
95 };
96
97 struct req_exec_service_build_message {
98 struct qb_ipc_request_header header __attribute__((aligned(8)));
99 struct memb_ring_id ring_id __attribute__((aligned(8)));
100 int service_list_entries __attribute__((aligned(8)));
101 int service_list[128] __attribute__((aligned(8)));
102 };
103
104 struct req_exec_barrier_message {
105 struct qb_ipc_request_header header __attribute__((aligned(8)));
106 struct memb_ring_id ring_id __attribute__((aligned(8)));
107 };
108
109 static enum sync_state my_state = SYNC_BARRIER;
110
111 static struct memb_ring_id my_ring_id;
112
113 static int my_processing_idx = 0;
114
115 static hdb_handle_t my_schedwrk_handle;
116
117 static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX];
118
119 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
120
121 static unsigned int my_trans_list[PROCESSOR_COUNT_MAX];
122
123 static size_t my_member_list_entries = 0;
124
125 static size_t my_trans_list_entries = 0;
126
127 static int my_processor_list_entries = 0;
128
129 static struct service_entry my_service_list[SERVICES_COUNT_MAX];
130
131 static int my_service_list_entries = 0;
132
133 static void (*sync_synchronization_completed) (void);
134
135 static void sync_deliver_fn (
136 unsigned int nodeid,
137 const void *msg,
138 unsigned int msg_len,
139 int endian_conversion_required);
140
141 static int schedwrk_processor (const void *context);
142
143 static void sync_process_enter (void);
144
145 static void sync_process_call_init (void);
146
147 static struct totempg_group sync_group = {
148 .group = "sync",
149 .group_len = 4
150 };
151
152 static void *sync_group_handle;
153
154 int (*my_sync_callbacks_retrieve) (
155 int service_id,
156 struct sync_callbacks *callbacks);
157
158 int sync_init (
159 int (*sync_callbacks_retrieve) (
160 int service_id,
161 struct sync_callbacks *callbacks),
162 void (*synchronization_completed) (void))
163 {
164 unsigned int res;
165
166 res = totempg_groups_initialize (
167 &sync_group_handle,
168 sync_deliver_fn,
169 NULL);
170 if (res == -1) {
171 log_printf (LOGSYS_LEVEL_ERROR,
172 "Couldn't initialize groups interface.");
173 return (-1);
174 }
175
176 res = totempg_groups_join (
177 sync_group_handle,
178 &sync_group,
179 1);
180 if (res == -1) {
181 log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.");
182 return (-1);
183 }
184
185 sync_synchronization_completed = synchronization_completed;
186 my_sync_callbacks_retrieve = sync_callbacks_retrieve;
187
188 return (0);
189 }
190
191 static void sync_barrier_handler (unsigned int nodeid, const void *msg)
192 {
193 const struct req_exec_barrier_message *req_exec_barrier_message = msg;
194 int i;
195 int barrier_reached = 1;
196
197 if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
198 sizeof (struct memb_ring_id)) != 0) {
199
200 log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding");
201 return;
202 }
203 for (i = 0; i < my_processor_list_entries; i++) {
204 if (my_processor_list[i].nodeid == nodeid) {
205 my_processor_list[i].received = 1;
206 }
207 }
208 for (i = 0; i < my_processor_list_entries; i++) {
209 if (my_processor_list[i].received == 0) {
210 barrier_reached = 0;
211 }
212 }
213 if (barrier_reached) {
214 log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s",
215 my_service_list[my_processing_idx].name);
216 my_service_list[my_processing_idx].state = ACTIVATE;
217
218 if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
219 my_service_list[my_processing_idx].sync_activate ();
220 }
221
222 my_processing_idx += 1;
223 if (my_service_list_entries == my_processing_idx) {
224 sync_synchronization_completed ();
225 } else {
226 sync_process_enter ();
227 }
228 }
229 }
230
231 static void dummy_sync_abort (void)
232 {
233 }
234
235 static int dummy_sync_process (void)
236 {
237 return (0);
238 }
239
240 static void dummy_sync_activate (void)
241 {
242 }
243
244 static int service_entry_compare (const void *a, const void *b)
245 {
246 const struct service_entry *service_entry_a = a;
247 const struct service_entry *service_entry_b = b;
248
249 return (service_entry_a->service_id > service_entry_b->service_id);
250 }
251
252 static void sync_service_build_handler (unsigned int nodeid, const void *msg)
253 {
254 const struct req_exec_service_build_message *req_exec_service_build_message = msg;
255 int i, j;
256 int barrier_reached = 1;
257 int found;
258 int qsort_trigger = 0;
259
260 if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
261 sizeof (struct memb_ring_id)) != 0) {
262 log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding");
263 return;
264 }
265 for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
266
267 found = 0;
268 for (j = 0; j < my_service_list_entries; j++) {
269 if (req_exec_service_build_message->service_list[i] ==
270 my_service_list[j].service_id) {
271 found = 1;
272 break;
273 }
274 }
275 if (found == 0) {
276 my_service_list[my_service_list_entries].state = PROCESS;
277 my_service_list[my_service_list_entries].service_id =
278 req_exec_service_build_message->service_list[i];
279 sprintf (my_service_list[my_service_list_entries].name,
280 "Unknown External Service (id = %d)\n",
281 req_exec_service_build_message->service_list[i]);
282 my_service_list[my_service_list_entries].sync_init =
283 NULL;
284 my_service_list[my_service_list_entries].sync_abort =
285 dummy_sync_abort;
286 my_service_list[my_service_list_entries].sync_process =
287 dummy_sync_process;
288 my_service_list[my_service_list_entries].sync_activate =
289 dummy_sync_activate;
290 my_service_list_entries += 1;
291
292 qsort_trigger = 1;
293 }
294 }
295 if (qsort_trigger) {
296 qsort (my_service_list, my_service_list_entries,
297 sizeof (struct service_entry), service_entry_compare);
298 }
299 for (i = 0; i < my_processor_list_entries; i++) {
300 if (my_processor_list[i].nodeid == nodeid) {
301 my_processor_list[i].received = 1;
302 }
303 }
304 for (i = 0; i < my_processor_list_entries; i++) {
305 if (my_processor_list[i].received == 0) {
306 barrier_reached = 0;
307 }
308 }
309 if (barrier_reached) {
310 log_printf (LOGSYS_LEVEL_DEBUG, "enter sync process");
311 sync_process_enter ();
312 }
313 }
314
315 static void sync_deliver_fn (
316 unsigned int nodeid,
317 const void *msg,
318 unsigned int msg_len,
319 int endian_conversion_required)
320 {
321 struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg;
322
323 switch (header->id) {
324 case MESSAGE_REQ_SYNC_BARRIER:
325 sync_barrier_handler (nodeid, msg);
326 break;
327 case MESSAGE_REQ_SYNC_SERVICE_BUILD:
328 sync_service_build_handler (nodeid, msg);
329 break;
330 }
331 }
332
333 static void barrier_message_transmit (void)
334 {
335 struct iovec iovec;
336 struct req_exec_barrier_message req_exec_barrier_message;
337
338 memset(&req_exec_barrier_message, 0, sizeof(req_exec_barrier_message));
339
340 req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message);
341 req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER;
342
343 memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
344 sizeof (struct memb_ring_id));
345
346 iovec.iov_base = (char *)&req_exec_barrier_message;
347 iovec.iov_len = sizeof (req_exec_barrier_message);
348
349 (void)totempg_groups_mcast_joined (sync_group_handle,
350 &iovec, 1, TOTEMPG_AGREED);
351 }
352
353 static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message)
354 {
355 struct iovec iovec;
356
357 service_build_message->header.size = sizeof (struct req_exec_service_build_message);
358 service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD;
359
360 memcpy (&service_build_message->ring_id, &my_ring_id,
361 sizeof (struct memb_ring_id));
362
363 iovec.iov_base = (void *)service_build_message;
364 iovec.iov_len = sizeof (struct req_exec_service_build_message);
365
366 (void)totempg_groups_mcast_joined (sync_group_handle,
367 &iovec, 1, TOTEMPG_AGREED);
368 }
369
370 static void sync_barrier_enter (void)
371 {
372 my_state = SYNC_BARRIER;
373 barrier_message_transmit ();
374 }
375
376 static void sync_process_call_init (void)
377 {
378 unsigned int old_trans_list[PROCESSOR_COUNT_MAX];
379 size_t old_trans_list_entries = 0;
380 int o, m;
381 int i;
382
383 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
384 sizeof (unsigned int));
385 old_trans_list_entries = my_trans_list_entries;
386
387 my_trans_list_entries = 0;
388 for (o = 0; o < old_trans_list_entries; o++) {
389 for (m = 0; m < my_member_list_entries; m++) {
390 if (old_trans_list[o] == my_member_list[m]) {
391 my_trans_list[my_trans_list_entries] = my_member_list[m];
392 my_trans_list_entries++;
393 break;
394 }
395 }
396 }
397
398 for (i = 0; i < my_service_list_entries; i++) {
399 if (my_sync_callbacks_retrieve(my_service_list[i].service_id, NULL) != -1) {
400 my_service_list[i].sync_init (my_trans_list,
401 my_trans_list_entries, my_member_list,
402 my_member_list_entries,
403 &my_ring_id);
404 }
405 }
406 }
407
408 static void sync_process_enter (void)
409 {
410 int i;
411
412 my_state = SYNC_PROCESS;
413
414 /*
415 * No sync services
416 */
417 if (my_service_list_entries == 0) {
418 my_state = SYNC_SERVICELIST_BUILD;
419 sync_synchronization_completed ();
420 return;
421 }
422 for (i = 0; i < my_processor_list_entries; i++) {
423 my_processor_list[i].received = 0;
424 }
425
426 schedwrk_create (&my_schedwrk_handle,
427 schedwrk_processor,
428 NULL);
429 }
430
431 static void sync_servicelist_build_enter (
432 const unsigned int *member_list,
433 size_t member_list_entries,
434 const struct memb_ring_id *ring_id)
435 {
436 struct req_exec_service_build_message service_build;
437 int i;
438 int res;
439 struct sync_callbacks sync_callbacks;
440
441 memset(&service_build, 0, sizeof(service_build));
442
443 my_state = SYNC_SERVICELIST_BUILD;
444 for (i = 0; i < member_list_entries; i++) {
445 my_processor_list[i].nodeid = member_list[i];
446 my_processor_list[i].received = 0;
447 }
448 my_processor_list_entries = member_list_entries;
449
450 memcpy (my_member_list, member_list,
451 member_list_entries * sizeof (unsigned int));
452 my_member_list_entries = member_list_entries;
453
454 my_processing_idx = 0;
455
456 memset(my_service_list, 0, sizeof (struct service_entry) * SERVICES_COUNT_MAX);
457 my_service_list_entries = 0;
458
459 for (i = 0; i < SERVICES_COUNT_MAX; i++) {
460 res = my_sync_callbacks_retrieve (i, &sync_callbacks);
461 if (res == -1) {
462 continue;
463 }
464 if (sync_callbacks.sync_init == NULL) {
465 continue;
466 }
467 my_service_list[my_service_list_entries].state = PROCESS;
468 my_service_list[my_service_list_entries].service_id = i;
469
470 assert(strlen(sync_callbacks.name) < sizeof(my_service_list[my_service_list_entries].name));
471
472 strcpy (my_service_list[my_service_list_entries].name,
473 sync_callbacks.name);
474 my_service_list[my_service_list_entries].sync_init = sync_callbacks.sync_init;
475 my_service_list[my_service_list_entries].sync_process = sync_callbacks.sync_process;
476 my_service_list[my_service_list_entries].sync_abort = sync_callbacks.sync_abort;
477 my_service_list[my_service_list_entries].sync_activate = sync_callbacks.sync_activate;
478 my_service_list_entries += 1;
479 }
480
481 for (i = 0; i < my_service_list_entries; i++) {
482 service_build.service_list[i] =
483 my_service_list[i].service_id;
484 }
485 service_build.service_list_entries = my_service_list_entries;
486
487 service_build_message_transmit (&service_build);
488
489 log_printf (LOGSYS_LEVEL_DEBUG, "call init for locally known services");
490 sync_process_call_init ();
491 }
492
493 static int schedwrk_processor (const void *context)
494 {
495 int res = 0;
496
497 if (my_service_list[my_processing_idx].state == PROCESS) {
498 if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
499 res = my_service_list[my_processing_idx].sync_process ();
500 } else {
501 res = 0;
502 }
503 if (res == 0) {
504 sync_barrier_enter();
505 } else {
506 return (-1);
507 }
508 }
509 return (0);
510 }
511
512 void sync_start (
513 const unsigned int *member_list,
514 size_t member_list_entries,
515 const struct memb_ring_id *ring_id)
516 {
517 ENTER();
518 memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id));
519
520 sync_servicelist_build_enter (member_list, member_list_entries,
521 ring_id);
522 }
523
524 void sync_save_transitional (
525 const unsigned int *member_list,
526 size_t member_list_entries,
527 const struct memb_ring_id *ring_id)
528 {
529 ENTER();
530 memcpy (my_trans_list, member_list, member_list_entries *
531 sizeof (unsigned int));
532 my_trans_list_entries = member_list_entries;
533 }
534
535 void sync_abort (void)
536 {
537 ENTER();
538 if (my_state == SYNC_PROCESS) {
539 schedwrk_destroy (my_schedwrk_handle);
540 if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
541 my_service_list[my_processing_idx].sync_abort ();
542 }
543 }
544
545 /* this will cause any "old" barrier messages from causing
546 * problems.
547 */
548 memset (&my_ring_id, 0, sizeof (struct memb_ring_id));
549 }