X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frgw%2Flibrgw.cc;h=18c4140e60b31d6ada5113eb4e7f642b920eab80;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=f6b78c42df27ca33c760873237be3a43e50a366a;hpb=f64942e41c1f59e95cdc1205bbe5d32ed6dfd429;p=ceph.git diff --git a/ceph/src/rgw/librgw.cc b/ceph/src/rgw/librgw.cc index f6b78c42d..18c4140e6 100644 --- a/ceph/src/rgw/librgw.cc +++ b/ceph/src/rgw/librgw.cc @@ -1,5 +1,6 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// vim: ts=8 sw=2 smarttab ft=cpp + /* * Ceph - scalable distributed file system * @@ -11,6 +12,7 @@ * Foundation. See file COPYING. * */ + #include "include/compat.h" #include #include @@ -35,7 +37,6 @@ #include "common/common_init.h" #include "common/dout.h" -#include "rgw_rados.h" #include "rgw_resolve.h" #include "rgw_op.h" #include "rgw_rest.h" @@ -51,17 +52,25 @@ #include "rgw_lib_frontend.h" #include "rgw_http_client.h" #include "rgw_http_client_curl.h" +#include "rgw_perf_counters.h" +#ifdef WITH_RADOSGW_AMQP_ENDPOINT +#include "rgw_amqp.h" +#endif +#ifdef WITH_RADOSGW_KAFKA_ENDPOINT +#include "rgw_kafka.h" +#endif + +#include "services/svc_zone.h" #include -#include #include #include -#include #include - #define dout_subsys ceph_subsys_rgw +using namespace std; + bool global_stop = false; static void handle_sigterm(int signum) @@ -121,18 +130,19 @@ namespace rgw { RGWLibFS* fs = iter->first->ref(); uniq.unlock(); fs->gc(); - fs->update_user(); + const DoutPrefix dp(cct, dout_subsys, "librgw: "); + fs->update_user(&dp); fs->rele(); uniq.lock(); if (cur_gen != gen) goto restart; /* invalidated */ } + cv.wait_for(uniq, std::chrono::seconds(delay_s)); uniq.unlock(); - std::this_thread::sleep_for(std::chrono::seconds(delay_s)); } } - void RGWLibProcess::handle_request(RGWRequest* r) + void RGWLibProcess::handle_request(const DoutPrefixProvider *dpp, RGWRequest* r) { /* * invariant: valid requests are derived from RGWLibRequst @@ -203,7 +213,7 @@ namespace rgw { */ RGWOp *op = (req->op) ? req->op : dynamic_cast(req); if (! op) { - dout(1) << "failed to derive cognate RGWOp (invalid op?)" << dendl; + ldpp_dout(op, 1) << "failed to derive cognate RGWOp (invalid op?)" << dendl; return -EINVAL; } @@ -225,7 +235,7 @@ namespace rgw { rgw_env.set("HTTP_HOST", ""); /* XXX and -then- bloat up req_state with string copies from it */ - struct req_state rstate(req->cct, &rgw_env, req->get_user()); + struct req_state rstate(req->cct, &rgw_env, req->id); struct req_state *s = &rstate; // XXX fix this @@ -236,7 +246,7 @@ namespace rgw { /* XXX and -then- stash req_state pointers everywhere they are needed */ ret = req->init(rgw_env, &rados_ctx, io, s); if (ret < 0) { - dout(10) << "failed to initialize request" << dendl; + ldpp_dout(op, 10) << "failed to initialize request" << dendl; abort_req(s, op, ret); goto done; } @@ -254,67 +264,73 @@ namespace rgw { rgw_env.set("REQUEST_URI", s->info.request_uri); rgw_env.set("QUERY_STRING", ""); - /* XXX authorize does less here then in the REST path, e.g., - * the user's info is cached, but still incomplete */ - req->log(s, "authorizing"); - ret = req->authorize(); - if (ret < 0) { - dout(10) << "failed to authorize request" << dendl; - abort_req(s, op, ret); - goto done; - } + try { + /* XXX authorize does less here then in the REST path, e.g., + * the user's info is cached, but still incomplete */ + ldpp_dout(s, 2) << "authorizing" << dendl; + ret = req->authorize(op, null_yield); + if (ret < 0) { + dout(10) << "failed to authorize request" << dendl; + abort_req(s, op, ret); + goto done; + } - /* FIXME: remove this after switching all handlers to the new authentication - * infrastructure. */ - if (! s->auth.identity) { - s->auth.identity = rgw::auth::transform_old_authinfo(s); - } + /* FIXME: remove this after switching all handlers to the new + * authentication infrastructure. */ + if (! s->auth.identity) { + s->auth.identity = rgw::auth::transform_old_authinfo(s); + } - req->log(s, "reading op permissions"); - ret = req->read_permissions(op); - if (ret < 0) { - abort_req(s, op, ret); - goto done; - } + ldpp_dout(s, 2) << "reading op permissions" << dendl; + ret = req->read_permissions(op, null_yield); + if (ret < 0) { + abort_req(s, op, ret); + goto done; + } - req->log(s, "init op"); - ret = op->init_processing(); - if (ret < 0) { - abort_req(s, op, ret); - goto done; - } + ldpp_dout(s, 2) << "init op" << dendl; + ret = op->init_processing(null_yield); + if (ret < 0) { + abort_req(s, op, ret); + goto done; + } - req->log(s, "verifying op mask"); - ret = op->verify_op_mask(); - if (ret < 0) { - abort_req(s, op, ret); - goto done; - } + ldpp_dout(s, 2) << "verifying op mask" << dendl; + ret = op->verify_op_mask(); + if (ret < 0) { + abort_req(s, op, ret); + goto done; + } - req->log(s, "verifying op permissions"); - ret = op->verify_permission(); - if (ret < 0) { - if (s->system_request) { - dout(2) << "overriding permissions due to system operation" << dendl; - } else if (s->auth.identity->is_admin_of(s->user->user_id)) { - dout(2) << "overriding permissions due to admin operation" << dendl; - } else { + ldpp_dout(s, 2) << "verifying op permissions" << dendl; + ret = op->verify_permission(null_yield); + if (ret < 0) { + if (s->system_request) { + ldpp_dout(op, 2) << "overriding permissions due to system operation" << dendl; + } else if (s->auth.identity->is_admin_of(s->user->get_id())) { + ldpp_dout(op, 2) << "overriding permissions due to admin operation" << dendl; + } else { + abort_req(s, op, ret); + goto done; + } + } + + ldpp_dout(s, 2) << "verifying op params" << dendl; + ret = op->verify_params(); + if (ret < 0) { abort_req(s, op, ret); goto done; } - } - req->log(s, "verifying op params"); - ret = op->verify_params(); - if (ret < 0) { - abort_req(s, op, ret); - goto done; - } + ldpp_dout(s, 2) << "executing" << dendl; + op->pre_exec(); + op->execute(null_yield); + op->complete(); - req->log(s, "executing"); - op->pre_exec(); - op->execute(); - op->complete(); + } catch (const ceph::crypto::DigestException& e) { + dout(0) << "authentication failed" << e.what() << dendl; + abort_req(s, op, -ERR_INVALID_SECRET_KEY); + } done: try { @@ -324,15 +340,14 @@ namespace rgw { << e.what() << dendl; } if (should_log) { - rgw_log_op(store, nullptr /* !rest */, s, - (op ? op->name() : "unknown"), olog); + rgw_log_op(nullptr /* !rest */, s, (op ? op->name() : "unknown"), olog); } int http_ret = s->err.http_ret; - req->log_format(s, "http status=%d", http_ret); + ldpp_dout(s, 2) << "http status=" << http_ret << dendl; - dout(1) << "====== " << __func__ + ldpp_dout(op, 1) << "====== " << __func__ << " req done req=" << hex << req << dec << " http_status=" << http_ret << " ======" << dendl; @@ -355,14 +370,26 @@ namespace rgw { */ RGWOp *op = (req->op) ? req->op : dynamic_cast(req); if (! op) { - dout(1) << "failed to derive cognate RGWOp (invalid op?)" << dendl; + ldpp_dout(op, 1) << "failed to derive cognate RGWOp (invalid op?)" << dendl; return -EINVAL; } struct req_state* s = req->get_state(); + RGWLibIO& io_ctx = req->get_io(); + RGWEnv& rgw_env = io_ctx.get_env(); + RGWObjectCtx& rados_ctx = req->get_octx(); + + rgw_env.set("HTTP_HOST", ""); + + int ret = req->init(rgw_env, &rados_ctx, &io_ctx, s); + if (ret < 0) { + ldpp_dout(op, 10) << "failed to initialize request" << dendl; + abort_req(s, op, ret); + goto done; + } /* req is-a RGWOp, currently initialized separately */ - int ret = req->op_init(); + ret = req->op_init(); if (ret < 0) { dout(10) << "failed to initialize RGWOp" << dendl; abort_req(s, op, ret); @@ -371,8 +398,8 @@ namespace rgw { /* XXX authorize does less here then in the REST path, e.g., * the user's info is cached, but still incomplete */ - req->log(s, "authorizing"); - ret = req->authorize(); + ldpp_dout(s, 2) << "authorizing" << dendl; + ret = req->authorize(op, null_yield); if (ret < 0) { dout(10) << "failed to authorize request" << dendl; abort_req(s, op, ret); @@ -385,41 +412,41 @@ namespace rgw { s->auth.identity = rgw::auth::transform_old_authinfo(s); } - req->log(s, "reading op permissions"); - ret = req->read_permissions(op); + ldpp_dout(s, 2) << "reading op permissions" << dendl; + ret = req->read_permissions(op, null_yield); if (ret < 0) { abort_req(s, op, ret); goto done; } - req->log(s, "init op"); - ret = op->init_processing(); + ldpp_dout(s, 2) << "init op" << dendl; + ret = op->init_processing(null_yield); if (ret < 0) { abort_req(s, op, ret); goto done; } - req->log(s, "verifying op mask"); + ldpp_dout(s, 2) << "verifying op mask" << dendl; ret = op->verify_op_mask(); if (ret < 0) { abort_req(s, op, ret); goto done; } - req->log(s, "verifying op permissions"); - ret = op->verify_permission(); + ldpp_dout(s, 2) << "verifying op permissions" << dendl; + ret = op->verify_permission(null_yield); if (ret < 0) { if (s->system_request) { - dout(2) << "overriding permissions due to system operation" << dendl; - } else if (s->auth.identity->is_admin_of(s->user->user_id)) { - dout(2) << "overriding permissions due to admin operation" << dendl; + ldpp_dout(op, 2) << "overriding permissions due to system operation" << dendl; + } else if (s->auth.identity->is_admin_of(s->user->get_id())) { + ldpp_dout(op, 2) << "overriding permissions due to admin operation" << dendl; } else { abort_req(s, op, ret); goto done; } } - req->log(s, "verifying op params"); + ldpp_dout(s, 2) << "verifying op params" << dendl; ret = op->verify_params(); if (ret < 0) { abort_req(s, op, ret); @@ -437,25 +464,27 @@ namespace rgw { { RGWOp *op = (req->op) ? req->op : dynamic_cast(req); if (! op) { - dout(1) << "failed to derive cognate RGWOp (invalid op?)" << dendl; + ldpp_dout(op, 1) << "failed to derive cognate RGWOp (invalid op?)" << dendl; return -EINVAL; } int ret = req->exec_finish(); int op_ret = op->get_ret(); - dout(1) << "====== " << __func__ + ldpp_dout(op, 1) << "====== " << __func__ << " finishing continued request req=" << hex << req << dec << " op status=" << op_ret << " ======" << dendl; + perfcounter->inc(l_rgw_req); + return ret; } int RGWLibFrontend::init() { pprocess = new RGWLibProcess(g_ceph_context, &env, - g_conf->rgw_thread_pool_size, conf); + g_conf()->rgw_thread_pool_size, conf); return 0; } @@ -470,42 +499,61 @@ namespace rgw { int r = 0; /* alternative default for module */ - vector def_args; - def_args.push_back("--debug-rgw=1/5"); - def_args.push_back("--keyring=$rgw_data/keyring"); - def_args.push_back("--log-file=/var/log/radosgw/$cluster-$name.log"); + map defaults = { + { "debug_rgw", "1/5" }, + { "keyring", "$rgw_data/keyring" }, + { "log_file", "/var/log/radosgw/$cluster-$name.log" } + }; - cct = global_init(&def_args, args, + cct = global_init(&defaults, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_DAEMON, CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); - Mutex mutex("main"); + ceph::mutex mutex = ceph::make_mutex("main"); SafeTimer init_timer(g_ceph_context, mutex); init_timer.init(); - mutex.Lock(); - init_timer.add_event_after(g_conf->rgw_init_timeout, new C_InitTimeout); - mutex.Unlock(); + mutex.lock(); + init_timer.add_event_after(g_conf()->rgw_init_timeout, new C_InitTimeout); + mutex.unlock(); common_init_finish(g_ceph_context); - rgw_tools_init(g_ceph_context); + rgw_tools_init(this, g_ceph_context); rgw_init_resolver(); rgw::curl::setup_curl(boost::none); + rgw_http_client_init(g_ceph_context); + + auto run_gc = + g_conf()->rgw_enable_gc_threads && + g_conf()->rgw_nfs_run_gc_threads; + + auto run_lc = + g_conf()->rgw_enable_lc_threads && + g_conf()->rgw_nfs_run_lc_threads; - store = RGWStoreManager::get_storage(g_ceph_context, - g_conf->rgw_enable_gc_threads, - g_conf->rgw_enable_lc_threads, - g_conf->rgw_enable_quota_threads, - g_conf->rgw_run_sync_thread, - g_conf->rgw_dynamic_resharding); + auto run_quota = + g_conf()->rgw_enable_quota_threads && + g_conf()->rgw_nfs_run_quota_threads; + + auto run_sync = + g_conf()->rgw_run_sync_thread && + g_conf()->rgw_nfs_run_sync_thread; + + store = StoreManager::get_storage(this, g_ceph_context, + "rados", + run_gc, + run_lc, + run_quota, + run_sync, + g_conf().get_val("rgw_dynamic_resharding")); if (!store) { - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); derr << "Couldn't init storage provider (RADOS)" << dendl; return -EIO; @@ -513,12 +561,12 @@ namespace rgw { r = rgw_perf_start(g_ceph_context); - rgw_rest_init(g_ceph_context, store, store->get_zonegroup()); + rgw_rest_init(g_ceph_context, store->get_zone()->get_zonegroup()); - mutex.Lock(); + mutex.lock(); init_timer.cancel_all_events(); init_timer.shutdown(); - mutex.Unlock(); + mutex.unlock(); if (r) return -EIO; @@ -536,16 +584,24 @@ namespace rgw { ldh->init(); ldh->bind(); - rgw_user_init(store); - rgw_bucket_init(store->meta_mgr); rgw_log_usage_init(g_ceph_context, store); // XXX ex-RGWRESTMgr_lib, mgr->set_logging(true) - if (!g_conf->rgw_ops_log_socket_path.empty()) { - olog = new OpsLogSocket(g_ceph_context, g_conf->rgw_ops_log_data_backlog); - olog->init(g_conf->rgw_ops_log_socket_path); + OpsLogManifold* olog_manifold = new OpsLogManifold(); + if (!g_conf()->rgw_ops_log_socket_path.empty()) { + OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog); + olog_socket->init(g_conf()->rgw_ops_log_socket_path); + olog_manifold->add_sink(olog_socket); + } + OpsLogFile* ops_log_file; + if (!g_conf()->rgw_ops_log_file_path.empty()) { + ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog); + ops_log_file->start(); + olog_manifold->add_sink(ops_log_file); } + olog_manifold->add_sink(new OpsLogRados(store)); + olog = olog_manifold; int port = 80; RGWProcessEnv env = { store, &rest, olog, port }; @@ -570,12 +626,23 @@ namespace rgw { fe->run(); - r = store->register_to_service_map("rgw-nfs", service_map_meta); + r = store->register_to_service_map(this, "rgw-nfs", service_map_meta); if (r < 0) { derr << "ERROR: failed to register to service map: " << cpp_strerror(-r) << dendl; /* ignore error */ } +#ifdef WITH_RADOSGW_AMQP_ENDPOINT + if (!rgw::amqp::init(cct.get())) { + derr << "ERROR: failed to initialize AMQP manager" << dendl; + } +#endif +#ifdef WITH_RADOSGW_KAFKA_ENDPOINT + if (!rgw::kafka::init(cct.get())) { + derr << "ERROR: failed to initialize Kafka manager" << dendl; + } +#endif + return 0; } /* RGWLib::init() */ @@ -595,14 +662,21 @@ namespace rgw { shutdown_async_signal_handler(); rgw_log_usage_finalize(); - + delete olog; - RGWStoreManager::close_storage(store); + StoreManager::close_storage(store); rgw_tools_cleanup(); rgw_shutdown_resolver(); + rgw_http_client_cleanup(); rgw::curl::cleanup_curl(); +#ifdef WITH_RADOSGW_AMQP_ENDPOINT + rgw::amqp::shutdown(); +#endif +#ifdef WITH_RADOSGW_KAFKA_ENDPOINT + rgw::kafka::shutdown(); +#endif rgw_perf_stop(g_ceph_context); @@ -612,22 +686,26 @@ namespace rgw { return 0; } /* RGWLib::stop() */ - int RGWLibIO::set_uid(RGWRados *store, const rgw_user& uid) + int RGWLibIO::set_uid(rgw::sal::Store* store, const rgw_user& uid) { - int ret = rgw_get_user_info_by_uid(store, uid, user_info, NULL); + const DoutPrefix dp(store->ctx(), dout_subsys, "librgw: "); + std::unique_ptr user = store->get_user(uid); + /* object exists, but policy is broken */ + int ret = user->load_user(&dp, null_yield); if (ret < 0) { derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl; } + user_info = user->get_info(); return ret; } - int RGWLibRequest::read_permissions(RGWOp* op) { + int RGWLibRequest::read_permissions(RGWOp* op, optional_yield y) { /* bucket and object ops */ int ret = - rgw_build_bucket_policies(rgwlib.get_store(), get_state()); + rgw_build_bucket_policies(op, rgwlib.get_store(), get_state(), y); if (ret < 0) { - ldout(get_state()->cct, 10) << "read_permissions (bucket policy) on " + ldpp_dout(op, 10) << "read_permissions (bucket policy) on " << get_state()->bucket << ":" << get_state()->object << " only_bucket=" << only_bucket() @@ -636,10 +714,10 @@ namespace rgw { ret = -EACCES; } else if (! only_bucket()) { /* object ops */ - ret = rgw_build_object_policies(rgwlib.get_store(), get_state(), - op->prefetch_data()); + ret = rgw_build_object_policies(op, rgwlib.get_store(), get_state(), + op->prefetch_data(), y); if (ret < 0) { - ldout(get_state()->cct, 10) << "read_permissions (object policy) on" + ldpp_dout(op, 10) << "read_permissions (object policy) on" << get_state()->bucket << ":" << get_state()->object << " ret=" << ret << dendl; @@ -650,7 +728,7 @@ namespace rgw { return ret; } /* RGWLibRequest::read_permissions */ - int RGWHandler_Lib::authorize() + int RGWHandler_Lib::authorize(const DoutPrefixProvider *dpp, optional_yield y) { /* TODO: handle * 1. subusers @@ -664,8 +742,8 @@ namespace rgw { s->perm_mask = RGW_PERM_FULL_CONTROL; // populate the owner info - s->owner.set_id(s->user->user_id); - s->owner.set_name(s->user->display_name); + s->owner.set_id(s->user->get_id()); + s->owner.set_name(s->user->get_display_name()); return 0; } /* RGWHandler_Lib::authorize */ @@ -683,19 +761,17 @@ int librgw_create(librgw_t* rgw, int argc, char **argv) if (! g_ceph_context) { std::lock_guard lg(librgw_mtx); if (! g_ceph_context) { - vector args; std::vector spl_args; // last non-0 argument will be split and consumed if (argc > 1) { const std::string spl_arg{argv[(--argc)]}; get_str_vec(spl_arg, " \t", spl_args); } - argv_to_vec(argc, const_cast(argv), args); + auto args = argv_to_vec(argc, argv); // append split args, if any for (const auto& elt : spl_args) { args.push_back(elt.c_str()); } - env_to_vec(args); rc = rgwlib.init(args); } }