]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crush/CrushWrapper.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crush / CrushWrapper.cc
index 2b11ce9ec131ef0a36125fd916c470686ced546e..ed766719508880d0f42f23d532019e9671c35a1a 100644 (file)
 
 #define dout_subsys ceph_subsys_crush
 
+using std::cout;
+using std::list;
+using std::map;
+using std::make_pair;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::decode_nohead;
+using ceph::encode;
+using ceph::Formatter;
+
 bool CrushWrapper::has_legacy_rule_ids() const
 {
   for (unsigned i=0; i<crush->max_rules; i++) {
@@ -914,14 +931,26 @@ int CrushWrapper::verify_upmap(CephContext *cct,
                << dendl;
     return -ENOENT;
   }
+  int root_bucket = 0;
+  int cursor = 0;
+  std::map<int, int> type_stack;
   for (unsigned step = 0; step < rule->len; ++step) {
     auto curstep = &rule->steps[step];
     ldout(cct, 10) << __func__ << " step " << step << dendl;
     switch (curstep->op) {
+    case CRUSH_RULE_TAKE:
+      {
+        root_bucket = curstep->arg1;
+      }
+      break;
     case CRUSH_RULE_CHOOSELEAF_FIRSTN:
     case CRUSH_RULE_CHOOSELEAF_INDEP:
       {
+        int numrep = curstep->arg1;
         int type = curstep->arg2;
+        if (numrep <= 0)
+          numrep += pool_size;
+        type_stack.emplace(type, numrep);
         if (type == 0) // osd
           break;
         map<int, set<int>> osds_by_parent; // parent_of_desired_type -> osds
@@ -951,10 +980,11 @@ int CrushWrapper::verify_upmap(CephContext *cct,
       {
         int numrep = curstep->arg1;
         int type = curstep->arg2;
-        if (type == 0) // osd
-          break;
         if (numrep <= 0)
           numrep += pool_size;
+        type_stack.emplace(type, numrep);
+        if (type == 0) // osd
+          break;
         set<int> parents_of_type;
         for (auto osd : up) {
           auto parent = get_parent_of_type(osd, type, rule_id);
@@ -975,6 +1005,26 @@ int CrushWrapper::verify_upmap(CephContext *cct,
       }
       break;
 
+    case CRUSH_RULE_EMIT:
+      {
+        if (root_bucket < 0) {
+          int num_osds = 1;
+          for (auto &item : type_stack) {
+            num_osds *= item.second;
+          }
+          // validate the osd's in subtree
+          for (int c = 0; cursor < (int)up.size() && c < num_osds; ++cursor, ++c) {
+            int osd = up[cursor];
+            if (!subtree_contains(root_bucket, osd)) {
+              lderr(cct) << __func__ << " osd " << osd << " not in bucket " << root_bucket << dendl;
+              return -EINVAL;
+            }
+          }
+        }
+        type_stack.clear();
+        root_bucket = 0;
+      }
+      break;
     default:
       // ignore
       break;
@@ -1787,7 +1837,7 @@ int32_t CrushWrapper::_alloc_class_id() const {
     return class_id;
   }
   // wrapped, pick a random start and do exhaustive search
-  uint32_t upperlimit = numeric_limits<int32_t>::max();
+  uint32_t upperlimit = std::numeric_limits<int32_t>::max();
   upperlimit++;
   class_id = rand() % upperlimit;
   const auto start = class_id;
@@ -3060,7 +3110,7 @@ void CrushWrapper::decode(bufferlist::const_iterator& blp)
   __u32 magic;
   decode(magic, blp);
   if (magic != CRUSH_MAGIC)
-    throw buffer::malformed_input("bad magic number");
+    throw ceph::buffer::malformed_input("bad magic number");
 
   decode(crush->max_buckets, blp);
   decode(crush->max_rules, blp);
@@ -3212,7 +3262,7 @@ void CrushWrapper::decode_crush_bucket(crush_bucket** bptr, bufferlist::const_it
     {
       char str[128];
       snprintf(str, sizeof(str), "unsupported bucket algorithm: %d", alg);
-      throw buffer::malformed_input(str);
+      throw ceph::buffer::malformed_input(str);
     }
   }
   crush_bucket *bucket = reinterpret_cast<crush_bucket*>(calloc(1, size));