#include "XioConnection.h"
#include "XioMsg.h"
-#include "include/assert.h"
+#include "include/spinlock.h"
+
+#include "include/ceph_assert.h"
#include "common/dout.h"
#ifndef CACHE_LINE_SIZE
{
uint32_t size;
XioSubmit::Queue q;
- pthread_spinlock_t sp;
+ ceph::spinlock sp;
CACHE_PAD(0);
};
for (ix = 0; ix < nlanes; ++ix) {
lane = &qlane[ix];
- pthread_spin_init(&lane->sp, PTHREAD_PROCESS_PRIVATE);
lane->size = 0;
}
}
void enq(XioConnection *xcon, XioSubmit* xs)
{
Lane* lane = get_lane(xcon);
- pthread_spin_lock(&lane->sp);
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
lane->q.push_back(*xs);
++(lane->size);
- pthread_spin_unlock(&lane->sp);
}
void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
{
int size = requeue_q.size();
Lane* lane = get_lane(xcon);
- pthread_spin_lock(&lane->sp);
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
XioSubmit::Queue::const_iterator i1 = lane->q.end();
lane->q.splice(i1, requeue_q);
lane->size += size;
- pthread_spin_unlock(&lane->sp);
}
void deq(XioSubmit::Queue& send_q)
{
Lane* lane;
int cnt;
+
for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
lane = &qlane[ix];
- pthread_spin_lock(&lane->sp);
if (lane->size > 0) {
XioSubmit::Queue::const_iterator i1 = send_q.end();
send_q.splice(i1, lane->q);
lane->size = 0;
++ix, ix = ix % nlanes;
- pthread_spin_unlock(&lane->sp);
break;
}
- pthread_spin_unlock(&lane->sp);
}
}
struct xio_context *ctx;
struct xio_server *server;
SubmitQueue submit_q;
- pthread_spinlock_t sp;
+ ceph::spinlock sp;
void *ev_loop;
string xio_uri;
char *portal_id;
magic(0),
special_handling(0)
{
- pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
-
struct xio_context_params ctx_params;
memset(&ctx_params, 0, sizeof(ctx_params));
ctx_params.user_context = this;
/* a portal is an xio_context and event loop */
ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
- assert(ctx && "Whoops, failed to create portal/ctx");
+ ceph_assert(ctx && "Whoops, failed to create portal/ctx");
}
int bind(struct xio_session_ops *ops, const string &base_uri,
q_iter = send_q.erase(q_iter);
requeue_q.push_back(*xs);
}
- pthread_spin_lock(&xcon->sp);
+ std::lock_guard<decltype(xcon->sp)> lg(xcon->sp);
XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
xcon->outgoing.requeue.splice(i1, requeue_q);
xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
- pthread_spin_unlock(&xcon->sp);
}
void *entry()
submit_q.deq(send_q);
/* shutdown() barrier */
- pthread_spin_lock(&sp);
+ std::lock_guard<decltype(sp)> lg(sp);
restart:
size = send_q.size();
} /* while */
} /* size > 0 */
- pthread_spin_unlock(&sp);
xio_context_run_loop(ctx, 300);
} while ((!_shutdown) || (!drained));
void shutdown()
{
- pthread_spin_lock(&sp);
+ std::lock_guard<decltype(sp)> lg(sp);
_shutdown = true;
- pthread_spin_unlock(&sp);
}
};
for (int i = 0; i < n; i++) {
if (!portals[i]) {
portals[i] = new XioPortal(msgr, nconns);
- assert(portals[i] != nullptr);
+ ceph_assert(portals[i] != nullptr);
}
}
}