2 * Copyright (c) 2010-2017 Red Hat, Inc.
6 * Author: Angus Salkeld <asalkeld@redhat.com>
8 * This software licensed under BSD license, the text of which follows:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of Red Hat, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
44 #include <qb/qbdefs.h>
45 #include <qb/qblist.h>
46 #include <qb/qbutil.h>
47 #include <qb/qbloop.h>
48 #include <qb/qbipcs.h>
50 #include <corosync/swab.h>
51 #include <corosync/corotypes.h>
52 #include <corosync/corodefs.h>
53 #include <corosync/totem/totempg.h>
54 #include <corosync/logsys.h>
55 #include <corosync/icmap.h>
63 #include "ipcs_stats.h"
66 LOGSYS_DECLARE_SUBSYS ("MAIN");
68 static struct corosync_api_v1
*api
= NULL
;
69 static int32_t ipc_not_enough_fds_left
= 0;
70 static int32_t ipc_fc_is_quorate
; /* boolean */
71 static int32_t ipc_fc_totem_queue_level
; /* percentage used */
72 static int32_t ipc_fc_sync_in_process
; /* boolean */
73 static int32_t ipc_allow_connections
= 0; /* boolean */
75 #define CS_IPCS_MAPPER_SERV_NAME 256
77 struct cs_ipcs_mapper
{
79 qb_ipcs_service_t
*inst
;
80 char name
[CS_IPCS_MAPPER_SERV_NAME
];
86 struct qb_list_head list
;
89 static struct cs_ipcs_mapper ipcs_mapper
[SERVICES_COUNT_MAX
];
91 static int32_t cs_ipcs_job_add(enum qb_loop_priority p
, void *data
, qb_loop_job_dispatch_fn fn
);
92 static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p
, int32_t fd
, int32_t events
,
93 void *data
, qb_ipcs_dispatch_fn_t fn
);
94 static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p
, int32_t fd
, int32_t events
,
95 void *data
, qb_ipcs_dispatch_fn_t fn
);
96 static int32_t cs_ipcs_dispatch_del(int32_t fd
);
97 static void outq_flush (void *data
);
100 static struct qb_ipcs_poll_handlers corosync_poll_funcs
= {
101 .job_add
= cs_ipcs_job_add
,
102 .dispatch_add
= cs_ipcs_dispatch_add
,
103 .dispatch_mod
= cs_ipcs_dispatch_mod
,
104 .dispatch_del
= cs_ipcs_dispatch_del
,
107 static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t
*c
, uid_t euid
, gid_t egid
);
108 static void cs_ipcs_connection_created(qb_ipcs_connection_t
*c
);
109 static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t
*c
,
110 void *data
, size_t size
);
111 static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t
*c
);
112 static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t
*c
);
114 static struct qb_ipcs_service_handlers corosync_service_funcs
= {
115 .connection_accept
= cs_ipcs_connection_accept
,
116 .connection_created
= cs_ipcs_connection_created
,
117 .msg_process
= cs_ipcs_msg_process
,
118 .connection_closed
= cs_ipcs_connection_closed
,
119 .connection_destroyed
= cs_ipcs_connection_destroyed
,
122 static struct ipcs_global_stats global_stats
;
124 static const char* cs_ipcs_serv_short_name(int32_t service_id
)
127 switch (service_id
) {
140 case VOTEQUORUM_SERVICE
:
159 void cs_ipc_allow_connections(int32_t allow
)
161 ipc_allow_connections
= allow
;
164 int32_t cs_ipcs_service_destroy(int32_t service_id
)
166 if (ipcs_mapper
[service_id
].inst
) {
167 qb_ipcs_destroy(ipcs_mapper
[service_id
].inst
);
168 ipcs_mapper
[service_id
].inst
= NULL
;
173 static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t
*c
, uid_t euid
, gid_t egid
)
175 int32_t service
= qb_ipcs_service_id_get(c
);
177 char key_name
[ICMAP_KEYNAME_MAXLEN
];
179 if (!ipc_allow_connections
) {
180 log_printf(LOGSYS_LEVEL_DEBUG
, "Denied connection, corosync is not ready");
184 if (corosync_service
[service
] == NULL
||
185 ipcs_mapper
[service
].inst
== NULL
) {
189 if (ipc_not_enough_fds_left
) {
193 if (euid
== 0 || egid
== 0) {
197 snprintf(key_name
, ICMAP_KEYNAME_MAXLEN
, "uidgid.uid.%u", euid
);
198 if (icmap_get_uint8(key_name
, &u8
) == CS_OK
&& u8
== 1)
201 snprintf(key_name
, ICMAP_KEYNAME_MAXLEN
, "uidgid.config.uid.%u", euid
);
202 if (icmap_get_uint8(key_name
, &u8
) == CS_OK
&& u8
== 1)
205 snprintf(key_name
, ICMAP_KEYNAME_MAXLEN
, "uidgid.gid.%u", egid
);
206 if (icmap_get_uint8(key_name
, &u8
) == CS_OK
&& u8
== 1)
209 snprintf(key_name
, ICMAP_KEYNAME_MAXLEN
, "uidgid.config.gid.%u", egid
);
210 if (icmap_get_uint8(key_name
, &u8
) == CS_OK
&& u8
== 1)
213 log_printf(LOGSYS_LEVEL_ERROR
, "Denied connection attempt from %d:%d", euid
, egid
);
218 static char * pid_to_name (pid_t pid
, char *out_name
, size_t name_len
)
226 snprintf (fname
, 32, "/proc/%d/stat", pid
);
227 fp
= fopen (fname
, "r");
232 if (fgets (buf
, sizeof (buf
), fp
) == NULL
) {
238 name
= strrchr (buf
, '(');
243 /* move past the bracket */
246 rest
= strrchr (buf
, ')');
248 if (rest
== NULL
|| rest
[1] != ' ') {
253 /* move past the NULL and space */
257 strncpy (out_name
, name
, name_len
- 1);
258 out_name
[name_len
- 1] = '\0';
262 static void cs_ipcs_connection_created(qb_ipcs_connection_t
*c
)
265 struct cs_ipcs_conn_context
*context
;
266 struct qb_ipcs_connection_stats stats
;
267 size_t size
= sizeof(struct cs_ipcs_conn_context
);
269 log_printf(LOG_DEBUG
, "connection created");
271 service
= qb_ipcs_service_id_get(c
);
273 size
+= corosync_service
[service
]->private_data_size
;
274 context
= calloc(1, size
);
275 if (context
== NULL
) {
276 qb_ipcs_disconnect(c
);
280 qb_list_init(&context
->outq_head
);
281 context
->queuing
= QB_FALSE
;
285 qb_ipcs_context_set(c
, context
);
287 if (corosync_service
[service
]->lib_init_fn(c
) != 0) {
288 log_printf(LOG_ERR
, "lib_init_fn failed, disconnecting");
289 qb_ipcs_disconnect(c
);
293 qb_ipcs_connection_stats_get(c
, &stats
, QB_FALSE
);
295 if (!pid_to_name (stats
.client_pid
, context
->proc_name
, sizeof(context
->proc_name
))) {
296 context
->proc_name
[0] = '\0';
298 stats_ipcs_add_connection(service
, stats
.client_pid
, c
);
299 global_stats
.active
++;
302 void cs_ipc_refcnt_inc(void *conn
)
304 qb_ipcs_connection_ref(conn
);
307 void cs_ipc_refcnt_dec(void *conn
)
309 qb_ipcs_connection_unref(conn
);
312 void *cs_ipcs_private_data_get(void *conn
)
314 struct cs_ipcs_conn_context
*cnx
;
315 cnx
= qb_ipcs_context_get(conn
);
316 return &cnx
->data
[0];
319 static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t
*c
)
321 struct cs_ipcs_conn_context
*context
;
322 struct qb_list_head
*list
, *tmp_iter
;
323 struct outq_item
*outq_item
;
325 log_printf(LOG_DEBUG
, "%s() ", __func__
);
327 context
= qb_ipcs_context_get(c
);
329 qb_list_for_each_safe(list
, tmp_iter
, &(context
->outq_head
)) {
330 outq_item
= qb_list_entry (list
, struct outq_item
, list
);
333 free (outq_item
->msg
);
340 static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t
*c
)
343 int32_t service
= qb_ipcs_service_id_get(c
);
344 struct qb_ipcs_connection_stats stats
;
346 log_printf(LOG_DEBUG
, "%s() ", __func__
);
347 res
= corosync_service
[service
]->lib_exit_fn(c
);
352 qb_loop_job_del(cs_poll_handle_get(), QB_LOOP_HIGH
, c
, outq_flush
);
354 qb_ipcs_connection_stats_get(c
, &stats
, QB_FALSE
);
356 stats_ipcs_del_connection(service
, stats
.client_pid
, c
);
358 global_stats
.active
--;
359 global_stats
.closed
++;
363 int cs_ipcs_response_iov_send (void *conn
,
364 const struct iovec
*iov
,
365 unsigned int iov_len
)
367 int32_t rc
= qb_ipcs_response_sendv(conn
, iov
, iov_len
);
374 int cs_ipcs_response_send(void *conn
, const void *msg
, size_t mlen
)
376 int32_t rc
= qb_ipcs_response_send(conn
, msg
, mlen
);
383 static void outq_flush (void *data
)
385 qb_ipcs_connection_t
*conn
= data
;
386 struct qb_list_head
*list
, *tmp_iter
;
387 struct outq_item
*outq_item
;
389 struct cs_ipcs_conn_context
*context
= qb_ipcs_context_get(conn
);
391 qb_list_for_each_safe(list
, tmp_iter
, &(context
->outq_head
)) {
392 outq_item
= qb_list_entry (list
, struct outq_item
, list
);
394 rc
= qb_ipcs_event_send(conn
, outq_item
->msg
, outq_item
->mlen
);
395 if (rc
< 0 && rc
!= -EAGAIN
) {
397 qb_perror(LOG_ERR
, "qb_ipcs_event_send");
399 } else if (rc
== -EAGAIN
) {
402 assert(rc
== outq_item
->mlen
);
407 free (outq_item
->msg
);
410 if (qb_list_empty (&context
->outq_head
)) {
411 context
->queuing
= QB_FALSE
;
412 log_printf(LOGSYS_LEVEL_INFO
, "Q empty, queued:%d sent:%d.",
413 context
->queued
, context
->sent
);
417 qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH
, conn
, outq_flush
);
421 static void msg_send_or_queue(qb_ipcs_connection_t
*conn
, const struct iovec
*iov
, uint32_t iov_len
)
425 int32_t bytes_msg
= 0;
426 struct outq_item
*outq_item
;
428 struct cs_ipcs_conn_context
*context
= qb_ipcs_context_get(conn
);
430 for (i
= 0; i
< iov_len
; i
++) {
431 bytes_msg
+= iov
[i
].iov_len
;
434 if (!context
->queuing
) {
435 assert(qb_list_empty (&context
->outq_head
));
436 rc
= qb_ipcs_event_sendv(conn
, iov
, iov_len
);
437 if (rc
== bytes_msg
) {
444 context
->queuing
= QB_TRUE
;
445 qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH
, conn
, outq_flush
);
447 log_printf(LOGSYS_LEVEL_ERROR
, "event_send retuned %d, expected %d!", rc
, bytes_msg
);
451 outq_item
= malloc (sizeof (struct outq_item
));
452 if (outq_item
== NULL
) {
453 qb_ipcs_disconnect(conn
);
456 outq_item
->msg
= malloc (bytes_msg
);
457 if (outq_item
->msg
== NULL
) {
459 qb_ipcs_disconnect(conn
);
463 write_buf
= outq_item
->msg
;
464 for (i
= 0; i
< iov_len
; i
++) {
465 memcpy (write_buf
, iov
[i
].iov_base
, iov
[i
].iov_len
);
466 write_buf
+= iov
[i
].iov_len
;
468 outq_item
->mlen
= bytes_msg
;
469 qb_list_init (&outq_item
->list
);
470 qb_list_add_tail (&outq_item
->list
, &context
->outq_head
);
474 int cs_ipcs_dispatch_send(void *conn
, const void *msg
, size_t mlen
)
477 iov
.iov_base
= (void *)msg
;
479 msg_send_or_queue (conn
, &iov
, 1);
483 int cs_ipcs_dispatch_iov_send (void *conn
,
484 const struct iovec
*iov
,
485 unsigned int iov_len
)
487 msg_send_or_queue(conn
, iov
, iov_len
);
491 static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t
*c
,
492 void *data
, size_t size
)
494 struct qb_ipc_response_header response
;
495 struct qb_ipc_request_header
*request_pt
= (struct qb_ipc_request_header
*)data
;
496 int32_t service
= qb_ipcs_service_id_get(c
);
498 int32_t is_async_call
= QB_FALSE
;
500 int sending_allowed_private_data
;
501 struct cs_ipcs_conn_context
*cnx
;
503 send_ok
= corosync_sending_allowed (service
,
506 &sending_allowed_private_data
);
508 is_async_call
= (service
== CPG_SERVICE
&& request_pt
->id
== 2);
511 * This happens when the message contains some kind of invalid
512 * parameter, such as an invalid size
514 if (send_ok
== -EINVAL
) {
515 response
.size
= sizeof (response
);
517 response
.error
= CS_ERR_INVALID_PARAM
;
519 cnx
= qb_ipcs_context_get(c
);
521 cnx
->invalid_request
++;
525 log_printf(LOGSYS_LEVEL_INFO
, "*** %s() invalid message! size:%d error:%d",
526 __func__
, response
.size
, response
.error
);
528 qb_ipcs_response_send (c
,
533 } else if (send_ok
< 0) {
534 cnx
= qb_ipcs_context_get(c
);
538 if (!is_async_call
) {
540 * Overload, tell library to retry
542 response
.size
= sizeof (response
);
544 response
.error
= CS_ERR_TRY_AGAIN
;
545 qb_ipcs_response_send (c
,
549 log_printf(LOGSYS_LEVEL_WARNING
,
550 "*** %s() (%d:%d - %d) %s!",
551 __func__
, service
, request_pt
->id
,
552 is_async_call
, strerror(-send_ok
));
558 corosync_service
[service
]->lib_engine
[request_pt
->id
].lib_handler_fn(c
, request_pt
);
561 corosync_sending_allowed_release (&sending_allowed_private_data
);
566 static int32_t cs_ipcs_job_add(enum qb_loop_priority p
, void *data
, qb_loop_job_dispatch_fn fn
)
568 return qb_loop_job_add(cs_poll_handle_get(), p
, data
, fn
);
571 static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p
, int32_t fd
, int32_t events
,
572 void *data
, qb_ipcs_dispatch_fn_t fn
)
574 return qb_loop_poll_add(cs_poll_handle_get(), p
, fd
, events
, data
, fn
);
577 static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p
, int32_t fd
, int32_t events
,
578 void *data
, qb_ipcs_dispatch_fn_t fn
)
580 return qb_loop_poll_mod(cs_poll_handle_get(), p
, fd
, events
, data
, fn
);
583 static int32_t cs_ipcs_dispatch_del(int32_t fd
)
585 return qb_loop_poll_del(cs_poll_handle_get(), fd
);
588 static void cs_ipcs_low_fds_event(int32_t not_enough
, int32_t fds_available
)
590 ipc_not_enough_fds_left
= not_enough
;
592 log_printf(LOGSYS_LEVEL_WARNING
, "refusing new connections (fds_available:%d)",
595 log_printf(LOGSYS_LEVEL_NOTICE
, "allowing new connections (fds_available:%d)",
601 int32_t cs_ipcs_q_level_get(void)
603 return ipc_fc_totem_queue_level
;
606 static qb_loop_timer_handle ipcs_check_for_flow_control_timer
;
607 static void cs_ipcs_check_for_flow_control(void)
612 for (i
= 0; i
< SERVICES_COUNT_MAX
; i
++) {
613 if (corosync_service
[i
] == NULL
|| ipcs_mapper
[i
].inst
== NULL
) {
616 fc_enabled
= QB_IPCS_RATE_OFF
;
617 if (ipc_fc_is_quorate
== 1 ||
618 corosync_service
[i
]->allow_inquorate
== CS_LIB_ALLOW_INQUORATE
) {
621 * now check flow control
623 if (ipc_fc_totem_queue_level
!= TOTEM_Q_LEVEL_CRITICAL
&&
624 ipc_fc_sync_in_process
== 0) {
625 fc_enabled
= QB_FALSE
;
626 } else if (ipc_fc_totem_queue_level
!= TOTEM_Q_LEVEL_CRITICAL
&&
627 i
== VOTEQUORUM_SERVICE
) {
629 * Allow message processing for votequorum service even
632 fc_enabled
= QB_FALSE
;
634 fc_enabled
= QB_IPCS_RATE_OFF_2
;
638 qb_ipcs_request_rate_limit(ipcs_mapper
[i
].inst
, fc_enabled
);
640 qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED
, 1*QB_TIME_NS_IN_MSEC
,
641 NULL
, corosync_recheck_the_q_level
, &ipcs_check_for_flow_control_timer
);
642 } else if (ipc_fc_totem_queue_level
== TOTEM_Q_LEVEL_LOW
) {
643 qb_ipcs_request_rate_limit(ipcs_mapper
[i
].inst
, QB_IPCS_RATE_FAST
);
644 } else if (ipc_fc_totem_queue_level
== TOTEM_Q_LEVEL_GOOD
) {
645 qb_ipcs_request_rate_limit(ipcs_mapper
[i
].inst
, QB_IPCS_RATE_NORMAL
);
646 } else if (ipc_fc_totem_queue_level
== TOTEM_Q_LEVEL_HIGH
) {
647 qb_ipcs_request_rate_limit(ipcs_mapper
[i
].inst
, QB_IPCS_RATE_SLOW
);
652 static void cs_ipcs_fc_quorum_changed(int quorate
, void *context
)
654 ipc_fc_is_quorate
= quorate
;
655 cs_ipcs_check_for_flow_control();
658 static void cs_ipcs_totem_queue_level_changed(enum totem_q_level level
)
660 ipc_fc_totem_queue_level
= level
;
661 cs_ipcs_check_for_flow_control();
664 void cs_ipcs_sync_state_changed(int32_t sync_in_process
)
666 ipc_fc_sync_in_process
= sync_in_process
;
667 cs_ipcs_check_for_flow_control();
670 void cs_ipcs_get_global_stats(struct ipcs_global_stats
*ipcs_stats
)
672 memcpy(ipcs_stats
, &global_stats
, sizeof(global_stats
));
675 cs_error_t
cs_ipcs_get_conn_stats(int service_id
, uint32_t pid
, void *conn_ptr
, struct ipcs_conn_stats
*ipcs_stats
)
677 struct cs_ipcs_conn_context
*cnx
;
678 qb_ipcs_connection_t
*c
, *prev
;
681 if (corosync_service
[service_id
] == NULL
|| ipcs_mapper
[service_id
].inst
== NULL
) {
682 return CS_ERR_NOT_EXIST
;
685 qb_ipcs_stats_get(ipcs_mapper
[service_id
].inst
, &ipcs_stats
->srv
, QB_FALSE
);
687 for (c
= qb_ipcs_connection_first_get(ipcs_mapper
[service_id
].inst
);
689 prev
= c
, c
= qb_ipcs_connection_next_get(ipcs_mapper
[service_id
].inst
, prev
), qb_ipcs_connection_unref(prev
)) {
691 cnx
= qb_ipcs_context_get(c
);
692 if (cnx
== NULL
) continue;
693 if (c
!= conn_ptr
) continue;
695 qb_ipcs_connection_stats_get(c
, &ipcs_stats
->conn
, QB_FALSE
);
696 if (ipcs_stats
->conn
.client_pid
!= pid
) {
700 memcpy(&ipcs_stats
->cnx
, cnx
, sizeof(struct cs_ipcs_conn_context
));
703 return CS_ERR_NOT_EXIST
;
709 void cs_ipcs_clear_stats()
711 struct cs_ipcs_conn_context
*cnx
;
712 struct ipcs_conn_stats ipcs_stats
;
713 qb_ipcs_connection_t
*c
, *prev
;
716 /* Global stats are easy */
717 memset(&global_stats
, 0, sizeof(global_stats
));
719 for (service_id
= 0; service_id
< SERVICES_COUNT_MAX
; service_id
++) {
720 if (!ipcs_mapper
[service_id
].inst
) {
724 for (c
= qb_ipcs_connection_first_get(ipcs_mapper
[service_id
].inst
);
726 prev
= c
, c
= qb_ipcs_connection_next_get(ipcs_mapper
[service_id
].inst
, prev
), qb_ipcs_connection_unref(prev
)) {
727 /* Get stats with 'clear_after_read' set */
728 qb_ipcs_connection_stats_get(c
, &ipcs_stats
.conn
, QB_TRUE
);
731 cnx
= qb_ipcs_context_get(c
);
732 if (cnx
== NULL
) continue;
733 cnx
->invalid_request
= 0;
741 static enum qb_ipc_type
cs_get_ipc_type (void)
745 enum qb_ipc_type ret
= QB_IPC_NATIVE
;
747 if (icmap_get_string("system.qb_ipc_type", &str
) != CS_OK
) {
748 log_printf(LOGSYS_LEVEL_DEBUG
, "No configured system.qb_ipc_type. Using native ipc");
749 return QB_IPC_NATIVE
;
752 if (strcmp(str
, "native") == 0) {
757 if (strcmp(str
, "shm") == 0) {
762 if (strcmp(str
, "socket") == 0) {
768 log_printf(LOGSYS_LEVEL_DEBUG
, "Using %s ipc", str
);
770 log_printf(LOGSYS_LEVEL_DEBUG
, "Unknown ipc type %s", str
);
778 const char *cs_ipcs_service_init(struct corosync_service_engine
*service
)
780 const char *serv_short_name
;
782 serv_short_name
= cs_ipcs_serv_short_name(service
->id
);
784 if (service
->lib_engine_count
== 0) {
785 log_printf (LOGSYS_LEVEL_DEBUG
,
786 "NOT Initializing IPC on %s [%d]",
792 if (strlen(serv_short_name
) >= CS_IPCS_MAPPER_SERV_NAME
) {
793 log_printf (LOGSYS_LEVEL_ERROR
, "service name %s is too long", serv_short_name
);
794 return "qb_ipcs_run error";
797 ipcs_mapper
[service
->id
].id
= service
->id
;
798 strcpy(ipcs_mapper
[service
->id
].name
, serv_short_name
);
799 log_printf (LOGSYS_LEVEL_DEBUG
,
800 "Initializing IPC on %s [%d]",
801 ipcs_mapper
[service
->id
].name
,
802 ipcs_mapper
[service
->id
].id
);
803 ipcs_mapper
[service
->id
].inst
= qb_ipcs_create(ipcs_mapper
[service
->id
].name
,
804 ipcs_mapper
[service
->id
].id
,
806 &corosync_service_funcs
);
807 assert(ipcs_mapper
[service
->id
].inst
);
808 qb_ipcs_poll_handlers_set(ipcs_mapper
[service
->id
].inst
,
809 &corosync_poll_funcs
);
810 if (qb_ipcs_run(ipcs_mapper
[service
->id
].inst
) != 0) {
811 log_printf (LOGSYS_LEVEL_ERROR
, "Can't initialize IPC");
812 return "qb_ipcs_run error";
818 void cs_ipcs_init(void)
822 qb_loop_poll_low_fds_event_set(cs_poll_handle_get(), cs_ipcs_low_fds_event
);
824 api
->quorum_register_callback (cs_ipcs_fc_quorum_changed
, NULL
);
825 totempg_queue_level_register_callback (cs_ipcs_totem_queue_level_changed
);
827 global_stats
.active
= 0;
828 global_stats
.closed
= 0;