outq_item = list_entry (list, struct outq_item, list);
rc = qb_ipcs_event_send(conn, outq_item->msg, outq_item->mlen);
- if (rc != outq_item->mlen) {
+ if (rc < 0 && rc != -EAGAIN) {
+ errno = -rc;
+ qb_perror(LOG_ERR, "qb_ipcs_event_send");
+ qb_ipcs_connection_unref(conn);
+ return;
+ } else if (rc == -EAGAIN) {
break;
}
+ assert(rc == outq_item->mlen);
context->sent++;
context->queued--;
context->queued, context->sent);
context->queued = 0;
context->sent = 0;
- return;
- }
- qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush);
- if (rc < 0 && rc != -EAGAIN) {
- log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d!", rc);
+ qb_ipcs_connection_unref(conn);
+ } else {
+ qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush);
}
}
context->queued = 0;
context->sent = 0;
context->queuing = QB_TRUE;
+ qb_ipcs_connection_ref(conn);
qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush);
} else {
log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d, expected %d!", rc, bytes_msg);
}
outq_item = malloc (sizeof (struct outq_item));
if (outq_item == NULL) {
+ qb_ipcs_connection_unref(conn);
qb_ipcs_disconnect(conn);
return;
}
outq_item->msg = malloc (bytes_msg);
if (outq_item->msg == NULL) {
free (outq_item);
+ qb_ipcs_connection_unref(conn);
qb_ipcs_disconnect(conn);
return;
}