]> git.proxmox.com Git - mirror_corosync.git/blob - lib/sam.c
SAM Confdb integration
[mirror_corosync.git] / lib / sam.c
1 /*
2 * Copyright (c) 2009-2010 Red Hat, Inc.
3 *
4 * All rights reserved.
5 *
6 * Author: Jan Friesse (jfriesse@redhat.com)
7 *
8 * This software licensed under BSD license, the text of which follows:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
12 *
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the Red Hat, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35 /*
36 * Provides a SAM API
37 */
38
39 #include <config.h>
40
41 #include <limits.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <unistd.h>
45 #include <sys/time.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <errno.h>
49
50 #include <corosync/corotypes.h>
51 #include <corosync/coroipc_types.h>
52 #include <corosync/coroipcc.h>
53 #include <corosync/corodefs.h>
54 #include <corosync/confdb.h>
55 #include <corosync/hdb.h>
56 #include <corosync/quorum.h>
57
58 #include <corosync/sam.h>
59
60 #include "util.h"
61
62 #include <stdio.h>
63 #include <sys/wait.h>
64 #include <signal.h>
65
66 #define SAM_CONFDB_S_FAILED "failed"
67 #define SAM_CONFDB_S_REGISTERED "registered"
68 #define SAM_CONFDB_S_STARTED "started"
69 #define SAM_CONFDB_S_Q_WAIT "waiting for quorum"
70
71 #define SAM_RP_MASK_Q(pol) (pol & (~SAM_RECOVERY_POLICY_QUORUM))
72 #define SAM_RP_MASK_C(pol) (pol & (~SAM_RECOVERY_POLICY_CONFDB))
73 #define SAM_RP_MASK(pol) (pol & (~(SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_CONFDB)))
74
75 enum sam_internal_status_t {
76 SAM_INTERNAL_STATUS_NOT_INITIALIZED = 0,
77 SAM_INTERNAL_STATUS_INITIALIZED,
78 SAM_INTERNAL_STATUS_REGISTERED,
79 SAM_INTERNAL_STATUS_STARTED,
80 SAM_INTERNAL_STATUS_FINALIZED
81 };
82
83 enum sam_command_t {
84 SAM_COMMAND_START,
85 SAM_COMMAND_STOP,
86 SAM_COMMAND_HB,
87 SAM_COMMAND_DATA_STORE,
88 SAM_COMMAND_WARN_SIGNAL_SET,
89 SAM_COMMAND_MARK_FAILED,
90 };
91
92 enum sam_reply_t {
93 SAM_REPLY_OK,
94 SAM_REPLY_ERROR,
95 };
96
97 enum sam_parent_action_t {
98 SAM_PARENT_ACTION_ERROR,
99 SAM_PARENT_ACTION_RECOVERY,
100 SAM_PARENT_ACTION_QUIT,
101 SAM_PARENT_ACTION_CONTINUE
102 };
103
104 enum sam_confdb_key_t {
105 SAM_CONFDB_KEY_RECOVERY,
106 SAM_CONFDB_KEY_HC_PERIOD,
107 SAM_CONFDB_KEY_LAST_HC,
108 SAM_CONFDB_KEY_STATE,
109 };
110
111 static struct {
112 int time_interval;
113 sam_recovery_policy_t recovery_policy;
114 enum sam_internal_status_t internal_status;
115 unsigned int instance_id;
116 int child_fd_out;
117 int child_fd_in;
118 int term_send;
119 int warn_signal;
120 int am_i_child;
121
122 sam_hc_callback_t hc_callback;
123 pthread_t cb_thread;
124 int cb_rpipe_fd, cb_wpipe_fd;
125 int cb_registered;
126
127 void *user_data;
128 size_t user_data_size;
129 size_t user_data_allocated;
130
131 pthread_mutex_t lock;
132
133 quorum_handle_t quorum_handle;
134 uint32_t quorate;
135 int quorum_fd;
136
137 confdb_handle_t confdb_handle;
138 hdb_handle_t confdb_pid_handle;
139 } sam_internal_data;
140
141 extern const char *__progname;
142
143 static cs_error_t sam_confdb_update_key (enum sam_confdb_key_t key, const char *value)
144 {
145 cs_error_t err;
146 const char *svalue;
147 uint64_t hc_period, last_hc;
148 struct timeval tv;
149 const char *ssvalue[] = { [SAM_RECOVERY_POLICY_QUIT] = "quit", [SAM_RECOVERY_POLICY_RESTART] = "restart" };
150
151 switch (key) {
152 case SAM_CONFDB_KEY_RECOVERY:
153 svalue = ssvalue[SAM_RP_MASK (sam_internal_data.recovery_policy)];
154
155 if ((err = confdb_key_create_typed (sam_internal_data.confdb_handle, sam_internal_data.confdb_pid_handle,
156 "recovery", svalue, strlen ((const char *)svalue), CONFDB_VALUETYPE_STRING)) != CS_OK) {
157 goto exit_error;
158 }
159 break;
160 case SAM_CONFDB_KEY_HC_PERIOD:
161 hc_period = sam_internal_data.time_interval;
162
163 if ((err = confdb_key_create_typed (sam_internal_data.confdb_handle, sam_internal_data.confdb_pid_handle,
164 "hc_period", &hc_period, sizeof (uint64_t), CONFDB_VALUETYPE_UINT64)) != CS_OK) {
165 goto exit_error;
166 }
167 break;
168 case SAM_CONFDB_KEY_LAST_HC:
169 if (gettimeofday (&tv, NULL) == -1) {
170 last_hc = 0;
171 } else {
172 last_hc = ((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000);
173 }
174
175 if ((err = confdb_key_create_typed (sam_internal_data.confdb_handle, sam_internal_data.confdb_pid_handle,
176 "hc_last", &last_hc, sizeof (uint64_t), CONFDB_VALUETYPE_UINT64)) != CS_OK) {
177 goto exit_error;
178 }
179 break;
180 case SAM_CONFDB_KEY_STATE:
181 svalue = value;
182 if ((err = confdb_key_create_typed (sam_internal_data.confdb_handle, sam_internal_data.confdb_pid_handle,
183 "state", svalue, strlen ((const char *)svalue), CONFDB_VALUETYPE_STRING)) != CS_OK) {
184 goto exit_error;
185 }
186 break;
187 }
188
189 return (CS_OK);
190
191 exit_error:
192 return (err);
193 }
194
195 static cs_error_t sam_confdb_destroy_pid_obj (void)
196 {
197 return (confdb_object_destroy (sam_internal_data.confdb_handle, sam_internal_data.confdb_pid_handle));
198 }
199
200 static cs_error_t sam_confdb_register (void)
201 {
202 const char *obj_name;
203 cs_error_t err;
204 confdb_handle_t confdb_handle;
205 hdb_handle_t resource_handle, process_handle, pid_handle, obj_handle;
206 hdb_handle_t *res_handle;
207 char tmp_obj[PATH_MAX];
208 int i;
209
210 if ((err = confdb_initialize (&confdb_handle, NULL)) != CS_OK) {
211 return (err);
212 }
213
214 for (i = 0; i < 3; i++) {
215 switch (i) {
216 case 0:
217 obj_name = "resources";
218 obj_handle = OBJECT_PARENT_HANDLE;
219 res_handle = &resource_handle;
220 break;
221 case 1:
222 obj_name = "process";
223 obj_handle = resource_handle;
224 res_handle = &process_handle;
225 break;
226 case 2:
227 if (snprintf (tmp_obj, sizeof (tmp_obj), "%s:%d", __progname, getpid ()) >= sizeof (tmp_obj)) {
228 snprintf (tmp_obj, sizeof (tmp_obj), "%d", getpid ());
229 }
230
231 obj_name = tmp_obj;
232 obj_handle = process_handle;
233 res_handle = &pid_handle;
234 break;
235 }
236
237 if ((err = confdb_object_find_start (confdb_handle, obj_handle)) != CS_OK) {
238 goto finalize_error;
239 }
240
241 if ((err = confdb_object_find (confdb_handle, obj_handle, obj_name, strlen (obj_name),
242 res_handle)) != CS_OK) {
243 if (err == CONFDB_ERR_ACCESS) {
244 /*
245 * Try to create object
246 */
247 if ((err = confdb_object_create (confdb_handle, obj_handle, obj_name,
248 strlen (obj_name), res_handle)) != CS_OK) {
249 goto finalize_error;
250 }
251 } else {
252 goto finalize_error;
253 }
254 } else {
255 if ((err = confdb_object_find_destroy (confdb_handle, obj_handle)) != CS_OK) {
256 goto finalize_error;
257 }
258 }
259 }
260
261 sam_internal_data.confdb_pid_handle = pid_handle;
262 sam_internal_data.confdb_handle = confdb_handle;
263
264 if ((err = sam_confdb_update_key (SAM_CONFDB_KEY_RECOVERY, NULL)) != CS_OK) {
265 goto destroy_finalize_error;
266 }
267
268 if ((err = sam_confdb_update_key (SAM_CONFDB_KEY_HC_PERIOD, NULL)) != CS_OK) {
269 goto destroy_finalize_error;
270 }
271
272 return (CS_OK);
273
274 destroy_finalize_error:
275 sam_confdb_destroy_pid_obj ();
276 finalize_error:
277 confdb_finalize (confdb_handle);
278 return (err);
279 }
280
281 static void quorum_notification_fn (
282 quorum_handle_t handle,
283 uint32_t quorate,
284 uint64_t ring_id,
285 uint32_t view_list_entries,
286 uint32_t *view_list)
287 {
288 sam_internal_data.quorate = quorate;
289 }
290
291 cs_error_t sam_initialize (
292 int time_interval,
293 sam_recovery_policy_t recovery_policy)
294 {
295 quorum_callbacks_t quorum_callbacks;
296 cs_error_t err;
297
298 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_NOT_INITIALIZED) {
299 return (CS_ERR_BAD_HANDLE);
300 }
301
302 if (SAM_RP_MASK (recovery_policy) != SAM_RECOVERY_POLICY_QUIT &&
303 SAM_RP_MASK (recovery_policy) != SAM_RECOVERY_POLICY_RESTART) {
304 return (CS_ERR_INVALID_PARAM);
305 }
306
307 if (recovery_policy & SAM_RECOVERY_POLICY_QUORUM) {
308 /*
309 * Initialize quorum
310 */
311 quorum_callbacks.quorum_notify_fn = quorum_notification_fn;
312 if ((err = quorum_initialize (&sam_internal_data.quorum_handle, &quorum_callbacks)) != CS_OK) {
313 goto exit_error;
314 }
315
316 if ((err = quorum_trackstart (sam_internal_data.quorum_handle, CS_TRACK_CHANGES)) != CS_OK) {
317 goto exit_error_quorum;
318 }
319
320 if ((err = quorum_fd_get (sam_internal_data.quorum_handle, &sam_internal_data.quorum_fd)) != CS_OK) {
321 goto exit_error_quorum;
322 }
323
324 /*
325 * Dispatch initial quorate state
326 */
327 if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) {
328 goto exit_error_quorum;
329 }
330 }
331 sam_internal_data.recovery_policy = recovery_policy;
332
333 sam_internal_data.time_interval = time_interval;
334
335 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_INITIALIZED;
336
337 sam_internal_data.warn_signal = SIGTERM;
338
339 sam_internal_data.am_i_child = 0;
340
341 sam_internal_data.user_data = NULL;
342 sam_internal_data.user_data_size = 0;
343 sam_internal_data.user_data_allocated = 0;
344
345 pthread_mutex_init (&sam_internal_data.lock, NULL);
346
347 return (CS_OK);
348
349 exit_error_quorum:
350 quorum_finalize (sam_internal_data.quorum_handle);
351 exit_error:
352 return (err);
353 }
354
355 /*
356 * Wrapper on top of write(2) function. It handles EAGAIN and EINTR states and sends whole buffer if possible.
357 */
358 static size_t sam_safe_write (
359 int d,
360 const void *buf,
361 size_t nbyte)
362 {
363 ssize_t bytes_write;
364 ssize_t tmp_bytes_write;
365
366 bytes_write = 0;
367
368 do {
369 tmp_bytes_write = write (d, (const char *)buf + bytes_write,
370 (nbyte - bytes_write > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_write);
371
372 if (tmp_bytes_write == -1) {
373 if (!(errno == EAGAIN || errno == EINTR))
374 return -1;
375 } else {
376 bytes_write += tmp_bytes_write;
377 }
378 } while (bytes_write != nbyte);
379
380 return (bytes_write);
381 }
382
383 /*
384 * Wrapper on top of read(2) function. It handles EAGAIN and EINTR states and reads whole buffer if possible.
385 */
386 static size_t sam_safe_read (
387 int d,
388 void *buf,
389 size_t nbyte)
390 {
391 ssize_t bytes_read;
392 ssize_t tmp_bytes_read;
393
394 bytes_read = 0;
395
396 do {
397 tmp_bytes_read = read (d, (char *)buf + bytes_read,
398 (nbyte - bytes_read > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_read);
399
400 if (tmp_bytes_read == -1) {
401 if (!(errno == EAGAIN || errno == EINTR))
402 return -1;
403 } else {
404 bytes_read += tmp_bytes_read;
405 }
406
407 } while (bytes_read != nbyte && tmp_bytes_read != 0);
408
409 return (bytes_read);
410 }
411
412 static cs_error_t sam_read_reply (
413 int child_fd_in)
414 {
415 char reply;
416 cs_error_t err;
417
418 if (sam_safe_read (sam_internal_data.child_fd_in, &reply, sizeof (reply)) != sizeof (reply)) {
419 return (CS_ERR_LIBRARY);
420 }
421
422 switch (reply) {
423 case SAM_REPLY_ERROR:
424 /*
425 * Read error and return that
426 */
427 if (sam_safe_read (sam_internal_data.child_fd_in, &err, sizeof (err)) != sizeof (err)) {
428 return (CS_ERR_LIBRARY);
429 }
430
431 return (err);
432 break;
433 case SAM_REPLY_OK:
434 /*
435 * Everything correct
436 */
437 break;
438 default:
439 return (CS_ERR_LIBRARY);
440 break;
441 }
442
443 return (CS_OK);
444 }
445
446 cs_error_t sam_data_getsize (size_t *size)
447 {
448 if (size == NULL) {
449 return (CS_ERR_INVALID_PARAM);
450 }
451
452 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
453 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
454 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
455
456 return (CS_ERR_BAD_HANDLE);
457 }
458
459 pthread_mutex_lock (&sam_internal_data.lock);
460
461 *size = sam_internal_data.user_data_size;
462
463 pthread_mutex_unlock (&sam_internal_data.lock);
464
465 return (CS_OK);
466 }
467
468 cs_error_t sam_data_restore (
469 void *data,
470 size_t size)
471 {
472 cs_error_t err;
473
474 err = CS_OK;
475
476 if (data == NULL) {
477 return (CS_ERR_INVALID_PARAM);
478 }
479
480 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
481 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
482 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
483
484 return (CS_ERR_BAD_HANDLE);
485 }
486
487 pthread_mutex_lock (&sam_internal_data.lock);
488
489 if (sam_internal_data.user_data_size == 0) {
490 err = CS_OK;
491
492 goto error_unlock;
493 }
494
495 if (size < sam_internal_data.user_data_size) {
496 err = CS_ERR_INVALID_PARAM;
497
498 goto error_unlock;
499 }
500
501 memcpy (data, sam_internal_data.user_data, sam_internal_data.user_data_size);
502
503 pthread_mutex_unlock (&sam_internal_data.lock);
504
505 return (CS_OK);
506
507 error_unlock:
508 pthread_mutex_unlock (&sam_internal_data.lock);
509
510 return (err);
511 }
512
513 cs_error_t sam_data_store (
514 const void *data,
515 size_t size)
516 {
517 cs_error_t err;
518 char command;
519 char *new_data;
520
521 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
522 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
523 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
524
525 return (CS_ERR_BAD_HANDLE);
526 }
527
528
529 if (data == NULL) {
530 size = 0;
531 }
532
533 pthread_mutex_lock (&sam_internal_data.lock);
534
535 if (sam_internal_data.am_i_child) {
536 /*
537 * We are child so we must send data to parent
538 */
539 command = SAM_COMMAND_DATA_STORE;
540 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
541 err = CS_ERR_LIBRARY;
542
543 goto error_unlock;
544 }
545
546 if (sam_safe_write (sam_internal_data.child_fd_out, &size, sizeof (size)) != sizeof (size)) {
547 err = CS_ERR_LIBRARY;
548
549 goto error_unlock;
550 }
551
552 if (data != NULL && sam_safe_write (sam_internal_data.child_fd_out, data, size) != size) {
553 err = CS_ERR_LIBRARY;
554
555 goto error_unlock;
556 }
557
558 /*
559 * And wait for reply
560 */
561 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
562 goto error_unlock;
563 }
564 }
565
566 /*
567 * We are parent or we received OK reply from parent -> do required action
568 */
569 if (data == NULL) {
570 free (sam_internal_data.user_data);
571 sam_internal_data.user_data = NULL;
572 sam_internal_data.user_data_allocated = 0;
573 sam_internal_data.user_data_size = 0;
574 } else {
575 if (sam_internal_data.user_data_allocated < size) {
576 if ((new_data = realloc (sam_internal_data.user_data, size)) == NULL) {
577 err = CS_ERR_NO_MEMORY;
578
579 goto error_unlock;
580 }
581
582 sam_internal_data.user_data_allocated = size;
583 } else {
584 new_data = sam_internal_data.user_data;
585 }
586 sam_internal_data.user_data = new_data;
587 sam_internal_data.user_data_size = size;
588
589 memcpy (sam_internal_data.user_data, data, size);
590 }
591
592 pthread_mutex_unlock (&sam_internal_data.lock);
593
594 return (CS_OK);
595
596 error_unlock:
597 pthread_mutex_unlock (&sam_internal_data.lock);
598
599 return (err);
600 }
601
602 cs_error_t sam_start (void)
603 {
604 char command;
605 cs_error_t err;
606 sam_recovery_policy_t recpol;
607
608 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) {
609 return (CS_ERR_BAD_HANDLE);
610 }
611
612 recpol = sam_internal_data.recovery_policy;
613
614 if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CONFDB) {
615 pthread_mutex_lock (&sam_internal_data.lock);
616 }
617
618 command = SAM_COMMAND_START;
619
620 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
621 if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CONFDB) {
622 pthread_mutex_unlock (&sam_internal_data.lock);
623 }
624
625 return (CS_ERR_LIBRARY);
626 }
627
628 if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CONFDB) {
629 /*
630 * Wait for parent reply
631 */
632 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
633 pthread_mutex_unlock (&sam_internal_data.lock);
634
635 return (err);
636 }
637
638 pthread_mutex_unlock (&sam_internal_data.lock);
639 }
640
641 if (sam_internal_data.hc_callback)
642 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) != sizeof (command))
643 return (CS_ERR_LIBRARY);
644
645 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_STARTED;
646
647 return (CS_OK);
648 }
649
650 cs_error_t sam_stop (void)
651 {
652 char command;
653 cs_error_t err;
654
655 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
656 return (CS_ERR_BAD_HANDLE);
657 }
658
659 command = SAM_COMMAND_STOP;
660
661 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CONFDB) {
662 pthread_mutex_lock (&sam_internal_data.lock);
663 }
664
665 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
666 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CONFDB) {
667 pthread_mutex_unlock (&sam_internal_data.lock);
668 }
669
670 return (CS_ERR_LIBRARY);
671 }
672
673 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CONFDB) {
674 /*
675 * Wait for parent reply
676 */
677 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
678 pthread_mutex_unlock (&sam_internal_data.lock);
679
680 return (err);
681 }
682
683 pthread_mutex_unlock (&sam_internal_data.lock);
684 }
685
686 if (sam_internal_data.hc_callback)
687 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) != sizeof (command))
688 return (CS_ERR_LIBRARY);
689
690 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_REGISTERED;
691
692 return (CS_OK);
693 }
694
695 cs_error_t sam_hc_send (void)
696 {
697 char command;
698
699 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
700 return (CS_ERR_BAD_HANDLE);
701 }
702
703 command = SAM_COMMAND_HB;
704
705 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command))
706 return (CS_ERR_LIBRARY);
707
708 return (CS_OK);
709 }
710
711 cs_error_t sam_finalize (void)
712 {
713 cs_error_t error;
714
715 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
716 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
717 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
718 return (CS_ERR_BAD_HANDLE);
719 }
720
721 if (sam_internal_data.internal_status == SAM_INTERNAL_STATUS_STARTED) {
722 error = sam_stop ();
723 if (error != CS_OK)
724 goto exit_error;
725 }
726
727 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_FINALIZED;
728
729 free (sam_internal_data.user_data);
730
731 exit_error:
732 return (CS_OK);
733 }
734
735 cs_error_t sam_mark_failed (void)
736 {
737 char command;
738
739 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED &&
740 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) {
741 return (CS_ERR_BAD_HANDLE);
742 }
743
744 if (!(sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CONFDB)) {
745 return (CS_ERR_INVALID_PARAM);
746 }
747
748 command = SAM_COMMAND_MARK_FAILED;
749
750 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command))
751 return (CS_ERR_LIBRARY);
752
753 return (CS_OK);
754 }
755
756 cs_error_t sam_warn_signal_set (int warn_signal)
757 {
758 char command;
759 cs_error_t err;
760
761 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED &&
762 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED &&
763 sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) {
764 return (CS_ERR_BAD_HANDLE);
765 }
766
767 pthread_mutex_lock (&sam_internal_data.lock);
768
769 if (sam_internal_data.am_i_child) {
770 /*
771 * We are child so we must send data to parent
772 */
773 command = SAM_COMMAND_WARN_SIGNAL_SET;
774 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) {
775 err = CS_ERR_LIBRARY;
776
777 goto error_unlock;
778 }
779
780 if (sam_safe_write (sam_internal_data.child_fd_out, &warn_signal, sizeof (warn_signal)) !=
781 sizeof (warn_signal)) {
782 err = CS_ERR_LIBRARY;
783
784 goto error_unlock;
785 }
786
787 /*
788 * And wait for reply
789 */
790 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
791 goto error_unlock;
792 }
793 }
794
795 /*
796 * We are parent or we received OK reply from parent -> do required action
797 */
798 sam_internal_data.warn_signal = warn_signal;
799
800 pthread_mutex_unlock (&sam_internal_data.lock);
801
802 return (CS_OK);
803
804 error_unlock:
805 pthread_mutex_unlock (&sam_internal_data.lock);
806
807 return (err);
808 }
809
810 static cs_error_t sam_parent_reply_send (
811 cs_error_t err,
812 int parent_fd_in,
813 int parent_fd_out)
814 {
815 char reply;
816
817 if (err == CS_OK) {
818 reply = SAM_REPLY_OK;
819
820 if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) {
821 err = CS_ERR_LIBRARY;
822 goto error_reply;
823 }
824
825 return (CS_OK);
826 }
827
828 error_reply:
829 reply = SAM_REPLY_ERROR;
830 if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) {
831 return (CS_ERR_LIBRARY);
832 }
833 if (sam_safe_write (parent_fd_out, &err, sizeof (err)) != sizeof (err)) {
834 return (CS_ERR_LIBRARY);
835 }
836
837 return (err);
838 }
839
840
841 static cs_error_t sam_parent_warn_signal_set (
842 int parent_fd_in,
843 int parent_fd_out)
844 {
845 char *user_data;
846 int warn_signal;
847 cs_error_t err;
848
849 err = CS_OK;
850 user_data = NULL;
851
852 if (sam_safe_read (parent_fd_in, &warn_signal, sizeof (warn_signal)) != sizeof (warn_signal)) {
853 err = CS_ERR_LIBRARY;
854 goto error_reply;
855 }
856
857 err = sam_warn_signal_set (warn_signal);
858 if (err != CS_OK) {
859 goto error_reply;
860 }
861
862
863 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
864
865 error_reply:
866 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
867 }
868
869 static cs_error_t sam_parent_wait_for_quorum (
870 int parent_fd_in,
871 int parent_fd_out)
872 {
873 cs_error_t err;
874 struct pollfd pfds[2];
875 int poll_err;
876
877 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CONFDB) {
878 if ((err = sam_confdb_update_key (SAM_CONFDB_KEY_STATE, SAM_CONFDB_S_Q_WAIT)) != CS_OK) {
879 goto error_reply;
880 }
881 }
882
883 /*
884 * Update current quorum
885 */
886 if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL)) != CS_OK) {
887 goto error_reply;
888 }
889
890 /*
891 * Wait for quorum
892 */
893 while (!sam_internal_data.quorate) {
894 pfds[0].fd = parent_fd_in;
895 pfds[0].events = 0;
896 pfds[0].revents = 0;
897
898 pfds[1].fd = sam_internal_data.quorum_fd;
899 pfds[1].events = POLLIN;
900 pfds[1].revents = 0;
901
902 poll_err = poll (pfds, 2, -1);
903
904 if (poll_err == -1) {
905 /*
906 * Error in poll
907 * If it is EINTR, continue, otherwise QUIT
908 */
909 if (errno != EINTR) {
910 err = CS_ERR_LIBRARY;
911 goto error_reply;
912 }
913 }
914
915 if (pfds[0].revents != 0) {
916 if (pfds[0].revents == POLLERR || pfds[0].revents == POLLHUP ||pfds[0].revents == POLLNVAL) {
917 /*
918 * Child has exited
919 */
920 return (CS_OK);
921 }
922 }
923
924 if (pfds[1].revents != 0) {
925 if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) {
926 goto error_reply;
927 }
928 }
929 }
930
931 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CONFDB) {
932 if ((err = sam_confdb_update_key (SAM_CONFDB_KEY_STATE, SAM_CONFDB_S_STARTED)) != CS_OK) {
933 goto error_reply;
934 }
935 }
936
937 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
938
939 error_reply:
940 if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CONFDB) {
941 sam_confdb_update_key (SAM_CONFDB_KEY_STATE, SAM_CONFDB_S_REGISTERED);
942 }
943
944 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
945 }
946
947 static cs_error_t sam_parent_confdb_state_set (
948 int parent_fd_in,
949 int parent_fd_out,
950 int state)
951 {
952 cs_error_t err;
953 const char *state_s;
954
955 if (state == 1) {
956 state_s = SAM_CONFDB_S_STARTED;
957 } else {
958 state_s = SAM_CONFDB_S_REGISTERED;
959 }
960
961 if ((err = sam_confdb_update_key (SAM_CONFDB_KEY_STATE, state_s)) != CS_OK) {
962 goto error_reply;
963 }
964
965 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
966
967 error_reply:
968 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
969 }
970
971 static cs_error_t sam_parent_kill_child (
972 int *action,
973 pid_t child_pid)
974 {
975 /*
976 * Kill child process
977 */
978 if (!sam_internal_data.term_send) {
979 /*
980 * We didn't send warn_signal yet.
981 */
982 kill (child_pid, sam_internal_data.warn_signal);
983
984 sam_internal_data.term_send = 1;
985 } else {
986 /*
987 * We sent child warning. Now, we will not be so nice
988 */
989 kill (child_pid, SIGKILL);
990 *action = SAM_PARENT_ACTION_RECOVERY;
991 }
992
993 return (CS_OK);
994 }
995
996 static cs_error_t sam_parent_mark_child_failed (
997 int *action,
998 pid_t child_pid)
999 {
1000 sam_recovery_policy_t recpol;
1001
1002 recpol = sam_internal_data.recovery_policy;
1003
1004 sam_internal_data.term_send = 1;
1005 sam_internal_data.recovery_policy = SAM_RECOVERY_POLICY_QUIT |
1006 (SAM_RP_MASK_C (recpol) ? SAM_RECOVERY_POLICY_CONFDB : 0) |
1007 (SAM_RP_MASK_Q (recpol) ? SAM_RECOVERY_POLICY_QUORUM : 0);
1008
1009 return (sam_parent_kill_child (action, child_pid));
1010 }
1011
1012 static cs_error_t sam_parent_data_store (
1013 int parent_fd_in,
1014 int parent_fd_out)
1015 {
1016 char *user_data;
1017 ssize_t size;
1018 cs_error_t err;
1019
1020 err = CS_OK;
1021 user_data = NULL;
1022
1023 if (sam_safe_read (parent_fd_in, &size, sizeof (size)) != sizeof (size)) {
1024 err = CS_ERR_LIBRARY;
1025 goto error_reply;
1026 }
1027
1028 if (size > 0) {
1029 user_data = malloc (size);
1030 if (user_data == NULL) {
1031 err = CS_ERR_NO_MEMORY;
1032 goto error_reply;
1033 }
1034
1035 if (sam_safe_read (parent_fd_in, user_data, size) != size) {
1036 err = CS_ERR_LIBRARY;
1037 goto free_error_reply;
1038 }
1039 }
1040
1041 err = sam_data_store (user_data, size);
1042 if (err != CS_OK) {
1043 goto free_error_reply;
1044 }
1045
1046 free (user_data);
1047
1048 return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out));
1049
1050 free_error_reply:
1051 free (user_data);
1052 error_reply:
1053 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
1054 }
1055
1056 static enum sam_parent_action_t sam_parent_handler (
1057 int parent_fd_in,
1058 int parent_fd_out,
1059 pid_t child_pid)
1060 {
1061 int poll_error;
1062 int action;
1063 int status;
1064 ssize_t bytes_read;
1065 char command;
1066 int time_interval;
1067 struct pollfd pfds[2];
1068 nfds_t nfds;
1069 cs_error_t err;
1070 sam_recovery_policy_t recpol;
1071
1072 status = 0;
1073
1074 action = SAM_PARENT_ACTION_CONTINUE;
1075 recpol = sam_internal_data.recovery_policy;
1076
1077 while (action == SAM_PARENT_ACTION_CONTINUE) {
1078 pfds[0].fd = parent_fd_in;
1079 pfds[0].events = POLLIN;
1080 pfds[0].revents = 0;
1081 nfds = 1;
1082
1083 if (status == 1 && sam_internal_data.time_interval != 0) {
1084 time_interval = sam_internal_data.time_interval;
1085 } else {
1086 time_interval = -1;
1087 }
1088
1089 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1090 pfds[nfds].fd = sam_internal_data.quorum_fd;
1091 pfds[nfds].events = POLLIN;
1092 pfds[nfds].revents = 0;
1093 nfds++;
1094 }
1095
1096 poll_error = poll (pfds, nfds, time_interval);
1097
1098 if (poll_error == -1) {
1099 /*
1100 * Error in poll
1101 * If it is EINTR, continue, otherwise QUIT
1102 */
1103 if (errno != EINTR) {
1104 action = SAM_PARENT_ACTION_ERROR;
1105 }
1106 }
1107
1108 if (poll_error == 0) {
1109 /*
1110 * Time limit expires
1111 */
1112 if (status == 0) {
1113 action = SAM_PARENT_ACTION_QUIT;
1114 } else {
1115 sam_parent_kill_child (&action, child_pid);
1116 }
1117 }
1118
1119 if (poll_error > 0) {
1120 if (pfds[0].revents != 0) {
1121 /*
1122 * We have EOF or command in pipe
1123 */
1124 bytes_read = sam_safe_read (parent_fd_in, &command, 1);
1125
1126 if (bytes_read == 0) {
1127 /*
1128 * Handle EOF -> Take recovery action or quit if sam_start wasn't called
1129 */
1130 if (status == 0)
1131 action = SAM_PARENT_ACTION_QUIT;
1132 else
1133 action = SAM_PARENT_ACTION_RECOVERY;
1134
1135 continue;
1136 }
1137
1138 if (bytes_read == -1) {
1139 action = SAM_PARENT_ACTION_ERROR;
1140 goto action_exit;
1141 }
1142
1143 if (recpol & SAM_RECOVERY_POLICY_CONFDB) {
1144 sam_confdb_update_key (SAM_CONFDB_KEY_LAST_HC, NULL);
1145 }
1146
1147 /*
1148 * We have read command
1149 */
1150 switch (command) {
1151 case SAM_COMMAND_START:
1152 if (status == 0) {
1153 /*
1154 * Not started yet
1155 */
1156 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1157 if (sam_parent_wait_for_quorum (parent_fd_in,
1158 parent_fd_out) != CS_OK) {
1159 continue;
1160 }
1161 }
1162
1163 if (recpol & SAM_RECOVERY_POLICY_CONFDB) {
1164 if (sam_parent_confdb_state_set (parent_fd_in,
1165 parent_fd_out, 1) != CS_OK) {
1166 continue;
1167 }
1168 }
1169
1170 status = 1;
1171 }
1172 break;
1173 case SAM_COMMAND_STOP:
1174 if (status == 1) {
1175 /*
1176 * Started
1177 */
1178 if (recpol & SAM_RECOVERY_POLICY_CONFDB) {
1179 if (sam_parent_confdb_state_set (parent_fd_in,
1180 parent_fd_out, 0) != CS_OK) {
1181 continue;
1182 }
1183 }
1184
1185 status = 0;
1186 }
1187 break;
1188 case SAM_COMMAND_DATA_STORE:
1189 sam_parent_data_store (parent_fd_in, parent_fd_out);
1190 break;
1191 case SAM_COMMAND_WARN_SIGNAL_SET:
1192 sam_parent_warn_signal_set (parent_fd_in, parent_fd_out);
1193 break;
1194 case SAM_COMMAND_MARK_FAILED:
1195 status = 1;
1196 sam_parent_mark_child_failed (&action, child_pid);
1197 break;
1198 }
1199 } /* if (pfds[0].revents != 0) */
1200
1201 if ((sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) &&
1202 pfds[1].revents != 0) {
1203 /*
1204 * Handle quorum change
1205 */
1206 err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL);
1207
1208 if (status == 1 &&
1209 (!sam_internal_data.quorate || (err != CS_ERR_TRY_AGAIN && err != CS_OK))) {
1210 sam_parent_kill_child (&action, child_pid);
1211 }
1212 }
1213 } /* select_error > 0 */
1214 } /* action == SAM_PARENT_ACTION_CONTINUE */
1215
1216 action_exit:
1217 return action;
1218 }
1219
1220 cs_error_t sam_register (
1221 unsigned int *instance_id)
1222 {
1223 cs_error_t error;
1224 pid_t pid;
1225 int pipe_error;
1226 int pipe_fd_out[2], pipe_fd_in[2];
1227 enum sam_parent_action_t action, old_action;
1228 int child_status;
1229 sam_recovery_policy_t recpol;
1230
1231 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED) {
1232 return (CS_ERR_BAD_HANDLE);
1233 }
1234
1235 recpol = sam_internal_data.recovery_policy;
1236
1237 if (recpol & SAM_RECOVERY_POLICY_CONFDB) {
1238 /*
1239 * Register to objdb
1240 */
1241 if ((error = sam_confdb_register ()) != CS_OK) {
1242 goto error_exit;
1243 }
1244 }
1245
1246 error = CS_OK;
1247
1248 while (1) {
1249 if ((pipe_error = pipe (pipe_fd_out)) != 0) {
1250 error = CS_ERR_LIBRARY;
1251 goto error_exit;
1252 }
1253
1254 if ((pipe_error = pipe (pipe_fd_in)) != 0) {
1255 close (pipe_fd_out[0]);
1256 close (pipe_fd_out[1]);
1257
1258 error = CS_ERR_LIBRARY;
1259 goto error_exit;
1260 }
1261
1262 if (recpol & SAM_RECOVERY_POLICY_CONFDB) {
1263 if ((error = sam_confdb_update_key (SAM_CONFDB_KEY_STATE, SAM_CONFDB_S_REGISTERED)) != CS_OK) {
1264 goto error_exit;
1265 }
1266 }
1267
1268 sam_internal_data.instance_id++;
1269
1270 sam_internal_data.term_send = 0;
1271
1272 pid = fork ();
1273
1274 if (pid == -1) {
1275 /*
1276 * Fork error
1277 */
1278 sam_internal_data.instance_id--;
1279
1280 error = CS_ERR_LIBRARY;
1281 goto error_exit;
1282 }
1283
1284 if (pid == 0) {
1285 /*
1286 * Child process
1287 */
1288 close (pipe_fd_out[0]);
1289 close (pipe_fd_in[1]);
1290
1291 sam_internal_data.child_fd_out = pipe_fd_out[1];
1292 sam_internal_data.child_fd_in = pipe_fd_in[0];
1293
1294 if (instance_id)
1295 *instance_id = sam_internal_data.instance_id;
1296
1297 sam_internal_data.am_i_child = 1;
1298 sam_internal_data.internal_status = SAM_INTERNAL_STATUS_REGISTERED;
1299
1300 pthread_mutex_init (&sam_internal_data.lock, NULL);
1301
1302 goto error_exit;
1303 } else {
1304 /*
1305 * Parent process
1306 */
1307 close (pipe_fd_out[1]);
1308 close (pipe_fd_in[0]);
1309
1310 action = sam_parent_handler (pipe_fd_out[0], pipe_fd_in[1], pid);
1311
1312 close (pipe_fd_out[0]);
1313 close (pipe_fd_in[1]);
1314
1315 if (action == SAM_PARENT_ACTION_ERROR) {
1316 error = CS_ERR_LIBRARY;
1317 goto error_exit;
1318 }
1319
1320 /*
1321 * We really don't like zombies
1322 */
1323 while (waitpid (pid, &child_status, 0) == -1 && errno == EINTR)
1324 ;
1325
1326 old_action = action;
1327
1328 if (action == SAM_PARENT_ACTION_RECOVERY) {
1329 if (SAM_RP_MASK (sam_internal_data.recovery_policy) == SAM_RECOVERY_POLICY_QUIT)
1330 action = SAM_PARENT_ACTION_QUIT;
1331 }
1332
1333
1334 if (action == SAM_PARENT_ACTION_QUIT) {
1335 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1336 quorum_finalize (sam_internal_data.quorum_handle);
1337 }
1338
1339 if (recpol & SAM_RECOVERY_POLICY_CONFDB) {
1340 if (old_action == SAM_PARENT_ACTION_RECOVERY) {
1341 /*
1342 * Mark as failed
1343 */
1344 sam_confdb_update_key (SAM_CONFDB_KEY_STATE, SAM_CONFDB_S_FAILED);
1345 } else {
1346 sam_confdb_destroy_pid_obj ();
1347 }
1348 }
1349
1350 exit (WEXITSTATUS (child_status));
1351 }
1352
1353
1354 }
1355 }
1356
1357 error_exit:
1358 return (error);
1359 }
1360
1361 static void *hc_callback_thread (void *unused_param)
1362 {
1363 int poll_error;
1364 int status;
1365 ssize_t bytes_readed;
1366 char command;
1367 int time_interval, tmp_time_interval;
1368 int counter;
1369 struct pollfd pfds;
1370
1371 status = 0;
1372 counter = 0;
1373
1374 time_interval = sam_internal_data.time_interval >> 2;
1375
1376 while (1) {
1377 pfds.fd = sam_internal_data.cb_rpipe_fd;
1378 pfds.events = POLLIN;
1379 pfds.revents = 0;
1380
1381 if (status == 1) {
1382 tmp_time_interval = time_interval;
1383 } else {
1384 tmp_time_interval = -1;
1385 }
1386
1387 poll_error = poll (&pfds, 1, tmp_time_interval);
1388
1389 if (poll_error == 0) {
1390 if (sam_hc_send () == CS_OK) {
1391 counter++;
1392 }
1393
1394 if (counter >= 4) {
1395 if (sam_internal_data.hc_callback () != 0) {
1396 status = 3;
1397 }
1398
1399 counter = 0;
1400 }
1401 }
1402
1403 if (poll_error > 0) {
1404 bytes_readed = sam_safe_read (sam_internal_data.cb_rpipe_fd, &command, 1);
1405
1406 if (bytes_readed > 0) {
1407 if (status == 0 && command == SAM_COMMAND_START)
1408 status = 1;
1409
1410 if (status == 1 && command == SAM_COMMAND_STOP)
1411 status = 0;
1412
1413 }
1414 }
1415 }
1416
1417 /*
1418 * This makes compiler happy, it's same as return (NULL);
1419 */
1420 return (unused_param);
1421 }
1422
1423 cs_error_t sam_hc_callback_register (sam_hc_callback_t cb)
1424 {
1425 cs_error_t error = CS_OK;
1426 pthread_attr_t thread_attr;
1427 int pipe_error;
1428 int pipe_fd[2];
1429
1430 if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) {
1431 return (CS_ERR_BAD_HANDLE);
1432 }
1433
1434 if (sam_internal_data.time_interval == 0) {
1435 return (CS_ERR_INVALID_PARAM);
1436 }
1437
1438 if (sam_internal_data.cb_registered) {
1439 sam_internal_data.hc_callback = cb;
1440
1441 return (CS_OK);
1442 }
1443
1444 /*
1445 * We know, this is first registration
1446 */
1447
1448 if (cb == NULL) {
1449 return (CS_ERR_INVALID_PARAM);
1450 }
1451
1452 pipe_error = pipe (pipe_fd);
1453
1454 if (pipe_error != 0) {
1455 /*
1456 * Pipe creation error
1457 */
1458 error = CS_ERR_LIBRARY;
1459 goto error_exit;
1460 }
1461
1462 sam_internal_data.cb_rpipe_fd = pipe_fd[0];
1463 sam_internal_data.cb_wpipe_fd = pipe_fd[1];
1464
1465 /*
1466 * Create thread attributes
1467 */
1468 error = pthread_attr_init (&thread_attr);
1469 if (error != 0) {
1470 error = CS_ERR_LIBRARY;
1471 goto error_close_fd_exit;
1472 }
1473
1474
1475 pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
1476 pthread_attr_setstacksize (&thread_attr, 32768);
1477
1478 /*
1479 * Create thread
1480 */
1481 error = pthread_create (&sam_internal_data.cb_thread, &thread_attr, hc_callback_thread, NULL);
1482
1483 if (error != 0) {
1484 error = CS_ERR_LIBRARY;
1485 goto error_attr_destroy_exit;
1486 }
1487
1488 /*
1489 * Cleanup
1490 */
1491 pthread_attr_destroy(&thread_attr);
1492
1493 sam_internal_data.cb_registered = 1;
1494 sam_internal_data.hc_callback = cb;
1495
1496 return (CS_OK);
1497
1498 error_attr_destroy_exit:
1499 pthread_attr_destroy(&thread_attr);
1500 error_close_fd_exit:
1501 sam_internal_data.cb_rpipe_fd = sam_internal_data.cb_wpipe_fd = 0;
1502 close (pipe_fd[0]);
1503 close (pipe_fd[1]);
1504 error_exit:
1505 return (error);
1506 }