]> git.proxmox.com Git - mirror_corosync.git/blob - lib/sam.c
Merge pull request #2 from dfcluster/master
[mirror_corosync.git] / lib / sam.c
1 /*
2 * Copyright (c) 2009-2011 Red Hat, Inc.
3 *
4 * All rights reserved.
5 *
6 * Author: Jan Friesse (jfriesse@redhat.com)
7 *
8 * This software licensed under BSD license, the text of which follows:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
12 *
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the Red Hat, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35 /*
36 * Provides a SAM API
37 */
38
39 #include <config.h>
40
41 #include <limits.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <unistd.h>
45 #include <sys/time.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <errno.h>
49
50 #include <corosync/corotypes.h>
51 #include <qb/qbipcc.h>
52 #include <corosync/corodefs.h>
53 #include <corosync/cmap.h>
54 #include <corosync/hdb.h>
55 #include <corosync/quorum.h>
56
57 #include <corosync/sam.h>
58
59 #include "util.h"
60
61 #include <stdio.h>
62 #include <sys/wait.h>
63 #include <signal.h>
64
65 #define SAM_CMAP_S_FAILED "failed"
66 #define SAM_CMAP_S_REGISTERED "stopped"
67 #define SAM_CMAP_S_STARTED "running"
68 #define SAM_CMAP_S_Q_WAIT "waiting for quorum"
69
70 #define SAM_RP_MASK_Q(pol) (pol & (~SAM_RECOVERY_POLICY_QUORUM))
71 #define SAM_RP_MASK_C(pol) (pol & (~SAM_RECOVERY_POLICY_CMAP))
72 #define SAM_RP_MASK(pol) (pol & (~(SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_CMAP)))
73
74 enum sam_internal_status_t {
75 SAM_INTERNAL_STATUS_NOT_INITIALIZED = 0,
76 SAM_INTERNAL_STATUS_INITIALIZED,
77 SAM_INTERNAL_STATUS_REGISTERED,
78 SAM_INTERNAL_STATUS_STARTED,
79 SAM_INTERNAL_STATUS_FINALIZED
80 };
81
82 enum sam_command_t {
83 SAM_COMMAND_START,
84 SAM_COMMAND_STOP,
85 SAM_COMMAND_HB,
86 SAM_COMMAND_DATA_STORE,
87 SAM_COMMAND_WARN_SIGNAL_SET,
88 SAM_COMMAND_MARK_FAILED,
89 };
90
91 enum sam_reply_t {
92 SAM_REPLY_OK,
93 SAM_REPLY_ERROR,
94 };
95
96 enum sam_parent_action_t {
97 SAM_PARENT_ACTION_ERROR,
98 SAM_PARENT_ACTION_RECOVERY,
99 SAM_PARENT_ACTION_QUIT,
100 SAM_PARENT_ACTION_CONTINUE
101 };
102
103 enum sam_cmap_key_t {
104 SAM_CMAP_KEY_RECOVERY,
105 SAM_CMAP_KEY_HC_PERIOD,
106 SAM_CMAP_KEY_LAST_HC,
107 SAM_CMAP_KEY_STATE,
108 };
109
110 static struct {
111 int time_interval;
112 sam_recovery_policy_t recovery_policy;
113 enum sam_internal_status_t internal_status;
114 unsigned int instance_id;
115 int child_fd_out;
116 int child_fd_in;
117 int term_send;
118 int warn_signal;
119 int am_i_child;
120
121 sam_hc_callback_t hc_callback;
122 pthread_t cb_thread;
123 int cb_rpipe_fd, cb_wpipe_fd;
124 int cb_registered;
125
126 void *user_data;
127 size_t user_data_size;
128 size_t user_data_allocated;
129
130 pthread_mutex_t lock;
131
132 quorum_handle_t quorum_handle;
133 uint32_t quorate;
134 int quorum_fd;
135
136 cmap_handle_t cmap_handle;
137 char cmap_pid_path[CMAP_KEYNAME_MAXLEN];
138 } sam_internal_data;
139
140 extern const char *__progname;
141
142 static cs_error_t sam_cmap_update_key (enum sam_cmap_key_t key, const char *value)
143 {
144 cs_error_t err;
145 const char *svalue;
146 uint64_t hc_period, last_hc;
147 const char *ssvalue[] = { [SAM_RECOVERY_POLICY_QUIT] = "quit", [SAM_RECOVERY_POLICY_RESTART] = "restart" };
148 char key_name[CMAP_KEYNAME_MAXLEN];
149
150 switch (key) {
151 case SAM_CMAP_KEY_RECOVERY:
152 svalue = ssvalue[SAM_RP_MASK (sam_internal_data.recovery_policy)];
153
154 snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path,
155 "recovery");
156 if ((err = cmap_set_string(sam_internal_data.cmap_handle, key_name, svalue)) != CS_OK) {
157 goto exit_error;
158 }
159 break;
160 case SAM_CMAP_KEY_HC_PERIOD:
161 hc_period = sam_internal_data.time_interval;
162
163 snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path,
164 "poll_period");
165 if ((err = cmap_set_uint64(sam_internal_data.cmap_handle, key_name, hc_period)) != CS_OK) {
166 goto exit_error;
167 }
168 break;
169 case SAM_CMAP_KEY_LAST_HC:
170 last_hc = cs_timestamp_get();
171
172 snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path,
173 "last_updated");
174 if ((err = cmap_set_uint64(sam_internal_data.cmap_handle, key_name, last_hc)) != CS_OK) {
175 goto exit_error;
176 }
177 break;
178 case SAM_CMAP_KEY_STATE:
179 svalue = value;
180 snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path,
181 "state");
182 if ((err = cmap_set_string(sam_internal_data.cmap_handle, key_name, svalue)) != CS_OK) {
183 goto exit_error;
184 }
185 break;
186 }
187
188 return (CS_OK);
189
190 exit_error:
191 return (err);
192 }
193
194 static cs_error_t sam_cmap_destroy_pid_path (void)
195 {
196 cmap_iter_handle_t iter;
197 cs_error_t err;
198 char key_name[CMAP_KEYNAME_MAXLEN];
199
200 err = cmap_iter_init(sam_internal_data.cmap_handle, sam_internal_data.cmap_pid_path, &iter);
201 if (err != CS_OK) {
202 goto error_exit;
203 }
204
205 while ((err = cmap_iter_next(sam_internal_data.cmap_handle, iter, key_name, NULL, NULL)) == CS_OK) {
206 cmap_delete(sam_internal_data.cmap_handle, key_name);
207 }
208
209 err = cmap_iter_finalize(sam_internal_data.cmap_handle, iter);
210
211 error_exit:
212 return (err);
213 }
214
215 static cs_error_t sam_cmap_register (void)
216 {
217 cs_error_t err;
218 cmap_handle_t cmap_handle;
219
220 if ((err = cmap_initialize (&cmap_handle)) != CS_OK) {
221 return (err);
222 }
223
224 snprintf(sam_internal_data.cmap_pid_path, CMAP_KEYNAME_MAXLEN, "resources.process.%d.", getpid());
225
226 sam_internal_data.cmap_handle = cmap_handle;
227
228 if ((err = sam_cmap_update_key (SAM_CMAP_KEY_RECOVERY, NULL)) != CS_OK) {
229 goto destroy_finalize_error;
230 }
231
232 if ((err = sam_cmap_update_key (SAM_CMAP_KEY_HC_PERIOD, NULL)) != CS_OK) {
233 goto destroy_finalize_error;
234 }
235
236 return (CS_OK);
237
238 destroy_finalize_error:
239 sam_cmap_destroy_pid_path ();
240 cmap_finalize (cmap_handle);
241 return (err);
242 }
243
244 static void quorum_notification_fn (
245 quorum_handle_t handle,
246 uint32_t quorate,
247 uint64_t ring_id,
248 uint32_t view_list_entries,
249 uint32_t *view_list)
250 {
251 sam_internal_data.quorate = quorate;
252 }
253
254 cs_error_t sam_initialize (
255 int time_interval,
256 sam_recovery_policy_t recovery_policy)
257 {
258 quorum_callbacks_t quorum_callbacks;
259 uint32_t quorum_type;
260 cs_error_t err;
261
262 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_NOT_INITIALIZED) {
263 return (CS_ERR_BAD_HANDLE);
264 }
265
266 if (SAM_RP_MASK (recovery_policy) != SAM_RECOVERY_POLICY_QUIT &&
267 SAM_RP_MASK (recovery_policy) != SAM_RECOVERY_POLICY_RESTART) {
268 return (CS_ERR_INVALID_PARAM);
269 }
270
271 if (recovery_policy & SAM_RECOVERY_POLICY_QUORUM) {
272 /*
273 * Initialize quorum
274 */
275 quorum_callbacks.quorum_notify_fn = quorum_notification_fn;
276 if ((err = quorum_initialize (&sam_internal_data.quorum_handle, &quorum_callbacks, &quorum_type)) != CS_OK) {
277 goto exit_error;
278 }
279
280 if ((err = quorum_trackstart (sam_internal_data.quorum_handle, CS_TRACK_CHANGES)) != CS_OK) {
281 goto exit_error_quorum;
282 }
283
284 if ((err = quorum_fd_get (sam_internal_data.quorum_handle, &sam_internal_data.quorum_fd)) != CS_OK) {
285 goto exit_error_quorum;
286 }
287
288 /*
289 * Dispatch initial quorate state
290 */
291 if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) {
292 goto exit_error_quorum;
293 }
294 }
295 sam_internal_data.recovery_policy = recovery_policy;
296
297 sam_internal_data.time_interval = time_interval;
298
299 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_INITIALIZED;
300
301 sam_internal_data.warn_signal = SIGTERM;
302
303 sam_internal_data.am_i_child = 0;
304
305 sam_internal_data.user_data = NULL;
306 sam_internal_data.user_data_size = 0;
307 sam_internal_data.user_data_allocated = 0;
308
309 pthread_mutex_init (&sam_internal_data.lock, NULL);
310
311 return (CS_OK);
312
313 exit_error_quorum:
314 quorum_finalize (sam_internal_data.quorum_handle);
315 exit_error:
316 return (err);
317 }
318
319 /*
320 * Wrapper on top of write(2) function. It handles EAGAIN and EINTR states and sends whole buffer if possible.
321 */
322 static size_t sam_safe_write (
323 int d,
324 const void *buf,
325 size_t nbyte)
326 {
327 ssize_t bytes_write;
328 ssize_t tmp_bytes_write;
329
330 bytes_write = 0;
331
332 do {
333 tmp_bytes_write = write (d, (const char *)buf + bytes_write,
334 (nbyte - bytes_write > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_write);
335
336 if (tmp_bytes_write == -1) {
337 if (!(errno == EAGAIN || errno == EINTR))
338 return -1;
339 } else {
340 bytes_write += tmp_bytes_write;
341 }
342 } while (bytes_write != nbyte);
343
344 return (bytes_write);
345 }
346
347 /*
348 * Wrapper on top of read(2) function. It handles EAGAIN and EINTR states and reads whole buffer if possible.
349 */
350 static size_t sam_safe_read (
351 int d,
352 void *buf,
353 size_t nbyte)
354 {
355 ssize_t bytes_read;
356 ssize_t tmp_bytes_read;
357
358 bytes_read = 0;
359
360 do {
361 tmp_bytes_read = read (d, (char *)buf + bytes_read,
362 (nbyte - bytes_read > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_read);
363
364 if (tmp_bytes_read == -1) {
365 if (!(errno == EAGAIN || errno == EINTR))
366 return -1;
367 } else {
368 bytes_read += tmp_bytes_read;
369 }
370
371 } while (bytes_read != nbyte && tmp_bytes_read != 0);
372
373 return (bytes_read);
374 }
375
376 static cs_error_t sam_read_reply (
377 int child_fd_in)
378 {
379 char reply;
380 cs_error_t err;
381
382 if (sam_safe_read (sam_internal_data.child_fd_in, &reply, sizeof (reply)) != sizeof (reply)) {
383 return (CS_ERR_LIBRARY);
384 }
385
386 switch (reply) {
387 case SAM_REPLY_ERROR:
388 /*
389 * Read error and return that
390 */
391 if (sam_safe_read (sam_internal_data.child_fd_in, &err, sizeof (err)) != sizeof (err)) {
392 return (CS_ERR_LIBRARY);
393 }
394
395 return (err);
396 break;
397 case SAM_REPLY_OK:
398 /*
399 * Everything correct
400 */
401 break;
402 default:
403 return (CS_ERR_LIBRARY);
404 break;
405 }
406
407 return (CS_OK);
408 }
409
410 cs_error_t sam_data_getsize (size_t *size)
411 {
412 if (size == NULL) {
413 return (CS_ERR_INVALID_PARAM);
414 }
415
416 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
417 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
418 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
419
420 return (CS_ERR_BAD_HANDLE);
421 }
422
423 pthread_mutex_lock (&sam_internal_data.lock);
424
425 *size = sam_internal_data.user_data_size;
426
427 pthread_mutex_unlock (&sam_internal_data.lock);
428
429 return (CS_OK);
430 }
431
432 cs_error_t sam_data_restore (
433 void *data,
434 size_t size)
435 {
436 cs_error_t err;
437
438 err = CS_OK;
439
440 if (data == NULL) {
441 return (CS_ERR_INVALID_PARAM);
442 }
443
444 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
445 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
446 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
447
448 return (CS_ERR_BAD_HANDLE);
449 }
450
451 pthread_mutex_lock (&sam_internal_data.lock);
452
453 if (sam_internal_data.user_data_size == 0) {
454 err = CS_OK;
455
456 goto error_unlock;
457 }
458
459 if (size < sam_internal_data.user_data_size) {
460 err = CS_ERR_INVALID_PARAM;
461
462 goto error_unlock;
463 }
464
465 memcpy (data, sam_internal_data.user_data, sam_internal_data.user_data_size);
466
467 pthread_mutex_unlock (&sam_internal_data.lock);
468
469 return (CS_OK);
470
471 error_unlock:
472 pthread_mutex_unlock (&sam_internal_data.lock);
473
474 return (err);
475 }
476
477 cs_error_t sam_data_store (
478 const void *data,
479 size_t size)
480 {
481 cs_error_t err;
482 char command;
483 char *new_data;
484
485 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
486 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
487 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
488
489 return (CS_ERR_BAD_HANDLE);
490 }
491
492
493 if (data == NULL) {
494 size = 0;
495 }
496
497 pthread_mutex_lock (&sam_internal_data.lock);
498
499 if (sam_internal_data.am_i_child) {
500 /*
501 * We are child so we must send data to parent
502 */
503 command = SAM_COMMAND_DATA_STORE;
504 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
505 err = CS_ERR_LIBRARY;
506
507 goto error_unlock;
508 }
509
510 if (sam_safe_write (sam_internal_data.child_fd_out, &size, sizeof (size)) != sizeof (size)) {
511 err = CS_ERR_LIBRARY;
512
513 goto error_unlock;
514 }
515
516 if (data != NULL && sam_safe_write (sam_internal_data.child_fd_out, data, size) != size) {
517 err = CS_ERR_LIBRARY;
518
519 goto error_unlock;
520 }
521
522 /*
523 * And wait for reply
524 */
525 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
526 goto error_unlock;
527 }
528 }
529
530 /*
531 * We are parent or we received OK reply from parent -> do required action
532 */
533 if (data == NULL) {
534 free (sam_internal_data.user_data);
535 sam_internal_data.user_data = NULL;
536 sam_internal_data.user_data_allocated = 0;
537 sam_internal_data.user_data_size = 0;
538 } else {
539 if (sam_internal_data.user_data_allocated < size) {
540 if ((new_data = realloc (sam_internal_data.user_data, size)) == NULL) {
541 err = CS_ERR_NO_MEMORY;
542
543 goto error_unlock;
544 }
545
546 sam_internal_data.user_data_allocated = size;
547 } else {
548 new_data = sam_internal_data.user_data;
549 }
550 sam_internal_data.user_data = new_data;
551 sam_internal_data.user_data_size = size;
552
553 memcpy (sam_internal_data.user_data, data, size);
554 }
555
556 pthread_mutex_unlock (&sam_internal_data.lock);
557
558 return (CS_OK);
559
560 error_unlock:
561 pthread_mutex_unlock (&sam_internal_data.lock);
562
563 return (err);
564 }
565
566 cs_error_t sam_start (void)
567 {
568 char command;
569 cs_error_t err;
570 sam_recovery_policy_t recpol;
571
572 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) {
573 return (CS_ERR_BAD_HANDLE);
574 }
575
576 recpol = sam_internal_data.recovery_policy;
577
578 if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CMAP) {
579 pthread_mutex_lock (&sam_internal_data.lock);
580 }
581
582 command = SAM_COMMAND_START;
583
584 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
585 if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CMAP) {
586 pthread_mutex_unlock (&sam_internal_data.lock);
587 }
588
589 return (CS_ERR_LIBRARY);
590 }
591
592 if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CMAP) {
593 /*
594 * Wait for parent reply
595 */
596 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
597 pthread_mutex_unlock (&sam_internal_data.lock);
598
599 return (err);
600 }
601
602 pthread_mutex_unlock (&sam_internal_data.lock);
603 }
604
605 if (sam_internal_data.hc_callback)
606 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) != sizeof (command))
607 return (CS_ERR_LIBRARY);
608
609 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_STARTED;
610
611 return (CS_OK);
612 }
613
614 cs_error_t sam_stop (void)
615 {
616 char command;
617 cs_error_t err;
618
619 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
620 return (CS_ERR_BAD_HANDLE);
621 }
622
623 command = SAM_COMMAND_STOP;
624
625 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) {
626 pthread_mutex_lock (&sam_internal_data.lock);
627 }
628
629 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
630 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) {
631 pthread_mutex_unlock (&sam_internal_data.lock);
632 }
633
634 return (CS_ERR_LIBRARY);
635 }
636
637 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) {
638 /*
639 * Wait for parent reply
640 */
641 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
642 pthread_mutex_unlock (&sam_internal_data.lock);
643
644 return (err);
645 }
646
647 pthread_mutex_unlock (&sam_internal_data.lock);
648 }
649
650 if (sam_internal_data.hc_callback)
651 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) != sizeof (command))
652 return (CS_ERR_LIBRARY);
653
654 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_REGISTERED;
655
656 return (CS_OK);
657 }
658
659 cs_error_t sam_hc_send (void)
660 {
661 char command;
662
663 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
664 return (CS_ERR_BAD_HANDLE);
665 }
666
667 command = SAM_COMMAND_HB;
668
669 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command))
670 return (CS_ERR_LIBRARY);
671
672 return (CS_OK);
673 }
674
675 cs_error_t sam_finalize (void)
676 {
677 cs_error_t error;
678
679 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
680 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
681 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
682 return (CS_ERR_BAD_HANDLE);
683 }
684
685 if (sam_internal_data.internal_status == SAM_INTERNAL_STATUS_STARTED) {
686 error = sam_stop ();
687 if (error != CS_OK)
688 goto exit_error;
689 }
690
691 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_FINALIZED;
692
693 free (sam_internal_data.user_data);
694
695 exit_error:
696 return (CS_OK);
697 }
698
699 cs_error_t sam_mark_failed (void)
700 {
701 char command;
702
703 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED &&
704 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) {
705 return (CS_ERR_BAD_HANDLE);
706 }
707
708 if (!(sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP)) {
709 return (CS_ERR_INVALID_PARAM);
710 }
711
712 command = SAM_COMMAND_MARK_FAILED;
713
714 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command))
715 return (CS_ERR_LIBRARY);
716
717 return (CS_OK);
718 }
719
720 cs_error_t sam_warn_signal_set (int warn_signal)
721 {
722 char command;
723 cs_error_t err;
724
725 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
726 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
727 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
728 return (CS_ERR_BAD_HANDLE);
729 }
730
731 pthread_mutex_lock (&sam_internal_data.lock);
732
733 if (sam_internal_data.am_i_child) {
734 /*
735 * We are child so we must send data to parent
736 */
737 command = SAM_COMMAND_WARN_SIGNAL_SET;
738 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
739 err = CS_ERR_LIBRARY;
740
741 goto error_unlock;
742 }
743
744 if (sam_safe_write (sam_internal_data.child_fd_out, &warn_signal, sizeof (warn_signal)) !=
745 sizeof (warn_signal)) {
746 err = CS_ERR_LIBRARY;
747
748 goto error_unlock;
749 }
750
751 /*
752 * And wait for reply
753 */
754 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
755 goto error_unlock;
756 }
757 }
758
759 /*
760 * We are parent or we received OK reply from parent -> do required action
761 */
762 sam_internal_data.warn_signal = warn_signal;
763
764 pthread_mutex_unlock (&sam_internal_data.lock);
765
766 return (CS_OK);
767
768 error_unlock:
769 pthread_mutex_unlock (&sam_internal_data.lock);
770
771 return (err);
772 }
773
774 static cs_error_t sam_parent_reply_send (
775 cs_error_t err,
776 int parent_fd_in,
777 int parent_fd_out)
778 {
779 char reply;
780
781 if (err == CS_OK) {
782 reply = SAM_REPLY_OK;
783
784 if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) {
785 err = CS_ERR_LIBRARY;
786 goto error_reply;
787 }
788
789 return (CS_OK);
790 }
791
792 error_reply:
793 reply = SAM_REPLY_ERROR;
794 if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) {
795 return (CS_ERR_LIBRARY);
796 }
797 if (sam_safe_write (parent_fd_out, &err, sizeof (err)) != sizeof (err)) {
798 return (CS_ERR_LIBRARY);
799 }
800
801 return (err);
802 }
803
804
805 static cs_error_t sam_parent_warn_signal_set (
806 int parent_fd_in,
807 int parent_fd_out)
808 {
809 int warn_signal;
810 cs_error_t err;
811
812 err = CS_OK;
813
814 if (sam_safe_read (parent_fd_in, &warn_signal, sizeof (warn_signal)) != sizeof (warn_signal)) {
815 err = CS_ERR_LIBRARY;
816 goto error_reply;
817 }
818
819 err = sam_warn_signal_set (warn_signal);
820 if (err != CS_OK) {
821 goto error_reply;
822 }
823
824
825 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
826
827 error_reply:
828 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
829 }
830
831 static cs_error_t sam_parent_wait_for_quorum (
832 int parent_fd_in,
833 int parent_fd_out)
834 {
835 cs_error_t err;
836 struct pollfd pfds[2];
837 int poll_err;
838
839 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) {
840 if ((err = sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_Q_WAIT)) != CS_OK) {
841 goto error_reply;
842 }
843 }
844
845 /*
846 * Update current quorum
847 */
848 if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL)) != CS_OK) {
849 goto error_reply;
850 }
851
852 /*
853 * Wait for quorum
854 */
855 while (!sam_internal_data.quorate) {
856 pfds[0].fd = parent_fd_in;
857 pfds[0].events = 0;
858 pfds[0].revents = 0;
859
860 pfds[1].fd = sam_internal_data.quorum_fd;
861 pfds[1].events = POLLIN;
862 pfds[1].revents = 0;
863
864 poll_err = poll (pfds, 2, -1);
865
866 if (poll_err == -1) {
867 /*
868 * Error in poll
869 * If it is EINTR, continue, otherwise QUIT
870 */
871 if (errno != EINTR) {
872 err = CS_ERR_LIBRARY;
873 goto error_reply;
874 }
875 }
876
877 if (pfds[0].revents != 0) {
878 if (pfds[0].revents == POLLERR || pfds[0].revents == POLLHUP ||pfds[0].revents == POLLNVAL) {
879 /*
880 * Child has exited
881 */
882 return (CS_OK);
883 }
884 }
885
886 if (pfds[1].revents != 0) {
887 if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) {
888 goto error_reply;
889 }
890 }
891 }
892
893 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) {
894 if ((err = sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_STARTED)) != CS_OK) {
895 goto error_reply;
896 }
897 }
898
899 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
900
901 error_reply:
902 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) {
903 sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_REGISTERED);
904 }
905
906 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
907 }
908
909 static cs_error_t sam_parent_cmap_state_set (
910 int parent_fd_in,
911 int parent_fd_out,
912 int state)
913 {
914 cs_error_t err;
915 const char *state_s;
916
917 if (state == 1) {
918 state_s = SAM_CMAP_S_STARTED;
919 } else {
920 state_s = SAM_CMAP_S_REGISTERED;
921 }
922
923 if ((err = sam_cmap_update_key (SAM_CMAP_KEY_STATE, state_s)) != CS_OK) {
924 goto error_reply;
925 }
926
927 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
928
929 error_reply:
930 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
931 }
932
933 static cs_error_t sam_parent_kill_child (
934 int *action,
935 pid_t child_pid)
936 {
937 /*
938 * Kill child process
939 */
940 if (!sam_internal_data.term_send) {
941 /*
942 * We didn't send warn_signal yet.
943 */
944 kill (child_pid, sam_internal_data.warn_signal);
945
946 sam_internal_data.term_send = 1;
947 } else {
948 /*
949 * We sent child warning. Now, we will not be so nice
950 */
951 kill (child_pid, SIGKILL);
952 *action = SAM_PARENT_ACTION_RECOVERY;
953 }
954
955 return (CS_OK);
956 }
957
958 static cs_error_t sam_parent_mark_child_failed (
959 int *action,
960 pid_t child_pid)
961 {
962 sam_recovery_policy_t recpol;
963
964 recpol = sam_internal_data.recovery_policy;
965
966 sam_internal_data.term_send = 1;
967 sam_internal_data.recovery_policy = SAM_RECOVERY_POLICY_QUIT |
968 (SAM_RP_MASK_C (recpol) ? SAM_RECOVERY_POLICY_CMAP : 0) |
969 (SAM_RP_MASK_Q (recpol) ? SAM_RECOVERY_POLICY_QUORUM : 0);
970
971 return (sam_parent_kill_child (action, child_pid));
972 }
973
974 static cs_error_t sam_parent_data_store (
975 int parent_fd_in,
976 int parent_fd_out)
977 {
978 char *user_data;
979 ssize_t size;
980 cs_error_t err;
981
982 err = CS_OK;
983 user_data = NULL;
984
985 if (sam_safe_read (parent_fd_in, &size, sizeof (size)) != sizeof (size)) {
986 err = CS_ERR_LIBRARY;
987 goto error_reply;
988 }
989
990 if (size > 0) {
991 user_data = malloc (size);
992 if (user_data == NULL) {
993 err = CS_ERR_NO_MEMORY;
994 goto error_reply;
995 }
996
997 if (sam_safe_read (parent_fd_in, user_data, size) != size) {
998 err = CS_ERR_LIBRARY;
999 goto free_error_reply;
1000 }
1001 }
1002
1003 err = sam_data_store (user_data, size);
1004 if (err != CS_OK) {
1005 goto free_error_reply;
1006 }
1007
1008 free (user_data);
1009
1010 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
1011
1012 free_error_reply:
1013 free (user_data);
1014 error_reply:
1015 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
1016 }
1017
1018 static enum sam_parent_action_t sam_parent_handler (
1019 int parent_fd_in,
1020 int parent_fd_out,
1021 pid_t child_pid)
1022 {
1023 int poll_error;
1024 int action;
1025 int status;
1026 ssize_t bytes_read;
1027 char command;
1028 int time_interval;
1029 struct pollfd pfds[2];
1030 nfds_t nfds;
1031 cs_error_t err;
1032 sam_recovery_policy_t recpol;
1033
1034 status = 0;
1035
1036 action = SAM_PARENT_ACTION_CONTINUE;
1037 recpol = sam_internal_data.recovery_policy;
1038
1039 while (action == SAM_PARENT_ACTION_CONTINUE) {
1040 pfds[0].fd = parent_fd_in;
1041 pfds[0].events = POLLIN;
1042 pfds[0].revents = 0;
1043 nfds = 1;
1044
1045 if (status == 1 && sam_internal_data.time_interval != 0) {
1046 time_interval = sam_internal_data.time_interval;
1047 } else {
1048 time_interval = -1;
1049 }
1050
1051 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1052 pfds[nfds].fd = sam_internal_data.quorum_fd;
1053 pfds[nfds].events = POLLIN;
1054 pfds[nfds].revents = 0;
1055 nfds++;
1056 }
1057
1058 poll_error = poll (pfds, nfds, time_interval);
1059
1060 if (poll_error == -1) {
1061 /*
1062 * Error in poll
1063 * If it is EINTR, continue, otherwise QUIT
1064 */
1065 if (errno != EINTR) {
1066 action = SAM_PARENT_ACTION_ERROR;
1067 }
1068 }
1069
1070 if (poll_error == 0) {
1071 /*
1072 * Time limit expires
1073 */
1074 if (status == 0) {
1075 action = SAM_PARENT_ACTION_QUIT;
1076 } else {
1077 sam_parent_kill_child (&action, child_pid);
1078 }
1079 }
1080
1081 if (poll_error > 0) {
1082 if (pfds[0].revents != 0) {
1083 /*
1084 * We have EOF or command in pipe
1085 */
1086 bytes_read = sam_safe_read (parent_fd_in, &command, 1);
1087
1088 if (bytes_read == 0) {
1089 /*
1090 * Handle EOF -> Take recovery action or quit if sam_start wasn't called
1091 */
1092 if (status == 0)
1093 action = SAM_PARENT_ACTION_QUIT;
1094 else
1095 action = SAM_PARENT_ACTION_RECOVERY;
1096
1097 continue;
1098 }
1099
1100 if (bytes_read == -1) {
1101 action = SAM_PARENT_ACTION_ERROR;
1102 goto action_exit;
1103 }
1104
1105 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1106 sam_cmap_update_key (SAM_CMAP_KEY_LAST_HC, NULL);
1107 }
1108
1109 /*
1110 * We have read command
1111 */
1112 switch (command) {
1113 case SAM_COMMAND_START:
1114 if (status == 0) {
1115 /*
1116 * Not started yet
1117 */
1118 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1119 if (sam_parent_wait_for_quorum (parent_fd_in,
1120 parent_fd_out) != CS_OK) {
1121 continue;
1122 }
1123 }
1124
1125 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1126 if (sam_parent_cmap_state_set (parent_fd_in,
1127 parent_fd_out, 1) != CS_OK) {
1128 continue;
1129 }
1130 }
1131
1132 status = 1;
1133 }
1134 break;
1135 case SAM_COMMAND_STOP:
1136 if (status == 1) {
1137 /*
1138 * Started
1139 */
1140 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1141 if (sam_parent_cmap_state_set (parent_fd_in,
1142 parent_fd_out, 0) != CS_OK) {
1143 continue;
1144 }
1145 }
1146
1147 status = 0;
1148 }
1149 break;
1150 case SAM_COMMAND_DATA_STORE:
1151 sam_parent_data_store (parent_fd_in, parent_fd_out);
1152 break;
1153 case SAM_COMMAND_WARN_SIGNAL_SET:
1154 sam_parent_warn_signal_set (parent_fd_in, parent_fd_out);
1155 break;
1156 case SAM_COMMAND_MARK_FAILED:
1157 status = 1;
1158 sam_parent_mark_child_failed (&action, child_pid);
1159 break;
1160 }
1161 } /* if (pfds[0].revents != 0) */
1162
1163 if ((sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) &&
1164 pfds[1].revents != 0) {
1165 /*
1166 * Handle quorum change
1167 */
1168 err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL);
1169
1170 if (status == 1 &&
1171 (!sam_internal_data.quorate || (err != CS_ERR_TRY_AGAIN && err != CS_OK))) {
1172 sam_parent_kill_child (&action, child_pid);
1173 }
1174 }
1175 } /* select_error > 0 */
1176 } /* action == SAM_PARENT_ACTION_CONTINUE */
1177
1178 action_exit:
1179 return action;
1180 }
1181
1182 cs_error_t sam_register (
1183 unsigned int *instance_id)
1184 {
1185 cs_error_t error;
1186 pid_t pid;
1187 int pipe_error;
1188 int pipe_fd_out[2], pipe_fd_in[2];
1189 enum sam_parent_action_t action, old_action;
1190 int child_status;
1191 sam_recovery_policy_t recpol;
1192
1193 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED) {
1194 return (CS_ERR_BAD_HANDLE);
1195 }
1196
1197 recpol = sam_internal_data.recovery_policy;
1198
1199 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1200 /*
1201 * Register to cmap
1202 */
1203 if ((error = sam_cmap_register ()) != CS_OK) {
1204 goto error_exit;
1205 }
1206 }
1207
1208 error = CS_OK;
1209
1210 while (1) {
1211 if ((pipe_error = pipe (pipe_fd_out)) != 0) {
1212 error = CS_ERR_LIBRARY;
1213 goto error_exit;
1214 }
1215
1216 if ((pipe_error = pipe (pipe_fd_in)) != 0) {
1217 close (pipe_fd_out[0]);
1218 close (pipe_fd_out[1]);
1219
1220 error = CS_ERR_LIBRARY;
1221 goto error_exit;
1222 }
1223
1224 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1225 if ((error = sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_REGISTERED)) != CS_OK) {
1226 goto error_exit;
1227 }
1228 }
1229
1230 sam_internal_data.instance_id++;
1231
1232 sam_internal_data.term_send = 0;
1233
1234 pid = fork ();
1235
1236 if (pid == -1) {
1237 /*
1238 * Fork error
1239 */
1240 sam_internal_data.instance_id--;
1241
1242 error = CS_ERR_LIBRARY;
1243 goto error_exit;
1244 }
1245
1246 if (pid == 0) {
1247 /*
1248 * Child process
1249 */
1250 close (pipe_fd_out[0]);
1251 close (pipe_fd_in[1]);
1252
1253 sam_internal_data.child_fd_out = pipe_fd_out[1];
1254 sam_internal_data.child_fd_in = pipe_fd_in[0];
1255
1256 if (instance_id)
1257 *instance_id = sam_internal_data.instance_id;
1258
1259 sam_internal_data.am_i_child = 1;
1260 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_REGISTERED;
1261
1262 pthread_mutex_init (&sam_internal_data.lock, NULL);
1263
1264 goto error_exit;
1265 } else {
1266 /*
1267 * Parent process
1268 */
1269 close (pipe_fd_out[1]);
1270 close (pipe_fd_in[0]);
1271
1272 action = sam_parent_handler (pipe_fd_out[0], pipe_fd_in[1], pid);
1273
1274 close (pipe_fd_out[0]);
1275 close (pipe_fd_in[1]);
1276
1277 if (action == SAM_PARENT_ACTION_ERROR) {
1278 error = CS_ERR_LIBRARY;
1279 goto error_exit;
1280 }
1281
1282 /*
1283 * We really don't like zombies
1284 */
1285 while (waitpid (pid, &child_status, 0) == -1 && errno == EINTR)
1286 ;
1287
1288 old_action = action;
1289
1290 if (action == SAM_PARENT_ACTION_RECOVERY) {
1291 if (SAM_RP_MASK (sam_internal_data.recovery_policy) == SAM_RECOVERY_POLICY_QUIT)
1292 action = SAM_PARENT_ACTION_QUIT;
1293 }
1294
1295
1296 if (action == SAM_PARENT_ACTION_QUIT) {
1297 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1298 quorum_finalize (sam_internal_data.quorum_handle);
1299 }
1300
1301 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1302 if (old_action == SAM_PARENT_ACTION_RECOVERY) {
1303 /*
1304 * Mark as failed
1305 */
1306 sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_FAILED);
1307 } else {
1308 sam_cmap_destroy_pid_path ();
1309 }
1310 }
1311
1312 exit (WEXITSTATUS (child_status));
1313 }
1314
1315
1316 }
1317 }
1318
1319 error_exit:
1320 return (error);
1321 }
1322
1323 static void *hc_callback_thread (void *unused_param)
1324 {
1325 int poll_error;
1326 int status;
1327 ssize_t bytes_readed;
1328 char command;
1329 int time_interval, tmp_time_interval;
1330 int counter;
1331 struct pollfd pfds;
1332
1333 status = 0;
1334 counter = 0;
1335
1336 time_interval = sam_internal_data.time_interval >> 2;
1337
1338 while (1) {
1339 pfds.fd = sam_internal_data.cb_rpipe_fd;
1340 pfds.events = POLLIN;
1341 pfds.revents = 0;
1342
1343 if (status == 1) {
1344 tmp_time_interval = time_interval;
1345 } else {
1346 tmp_time_interval = -1;
1347 }
1348
1349 poll_error = poll (&pfds, 1, tmp_time_interval);
1350
1351 if (poll_error == 0) {
1352 if (sam_hc_send () == CS_OK) {
1353 counter++;
1354 }
1355
1356 if (counter >= 4) {
1357 if (sam_internal_data.hc_callback () != 0) {
1358 status = 3;
1359 }
1360
1361 counter = 0;
1362 }
1363 }
1364
1365 if (poll_error > 0) {
1366 bytes_readed = sam_safe_read (sam_internal_data.cb_rpipe_fd, &command, 1);
1367
1368 if (bytes_readed > 0) {
1369 if (status == 0 && command == SAM_COMMAND_START)
1370 status = 1;
1371
1372 if (status == 1 && command == SAM_COMMAND_STOP)
1373 status = 0;
1374
1375 }
1376 }
1377 }
1378
1379 /*
1380 * This makes compiler happy, it's same as return (NULL);
1381 */
1382 return (unused_param);
1383 }
1384
1385 cs_error_t sam_hc_callback_register (sam_hc_callback_t cb)
1386 {
1387 cs_error_t error = CS_OK;
1388 pthread_attr_t thread_attr;
1389 int pipe_error;
1390 int pipe_fd[2];
1391
1392 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) {
1393 return (CS_ERR_BAD_HANDLE);
1394 }
1395
1396 if (sam_internal_data.time_interval == 0) {
1397 return (CS_ERR_INVALID_PARAM);
1398 }
1399
1400 if (sam_internal_data.cb_registered) {
1401 sam_internal_data.hc_callback = cb;
1402
1403 return (CS_OK);
1404 }
1405
1406 /*
1407 * We know, this is first registration
1408 */
1409
1410 if (cb == NULL) {
1411 return (CS_ERR_INVALID_PARAM);
1412 }
1413
1414 pipe_error = pipe (pipe_fd);
1415
1416 if (pipe_error != 0) {
1417 /*
1418 * Pipe creation error
1419 */
1420 error = CS_ERR_LIBRARY;
1421 goto error_exit;
1422 }
1423
1424 sam_internal_data.cb_rpipe_fd = pipe_fd[0];
1425 sam_internal_data.cb_wpipe_fd = pipe_fd[1];
1426
1427 /*
1428 * Create thread attributes
1429 */
1430 error = pthread_attr_init (&thread_attr);
1431 if (error != 0) {
1432 error = CS_ERR_LIBRARY;
1433 goto error_close_fd_exit;
1434 }
1435
1436
1437 pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
1438 pthread_attr_setstacksize (&thread_attr, 32768);
1439
1440 /*
1441 * Create thread
1442 */
1443 error = pthread_create (&sam_internal_data.cb_thread, &thread_attr, hc_callback_thread, NULL);
1444
1445 if (error != 0) {
1446 error = CS_ERR_LIBRARY;
1447 goto error_attr_destroy_exit;
1448 }
1449
1450 /*
1451 * Cleanup
1452 */
1453 pthread_attr_destroy(&thread_attr);
1454
1455 sam_internal_data.cb_registered = 1;
1456 sam_internal_data.hc_callback = cb;
1457
1458 return (CS_OK);
1459
1460 error_attr_destroy_exit:
1461 pthread_attr_destroy(&thread_attr);
1462 error_close_fd_exit:
1463 sam_internal_data.cb_rpipe_fd = sam_internal_data.cb_wpipe_fd = 0;
1464 close (pipe_fd[0]);
1465 close (pipe_fd[1]);
1466 error_exit:
1467 return (error);
1468 }