2 * Copyright (c) 2009-2011 Red Hat, Inc.
6 * Author: Jan Friesse (jfriesse@redhat.com)
8 * This software licensed under BSD license, the text of which follows:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the Red Hat, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
46 #include <sys/types.h>
47 #include <sys/socket.h>
50 #include <corosync/corotypes.h>
51 #include <qb/qbipcc.h>
52 #include <corosync/corodefs.h>
53 #include <corosync/cmap.h>
54 #include <corosync/hdb.h>
55 #include <corosync/quorum.h>
57 #include <corosync/sam.h>
65 #define SAM_CMAP_S_FAILED "failed"
66 #define SAM_CMAP_S_REGISTERED "stopped"
67 #define SAM_CMAP_S_STARTED "running"
68 #define SAM_CMAP_S_Q_WAIT "waiting for quorum"
70 #define SAM_RP_MASK_Q(pol) (pol & (~SAM_RECOVERY_POLICY_QUORUM))
71 #define SAM_RP_MASK_C(pol) (pol & (~SAM_RECOVERY_POLICY_CMAP))
72 #define SAM_RP_MASK(pol) (pol & (~(SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_CMAP)))
74 enum sam_internal_status_t
{
75 SAM_INTERNAL_STATUS_NOT_INITIALIZED
= 0,
76 SAM_INTERNAL_STATUS_INITIALIZED
,
77 SAM_INTERNAL_STATUS_REGISTERED
,
78 SAM_INTERNAL_STATUS_STARTED
,
79 SAM_INTERNAL_STATUS_FINALIZED
86 SAM_COMMAND_DATA_STORE
,
87 SAM_COMMAND_WARN_SIGNAL_SET
,
88 SAM_COMMAND_MARK_FAILED
,
96 enum sam_parent_action_t
{
97 SAM_PARENT_ACTION_ERROR
,
98 SAM_PARENT_ACTION_RECOVERY
,
99 SAM_PARENT_ACTION_QUIT
,
100 SAM_PARENT_ACTION_CONTINUE
103 enum sam_cmap_key_t
{
104 SAM_CMAP_KEY_RECOVERY
,
105 SAM_CMAP_KEY_HC_PERIOD
,
106 SAM_CMAP_KEY_LAST_HC
,
112 sam_recovery_policy_t recovery_policy
;
113 enum sam_internal_status_t internal_status
;
114 unsigned int instance_id
;
121 sam_hc_callback_t hc_callback
;
123 int cb_rpipe_fd
, cb_wpipe_fd
;
127 size_t user_data_size
;
128 size_t user_data_allocated
;
130 pthread_mutex_t lock
;
132 quorum_handle_t quorum_handle
;
136 cmap_handle_t cmap_handle
;
137 char cmap_pid_path
[CMAP_KEYNAME_MAXLEN
];
140 extern const char *__progname
;
142 static cs_error_t
sam_cmap_update_key (enum sam_cmap_key_t key
, const char *value
)
146 uint64_t hc_period
, last_hc
;
147 const char *ssvalue
[] = { [SAM_RECOVERY_POLICY_QUIT
] = "quit", [SAM_RECOVERY_POLICY_RESTART
] = "restart" };
148 char key_name
[CMAP_KEYNAME_MAXLEN
];
151 case SAM_CMAP_KEY_RECOVERY
:
152 svalue
= ssvalue
[SAM_RP_MASK (sam_internal_data
.recovery_policy
)];
154 snprintf(key_name
, CMAP_KEYNAME_MAXLEN
, "%s%s", sam_internal_data
.cmap_pid_path
,
156 if ((err
= cmap_set_string(sam_internal_data
.cmap_handle
, key_name
, svalue
)) != CS_OK
) {
160 case SAM_CMAP_KEY_HC_PERIOD
:
161 hc_period
= sam_internal_data
.time_interval
;
163 snprintf(key_name
, CMAP_KEYNAME_MAXLEN
, "%s%s", sam_internal_data
.cmap_pid_path
,
165 if ((err
= cmap_set_uint64(sam_internal_data
.cmap_handle
, key_name
, hc_period
)) != CS_OK
) {
169 case SAM_CMAP_KEY_LAST_HC
:
170 last_hc
= cs_timestamp_get();
172 snprintf(key_name
, CMAP_KEYNAME_MAXLEN
, "%s%s", sam_internal_data
.cmap_pid_path
,
174 if ((err
= cmap_set_uint64(sam_internal_data
.cmap_handle
, key_name
, last_hc
)) != CS_OK
) {
178 case SAM_CMAP_KEY_STATE
:
180 snprintf(key_name
, CMAP_KEYNAME_MAXLEN
, "%s%s", sam_internal_data
.cmap_pid_path
,
182 if ((err
= cmap_set_string(sam_internal_data
.cmap_handle
, key_name
, svalue
)) != CS_OK
) {
194 static cs_error_t
sam_cmap_destroy_pid_path (void)
196 cmap_iter_handle_t iter
;
198 char key_name
[CMAP_KEYNAME_MAXLEN
];
200 err
= cmap_iter_init(sam_internal_data
.cmap_handle
, sam_internal_data
.cmap_pid_path
, &iter
);
205 while ((err
= cmap_iter_next(sam_internal_data
.cmap_handle
, iter
, key_name
, NULL
, NULL
)) == CS_OK
) {
206 cmap_delete(sam_internal_data
.cmap_handle
, key_name
);
209 err
= cmap_iter_finalize(sam_internal_data
.cmap_handle
, iter
);
215 static cs_error_t
sam_cmap_register (void)
218 cmap_handle_t cmap_handle
;
220 if ((err
= cmap_initialize (&cmap_handle
)) != CS_OK
) {
224 snprintf(sam_internal_data
.cmap_pid_path
, CMAP_KEYNAME_MAXLEN
, "resources.process.%d.", getpid());
226 sam_internal_data
.cmap_handle
= cmap_handle
;
228 if ((err
= sam_cmap_update_key (SAM_CMAP_KEY_RECOVERY
, NULL
)) != CS_OK
) {
229 goto destroy_finalize_error
;
232 if ((err
= sam_cmap_update_key (SAM_CMAP_KEY_HC_PERIOD
, NULL
)) != CS_OK
) {
233 goto destroy_finalize_error
;
238 destroy_finalize_error
:
239 sam_cmap_destroy_pid_path ();
240 cmap_finalize (cmap_handle
);
244 static void quorum_notification_fn (
245 quorum_handle_t handle
,
248 uint32_t view_list_entries
,
251 sam_internal_data
.quorate
= quorate
;
254 cs_error_t
sam_initialize (
256 sam_recovery_policy_t recovery_policy
)
258 quorum_callbacks_t quorum_callbacks
;
259 uint32_t quorum_type
;
262 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_NOT_INITIALIZED
) {
263 return (CS_ERR_BAD_HANDLE
);
266 if (SAM_RP_MASK (recovery_policy
) != SAM_RECOVERY_POLICY_QUIT
&&
267 SAM_RP_MASK (recovery_policy
) != SAM_RECOVERY_POLICY_RESTART
) {
268 return (CS_ERR_INVALID_PARAM
);
271 if (recovery_policy
& SAM_RECOVERY_POLICY_QUORUM
) {
275 quorum_callbacks
.quorum_notify_fn
= quorum_notification_fn
;
276 if ((err
= quorum_initialize (&sam_internal_data
.quorum_handle
, &quorum_callbacks
, &quorum_type
)) != CS_OK
) {
280 if ((err
= quorum_trackstart (sam_internal_data
.quorum_handle
, CS_TRACK_CHANGES
)) != CS_OK
) {
281 goto exit_error_quorum
;
284 if ((err
= quorum_fd_get (sam_internal_data
.quorum_handle
, &sam_internal_data
.quorum_fd
)) != CS_OK
) {
285 goto exit_error_quorum
;
289 * Dispatch initial quorate state
291 if ((err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ONE
)) != CS_OK
) {
292 goto exit_error_quorum
;
295 sam_internal_data
.recovery_policy
= recovery_policy
;
297 sam_internal_data
.time_interval
= time_interval
;
299 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_INITIALIZED
;
301 sam_internal_data
.warn_signal
= SIGTERM
;
303 sam_internal_data
.am_i_child
= 0;
305 sam_internal_data
.user_data
= NULL
;
306 sam_internal_data
.user_data_size
= 0;
307 sam_internal_data
.user_data_allocated
= 0;
309 pthread_mutex_init (&sam_internal_data
.lock
, NULL
);
314 quorum_finalize (sam_internal_data
.quorum_handle
);
320 * Wrapper on top of write(2) function. It handles EAGAIN and EINTR states and sends whole buffer if possible.
322 static size_t sam_safe_write (
328 ssize_t tmp_bytes_write
;
333 tmp_bytes_write
= write (d
, (const char *)buf
+ bytes_write
,
334 (nbyte
- bytes_write
> SSIZE_MAX
) ? SSIZE_MAX
: nbyte
- bytes_write
);
336 if (tmp_bytes_write
== -1) {
337 if (!(errno
== EAGAIN
|| errno
== EINTR
))
340 bytes_write
+= tmp_bytes_write
;
342 } while (bytes_write
!= nbyte
);
344 return (bytes_write
);
348 * Wrapper on top of read(2) function. It handles EAGAIN and EINTR states and reads whole buffer if possible.
350 static size_t sam_safe_read (
356 ssize_t tmp_bytes_read
;
361 tmp_bytes_read
= read (d
, (char *)buf
+ bytes_read
,
362 (nbyte
- bytes_read
> SSIZE_MAX
) ? SSIZE_MAX
: nbyte
- bytes_read
);
364 if (tmp_bytes_read
== -1) {
365 if (!(errno
== EAGAIN
|| errno
== EINTR
))
368 bytes_read
+= tmp_bytes_read
;
371 } while (bytes_read
!= nbyte
&& tmp_bytes_read
!= 0);
376 static cs_error_t
sam_read_reply (
382 if (sam_safe_read (sam_internal_data
.child_fd_in
, &reply
, sizeof (reply
)) != sizeof (reply
)) {
383 return (CS_ERR_LIBRARY
);
387 case SAM_REPLY_ERROR
:
389 * Read error and return that
391 if (sam_safe_read (sam_internal_data
.child_fd_in
, &err
, sizeof (err
)) != sizeof (err
)) {
392 return (CS_ERR_LIBRARY
);
403 return (CS_ERR_LIBRARY
);
410 cs_error_t
sam_data_getsize (size_t *size
)
413 return (CS_ERR_INVALID_PARAM
);
416 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
417 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
418 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
420 return (CS_ERR_BAD_HANDLE
);
423 pthread_mutex_lock (&sam_internal_data
.lock
);
425 *size
= sam_internal_data
.user_data_size
;
427 pthread_mutex_unlock (&sam_internal_data
.lock
);
432 cs_error_t
sam_data_restore (
441 return (CS_ERR_INVALID_PARAM
);
444 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
445 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
446 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
448 return (CS_ERR_BAD_HANDLE
);
451 pthread_mutex_lock (&sam_internal_data
.lock
);
453 if (sam_internal_data
.user_data_size
== 0) {
459 if (size
< sam_internal_data
.user_data_size
) {
460 err
= CS_ERR_INVALID_PARAM
;
465 memcpy (data
, sam_internal_data
.user_data
, sam_internal_data
.user_data_size
);
467 pthread_mutex_unlock (&sam_internal_data
.lock
);
472 pthread_mutex_unlock (&sam_internal_data
.lock
);
477 cs_error_t
sam_data_store (
485 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
486 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
487 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
489 return (CS_ERR_BAD_HANDLE
);
497 pthread_mutex_lock (&sam_internal_data
.lock
);
499 if (sam_internal_data
.am_i_child
) {
501 * We are child so we must send data to parent
503 command
= SAM_COMMAND_DATA_STORE
;
504 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
505 err
= CS_ERR_LIBRARY
;
510 if (sam_safe_write (sam_internal_data
.child_fd_out
, &size
, sizeof (size
)) != sizeof (size
)) {
511 err
= CS_ERR_LIBRARY
;
516 if (data
!= NULL
&& sam_safe_write (sam_internal_data
.child_fd_out
, data
, size
) != size
) {
517 err
= CS_ERR_LIBRARY
;
525 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
531 * We are parent or we received OK reply from parent -> do required action
534 free (sam_internal_data
.user_data
);
535 sam_internal_data
.user_data
= NULL
;
536 sam_internal_data
.user_data_allocated
= 0;
537 sam_internal_data
.user_data_size
= 0;
539 if (sam_internal_data
.user_data_allocated
< size
) {
540 if ((new_data
= realloc (sam_internal_data
.user_data
, size
)) == NULL
) {
541 err
= CS_ERR_NO_MEMORY
;
546 sam_internal_data
.user_data_allocated
= size
;
548 new_data
= sam_internal_data
.user_data
;
550 sam_internal_data
.user_data
= new_data
;
551 sam_internal_data
.user_data_size
= size
;
553 memcpy (sam_internal_data
.user_data
, data
, size
);
556 pthread_mutex_unlock (&sam_internal_data
.lock
);
561 pthread_mutex_unlock (&sam_internal_data
.lock
);
566 cs_error_t
sam_start (void)
570 sam_recovery_policy_t recpol
;
572 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
) {
573 return (CS_ERR_BAD_HANDLE
);
576 recpol
= sam_internal_data
.recovery_policy
;
578 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
|| recpol
& SAM_RECOVERY_POLICY_CMAP
) {
579 pthread_mutex_lock (&sam_internal_data
.lock
);
582 command
= SAM_COMMAND_START
;
584 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
585 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
|| recpol
& SAM_RECOVERY_POLICY_CMAP
) {
586 pthread_mutex_unlock (&sam_internal_data
.lock
);
589 return (CS_ERR_LIBRARY
);
592 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
|| recpol
& SAM_RECOVERY_POLICY_CMAP
) {
594 * Wait for parent reply
596 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
597 pthread_mutex_unlock (&sam_internal_data
.lock
);
602 pthread_mutex_unlock (&sam_internal_data
.lock
);
605 if (sam_internal_data
.hc_callback
)
606 if (sam_safe_write (sam_internal_data
.cb_wpipe_fd
, &command
, sizeof (command
)) != sizeof (command
))
607 return (CS_ERR_LIBRARY
);
609 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_STARTED
;
614 cs_error_t
sam_stop (void)
619 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
620 return (CS_ERR_BAD_HANDLE
);
623 command
= SAM_COMMAND_STOP
;
625 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CMAP
) {
626 pthread_mutex_lock (&sam_internal_data
.lock
);
629 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
630 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CMAP
) {
631 pthread_mutex_unlock (&sam_internal_data
.lock
);
634 return (CS_ERR_LIBRARY
);
637 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CMAP
) {
639 * Wait for parent reply
641 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
642 pthread_mutex_unlock (&sam_internal_data
.lock
);
647 pthread_mutex_unlock (&sam_internal_data
.lock
);
650 if (sam_internal_data
.hc_callback
)
651 if (sam_safe_write (sam_internal_data
.cb_wpipe_fd
, &command
, sizeof (command
)) != sizeof (command
))
652 return (CS_ERR_LIBRARY
);
654 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_REGISTERED
;
659 cs_error_t
sam_hc_send (void)
663 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
664 return (CS_ERR_BAD_HANDLE
);
667 command
= SAM_COMMAND_HB
;
669 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
))
670 return (CS_ERR_LIBRARY
);
675 cs_error_t
sam_finalize (void)
679 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
680 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
681 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
682 return (CS_ERR_BAD_HANDLE
);
685 if (sam_internal_data
.internal_status
== SAM_INTERNAL_STATUS_STARTED
) {
691 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_FINALIZED
;
693 free (sam_internal_data
.user_data
);
699 cs_error_t
sam_mark_failed (void)
703 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
&&
704 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
) {
705 return (CS_ERR_BAD_HANDLE
);
708 if (!(sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CMAP
)) {
709 return (CS_ERR_INVALID_PARAM
);
712 command
= SAM_COMMAND_MARK_FAILED
;
714 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
))
715 return (CS_ERR_LIBRARY
);
720 cs_error_t
sam_warn_signal_set (int warn_signal
)
725 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
726 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
727 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
728 return (CS_ERR_BAD_HANDLE
);
731 pthread_mutex_lock (&sam_internal_data
.lock
);
733 if (sam_internal_data
.am_i_child
) {
735 * We are child so we must send data to parent
737 command
= SAM_COMMAND_WARN_SIGNAL_SET
;
738 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
739 err
= CS_ERR_LIBRARY
;
744 if (sam_safe_write (sam_internal_data
.child_fd_out
, &warn_signal
, sizeof (warn_signal
)) !=
745 sizeof (warn_signal
)) {
746 err
= CS_ERR_LIBRARY
;
754 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
760 * We are parent or we received OK reply from parent -> do required action
762 sam_internal_data
.warn_signal
= warn_signal
;
764 pthread_mutex_unlock (&sam_internal_data
.lock
);
769 pthread_mutex_unlock (&sam_internal_data
.lock
);
774 static cs_error_t
sam_parent_reply_send (
782 reply
= SAM_REPLY_OK
;
784 if (sam_safe_write (parent_fd_out
, &reply
, sizeof (reply
)) != sizeof (reply
)) {
785 err
= CS_ERR_LIBRARY
;
793 reply
= SAM_REPLY_ERROR
;
794 if (sam_safe_write (parent_fd_out
, &reply
, sizeof (reply
)) != sizeof (reply
)) {
795 return (CS_ERR_LIBRARY
);
797 if (sam_safe_write (parent_fd_out
, &err
, sizeof (err
)) != sizeof (err
)) {
798 return (CS_ERR_LIBRARY
);
805 static cs_error_t
sam_parent_warn_signal_set (
814 if (sam_safe_read (parent_fd_in
, &warn_signal
, sizeof (warn_signal
)) != sizeof (warn_signal
)) {
815 err
= CS_ERR_LIBRARY
;
819 err
= sam_warn_signal_set (warn_signal
);
825 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
828 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
831 static cs_error_t
sam_parent_wait_for_quorum (
836 struct pollfd pfds
[2];
839 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CMAP
) {
840 if ((err
= sam_cmap_update_key (SAM_CMAP_KEY_STATE
, SAM_CMAP_S_Q_WAIT
)) != CS_OK
) {
846 * Update current quorum
848 if ((err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ALL
)) != CS_OK
) {
855 while (!sam_internal_data
.quorate
) {
856 pfds
[0].fd
= parent_fd_in
;
860 pfds
[1].fd
= sam_internal_data
.quorum_fd
;
861 pfds
[1].events
= POLLIN
;
864 poll_err
= poll (pfds
, 2, -1);
866 if (poll_err
== -1) {
869 * If it is EINTR, continue, otherwise QUIT
871 if (errno
!= EINTR
) {
872 err
= CS_ERR_LIBRARY
;
877 if (pfds
[0].revents
!= 0) {
878 if (pfds
[0].revents
== POLLERR
|| pfds
[0].revents
== POLLHUP
||pfds
[0].revents
== POLLNVAL
) {
886 if (pfds
[1].revents
!= 0) {
887 if ((err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ONE
)) != CS_OK
) {
893 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CMAP
) {
894 if ((err
= sam_cmap_update_key (SAM_CMAP_KEY_STATE
, SAM_CMAP_S_STARTED
)) != CS_OK
) {
899 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
902 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CMAP
) {
903 sam_cmap_update_key (SAM_CMAP_KEY_STATE
, SAM_CMAP_S_REGISTERED
);
906 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
909 static cs_error_t
sam_parent_cmap_state_set (
918 state_s
= SAM_CMAP_S_STARTED
;
920 state_s
= SAM_CMAP_S_REGISTERED
;
923 if ((err
= sam_cmap_update_key (SAM_CMAP_KEY_STATE
, state_s
)) != CS_OK
) {
927 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
930 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
933 static cs_error_t
sam_parent_kill_child (
940 if (!sam_internal_data
.term_send
) {
942 * We didn't send warn_signal yet.
944 kill (child_pid
, sam_internal_data
.warn_signal
);
946 sam_internal_data
.term_send
= 1;
949 * We sent child warning. Now, we will not be so nice
951 kill (child_pid
, SIGKILL
);
952 *action
= SAM_PARENT_ACTION_RECOVERY
;
958 static cs_error_t
sam_parent_mark_child_failed (
962 sam_recovery_policy_t recpol
;
964 recpol
= sam_internal_data
.recovery_policy
;
966 sam_internal_data
.term_send
= 1;
967 sam_internal_data
.recovery_policy
= SAM_RECOVERY_POLICY_QUIT
|
968 (SAM_RP_MASK_C (recpol
) ? SAM_RECOVERY_POLICY_CMAP
: 0) |
969 (SAM_RP_MASK_Q (recpol
) ? SAM_RECOVERY_POLICY_QUORUM
: 0);
971 return (sam_parent_kill_child (action
, child_pid
));
974 static cs_error_t
sam_parent_data_store (
985 if (sam_safe_read (parent_fd_in
, &size
, sizeof (size
)) != sizeof (size
)) {
986 err
= CS_ERR_LIBRARY
;
991 user_data
= malloc (size
);
992 if (user_data
== NULL
) {
993 err
= CS_ERR_NO_MEMORY
;
997 if (sam_safe_read (parent_fd_in
, user_data
, size
) != size
) {
998 err
= CS_ERR_LIBRARY
;
999 goto free_error_reply
;
1003 err
= sam_data_store (user_data
, size
);
1005 goto free_error_reply
;
1010 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
1015 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
1018 static enum sam_parent_action_t
sam_parent_handler (
1029 struct pollfd pfds
[2];
1032 sam_recovery_policy_t recpol
;
1036 action
= SAM_PARENT_ACTION_CONTINUE
;
1037 recpol
= sam_internal_data
.recovery_policy
;
1039 while (action
== SAM_PARENT_ACTION_CONTINUE
) {
1040 pfds
[0].fd
= parent_fd_in
;
1041 pfds
[0].events
= POLLIN
;
1042 pfds
[0].revents
= 0;
1045 if (status
== 1 && sam_internal_data
.time_interval
!= 0) {
1046 time_interval
= sam_internal_data
.time_interval
;
1051 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
) {
1052 pfds
[nfds
].fd
= sam_internal_data
.quorum_fd
;
1053 pfds
[nfds
].events
= POLLIN
;
1054 pfds
[nfds
].revents
= 0;
1058 poll_error
= poll (pfds
, nfds
, time_interval
);
1060 if (poll_error
== -1) {
1063 * If it is EINTR, continue, otherwise QUIT
1065 if (errno
!= EINTR
) {
1066 action
= SAM_PARENT_ACTION_ERROR
;
1070 if (poll_error
== 0) {
1072 * Time limit expires
1075 action
= SAM_PARENT_ACTION_QUIT
;
1077 sam_parent_kill_child (&action
, child_pid
);
1081 if (poll_error
> 0) {
1082 if (pfds
[0].revents
!= 0) {
1084 * We have EOF or command in pipe
1086 bytes_read
= sam_safe_read (parent_fd_in
, &command
, 1);
1088 if (bytes_read
== 0) {
1090 * Handle EOF -> Take recovery action or quit if sam_start wasn't called
1093 action
= SAM_PARENT_ACTION_QUIT
;
1095 action
= SAM_PARENT_ACTION_RECOVERY
;
1100 if (bytes_read
== -1) {
1101 action
= SAM_PARENT_ACTION_ERROR
;
1105 if (recpol
& SAM_RECOVERY_POLICY_CMAP
) {
1106 sam_cmap_update_key (SAM_CMAP_KEY_LAST_HC
, NULL
);
1110 * We have read command
1113 case SAM_COMMAND_START
:
1118 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
) {
1119 if (sam_parent_wait_for_quorum (parent_fd_in
,
1120 parent_fd_out
) != CS_OK
) {
1125 if (recpol
& SAM_RECOVERY_POLICY_CMAP
) {
1126 if (sam_parent_cmap_state_set (parent_fd_in
,
1127 parent_fd_out
, 1) != CS_OK
) {
1135 case SAM_COMMAND_STOP
:
1140 if (recpol
& SAM_RECOVERY_POLICY_CMAP
) {
1141 if (sam_parent_cmap_state_set (parent_fd_in
,
1142 parent_fd_out
, 0) != CS_OK
) {
1150 case SAM_COMMAND_DATA_STORE
:
1151 sam_parent_data_store (parent_fd_in
, parent_fd_out
);
1153 case SAM_COMMAND_WARN_SIGNAL_SET
:
1154 sam_parent_warn_signal_set (parent_fd_in
, parent_fd_out
);
1156 case SAM_COMMAND_MARK_FAILED
:
1158 sam_parent_mark_child_failed (&action
, child_pid
);
1161 } /* if (pfds[0].revents != 0) */
1163 if ((sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_QUORUM
) &&
1164 pfds
[1].revents
!= 0) {
1166 * Handle quorum change
1168 err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ALL
);
1171 (!sam_internal_data
.quorate
|| (err
!= CS_ERR_TRY_AGAIN
&& err
!= CS_OK
))) {
1172 sam_parent_kill_child (&action
, child_pid
);
1175 } /* select_error > 0 */
1176 } /* action == SAM_PARENT_ACTION_CONTINUE */
1182 cs_error_t
sam_register (
1183 unsigned int *instance_id
)
1188 int pipe_fd_out
[2], pipe_fd_in
[2];
1189 enum sam_parent_action_t action
, old_action
;
1191 sam_recovery_policy_t recpol
;
1193 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
) {
1194 return (CS_ERR_BAD_HANDLE
);
1197 recpol
= sam_internal_data
.recovery_policy
;
1199 if (recpol
& SAM_RECOVERY_POLICY_CMAP
) {
1203 if ((error
= sam_cmap_register ()) != CS_OK
) {
1211 if ((pipe_error
= pipe (pipe_fd_out
)) != 0) {
1212 error
= CS_ERR_LIBRARY
;
1216 if ((pipe_error
= pipe (pipe_fd_in
)) != 0) {
1217 close (pipe_fd_out
[0]);
1218 close (pipe_fd_out
[1]);
1220 error
= CS_ERR_LIBRARY
;
1224 if (recpol
& SAM_RECOVERY_POLICY_CMAP
) {
1225 if ((error
= sam_cmap_update_key (SAM_CMAP_KEY_STATE
, SAM_CMAP_S_REGISTERED
)) != CS_OK
) {
1230 sam_internal_data
.instance_id
++;
1232 sam_internal_data
.term_send
= 0;
1240 sam_internal_data
.instance_id
--;
1242 error
= CS_ERR_LIBRARY
;
1250 close (pipe_fd_out
[0]);
1251 close (pipe_fd_in
[1]);
1253 sam_internal_data
.child_fd_out
= pipe_fd_out
[1];
1254 sam_internal_data
.child_fd_in
= pipe_fd_in
[0];
1257 *instance_id
= sam_internal_data
.instance_id
;
1259 sam_internal_data
.am_i_child
= 1;
1260 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_REGISTERED
;
1262 pthread_mutex_init (&sam_internal_data
.lock
, NULL
);
1269 close (pipe_fd_out
[1]);
1270 close (pipe_fd_in
[0]);
1272 action
= sam_parent_handler (pipe_fd_out
[0], pipe_fd_in
[1], pid
);
1274 close (pipe_fd_out
[0]);
1275 close (pipe_fd_in
[1]);
1277 if (action
== SAM_PARENT_ACTION_ERROR
) {
1278 error
= CS_ERR_LIBRARY
;
1283 * We really don't like zombies
1285 while (waitpid (pid
, &child_status
, 0) == -1 && errno
== EINTR
)
1288 old_action
= action
;
1290 if (action
== SAM_PARENT_ACTION_RECOVERY
) {
1291 if (SAM_RP_MASK (sam_internal_data
.recovery_policy
) == SAM_RECOVERY_POLICY_QUIT
)
1292 action
= SAM_PARENT_ACTION_QUIT
;
1296 if (action
== SAM_PARENT_ACTION_QUIT
) {
1297 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
) {
1298 quorum_finalize (sam_internal_data
.quorum_handle
);
1301 if (recpol
& SAM_RECOVERY_POLICY_CMAP
) {
1302 if (old_action
== SAM_PARENT_ACTION_RECOVERY
) {
1306 sam_cmap_update_key (SAM_CMAP_KEY_STATE
, SAM_CMAP_S_FAILED
);
1308 sam_cmap_destroy_pid_path ();
1312 exit (WEXITSTATUS (child_status
));
1323 static void *hc_callback_thread (void *unused_param
)
1327 ssize_t bytes_readed
;
1329 int time_interval
, tmp_time_interval
;
1336 time_interval
= sam_internal_data
.time_interval
>> 2;
1339 pfds
.fd
= sam_internal_data
.cb_rpipe_fd
;
1340 pfds
.events
= POLLIN
;
1344 tmp_time_interval
= time_interval
;
1346 tmp_time_interval
= -1;
1349 poll_error
= poll (&pfds
, 1, tmp_time_interval
);
1351 if (poll_error
== 0) {
1352 if (sam_hc_send () == CS_OK
) {
1357 if (sam_internal_data
.hc_callback () != 0) {
1365 if (poll_error
> 0) {
1366 bytes_readed
= sam_safe_read (sam_internal_data
.cb_rpipe_fd
, &command
, 1);
1368 if (bytes_readed
> 0) {
1369 if (status
== 0 && command
== SAM_COMMAND_START
)
1372 if (status
== 1 && command
== SAM_COMMAND_STOP
)
1380 * This makes compiler happy, it's same as return (NULL);
1382 return (unused_param
);
1385 cs_error_t
sam_hc_callback_register (sam_hc_callback_t cb
)
1387 cs_error_t error
= CS_OK
;
1388 pthread_attr_t thread_attr
;
1392 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
) {
1393 return (CS_ERR_BAD_HANDLE
);
1396 if (sam_internal_data
.time_interval
== 0) {
1397 return (CS_ERR_INVALID_PARAM
);
1400 if (sam_internal_data
.cb_registered
) {
1401 sam_internal_data
.hc_callback
= cb
;
1407 * We know, this is first registration
1411 return (CS_ERR_INVALID_PARAM
);
1414 pipe_error
= pipe (pipe_fd
);
1416 if (pipe_error
!= 0) {
1418 * Pipe creation error
1420 error
= CS_ERR_LIBRARY
;
1424 sam_internal_data
.cb_rpipe_fd
= pipe_fd
[0];
1425 sam_internal_data
.cb_wpipe_fd
= pipe_fd
[1];
1428 * Create thread attributes
1430 error
= pthread_attr_init (&thread_attr
);
1432 error
= CS_ERR_LIBRARY
;
1433 goto error_close_fd_exit
;
1437 pthread_attr_setdetachstate (&thread_attr
, PTHREAD_CREATE_DETACHED
);
1438 pthread_attr_setstacksize (&thread_attr
, 32768);
1443 error
= pthread_create (&sam_internal_data
.cb_thread
, &thread_attr
, hc_callback_thread
, NULL
);
1446 error
= CS_ERR_LIBRARY
;
1447 goto error_attr_destroy_exit
;
1453 pthread_attr_destroy(&thread_attr
);
1455 sam_internal_data
.cb_registered
= 1;
1456 sam_internal_data
.hc_callback
= cb
;
1460 error_attr_destroy_exit
:
1461 pthread_attr_destroy(&thread_attr
);
1462 error_close_fd_exit
:
1463 sam_internal_data
.cb_rpipe_fd
= sam_internal_data
.cb_wpipe_fd
= 0;