2 * Copyright (c) 2009-2010 Red Hat, Inc.
6 * Author: Jan Friesse (jfriesse@redhat.com)
8 * This software licensed under BSD license, the text of which follows:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the Red Hat, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
46 #include <sys/types.h>
47 #include <sys/socket.h>
50 #include <corosync/corotypes.h>
51 #include <corosync/coroipc_types.h>
52 #include <corosync/coroipcc.h>
53 #include <corosync/corodefs.h>
54 #include <corosync/confdb.h>
55 #include <corosync/hdb.h>
56 #include <corosync/quorum.h>
58 #include <corosync/sam.h>
66 #define SAM_CONFDB_S_FAILED "failed"
67 #define SAM_CONFDB_S_REGISTERED "registered"
68 #define SAM_CONFDB_S_STARTED "started"
69 #define SAM_CONFDB_S_Q_WAIT "waiting for quorum"
71 #define SAM_RP_MASK_Q(pol) (pol & (~SAM_RECOVERY_POLICY_QUORUM))
72 #define SAM_RP_MASK_C(pol) (pol & (~SAM_RECOVERY_POLICY_CONFDB))
73 #define SAM_RP_MASK(pol) (pol & (~(SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_CONFDB)))
75 enum sam_internal_status_t
{
76 SAM_INTERNAL_STATUS_NOT_INITIALIZED
= 0,
77 SAM_INTERNAL_STATUS_INITIALIZED
,
78 SAM_INTERNAL_STATUS_REGISTERED
,
79 SAM_INTERNAL_STATUS_STARTED
,
80 SAM_INTERNAL_STATUS_FINALIZED
87 SAM_COMMAND_DATA_STORE
,
88 SAM_COMMAND_WARN_SIGNAL_SET
,
89 SAM_COMMAND_MARK_FAILED
,
97 enum sam_parent_action_t
{
98 SAM_PARENT_ACTION_ERROR
,
99 SAM_PARENT_ACTION_RECOVERY
,
100 SAM_PARENT_ACTION_QUIT
,
101 SAM_PARENT_ACTION_CONTINUE
104 enum sam_confdb_key_t
{
105 SAM_CONFDB_KEY_RECOVERY
,
106 SAM_CONFDB_KEY_HC_PERIOD
,
107 SAM_CONFDB_KEY_LAST_HC
,
108 SAM_CONFDB_KEY_STATE
,
113 sam_recovery_policy_t recovery_policy
;
114 enum sam_internal_status_t internal_status
;
115 unsigned int instance_id
;
122 sam_hc_callback_t hc_callback
;
124 int cb_rpipe_fd
, cb_wpipe_fd
;
128 size_t user_data_size
;
129 size_t user_data_allocated
;
131 pthread_mutex_t lock
;
133 quorum_handle_t quorum_handle
;
137 confdb_handle_t confdb_handle
;
138 hdb_handle_t confdb_pid_handle
;
141 extern const char *__progname
;
143 static cs_error_t
sam_confdb_update_key (enum sam_confdb_key_t key
, const char *value
)
147 uint64_t hc_period
, last_hc
;
149 const char *ssvalue
[] = { [SAM_RECOVERY_POLICY_QUIT
] = "quit", [SAM_RECOVERY_POLICY_RESTART
] = "restart" };
152 case SAM_CONFDB_KEY_RECOVERY
:
153 svalue
= ssvalue
[SAM_RP_MASK (sam_internal_data
.recovery_policy
)];
155 if ((err
= confdb_key_create_typed (sam_internal_data
.confdb_handle
, sam_internal_data
.confdb_pid_handle
,
156 "recovery", svalue
, strlen ((const char *)svalue
), CONFDB_VALUETYPE_STRING
)) != CS_OK
) {
160 case SAM_CONFDB_KEY_HC_PERIOD
:
161 hc_period
= sam_internal_data
.time_interval
;
163 if ((err
= confdb_key_create_typed (sam_internal_data
.confdb_handle
, sam_internal_data
.confdb_pid_handle
,
164 "hc_period", &hc_period
, sizeof (uint64_t), CONFDB_VALUETYPE_UINT64
)) != CS_OK
) {
168 case SAM_CONFDB_KEY_LAST_HC
:
169 if (gettimeofday (&tv
, NULL
) == -1) {
172 last_hc
= ((uint64_t)tv
.tv_sec
* 1000) + ((uint64_t)tv
.tv_usec
/ 1000);
175 if ((err
= confdb_key_create_typed (sam_internal_data
.confdb_handle
, sam_internal_data
.confdb_pid_handle
,
176 "hc_last", &last_hc
, sizeof (uint64_t), CONFDB_VALUETYPE_UINT64
)) != CS_OK
) {
180 case SAM_CONFDB_KEY_STATE
:
182 if ((err
= confdb_key_create_typed (sam_internal_data
.confdb_handle
, sam_internal_data
.confdb_pid_handle
,
183 "state", svalue
, strlen ((const char *)svalue
), CONFDB_VALUETYPE_STRING
)) != CS_OK
) {
195 static cs_error_t
sam_confdb_destroy_pid_obj (void)
197 return (confdb_object_destroy (sam_internal_data
.confdb_handle
, sam_internal_data
.confdb_pid_handle
));
200 static cs_error_t
sam_confdb_register (void)
202 const char *obj_name
;
204 confdb_handle_t confdb_handle
;
205 hdb_handle_t resource_handle
, process_handle
, pid_handle
, obj_handle
;
206 hdb_handle_t
*res_handle
;
207 char tmp_obj
[PATH_MAX
];
210 if ((err
= confdb_initialize (&confdb_handle
, NULL
)) != CS_OK
) {
214 for (i
= 0; i
< 3; i
++) {
217 obj_name
= "resources";
218 obj_handle
= OBJECT_PARENT_HANDLE
;
219 res_handle
= &resource_handle
;
222 obj_name
= "process";
223 obj_handle
= resource_handle
;
224 res_handle
= &process_handle
;
227 if (snprintf (tmp_obj
, sizeof (tmp_obj
), "%s:%d", __progname
, getpid ()) >= sizeof (tmp_obj
)) {
228 snprintf (tmp_obj
, sizeof (tmp_obj
), "%d", getpid ());
232 obj_handle
= process_handle
;
233 res_handle
= &pid_handle
;
237 if ((err
= confdb_object_find_start (confdb_handle
, obj_handle
)) != CS_OK
) {
241 if ((err
= confdb_object_find (confdb_handle
, obj_handle
, obj_name
, strlen (obj_name
),
242 res_handle
)) != CS_OK
) {
243 if (err
== CONFDB_ERR_ACCESS
) {
245 * Try to create object
247 if ((err
= confdb_object_create (confdb_handle
, obj_handle
, obj_name
,
248 strlen (obj_name
), res_handle
)) != CS_OK
) {
255 if ((err
= confdb_object_find_destroy (confdb_handle
, obj_handle
)) != CS_OK
) {
261 sam_internal_data
.confdb_pid_handle
= pid_handle
;
262 sam_internal_data
.confdb_handle
= confdb_handle
;
264 if ((err
= sam_confdb_update_key (SAM_CONFDB_KEY_RECOVERY
, NULL
)) != CS_OK
) {
265 goto destroy_finalize_error
;
268 if ((err
= sam_confdb_update_key (SAM_CONFDB_KEY_HC_PERIOD
, NULL
)) != CS_OK
) {
269 goto destroy_finalize_error
;
274 destroy_finalize_error
:
275 sam_confdb_destroy_pid_obj ();
277 confdb_finalize (confdb_handle
);
281 static void quorum_notification_fn (
282 quorum_handle_t handle
,
285 uint32_t view_list_entries
,
288 sam_internal_data
.quorate
= quorate
;
291 cs_error_t
sam_initialize (
293 sam_recovery_policy_t recovery_policy
)
295 quorum_callbacks_t quorum_callbacks
;
298 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_NOT_INITIALIZED
) {
299 return (CS_ERR_BAD_HANDLE
);
302 if (SAM_RP_MASK (recovery_policy
) != SAM_RECOVERY_POLICY_QUIT
&&
303 SAM_RP_MASK (recovery_policy
) != SAM_RECOVERY_POLICY_RESTART
) {
304 return (CS_ERR_INVALID_PARAM
);
307 if (recovery_policy
& SAM_RECOVERY_POLICY_QUORUM
) {
311 quorum_callbacks
.quorum_notify_fn
= quorum_notification_fn
;
312 if ((err
= quorum_initialize (&sam_internal_data
.quorum_handle
, &quorum_callbacks
)) != CS_OK
) {
316 if ((err
= quorum_trackstart (sam_internal_data
.quorum_handle
, CS_TRACK_CHANGES
)) != CS_OK
) {
317 goto exit_error_quorum
;
320 if ((err
= quorum_fd_get (sam_internal_data
.quorum_handle
, &sam_internal_data
.quorum_fd
)) != CS_OK
) {
321 goto exit_error_quorum
;
325 * Dispatch initial quorate state
327 if ((err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ONE
)) != CS_OK
) {
328 goto exit_error_quorum
;
331 sam_internal_data
.recovery_policy
= recovery_policy
;
333 sam_internal_data
.time_interval
= time_interval
;
335 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_INITIALIZED
;
337 sam_internal_data
.warn_signal
= SIGTERM
;
339 sam_internal_data
.am_i_child
= 0;
341 sam_internal_data
.user_data
= NULL
;
342 sam_internal_data
.user_data_size
= 0;
343 sam_internal_data
.user_data_allocated
= 0;
345 pthread_mutex_init (&sam_internal_data
.lock
, NULL
);
350 quorum_finalize (sam_internal_data
.quorum_handle
);
356 * Wrapper on top of write(2) function. It handles EAGAIN and EINTR states and sends whole buffer if possible.
358 static size_t sam_safe_write (
364 ssize_t tmp_bytes_write
;
369 tmp_bytes_write
= write (d
, (const char *)buf
+ bytes_write
,
370 (nbyte
- bytes_write
> SSIZE_MAX
) ? SSIZE_MAX
: nbyte
- bytes_write
);
372 if (tmp_bytes_write
== -1) {
373 if (!(errno
== EAGAIN
|| errno
== EINTR
))
376 bytes_write
+= tmp_bytes_write
;
378 } while (bytes_write
!= nbyte
);
380 return (bytes_write
);
384 * Wrapper on top of read(2) function. It handles EAGAIN and EINTR states and reads whole buffer if possible.
386 static size_t sam_safe_read (
392 ssize_t tmp_bytes_read
;
397 tmp_bytes_read
= read (d
, (char *)buf
+ bytes_read
,
398 (nbyte
- bytes_read
> SSIZE_MAX
) ? SSIZE_MAX
: nbyte
- bytes_read
);
400 if (tmp_bytes_read
== -1) {
401 if (!(errno
== EAGAIN
|| errno
== EINTR
))
404 bytes_read
+= tmp_bytes_read
;
407 } while (bytes_read
!= nbyte
&& tmp_bytes_read
!= 0);
412 static cs_error_t
sam_read_reply (
418 if (sam_safe_read (sam_internal_data
.child_fd_in
, &reply
, sizeof (reply
)) != sizeof (reply
)) {
419 return (CS_ERR_LIBRARY
);
423 case SAM_REPLY_ERROR
:
425 * Read error and return that
427 if (sam_safe_read (sam_internal_data
.child_fd_in
, &err
, sizeof (err
)) != sizeof (err
)) {
428 return (CS_ERR_LIBRARY
);
439 return (CS_ERR_LIBRARY
);
446 cs_error_t
sam_data_getsize (size_t *size
)
449 return (CS_ERR_INVALID_PARAM
);
452 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
453 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
454 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
456 return (CS_ERR_BAD_HANDLE
);
459 pthread_mutex_lock (&sam_internal_data
.lock
);
461 *size
= sam_internal_data
.user_data_size
;
463 pthread_mutex_unlock (&sam_internal_data
.lock
);
468 cs_error_t
sam_data_restore (
477 return (CS_ERR_INVALID_PARAM
);
480 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
481 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
482 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
484 return (CS_ERR_BAD_HANDLE
);
487 pthread_mutex_lock (&sam_internal_data
.lock
);
489 if (sam_internal_data
.user_data_size
== 0) {
495 if (size
< sam_internal_data
.user_data_size
) {
496 err
= CS_ERR_INVALID_PARAM
;
501 memcpy (data
, sam_internal_data
.user_data
, sam_internal_data
.user_data_size
);
503 pthread_mutex_unlock (&sam_internal_data
.lock
);
508 pthread_mutex_unlock (&sam_internal_data
.lock
);
513 cs_error_t
sam_data_store (
521 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
522 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
523 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
525 return (CS_ERR_BAD_HANDLE
);
533 pthread_mutex_lock (&sam_internal_data
.lock
);
535 if (sam_internal_data
.am_i_child
) {
537 * We are child so we must send data to parent
539 command
= SAM_COMMAND_DATA_STORE
;
540 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
541 err
= CS_ERR_LIBRARY
;
546 if (sam_safe_write (sam_internal_data
.child_fd_out
, &size
, sizeof (size
)) != sizeof (size
)) {
547 err
= CS_ERR_LIBRARY
;
552 if (data
!= NULL
&& sam_safe_write (sam_internal_data
.child_fd_out
, data
, size
) != size
) {
553 err
= CS_ERR_LIBRARY
;
561 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
567 * We are parent or we received OK reply from parent -> do required action
570 free (sam_internal_data
.user_data
);
571 sam_internal_data
.user_data
= NULL
;
572 sam_internal_data
.user_data_allocated
= 0;
573 sam_internal_data
.user_data_size
= 0;
575 if (sam_internal_data
.user_data_allocated
< size
) {
576 if ((new_data
= realloc (sam_internal_data
.user_data
, size
)) == NULL
) {
577 err
= CS_ERR_NO_MEMORY
;
582 sam_internal_data
.user_data_allocated
= size
;
584 new_data
= sam_internal_data
.user_data
;
586 sam_internal_data
.user_data
= new_data
;
587 sam_internal_data
.user_data_size
= size
;
589 memcpy (sam_internal_data
.user_data
, data
, size
);
592 pthread_mutex_unlock (&sam_internal_data
.lock
);
597 pthread_mutex_unlock (&sam_internal_data
.lock
);
602 cs_error_t
sam_start (void)
606 sam_recovery_policy_t recpol
;
608 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
) {
609 return (CS_ERR_BAD_HANDLE
);
612 recpol
= sam_internal_data
.recovery_policy
;
614 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
|| recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
615 pthread_mutex_lock (&sam_internal_data
.lock
);
618 command
= SAM_COMMAND_START
;
620 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
621 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
|| recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
622 pthread_mutex_unlock (&sam_internal_data
.lock
);
625 return (CS_ERR_LIBRARY
);
628 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
|| recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
630 * Wait for parent reply
632 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
633 pthread_mutex_unlock (&sam_internal_data
.lock
);
638 pthread_mutex_unlock (&sam_internal_data
.lock
);
641 if (sam_internal_data
.hc_callback
)
642 if (sam_safe_write (sam_internal_data
.cb_wpipe_fd
, &command
, sizeof (command
)) != sizeof (command
))
643 return (CS_ERR_LIBRARY
);
645 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_STARTED
;
650 cs_error_t
sam_stop (void)
655 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
656 return (CS_ERR_BAD_HANDLE
);
659 command
= SAM_COMMAND_STOP
;
661 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CONFDB
) {
662 pthread_mutex_lock (&sam_internal_data
.lock
);
665 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
666 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CONFDB
) {
667 pthread_mutex_unlock (&sam_internal_data
.lock
);
670 return (CS_ERR_LIBRARY
);
673 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CONFDB
) {
675 * Wait for parent reply
677 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
678 pthread_mutex_unlock (&sam_internal_data
.lock
);
683 pthread_mutex_unlock (&sam_internal_data
.lock
);
686 if (sam_internal_data
.hc_callback
)
687 if (sam_safe_write (sam_internal_data
.cb_wpipe_fd
, &command
, sizeof (command
)) != sizeof (command
))
688 return (CS_ERR_LIBRARY
);
690 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_REGISTERED
;
695 cs_error_t
sam_hc_send (void)
699 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
700 return (CS_ERR_BAD_HANDLE
);
703 command
= SAM_COMMAND_HB
;
705 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
))
706 return (CS_ERR_LIBRARY
);
711 cs_error_t
sam_finalize (void)
715 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
716 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
717 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
718 return (CS_ERR_BAD_HANDLE
);
721 if (sam_internal_data
.internal_status
== SAM_INTERNAL_STATUS_STARTED
) {
727 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_FINALIZED
;
729 free (sam_internal_data
.user_data
);
735 cs_error_t
sam_mark_failed (void)
739 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
&&
740 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
) {
741 return (CS_ERR_BAD_HANDLE
);
744 if (!(sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CONFDB
)) {
745 return (CS_ERR_INVALID_PARAM
);
748 command
= SAM_COMMAND_MARK_FAILED
;
750 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
))
751 return (CS_ERR_LIBRARY
);
756 cs_error_t
sam_warn_signal_set (int warn_signal
)
761 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
&&
762 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
&&
763 sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_STARTED
) {
764 return (CS_ERR_BAD_HANDLE
);
767 pthread_mutex_lock (&sam_internal_data
.lock
);
769 if (sam_internal_data
.am_i_child
) {
771 * We are child so we must send data to parent
773 command
= SAM_COMMAND_WARN_SIGNAL_SET
;
774 if (sam_safe_write (sam_internal_data
.child_fd_out
, &command
, sizeof (command
)) != sizeof (command
)) {
775 err
= CS_ERR_LIBRARY
;
780 if (sam_safe_write (sam_internal_data
.child_fd_out
, &warn_signal
, sizeof (warn_signal
)) !=
781 sizeof (warn_signal
)) {
782 err
= CS_ERR_LIBRARY
;
790 if ((err
= sam_read_reply (sam_internal_data
.child_fd_in
)) != CS_OK
) {
796 * We are parent or we received OK reply from parent -> do required action
798 sam_internal_data
.warn_signal
= warn_signal
;
800 pthread_mutex_unlock (&sam_internal_data
.lock
);
805 pthread_mutex_unlock (&sam_internal_data
.lock
);
810 static cs_error_t
sam_parent_reply_send (
818 reply
= SAM_REPLY_OK
;
820 if (sam_safe_write (parent_fd_out
, &reply
, sizeof (reply
)) != sizeof (reply
)) {
821 err
= CS_ERR_LIBRARY
;
829 reply
= SAM_REPLY_ERROR
;
830 if (sam_safe_write (parent_fd_out
, &reply
, sizeof (reply
)) != sizeof (reply
)) {
831 return (CS_ERR_LIBRARY
);
833 if (sam_safe_write (parent_fd_out
, &err
, sizeof (err
)) != sizeof (err
)) {
834 return (CS_ERR_LIBRARY
);
841 static cs_error_t
sam_parent_warn_signal_set (
852 if (sam_safe_read (parent_fd_in
, &warn_signal
, sizeof (warn_signal
)) != sizeof (warn_signal
)) {
853 err
= CS_ERR_LIBRARY
;
857 err
= sam_warn_signal_set (warn_signal
);
863 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
866 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
869 static cs_error_t
sam_parent_wait_for_quorum (
874 struct pollfd pfds
[2];
877 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CONFDB
) {
878 if ((err
= sam_confdb_update_key (SAM_CONFDB_KEY_STATE
, SAM_CONFDB_S_Q_WAIT
)) != CS_OK
) {
884 * Update current quorum
886 if ((err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ALL
)) != CS_OK
) {
893 while (!sam_internal_data
.quorate
) {
894 pfds
[0].fd
= parent_fd_in
;
898 pfds
[1].fd
= sam_internal_data
.quorum_fd
;
899 pfds
[1].events
= POLLIN
;
902 poll_err
= poll (pfds
, 2, -1);
904 if (poll_err
== -1) {
907 * If it is EINTR, continue, otherwise QUIT
909 if (errno
!= EINTR
) {
910 err
= CS_ERR_LIBRARY
;
915 if (pfds
[0].revents
!= 0) {
916 if (pfds
[0].revents
== POLLERR
|| pfds
[0].revents
== POLLHUP
||pfds
[0].revents
== POLLNVAL
) {
924 if (pfds
[1].revents
!= 0) {
925 if ((err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ONE
)) != CS_OK
) {
931 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CONFDB
) {
932 if ((err
= sam_confdb_update_key (SAM_CONFDB_KEY_STATE
, SAM_CONFDB_S_STARTED
)) != CS_OK
) {
937 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
940 if (sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_CONFDB
) {
941 sam_confdb_update_key (SAM_CONFDB_KEY_STATE
, SAM_CONFDB_S_REGISTERED
);
944 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
947 static cs_error_t
sam_parent_confdb_state_set (
956 state_s
= SAM_CONFDB_S_STARTED
;
958 state_s
= SAM_CONFDB_S_REGISTERED
;
961 if ((err
= sam_confdb_update_key (SAM_CONFDB_KEY_STATE
, state_s
)) != CS_OK
) {
965 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
968 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
971 static cs_error_t
sam_parent_kill_child (
978 if (!sam_internal_data
.term_send
) {
980 * We didn't send warn_signal yet.
982 kill (child_pid
, sam_internal_data
.warn_signal
);
984 sam_internal_data
.term_send
= 1;
987 * We sent child warning. Now, we will not be so nice
989 kill (child_pid
, SIGKILL
);
990 *action
= SAM_PARENT_ACTION_RECOVERY
;
996 static cs_error_t
sam_parent_mark_child_failed (
1000 sam_recovery_policy_t recpol
;
1002 recpol
= sam_internal_data
.recovery_policy
;
1004 sam_internal_data
.term_send
= 1;
1005 sam_internal_data
.recovery_policy
= SAM_RECOVERY_POLICY_QUIT
|
1006 (SAM_RP_MASK_C (recpol
) ? SAM_RECOVERY_POLICY_CONFDB
: 0) |
1007 (SAM_RP_MASK_Q (recpol
) ? SAM_RECOVERY_POLICY_QUORUM
: 0);
1009 return (sam_parent_kill_child (action
, child_pid
));
1012 static cs_error_t
sam_parent_data_store (
1023 if (sam_safe_read (parent_fd_in
, &size
, sizeof (size
)) != sizeof (size
)) {
1024 err
= CS_ERR_LIBRARY
;
1029 user_data
= malloc (size
);
1030 if (user_data
== NULL
) {
1031 err
= CS_ERR_NO_MEMORY
;
1035 if (sam_safe_read (parent_fd_in
, user_data
, size
) != size
) {
1036 err
= CS_ERR_LIBRARY
;
1037 goto free_error_reply
;
1041 err
= sam_data_store (user_data
, size
);
1043 goto free_error_reply
;
1048 return (sam_parent_reply_send (CS_OK
, parent_fd_in
, parent_fd_out
));
1053 return (sam_parent_reply_send (err
, parent_fd_in
, parent_fd_out
));
1056 static enum sam_parent_action_t
sam_parent_handler (
1067 struct pollfd pfds
[2];
1070 sam_recovery_policy_t recpol
;
1074 action
= SAM_PARENT_ACTION_CONTINUE
;
1075 recpol
= sam_internal_data
.recovery_policy
;
1077 while (action
== SAM_PARENT_ACTION_CONTINUE
) {
1078 pfds
[0].fd
= parent_fd_in
;
1079 pfds
[0].events
= POLLIN
;
1080 pfds
[0].revents
= 0;
1083 if (status
== 1 && sam_internal_data
.time_interval
!= 0) {
1084 time_interval
= sam_internal_data
.time_interval
;
1089 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
) {
1090 pfds
[nfds
].fd
= sam_internal_data
.quorum_fd
;
1091 pfds
[nfds
].events
= POLLIN
;
1092 pfds
[nfds
].revents
= 0;
1096 poll_error
= poll (pfds
, nfds
, time_interval
);
1098 if (poll_error
== -1) {
1101 * If it is EINTR, continue, otherwise QUIT
1103 if (errno
!= EINTR
) {
1104 action
= SAM_PARENT_ACTION_ERROR
;
1108 if (poll_error
== 0) {
1110 * Time limit expires
1113 action
= SAM_PARENT_ACTION_QUIT
;
1115 sam_parent_kill_child (&action
, child_pid
);
1119 if (poll_error
> 0) {
1120 if (pfds
[0].revents
!= 0) {
1122 * We have EOF or command in pipe
1124 bytes_read
= sam_safe_read (parent_fd_in
, &command
, 1);
1126 if (bytes_read
== 0) {
1128 * Handle EOF -> Take recovery action or quit if sam_start wasn't called
1131 action
= SAM_PARENT_ACTION_QUIT
;
1133 action
= SAM_PARENT_ACTION_RECOVERY
;
1138 if (bytes_read
== -1) {
1139 action
= SAM_PARENT_ACTION_ERROR
;
1143 if (recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
1144 sam_confdb_update_key (SAM_CONFDB_KEY_LAST_HC
, NULL
);
1148 * We have read command
1151 case SAM_COMMAND_START
:
1156 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
) {
1157 if (sam_parent_wait_for_quorum (parent_fd_in
,
1158 parent_fd_out
) != CS_OK
) {
1163 if (recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
1164 if (sam_parent_confdb_state_set (parent_fd_in
,
1165 parent_fd_out
, 1) != CS_OK
) {
1173 case SAM_COMMAND_STOP
:
1178 if (recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
1179 if (sam_parent_confdb_state_set (parent_fd_in
,
1180 parent_fd_out
, 0) != CS_OK
) {
1188 case SAM_COMMAND_DATA_STORE
:
1189 sam_parent_data_store (parent_fd_in
, parent_fd_out
);
1191 case SAM_COMMAND_WARN_SIGNAL_SET
:
1192 sam_parent_warn_signal_set (parent_fd_in
, parent_fd_out
);
1194 case SAM_COMMAND_MARK_FAILED
:
1196 sam_parent_mark_child_failed (&action
, child_pid
);
1199 } /* if (pfds[0].revents != 0) */
1201 if ((sam_internal_data
.recovery_policy
& SAM_RECOVERY_POLICY_QUORUM
) &&
1202 pfds
[1].revents
!= 0) {
1204 * Handle quorum change
1206 err
= quorum_dispatch (sam_internal_data
.quorum_handle
, CS_DISPATCH_ALL
);
1209 (!sam_internal_data
.quorate
|| (err
!= CS_ERR_TRY_AGAIN
&& err
!= CS_OK
))) {
1210 sam_parent_kill_child (&action
, child_pid
);
1213 } /* select_error > 0 */
1214 } /* action == SAM_PARENT_ACTION_CONTINUE */
1220 cs_error_t
sam_register (
1221 unsigned int *instance_id
)
1226 int pipe_fd_out
[2], pipe_fd_in
[2];
1227 enum sam_parent_action_t action
, old_action
;
1229 sam_recovery_policy_t recpol
;
1231 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_INITIALIZED
) {
1232 return (CS_ERR_BAD_HANDLE
);
1235 recpol
= sam_internal_data
.recovery_policy
;
1237 if (recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
1241 if ((error
= sam_confdb_register ()) != CS_OK
) {
1249 if ((pipe_error
= pipe (pipe_fd_out
)) != 0) {
1250 error
= CS_ERR_LIBRARY
;
1254 if ((pipe_error
= pipe (pipe_fd_in
)) != 0) {
1255 close (pipe_fd_out
[0]);
1256 close (pipe_fd_out
[1]);
1258 error
= CS_ERR_LIBRARY
;
1262 if (recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
1263 if ((error
= sam_confdb_update_key (SAM_CONFDB_KEY_STATE
, SAM_CONFDB_S_REGISTERED
)) != CS_OK
) {
1268 sam_internal_data
.instance_id
++;
1270 sam_internal_data
.term_send
= 0;
1278 sam_internal_data
.instance_id
--;
1280 error
= CS_ERR_LIBRARY
;
1288 close (pipe_fd_out
[0]);
1289 close (pipe_fd_in
[1]);
1291 sam_internal_data
.child_fd_out
= pipe_fd_out
[1];
1292 sam_internal_data
.child_fd_in
= pipe_fd_in
[0];
1295 *instance_id
= sam_internal_data
.instance_id
;
1297 sam_internal_data
.am_i_child
= 1;
1298 sam_internal_data
.internal_status
= SAM_INTERNAL_STATUS_REGISTERED
;
1300 pthread_mutex_init (&sam_internal_data
.lock
, NULL
);
1307 close (pipe_fd_out
[1]);
1308 close (pipe_fd_in
[0]);
1310 action
= sam_parent_handler (pipe_fd_out
[0], pipe_fd_in
[1], pid
);
1312 close (pipe_fd_out
[0]);
1313 close (pipe_fd_in
[1]);
1315 if (action
== SAM_PARENT_ACTION_ERROR
) {
1316 error
= CS_ERR_LIBRARY
;
1321 * We really don't like zombies
1323 while (waitpid (pid
, &child_status
, 0) == -1 && errno
== EINTR
)
1326 old_action
= action
;
1328 if (action
== SAM_PARENT_ACTION_RECOVERY
) {
1329 if (SAM_RP_MASK (sam_internal_data
.recovery_policy
) == SAM_RECOVERY_POLICY_QUIT
)
1330 action
= SAM_PARENT_ACTION_QUIT
;
1334 if (action
== SAM_PARENT_ACTION_QUIT
) {
1335 if (recpol
& SAM_RECOVERY_POLICY_QUORUM
) {
1336 quorum_finalize (sam_internal_data
.quorum_handle
);
1339 if (recpol
& SAM_RECOVERY_POLICY_CONFDB
) {
1340 if (old_action
== SAM_PARENT_ACTION_RECOVERY
) {
1344 sam_confdb_update_key (SAM_CONFDB_KEY_STATE
, SAM_CONFDB_S_FAILED
);
1346 sam_confdb_destroy_pid_obj ();
1350 exit (WEXITSTATUS (child_status
));
1361 static void *hc_callback_thread (void *unused_param
)
1365 ssize_t bytes_readed
;
1367 int time_interval
, tmp_time_interval
;
1374 time_interval
= sam_internal_data
.time_interval
>> 2;
1377 pfds
.fd
= sam_internal_data
.cb_rpipe_fd
;
1378 pfds
.events
= POLLIN
;
1382 tmp_time_interval
= time_interval
;
1384 tmp_time_interval
= -1;
1387 poll_error
= poll (&pfds
, 1, tmp_time_interval
);
1389 if (poll_error
== 0) {
1390 if (sam_hc_send () == CS_OK
) {
1395 if (sam_internal_data
.hc_callback () != 0) {
1403 if (poll_error
> 0) {
1404 bytes_readed
= sam_safe_read (sam_internal_data
.cb_rpipe_fd
, &command
, 1);
1406 if (bytes_readed
> 0) {
1407 if (status
== 0 && command
== SAM_COMMAND_START
)
1410 if (status
== 1 && command
== SAM_COMMAND_STOP
)
1418 * This makes compiler happy, it's same as return (NULL);
1420 return (unused_param
);
1423 cs_error_t
sam_hc_callback_register (sam_hc_callback_t cb
)
1425 cs_error_t error
= CS_OK
;
1426 pthread_attr_t thread_attr
;
1430 if (sam_internal_data
.internal_status
!= SAM_INTERNAL_STATUS_REGISTERED
) {
1431 return (CS_ERR_BAD_HANDLE
);
1434 if (sam_internal_data
.time_interval
== 0) {
1435 return (CS_ERR_INVALID_PARAM
);
1438 if (sam_internal_data
.cb_registered
) {
1439 sam_internal_data
.hc_callback
= cb
;
1445 * We know, this is first registration
1449 return (CS_ERR_INVALID_PARAM
);
1452 pipe_error
= pipe (pipe_fd
);
1454 if (pipe_error
!= 0) {
1456 * Pipe creation error
1458 error
= CS_ERR_LIBRARY
;
1462 sam_internal_data
.cb_rpipe_fd
= pipe_fd
[0];
1463 sam_internal_data
.cb_wpipe_fd
= pipe_fd
[1];
1466 * Create thread attributes
1468 error
= pthread_attr_init (&thread_attr
);
1470 error
= CS_ERR_LIBRARY
;
1471 goto error_close_fd_exit
;
1475 pthread_attr_setdetachstate (&thread_attr
, PTHREAD_CREATE_DETACHED
);
1476 pthread_attr_setstacksize (&thread_attr
, 32768);
1481 error
= pthread_create (&sam_internal_data
.cb_thread
, &thread_attr
, hc_callback_thread
, NULL
);
1484 error
= CS_ERR_LIBRARY
;
1485 goto error_attr_destroy_exit
;
1491 pthread_attr_destroy(&thread_attr
);
1493 sam_internal_data
.cb_registered
= 1;
1494 sam_internal_data
.hc_callback
= cb
;
1498 error_attr_destroy_exit
:
1499 pthread_attr_destroy(&thread_attr
);
1500 error_close_fd_exit
:
1501 sam_internal_data
.cb_rpipe_fd
= sam_internal_data
.cb_wpipe_fd
= 0;