]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
update sources to 12.2.7
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
1 import json
2 import random
3 import string
4 import sys
5 import time
6 import logging
7
8 try:
9 from itertools import izip_longest as zip_longest
10 except ImportError:
11 from itertools import zip_longest
12 from itertools import combinations
13 from cStringIO import StringIO
14
15 import boto
16 import boto.s3.connection
17 from boto.s3.website import WebsiteConfiguration
18 from boto.s3.cors import CORSConfiguration
19
20 from nose.tools import eq_ as eq
21 from nose.plugins.attrib import attr
22 from nose.plugins.skip import SkipTest
23
24 from .multisite import Zone
25
26 from .conn import get_gateway_connection
27
28 class Config:
29 """ test configuration """
30 def __init__(self, **kwargs):
31 # by default, wait up to 5 minutes before giving up on a sync checkpoint
32 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
33 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
34 # allow some time for realm reconfiguration after changing master zone
35 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
36
37 # rgw multisite tests, written against the interfaces provided in rgw_multi.
38 # these tests must be initialized and run by another module that provides
39 # implementations of these interfaces by calling init_multi()
40 realm = None
41 user = None
42 config = None
43 def init_multi(_realm, _user, _config=None):
44 global realm
45 realm = _realm
46 global user
47 user = _user
48 global config
49 config = _config or Config()
50 realm_meta_checkpoint(realm)
51
52 def get_realm():
53 return realm
54
55 log = logging.getLogger(__name__)
56
57 num_buckets = 0
58 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
59
60 def get_gateway_connection(gateway, credentials):
61 """ connect to the given gateway """
62 if gateway.connection is None:
63 gateway.connection = boto.connect_s3(
64 aws_access_key_id = credentials.access_key,
65 aws_secret_access_key = credentials.secret,
66 host = gateway.host,
67 port = gateway.port,
68 is_secure = False,
69 calling_format = boto.s3.connection.OrdinaryCallingFormat())
70 return gateway.connection
71
72 def get_zone_connection(zone, credentials):
73 """ connect to the zone's first gateway """
74 if isinstance(credentials, list):
75 credentials = credentials[0]
76 return get_gateway_connection(zone.gateways[0], credentials)
77
78 def mdlog_list(zone, period = None):
79 cmd = ['mdlog', 'list']
80 if period:
81 cmd += ['--period', period]
82 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
83 mdlog_json = mdlog_json.decode('utf-8')
84 return json.loads(mdlog_json)
85
86 def meta_sync_status(zone):
87 while True:
88 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
89 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
90 if retcode == 0:
91 break
92 assert(retcode == 2) # ENOENT
93 time.sleep(5)
94
95 def mdlog_autotrim(zone):
96 zone.cluster.admin(['mdlog', 'autotrim'])
97
98 def bilog_list(zone, bucket, args = None):
99 cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
100 bilog, _ = zone.cluster.admin(cmd, read_only=True)
101 bilog = bilog.decode('utf-8')
102 return json.loads(bilog)
103
104 def bilog_autotrim(zone, args = None):
105 zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
106
107 def parse_meta_sync_status(meta_sync_status_json):
108 meta_sync_status_json = meta_sync_status_json.decode('utf-8')
109 log.debug('current meta sync status=%s', meta_sync_status_json)
110 sync_status = json.loads(meta_sync_status_json)
111
112 sync_info = sync_status['sync_status']['info']
113 global_sync_status = sync_info['status']
114 num_shards = sync_info['num_shards']
115 period = sync_info['period']
116 realm_epoch = sync_info['realm_epoch']
117
118 sync_markers=sync_status['sync_status']['markers']
119 log.debug('sync_markers=%s', sync_markers)
120 assert(num_shards == len(sync_markers))
121
122 markers={}
123 for i in range(num_shards):
124 # get marker, only if it's an incremental marker for the same realm epoch
125 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
126 markers[i] = ''
127 else:
128 markers[i] = sync_markers[i]['val']['marker']
129
130 return period, realm_epoch, num_shards, markers
131
132 def meta_sync_status(zone):
133 for _ in range(config.checkpoint_retries):
134 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
135 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
136 if retcode == 0:
137 return parse_meta_sync_status(meta_sync_status_json)
138 assert(retcode == 2) # ENOENT
139 time.sleep(config.checkpoint_delay)
140
141 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
142
143 def meta_master_log_status(master_zone):
144 cmd = ['mdlog', 'status'] + master_zone.zone_args()
145 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
146 mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
147
148 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
149 log.debug('master meta markers=%s', markers)
150 return markers
151
152 def compare_meta_status(zone, log_status, sync_status):
153 if len(log_status) != len(sync_status):
154 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
155 return False
156
157 msg = ''
158 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
159 if l > s:
160 if len(msg):
161 msg += ', '
162 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
163
164 if len(msg) > 0:
165 log.warning('zone %s behind master: %s', zone.name, msg)
166 return False
167
168 return True
169
170 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
171 if not meta_master_zone:
172 meta_master_zone = zone.realm().meta_master_zone()
173 if not master_status:
174 master_status = meta_master_log_status(meta_master_zone)
175
176 current_realm_epoch = realm.current_period.data['realm_epoch']
177
178 log.info('starting meta checkpoint for zone=%s', zone.name)
179
180 for _ in range(config.checkpoint_retries):
181 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
182 if realm_epoch < current_realm_epoch:
183 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
184 zone.name, realm_epoch, current_realm_epoch)
185 else:
186 log.debug('log_status=%s', master_status)
187 log.debug('sync_status=%s', sync_status)
188 if compare_meta_status(zone, master_status, sync_status):
189 log.info('finish meta checkpoint for zone=%s', zone.name)
190 return
191
192 time.sleep(config.checkpoint_delay)
193 assert False, 'failed meta checkpoint for zone=%s' % zone.name
194
195 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
196 if not meta_master_zone:
197 meta_master_zone = zonegroup.realm().meta_master_zone()
198 if not master_status:
199 master_status = meta_master_log_status(meta_master_zone)
200
201 for zone in zonegroup.zones:
202 if zone == meta_master_zone:
203 continue
204 zone_meta_checkpoint(zone, meta_master_zone, master_status)
205
206 def realm_meta_checkpoint(realm):
207 log.info('meta checkpoint')
208
209 meta_master_zone = realm.meta_master_zone()
210 master_status = meta_master_log_status(meta_master_zone)
211
212 for zonegroup in realm.current_period.zonegroups:
213 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
214
215 def parse_data_sync_status(data_sync_status_json):
216 data_sync_status_json = data_sync_status_json.decode('utf-8')
217 log.debug('current data sync status=%s', data_sync_status_json)
218 sync_status = json.loads(data_sync_status_json)
219
220 global_sync_status=sync_status['sync_status']['info']['status']
221 num_shards=sync_status['sync_status']['info']['num_shards']
222
223 sync_markers=sync_status['sync_status']['markers']
224 log.debug('sync_markers=%s', sync_markers)
225 assert(num_shards == len(sync_markers))
226
227 markers={}
228 for i in range(num_shards):
229 markers[i] = sync_markers[i]['val']['marker']
230
231 return (num_shards, markers)
232
233 def data_sync_status(target_zone, source_zone):
234 if target_zone == source_zone:
235 return None
236
237 for _ in range(config.checkpoint_retries):
238 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
239 cmd += ['--source-zone', source_zone.name]
240 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
241 if retcode == 0:
242 return parse_data_sync_status(data_sync_status_json)
243
244 assert(retcode == 2) # ENOENT
245 time.sleep(config.checkpoint_delay)
246
247 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
248 (target_zone.name, source_zone.name)
249
250 def bucket_sync_status(target_zone, source_zone, bucket_name):
251 if target_zone == source_zone:
252 return None
253
254 cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
255 cmd += ['--source-zone', source_zone.name]
256 cmd += ['--bucket', bucket_name]
257 while True:
258 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
259 if retcode == 0:
260 break
261
262 assert(retcode == 2) # ENOENT
263
264 bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
265 log.debug('current bucket sync markers=%s', bucket_sync_status_json)
266 sync_status = json.loads(bucket_sync_status_json)
267
268 markers={}
269 for entry in sync_status:
270 val = entry['val']
271 if val['status'] == 'incremental-sync':
272 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
273 else:
274 pos = ''
275 markers[entry['key']] = pos
276
277 return markers
278
279 def data_source_log_status(source_zone):
280 source_cluster = source_zone.cluster
281 cmd = ['datalog', 'status'] + source_zone.zone_args()
282 datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
283 datalog_status = json.loads(datalog_status_json.decode('utf-8'))
284
285 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
286 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
287 return markers
288
289 def bucket_source_log_status(source_zone, bucket_name):
290 cmd = ['bilog', 'status'] + source_zone.zone_args()
291 cmd += ['--bucket', bucket_name]
292 source_cluster = source_zone.cluster
293 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
294 bilog_status = json.loads(bilog_status_json.decode('utf-8'))
295
296 m={}
297 markers={}
298 try:
299 m = bilog_status['markers']
300 except:
301 pass
302
303 for s in m:
304 key = s['key']
305 val = s['val']
306 markers[key] = val
307
308 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
309 return markers
310
311 def compare_data_status(target_zone, source_zone, log_status, sync_status):
312 if len(log_status) != len(sync_status):
313 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
314 return False
315
316 msg = ''
317 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
318 if l > s:
319 if len(msg):
320 msg += ', '
321 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
322
323 if len(msg) > 0:
324 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
325 return False
326
327 return True
328
329 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
330 if len(log_status) != len(sync_status):
331 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
332 return False
333
334 msg = ''
335 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
336 if l > s:
337 if len(msg):
338 msg += ', '
339 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
340
341 if len(msg) > 0:
342 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
343 return False
344
345 return True
346
347 def zone_data_checkpoint(target_zone, source_zone_conn):
348 if target_zone == source_zone:
349 return
350
351 log_status = data_source_log_status(source_zone)
352 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
353
354 for _ in range(config.checkpoint_retries):
355 num_shards, sync_status = data_sync_status(target_zone, source_zone)
356
357 log.debug('log_status=%s', log_status)
358 log.debug('sync_status=%s', sync_status)
359
360 if compare_data_status(target_zone, source_zone, log_status, sync_status):
361 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
362 target_zone.name, source_zone.name)
363 return
364 time.sleep(config.checkpoint_delay)
365
366 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
367 (target_zone.name, source_zone.name)
368
369
370 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
371 if target_zone == source_zone:
372 return
373
374 log_status = bucket_source_log_status(source_zone, bucket_name)
375 log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
376
377 for _ in range(config.checkpoint_retries):
378 sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
379
380 log.debug('log_status=%s', log_status)
381 log.debug('sync_status=%s', sync_status)
382
383 if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
384 log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
385 return
386
387 time.sleep(config.checkpoint_delay)
388
389 assert False, 'failed bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
390 (target_zone.name, source_zone.name, bucket_name)
391
392 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
393 for source_conn in zonegroup_conns.zones:
394 for target_conn in zonegroup_conns.zones:
395 if source_conn.zone == target_conn.zone:
396 continue
397 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
398 for source_conn, target_conn in combinations(zonegroup_conns.zones, 2):
399 target_conn.check_bucket_eq(source_conn, bucket_name)
400
401 def set_master_zone(zone):
402 zone.modify(zone.cluster, ['--master'])
403 zonegroup = zone.zonegroup
404 zonegroup.period.update(zone, commit=True)
405 zonegroup.master_zone = zone
406 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
407 time.sleep(config.reconfigure_delay)
408
409 def enable_bucket_sync(zone, bucket_name):
410 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
411 zone.cluster.admin(cmd)
412
413 def disable_bucket_sync(zone, bucket_name):
414 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
415 zone.cluster.admin(cmd)
416
417 def check_buckets_sync_status_obj_not_exist(zone, buckets):
418 for _ in range(config.checkpoint_retries):
419 cmd = ['log', 'list'] + zone.zone_arg()
420 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
421 for bucket in buckets:
422 if log_list.find(':'+bucket+":") >= 0:
423 break
424 else:
425 return
426 time.sleep(config.checkpoint_delay)
427 assert False
428
429 def gen_bucket_name():
430 global num_buckets
431
432 num_buckets += 1
433 return run_prefix + '-' + str(num_buckets)
434
435 class ZonegroupConns:
436 def __init__(self, zonegroup):
437 self.zonegroup = zonegroup
438 self.zones = []
439 self.ro_zones = []
440 self.rw_zones = []
441 self.master_zone = None
442 for z in zonegroup.zones:
443 zone_conn = z.get_conn(user.credentials)
444 self.zones.append(zone_conn)
445 if z.is_read_only():
446 self.ro_zones.append(zone_conn)
447 else:
448 self.rw_zones.append(zone_conn)
449
450 if z == zonegroup.master_zone:
451 self.master_zone = zone_conn
452
453 def check_all_buckets_exist(zone_conn, buckets):
454 if not zone_conn.zone.has_buckets():
455 return True
456
457 for b in buckets:
458 try:
459 zone_conn.get_bucket(b)
460 except:
461 log.critical('zone %s does not contain bucket %s', zone.name, b)
462 return False
463
464 return True
465
466 def check_all_buckets_dont_exist(zone_conn, buckets):
467 if not zone_conn.zone.has_buckets():
468 return True
469
470 for b in buckets:
471 try:
472 zone_conn.get_bucket(b)
473 except:
474 continue
475
476 log.critical('zone %s contains bucket %s', zone.zone, b)
477 return False
478
479 return True
480
481 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
482 buckets = []
483 zone_bucket = []
484 for zone in zonegroup_conns.rw_zones:
485 for i in xrange(buckets_per_zone):
486 bucket_name = gen_bucket_name()
487 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
488 bucket = zone.create_bucket(bucket_name)
489 buckets.append(bucket_name)
490 zone_bucket.append((zone, bucket))
491
492 return buckets, zone_bucket
493
494 def create_bucket_per_zone_in_realm():
495 buckets = []
496 zone_bucket = []
497 for zonegroup in realm.current_period.zonegroups:
498 zg_conn = ZonegroupConns(zonegroup)
499 b, z = create_bucket_per_zone(zg_conn)
500 buckets.extend(b)
501 zone_bucket.extend(z)
502 return buckets, zone_bucket
503
504 def test_bucket_create():
505 zonegroup = realm.master_zonegroup()
506 zonegroup_conns = ZonegroupConns(zonegroup)
507 buckets, _ = create_bucket_per_zone(zonegroup_conns)
508 zonegroup_meta_checkpoint(zonegroup)
509
510 for zone in zonegroup_conns.zones:
511 assert check_all_buckets_exist(zone, buckets)
512
513 def test_bucket_recreate():
514 zonegroup = realm.master_zonegroup()
515 zonegroup_conns = ZonegroupConns(zonegroup)
516 buckets, _ = create_bucket_per_zone(zonegroup_conns)
517 zonegroup_meta_checkpoint(zonegroup)
518
519
520 for zone in zonegroup_conns.zones:
521 assert check_all_buckets_exist(zone, buckets)
522
523 # recreate buckets on all zones, make sure they weren't removed
524 for zone in zonegroup_conns.rw_zones:
525 for bucket_name in buckets:
526 bucket = zone.create_bucket(bucket_name)
527
528 for zone in zonegroup_conns.zones:
529 assert check_all_buckets_exist(zone, buckets)
530
531 zonegroup_meta_checkpoint(zonegroup)
532
533 for zone in zonegroup_conns.zones:
534 assert check_all_buckets_exist(zone, buckets)
535
536 def test_bucket_remove():
537 zonegroup = realm.master_zonegroup()
538 zonegroup_conns = ZonegroupConns(zonegroup)
539 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
540 zonegroup_meta_checkpoint(zonegroup)
541
542 for zone in zonegroup_conns.zones:
543 assert check_all_buckets_exist(zone, buckets)
544
545 for zone, bucket_name in zone_bucket:
546 zone.conn.delete_bucket(bucket_name)
547
548 zonegroup_meta_checkpoint(zonegroup)
549
550 for zone in zonegroup_conns.zones:
551 assert check_all_buckets_dont_exist(zone, buckets)
552
553 def get_bucket(zone, bucket_name):
554 return zone.conn.get_bucket(bucket_name)
555
556 def get_key(zone, bucket_name, obj_name):
557 b = get_bucket(zone, bucket_name)
558 return b.get_key(obj_name)
559
560 def new_key(zone, bucket_name, obj_name):
561 b = get_bucket(zone, bucket_name)
562 return b.new_key(obj_name)
563
564 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
565 return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
566
567 def test_object_sync():
568 zonegroup = realm.master_zonegroup()
569 zonegroup_conns = ZonegroupConns(zonegroup)
570 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
571
572 objnames = [ 'myobj', '_myobj', ':', '&' ]
573 content = 'asdasd'
574
575 # don't wait for meta sync just yet
576 for zone, bucket_name in zone_bucket:
577 for objname in objnames:
578 k = new_key(zone, bucket_name, objname)
579 k.set_contents_from_string(content)
580
581 zonegroup_meta_checkpoint(zonegroup)
582
583 for source_conn, bucket in zone_bucket:
584 for target_conn in zonegroup_conns.zones:
585 if source_conn.zone == target_conn.zone:
586 continue
587
588 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
589 check_bucket_eq(source_conn, target_conn, bucket)
590
591 def test_object_delete():
592 zonegroup = realm.master_zonegroup()
593 zonegroup_conns = ZonegroupConns(zonegroup)
594 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
595
596 objname = 'myobj'
597 content = 'asdasd'
598
599 # don't wait for meta sync just yet
600 for zone, bucket in zone_bucket:
601 k = new_key(zone, bucket, objname)
602 k.set_contents_from_string(content)
603
604 zonegroup_meta_checkpoint(zonegroup)
605
606 # check object exists
607 for source_conn, bucket in zone_bucket:
608 for target_conn in zonegroup_conns.zones:
609 if source_conn.zone == target_conn.zone:
610 continue
611
612 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
613 check_bucket_eq(source_conn, target_conn, bucket)
614
615 # check object removal
616 for source_conn, bucket in zone_bucket:
617 k = get_key(source_conn, bucket, objname)
618 k.delete()
619 for target_conn in zonegroup_conns.zones:
620 if source_conn.zone == target_conn.zone:
621 continue
622
623 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
624 check_bucket_eq(source_conn, target_conn, bucket)
625
626 def get_latest_object_version(key):
627 for k in key.bucket.list_versions(key.name):
628 if k.is_latest:
629 return k
630 return None
631
632 def test_versioned_object_incremental_sync():
633 zonegroup = realm.master_zonegroup()
634 zonegroup_conns = ZonegroupConns(zonegroup)
635 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
636
637 # enable versioning
638 for _, bucket in zone_bucket:
639 bucket.configure_versioning(True)
640
641 zonegroup_meta_checkpoint(zonegroup)
642
643 # upload a dummy object to each bucket and wait for sync. this forces each
644 # bucket to finish a full sync and switch to incremental
645 for source_conn, bucket in zone_bucket:
646 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
647 for target_conn in zonegroup_conns.zones:
648 if source_conn.zone == target_conn.zone:
649 continue
650 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
651
652 for _, bucket in zone_bucket:
653 # create and delete multiple versions of an object from each zone
654 for zone_conn in zonegroup_conns.rw_zones:
655 obj = 'obj-' + zone_conn.name
656 k = new_key(zone_conn, bucket, obj)
657
658 k.set_contents_from_string('version1')
659 v = get_latest_object_version(k)
660 log.debug('version1 id=%s', v.version_id)
661 # don't delete version1 - this tests that the initial version
662 # doesn't get squashed into later versions
663
664 # create and delete the following object versions to test that
665 # the operations don't race with each other during sync
666 k.set_contents_from_string('version2')
667 v = get_latest_object_version(k)
668 log.debug('version2 id=%s', v.version_id)
669 k.bucket.delete_key(obj, version_id=v.version_id)
670
671 k.set_contents_from_string('version3')
672 v = get_latest_object_version(k)
673 log.debug('version3 id=%s', v.version_id)
674 k.bucket.delete_key(obj, version_id=v.version_id)
675
676 for _, bucket in zone_bucket:
677 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
678
679 def test_bucket_versioning():
680 buckets, zone_bucket = create_bucket_per_zone_in_realm()
681 for _, bucket in zone_bucket:
682 bucket.configure_versioning(True)
683 res = bucket.get_versioning_status()
684 key = 'Versioning'
685 assert(key in res and res[key] == 'Enabled')
686
687 def test_bucket_acl():
688 buckets, zone_bucket = create_bucket_per_zone_in_realm()
689 for _, bucket in zone_bucket:
690 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
691 bucket.set_acl('public-read')
692 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
693
694 def test_bucket_cors():
695 buckets, zone_bucket = create_bucket_per_zone_in_realm()
696 for _, bucket in zone_bucket:
697 cors_cfg = CORSConfiguration()
698 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
699 bucket.set_cors(cors_cfg)
700 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
701
702 def test_bucket_delete_notempty():
703 zonegroup = realm.master_zonegroup()
704 zonegroup_conns = ZonegroupConns(zonegroup)
705 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
706 zonegroup_meta_checkpoint(zonegroup)
707
708 for zone_conn, bucket_name in zone_bucket:
709 # upload an object to each bucket on its own zone
710 conn = zone_conn.get_connection()
711 bucket = conn.get_bucket(bucket_name)
712 k = bucket.new_key('foo')
713 k.set_contents_from_string('bar')
714 # attempt to delete the bucket before this object can sync
715 try:
716 conn.delete_bucket(bucket_name)
717 except boto.exception.S3ResponseError as e:
718 assert(e.error_code == 'BucketNotEmpty')
719 continue
720 assert False # expected 409 BucketNotEmpty
721
722 # assert that each bucket still exists on the master
723 c1 = zonegroup_conns.master_zone.conn
724 for _, bucket_name in zone_bucket:
725 assert c1.get_bucket(bucket_name)
726
727 def test_multi_period_incremental_sync():
728 zonegroup = realm.master_zonegroup()
729 if len(zonegroup.zones) < 3:
730 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
731
732 # periods to include in mdlog comparison
733 mdlog_periods = [realm.current_period.id]
734
735 # create a bucket in each zone
736 zonegroup_conns = ZonegroupConns(zonegroup)
737 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
738
739 zonegroup_meta_checkpoint(zonegroup)
740
741 z1, z2, z3 = zonegroup.zones[0:3]
742 assert(z1 == zonegroup.master_zone)
743
744 # kill zone 3 gateways to freeze sync status to incremental in first period
745 z3.stop()
746
747 # change master to zone 2 -> period 2
748 set_master_zone(z2)
749 mdlog_periods += [realm.current_period.id]
750
751 for zone_conn, _ in zone_bucket:
752 if zone_conn.zone == z3:
753 continue
754 bucket_name = gen_bucket_name()
755 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
756 bucket = zone_conn.conn.create_bucket(bucket_name)
757 buckets.append(bucket_name)
758
759 # wait for zone 1 to sync
760 zone_meta_checkpoint(z1)
761
762 # change master back to zone 1 -> period 3
763 set_master_zone(z1)
764 mdlog_periods += [realm.current_period.id]
765
766 for zone_conn, bucket_name in zone_bucket:
767 if zone_conn.zone == z3:
768 continue
769 bucket_name = gen_bucket_name()
770 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
771 bucket = zone_conn.conn.create_bucket(bucket_name)
772 buckets.append(bucket_name)
773
774 # restart zone 3 gateway and wait for sync
775 z3.start()
776 zonegroup_meta_checkpoint(zonegroup)
777
778 # verify that we end up with the same objects
779 for bucket_name in buckets:
780 for source_conn, _ in zone_bucket:
781 for target_conn in zonegroup_conns.zones:
782 if source_conn.zone == target_conn.zone:
783 continue
784
785 target_conn.check_bucket_eq(source_conn, bucket_name)
786
787 # verify that mdlogs are not empty and match for each period
788 for period in mdlog_periods:
789 master_mdlog = mdlog_list(z1, period)
790 assert len(master_mdlog) > 0
791 for zone in zonegroup.zones:
792 if zone == z1:
793 continue
794 mdlog = mdlog_list(zone, period)
795 assert len(mdlog) == len(master_mdlog)
796
797 # autotrim mdlogs for master zone
798 mdlog_autotrim(z1)
799
800 # autotrim mdlogs for peers
801 for zone in zonegroup.zones:
802 if zone == z1:
803 continue
804 mdlog_autotrim(zone)
805
806 # verify that mdlogs are empty for each period
807 for period in mdlog_periods:
808 for zone in zonegroup.zones:
809 mdlog = mdlog_list(zone, period)
810 assert len(mdlog) == 0
811
812 def test_zonegroup_remove():
813 zonegroup = realm.master_zonegroup()
814 zonegroup_conns = ZonegroupConns(zonegroup)
815 if len(zonegroup.zones) < 2:
816 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
817
818 zonegroup_meta_checkpoint(zonegroup)
819 z1, z2 = zonegroup.zones[0:2]
820 c1, c2 = (z1.cluster, z2.cluster)
821
822 # create a new zone in zonegroup on c2 and commit
823 zone = Zone('remove', zonegroup, c2)
824 zone.create(c2)
825 zonegroup.zones.append(zone)
826 zonegroup.period.update(zone, commit=True)
827
828 zonegroup.remove(c1, zone)
829
830 # another 'zonegroup remove' should fail with ENOENT
831 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
832 assert(retcode == 2) # ENOENT
833
834 # delete the new zone
835 zone.delete(c2)
836
837 # validate the resulting period
838 zonegroup.period.update(z1, commit=True)
839
840 def test_set_bucket_website():
841 buckets, zone_bucket = create_bucket_per_zone_in_realm()
842 for _, bucket in zone_bucket:
843 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
844 try:
845 bucket.set_website_configuration(website_cfg)
846 except boto.exception.S3ResponseError as e:
847 if e.error_code == 'MethodNotAllowed':
848 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
849 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
850
851 def test_set_bucket_policy():
852 policy = '''{
853 "Version": "2012-10-17",
854 "Statement": [{
855 "Effect": "Allow",
856 "Principal": "*"
857 }]
858 }'''
859 buckets, zone_bucket = create_bucket_per_zone_in_realm()
860 for _, bucket in zone_bucket:
861 bucket.set_policy(policy)
862 assert(bucket.get_policy() == policy)
863
864 def test_bucket_sync_disable():
865 zonegroup = realm.master_zonegroup()
866 zonegroup_conns = ZonegroupConns(zonegroup)
867 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
868
869 for bucket_name in buckets:
870 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
871
872 for zone in zonegroup.zones:
873 check_buckets_sync_status_obj_not_exist(zone, buckets)
874
875 def test_bucket_sync_enable_right_after_disable():
876 zonegroup = realm.master_zonegroup()
877 zonegroup_conns = ZonegroupConns(zonegroup)
878 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
879
880 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
881 content = 'asdasd'
882
883 for zone, bucket in zone_bucket:
884 for objname in objnames:
885 k = new_key(zone, bucket.name, objname)
886 k.set_contents_from_string(content)
887
888 for bucket_name in buckets:
889 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
890
891 for bucket_name in buckets:
892 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
893 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
894
895 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
896
897 for zone, bucket in zone_bucket:
898 for objname in objnames_2:
899 k = new_key(zone, bucket.name, objname)
900 k.set_contents_from_string(content)
901
902 for bucket_name in buckets:
903 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
904
905 def test_bucket_sync_disable_enable():
906 zonegroup = realm.master_zonegroup()
907 zonegroup_conns = ZonegroupConns(zonegroup)
908 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
909
910 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
911 content = 'asdasd'
912
913 for zone, bucket in zone_bucket:
914 for objname in objnames:
915 k = new_key(zone, bucket.name, objname)
916 k.set_contents_from_string(content)
917
918 zonegroup_meta_checkpoint(zonegroup)
919
920 for bucket_name in buckets:
921 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
922
923 for bucket_name in buckets:
924 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
925
926 zonegroup_meta_checkpoint(zonegroup)
927
928 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
929
930 for zone, bucket in zone_bucket:
931 for objname in objnames_2:
932 k = new_key(zone, bucket.name, objname)
933 k.set_contents_from_string(content)
934
935 for bucket_name in buckets:
936 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
937
938 for bucket_name in buckets:
939 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
940
941 def test_multipart_object_sync():
942 zonegroup = realm.master_zonegroup()
943 zonegroup_conns = ZonegroupConns(zonegroup)
944 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
945
946 _, bucket = zone_bucket[0]
947
948 # initiate a multipart upload
949 upload = bucket.initiate_multipart_upload('MULTIPART')
950 mp = boto.s3.multipart.MultiPartUpload(bucket)
951 mp.key_name = upload.key_name
952 mp.id = upload.id
953 part_size = 5 * 1024 * 1024 # 5M min part size
954 mp.upload_part_from_file(StringIO('a' * part_size), 1)
955 mp.upload_part_from_file(StringIO('b' * part_size), 2)
956 mp.upload_part_from_file(StringIO('c' * part_size), 3)
957 mp.upload_part_from_file(StringIO('d' * part_size), 4)
958 mp.complete_upload()
959
960 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
961
962 def test_encrypted_object_sync():
963 zonegroup = realm.master_zonegroup()
964 zonegroup_conns = ZonegroupConns(zonegroup)
965
966 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
967
968 # create a bucket on the first zone
969 bucket_name = gen_bucket_name()
970 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
971 bucket = zone1.conn.create_bucket(bucket_name)
972
973 # upload an object with sse-c encryption
974 sse_c_headers = {
975 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
976 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
977 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
978 }
979 key = bucket.new_key('testobj-sse-c')
980 data = 'A'*512
981 key.set_contents_from_string(data, headers=sse_c_headers)
982
983 # upload an object with sse-kms encryption
984 sse_kms_headers = {
985 'x-amz-server-side-encryption': 'aws:kms',
986 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
987 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
988 }
989 key = bucket.new_key('testobj-sse-kms')
990 key.set_contents_from_string(data, headers=sse_kms_headers)
991
992 # wait for the bucket metadata and data to sync
993 zonegroup_meta_checkpoint(zonegroup)
994 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
995
996 # read the encrypted objects from the second zone
997 bucket2 = get_bucket(zone2, bucket_name)
998 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
999 eq(data, key.get_contents_as_string(headers=sse_c_headers))
1000
1001 key = bucket2.get_key('testobj-sse-kms')
1002 eq(data, key.get_contents_as_string())
1003
1004 def test_bucket_index_log_trim():
1005 zonegroup = realm.master_zonegroup()
1006 zonegroup_conns = ZonegroupConns(zonegroup)
1007
1008 zone = zonegroup_conns.rw_zones[0]
1009
1010 # create a test bucket, upload some objects, and wait for sync
1011 def make_test_bucket():
1012 name = gen_bucket_name()
1013 log.info('create bucket zone=%s name=%s', zone.name, name)
1014 bucket = zone.conn.create_bucket(name)
1015 for objname in ('a', 'b', 'c', 'd'):
1016 k = new_key(zone, name, objname)
1017 k.set_contents_from_string('foo')
1018 zonegroup_meta_checkpoint(zonegroup)
1019 zonegroup_bucket_checkpoint(zonegroup_conns, name)
1020 return bucket
1021
1022 # create a 'cold' bucket
1023 cold_bucket = make_test_bucket()
1024
1025 # trim with max-buckets=0 to clear counters for cold bucket. this should
1026 # prevent it from being considered 'active' by the next autotrim
1027 bilog_autotrim(zone.zone, [
1028 '--rgw-sync-log-trim-max-buckets', '0',
1029 ])
1030
1031 # create an 'active' bucket
1032 active_bucket = make_test_bucket()
1033
1034 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1035 bilog_autotrim(zone.zone, [
1036 '--rgw-sync-log-trim-max-buckets', '1',
1037 '--rgw-sync-log-trim-min-cold-buckets', '0',
1038 ])
1039
1040 # verify active bucket has empty bilog
1041 active_bilog = bilog_list(zone.zone, active_bucket.name)
1042 assert(len(active_bilog) == 0)
1043
1044 # verify cold bucket has nonempty bilog
1045 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1046 assert(len(cold_bilog) > 0)
1047
1048 # trim with min-cold-buckets=999 to trim all buckets
1049 bilog_autotrim(zone.zone, [
1050 '--rgw-sync-log-trim-max-buckets', '999',
1051 '--rgw-sync-log-trim-min-cold-buckets', '999',
1052 ])
1053
1054 # verify cold bucket has empty bilog
1055 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1056 assert(len(cold_bilog) == 0)