]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
1 import json
2 import random
3 import string
4 import sys
5 import time
6 import logging
7 import errno
8 import dateutil.parser
9
10 from itertools import combinations
11 from itertools import zip_longest
12 from io import StringIO
13
14 import boto
15 import boto.s3.connection
16 from boto.s3.website import WebsiteConfiguration
17 from boto.s3.cors import CORSConfiguration
18
19 from nose.tools import eq_ as eq
20 from nose.tools import assert_not_equal, assert_equal
21 from nose.plugins.attrib import attr
22 from nose.plugins.skip import SkipTest
23
24 from .multisite import Zone, ZoneGroup, Credentials
25
26 from .conn import get_gateway_connection
27 from .tools import assert_raises
28
29 class Config:
30 """ test configuration """
31 def __init__(self, **kwargs):
32 # by default, wait up to 5 minutes before giving up on a sync checkpoint
33 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
34 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
35 # allow some time for realm reconfiguration after changing master zone
36 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
37 self.tenant = kwargs.get('tenant', '')
38
39 # rgw multisite tests, written against the interfaces provided in rgw_multi.
40 # these tests must be initialized and run by another module that provides
41 # implementations of these interfaces by calling init_multi()
42 realm = None
43 user = None
44 config = None
45 def init_multi(_realm, _user, _config=None):
46 global realm
47 realm = _realm
48 global user
49 user = _user
50 global config
51 config = _config or Config()
52 realm_meta_checkpoint(realm)
53
54 def get_user():
55 return user.id if user is not None else ''
56
57 def get_tenant():
58 return config.tenant if config is not None and config.tenant is not None else ''
59
60 def get_realm():
61 return realm
62
63 log = logging.getLogger('rgw_multi.tests')
64
65 num_buckets = 0
66 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
67
68 num_roles = 0
69
70 def get_zone_connection(zone, credentials):
71 """ connect to the zone's first gateway """
72 if isinstance(credentials, list):
73 credentials = credentials[0]
74 return get_gateway_connection(zone.gateways[0], credentials)
75
76 def mdlog_list(zone, period = None):
77 cmd = ['mdlog', 'list']
78 if period:
79 cmd += ['--period', period]
80 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
81 return json.loads(mdlog_json)
82
83 def mdlog_autotrim(zone):
84 zone.cluster.admin(['mdlog', 'autotrim'])
85
86 def datalog_list(zone, args = None):
87 cmd = ['datalog', 'list'] + (args or [])
88 (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
89 return json.loads(datalog_json)
90
91 def datalog_status(zone):
92 cmd = ['datalog', 'status']
93 (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
94 return json.loads(datalog_json)
95
96 def datalog_autotrim(zone):
97 zone.cluster.admin(['datalog', 'autotrim'])
98
99 def bilog_list(zone, bucket, args = None):
100 cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
101 cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
102 bilog, _ = zone.cluster.admin(cmd, read_only=True)
103 return json.loads(bilog)
104
105 def bilog_autotrim(zone, args = None):
106 zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
107
108 def bucket_layout(zone, bucket, args = None):
109 (bl_output,_) = zone.cluster.admin(['bucket', 'layout', '--bucket', bucket] + (args or []))
110 return json.loads(bl_output)
111
112 def parse_meta_sync_status(meta_sync_status_json):
113 log.debug('current meta sync status=%s', meta_sync_status_json)
114 sync_status = json.loads(meta_sync_status_json)
115
116 sync_info = sync_status['sync_status']['info']
117 global_sync_status = sync_info['status']
118 num_shards = sync_info['num_shards']
119 period = sync_info['period']
120 realm_epoch = sync_info['realm_epoch']
121
122 sync_markers=sync_status['sync_status']['markers']
123 log.debug('sync_markers=%s', sync_markers)
124 assert(num_shards == len(sync_markers))
125
126 markers={}
127 for i in range(num_shards):
128 # get marker, only if it's an incremental marker for the same realm epoch
129 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
130 markers[i] = ''
131 else:
132 markers[i] = sync_markers[i]['val']['marker']
133
134 return period, realm_epoch, num_shards, markers
135
136 def meta_sync_status(zone):
137 for _ in range(config.checkpoint_retries):
138 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
139 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
140 if retcode == 0:
141 return parse_meta_sync_status(meta_sync_status_json)
142 assert(retcode == 2) # ENOENT
143 time.sleep(config.checkpoint_delay)
144
145 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
146
147 def meta_master_log_status(master_zone):
148 cmd = ['mdlog', 'status'] + master_zone.zone_args()
149 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
150 mdlog_status = json.loads(mdlog_status_json)
151
152 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
153 log.debug('master meta markers=%s', markers)
154 return markers
155
156 def compare_meta_status(zone, log_status, sync_status):
157 if len(log_status) != len(sync_status):
158 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
159 return False
160
161 msg = ''
162 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
163 if l > s:
164 if len(msg):
165 msg += ', '
166 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
167
168 if len(msg) > 0:
169 log.warning('zone %s behind master: %s', zone.name, msg)
170 return False
171
172 return True
173
174 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
175 if not meta_master_zone:
176 meta_master_zone = zone.realm().meta_master_zone()
177 if not master_status:
178 master_status = meta_master_log_status(meta_master_zone)
179
180 current_realm_epoch = realm.current_period.data['realm_epoch']
181
182 log.info('starting meta checkpoint for zone=%s', zone.name)
183
184 for _ in range(config.checkpoint_retries):
185 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
186 if realm_epoch < current_realm_epoch:
187 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
188 zone.name, realm_epoch, current_realm_epoch)
189 else:
190 log.debug('log_status=%s', master_status)
191 log.debug('sync_status=%s', sync_status)
192 if compare_meta_status(zone, master_status, sync_status):
193 log.info('finish meta checkpoint for zone=%s', zone.name)
194 return
195
196 time.sleep(config.checkpoint_delay)
197 assert False, 'failed meta checkpoint for zone=%s' % zone.name
198
199 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
200 if not meta_master_zone:
201 meta_master_zone = zonegroup.realm().meta_master_zone()
202 if not master_status:
203 master_status = meta_master_log_status(meta_master_zone)
204
205 for zone in zonegroup.zones:
206 if zone == meta_master_zone:
207 continue
208 zone_meta_checkpoint(zone, meta_master_zone, master_status)
209
210 def realm_meta_checkpoint(realm):
211 log.info('meta checkpoint')
212
213 meta_master_zone = realm.meta_master_zone()
214 master_status = meta_master_log_status(meta_master_zone)
215
216 for zonegroup in realm.current_period.zonegroups:
217 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
218
219 def parse_data_sync_status(data_sync_status_json):
220 log.debug('current data sync status=%s', data_sync_status_json)
221 sync_status = json.loads(data_sync_status_json)
222
223 global_sync_status=sync_status['sync_status']['info']['status']
224 num_shards=sync_status['sync_status']['info']['num_shards']
225
226 sync_markers=sync_status['sync_status']['markers']
227 log.debug('sync_markers=%s', sync_markers)
228 assert(num_shards == len(sync_markers))
229
230 markers={}
231 for i in range(num_shards):
232 markers[i] = sync_markers[i]['val']['marker']
233
234 return (num_shards, markers)
235
236 def data_sync_status(target_zone, source_zone):
237 if target_zone == source_zone:
238 return None
239
240 for _ in range(config.checkpoint_retries):
241 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
242 cmd += ['--source-zone', source_zone.name]
243 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
244 if retcode == 0:
245 return parse_data_sync_status(data_sync_status_json)
246
247 assert(retcode == 2) # ENOENT
248 time.sleep(config.checkpoint_delay)
249
250 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
251 (target_zone.name, source_zone.name)
252
253 def bucket_sync_status(target_zone, source_zone, bucket_name):
254 if target_zone == source_zone:
255 return None
256
257 cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
258 cmd += ['--source-zone', source_zone.name]
259 cmd += ['--bucket', bucket_name]
260 cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
261 while True:
262 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
263 if retcode == 0:
264 break
265
266 assert(retcode == 2) # ENOENT
267
268 sync_status = json.loads(bucket_sync_status_json)
269
270 markers={}
271 for entry in sync_status:
272 val = entry['val']
273 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
274 markers[entry['key']] = pos
275
276 return markers
277
278 def data_source_log_status(source_zone):
279 source_cluster = source_zone.cluster
280 cmd = ['datalog', 'status'] + source_zone.zone_args()
281 datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
282 datalog_status = json.loads(datalog_status_json)
283
284 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
285 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
286 return markers
287
288 def bucket_source_log_status(source_zone, bucket_name):
289 cmd = ['bilog', 'status'] + source_zone.zone_args()
290 cmd += ['--bucket', bucket_name]
291 cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
292 source_cluster = source_zone.cluster
293 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
294 bilog_status = json.loads(bilog_status_json)
295
296 m={}
297 markers={}
298 try:
299 m = bilog_status['markers']
300 except:
301 pass
302
303 for s in m:
304 key = s['key']
305 val = s['val']
306 markers[key] = val
307
308 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
309 return markers
310
311 def compare_data_status(target_zone, source_zone, log_status, sync_status):
312 if len(log_status) != len(sync_status):
313 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
314 return False
315
316 msg = ''
317 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
318 if l > s:
319 if len(msg):
320 msg += ', '
321 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
322
323 if len(msg) > 0:
324 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
325 return False
326
327 return True
328
329 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
330 if len(log_status) != len(sync_status):
331 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
332 return False
333
334 msg = ''
335 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
336 if l > s:
337 if len(msg):
338 msg += ', '
339 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
340
341 if len(msg) > 0:
342 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
343 return False
344
345 return True
346
347 def zone_data_checkpoint(target_zone, source_zone):
348 if not target_zone.syncs_from(source_zone.name):
349 return
350
351 log_status = data_source_log_status(source_zone)
352 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
353
354 for _ in range(config.checkpoint_retries):
355 num_shards, sync_status = data_sync_status(target_zone, source_zone)
356
357 log.debug('log_status=%s', log_status)
358 log.debug('sync_status=%s', sync_status)
359
360 if compare_data_status(target_zone, source_zone, log_status, sync_status):
361 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
362 target_zone.name, source_zone.name)
363 return
364 time.sleep(config.checkpoint_delay)
365
366 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
367 (target_zone.name, source_zone.name)
368
369 def zonegroup_data_checkpoint(zonegroup_conns):
370 for source_conn in zonegroup_conns.rw_zones:
371 for target_conn in zonegroup_conns.zones:
372 if source_conn.zone == target_conn.zone:
373 continue
374 log.debug('data checkpoint: source=%s target=%s', source_conn.zone.name, target_conn.zone.name)
375 zone_data_checkpoint(target_conn.zone, source_conn.zone)
376
377 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
378 if not target_zone.syncs_from(source_zone.name):
379 return
380
381 cmd = ['bucket', 'sync', 'checkpoint']
382 cmd += ['--bucket', bucket_name, '--source-zone', source_zone.name]
383 retry_delay_ms = config.checkpoint_delay * 1000
384 timeout_sec = config.checkpoint_retries * config.checkpoint_delay
385 cmd += ['--retry-delay-ms', str(retry_delay_ms), '--timeout-sec', str(timeout_sec)]
386 cmd += target_zone.zone_args()
387 target_zone.cluster.admin(cmd, debug_rgw=1)
388
389 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
390 for source_conn in zonegroup_conns.rw_zones:
391 for target_conn in zonegroup_conns.zones:
392 if source_conn.zone == target_conn.zone:
393 continue
394 log.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn.zone.name, target_conn.zone.name, bucket_name)
395 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
396 for source_conn, target_conn in combinations(zonegroup_conns.zones, 2):
397 if target_conn.zone.has_buckets():
398 target_conn.check_bucket_eq(source_conn, bucket_name)
399
400 def set_master_zone(zone):
401 zone.modify(zone.cluster, ['--master'])
402 zonegroup = zone.zonegroup
403 zonegroup.period.update(zone, commit=True)
404 zonegroup.master_zone = zone
405 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
406 time.sleep(config.reconfigure_delay)
407
408 def set_sync_from_all(zone, flag):
409 s = 'true' if flag else 'false'
410 zone.modify(zone.cluster, ['--sync-from-all={}'.format(s)])
411 zonegroup = zone.zonegroup
412 zonegroup.period.update(zone, commit=True)
413 log.info('Set sync_from_all flag on zone %s to %s', zone.name, s)
414 time.sleep(config.reconfigure_delay)
415
416 def set_redirect_zone(zone, redirect_zone):
417 id_str = redirect_zone.id if redirect_zone else ''
418 zone.modify(zone.cluster, ['--redirect-zone={}'.format(id_str)])
419 zonegroup = zone.zonegroup
420 zonegroup.period.update(zone, commit=True)
421 log.info('Set redirect_zone zone %s to "%s"', zone.name, id_str)
422 time.sleep(config.reconfigure_delay)
423
424 def enable_bucket_sync(zone, bucket_name):
425 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
426 zone.cluster.admin(cmd)
427
428 def disable_bucket_sync(zone, bucket_name):
429 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
430 zone.cluster.admin(cmd)
431
432 def check_buckets_sync_status_obj_not_exist(zone, buckets):
433 for _ in range(config.checkpoint_retries):
434 cmd = ['log', 'list'] + zone.zone_arg()
435 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
436 for bucket in buckets:
437 if log_list.find(':'+bucket+":") >= 0:
438 break
439 else:
440 return
441 time.sleep(config.checkpoint_delay)
442 assert False
443
444 def gen_bucket_name():
445 global num_buckets
446
447 num_buckets += 1
448 return run_prefix + '-' + str(num_buckets)
449
450 def gen_role_name():
451 global num_roles
452
453 num_roles += 1
454 return "roles" + '-' + run_prefix + '-' + str(num_roles)
455
456 class ZonegroupConns:
457 def __init__(self, zonegroup):
458 self.zonegroup = zonegroup
459 self.zones = []
460 self.ro_zones = []
461 self.rw_zones = []
462 self.master_zone = None
463
464 for z in zonegroup.zones:
465 zone_conn = z.get_conn(user.credentials)
466 self.zones.append(zone_conn)
467 if z.is_read_only():
468 self.ro_zones.append(zone_conn)
469 else:
470 self.rw_zones.append(zone_conn)
471
472 if z == zonegroup.master_zone:
473 self.master_zone = zone_conn
474
475 def check_all_buckets_exist(zone_conn, buckets):
476 if not zone_conn.zone.has_buckets():
477 return True
478
479 for b in buckets:
480 try:
481 zone_conn.get_bucket(b)
482 except:
483 log.critical('zone %s does not contain bucket %s', zone_conn.zone.name, b)
484 return False
485
486 return True
487
488 def check_all_buckets_dont_exist(zone_conn, buckets):
489 if not zone_conn.zone.has_buckets():
490 return True
491
492 for b in buckets:
493 try:
494 zone_conn.get_bucket(b)
495 except:
496 continue
497
498 log.critical('zone %s contains bucket %s', zone.zone, b)
499 return False
500
501 return True
502
503 def create_role_per_zone(zonegroup_conns, roles_per_zone = 1):
504 roles = []
505 zone_role = []
506 for zone in zonegroup_conns.rw_zones:
507 for i in range(roles_per_zone):
508 role_name = gen_role_name()
509 log.info('create role zone=%s name=%s', zone.name, role_name)
510 policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/testuser\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
511 role = zone.create_role("", role_name, policy_document, "")
512 roles.append(role_name)
513 zone_role.append((zone, role))
514
515 return roles, zone_role
516
517 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
518 buckets = []
519 zone_bucket = []
520 for zone in zonegroup_conns.rw_zones:
521 for i in range(buckets_per_zone):
522 bucket_name = gen_bucket_name()
523 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
524 bucket = zone.create_bucket(bucket_name)
525 buckets.append(bucket_name)
526 zone_bucket.append((zone, bucket))
527
528 return buckets, zone_bucket
529
530 def create_bucket_per_zone_in_realm():
531 buckets = []
532 zone_bucket = []
533 for zonegroup in realm.current_period.zonegroups:
534 zg_conn = ZonegroupConns(zonegroup)
535 b, z = create_bucket_per_zone(zg_conn)
536 buckets.extend(b)
537 zone_bucket.extend(z)
538 return buckets, zone_bucket
539
540 def test_bucket_create():
541 zonegroup = realm.master_zonegroup()
542 zonegroup_conns = ZonegroupConns(zonegroup)
543 buckets, _ = create_bucket_per_zone(zonegroup_conns)
544 zonegroup_meta_checkpoint(zonegroup)
545
546 for zone in zonegroup_conns.zones:
547 assert check_all_buckets_exist(zone, buckets)
548
549 def test_bucket_recreate():
550 zonegroup = realm.master_zonegroup()
551 zonegroup_conns = ZonegroupConns(zonegroup)
552 buckets, _ = create_bucket_per_zone(zonegroup_conns)
553 zonegroup_meta_checkpoint(zonegroup)
554
555
556 for zone in zonegroup_conns.zones:
557 assert check_all_buckets_exist(zone, buckets)
558
559 # recreate buckets on all zones, make sure they weren't removed
560 for zone in zonegroup_conns.rw_zones:
561 for bucket_name in buckets:
562 bucket = zone.create_bucket(bucket_name)
563
564 for zone in zonegroup_conns.zones:
565 assert check_all_buckets_exist(zone, buckets)
566
567 zonegroup_meta_checkpoint(zonegroup)
568
569 for zone in zonegroup_conns.zones:
570 assert check_all_buckets_exist(zone, buckets)
571
572 def test_bucket_remove():
573 zonegroup = realm.master_zonegroup()
574 zonegroup_conns = ZonegroupConns(zonegroup)
575 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
576 zonegroup_meta_checkpoint(zonegroup)
577
578 for zone in zonegroup_conns.zones:
579 assert check_all_buckets_exist(zone, buckets)
580
581 for zone, bucket_name in zone_bucket:
582 zone.conn.delete_bucket(bucket_name)
583
584 zonegroup_meta_checkpoint(zonegroup)
585
586 for zone in zonegroup_conns.zones:
587 assert check_all_buckets_dont_exist(zone, buckets)
588
589 def get_bucket(zone, bucket_name):
590 return zone.conn.get_bucket(bucket_name)
591
592 def get_key(zone, bucket_name, obj_name):
593 b = get_bucket(zone, bucket_name)
594 return b.get_key(obj_name)
595
596 def new_key(zone, bucket_name, obj_name):
597 b = get_bucket(zone, bucket_name)
598 return b.new_key(obj_name)
599
600 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
601 if zone_conn2.zone.has_buckets():
602 zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
603
604 def check_role_eq(zone_conn1, zone_conn2, role):
605 if zone_conn2.zone.has_roles():
606 zone_conn2.check_role_eq(zone_conn1, role['create_role_response']['create_role_result']['role']['role_name'])
607
608 def test_object_sync():
609 zonegroup = realm.master_zonegroup()
610 zonegroup_conns = ZonegroupConns(zonegroup)
611 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
612
613 objnames = [ 'myobj', '_myobj', ':', '&' ]
614 content = 'asdasd'
615
616 # don't wait for meta sync just yet
617 for zone, bucket_name in zone_bucket:
618 for objname in objnames:
619 k = new_key(zone, bucket_name, objname)
620 k.set_contents_from_string(content)
621
622 zonegroup_meta_checkpoint(zonegroup)
623
624 for source_conn, bucket in zone_bucket:
625 for target_conn in zonegroup_conns.zones:
626 if source_conn.zone == target_conn.zone:
627 continue
628
629 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
630 check_bucket_eq(source_conn, target_conn, bucket)
631
632 def test_object_delete():
633 zonegroup = realm.master_zonegroup()
634 zonegroup_conns = ZonegroupConns(zonegroup)
635 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
636
637 objname = 'myobj'
638 content = 'asdasd'
639
640 # don't wait for meta sync just yet
641 for zone, bucket in zone_bucket:
642 k = new_key(zone, bucket, objname)
643 k.set_contents_from_string(content)
644
645 zonegroup_meta_checkpoint(zonegroup)
646
647 # check object exists
648 for source_conn, bucket in zone_bucket:
649 for target_conn in zonegroup_conns.zones:
650 if source_conn.zone == target_conn.zone:
651 continue
652
653 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
654 check_bucket_eq(source_conn, target_conn, bucket)
655
656 # check object removal
657 for source_conn, bucket in zone_bucket:
658 k = get_key(source_conn, bucket, objname)
659 k.delete()
660 for target_conn in zonegroup_conns.zones:
661 if source_conn.zone == target_conn.zone:
662 continue
663
664 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
665 check_bucket_eq(source_conn, target_conn, bucket)
666
667 def get_latest_object_version(key):
668 for k in key.bucket.list_versions(key.name):
669 if k.is_latest:
670 return k
671 return None
672
673 def test_versioned_object_incremental_sync():
674 zonegroup = realm.master_zonegroup()
675 zonegroup_conns = ZonegroupConns(zonegroup)
676 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
677
678 # enable versioning
679 for _, bucket in zone_bucket:
680 bucket.configure_versioning(True)
681
682 zonegroup_meta_checkpoint(zonegroup)
683
684 # upload a dummy object to each bucket and wait for sync. this forces each
685 # bucket to finish a full sync and switch to incremental
686 for source_conn, bucket in zone_bucket:
687 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
688 for target_conn in zonegroup_conns.zones:
689 if source_conn.zone == target_conn.zone:
690 continue
691 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
692
693 for _, bucket in zone_bucket:
694 # create and delete multiple versions of an object from each zone
695 for zone_conn in zonegroup_conns.rw_zones:
696 obj = 'obj-' + zone_conn.name
697 k = new_key(zone_conn, bucket, obj)
698
699 k.set_contents_from_string('version1')
700 log.debug('version1 id=%s', k.version_id)
701 # don't delete version1 - this tests that the initial version
702 # doesn't get squashed into later versions
703
704 # create and delete the following object versions to test that
705 # the operations don't race with each other during sync
706 k.set_contents_from_string('version2')
707 log.debug('version2 id=%s', k.version_id)
708 k.bucket.delete_key(obj, version_id=k.version_id)
709
710 k.set_contents_from_string('version3')
711 log.debug('version3 id=%s', k.version_id)
712 k.bucket.delete_key(obj, version_id=k.version_id)
713
714 for _, bucket in zone_bucket:
715 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
716
717 for _, bucket in zone_bucket:
718 # overwrite the acls to test that metadata-only entries are applied
719 for zone_conn in zonegroup_conns.rw_zones:
720 obj = 'obj-' + zone_conn.name
721 k = new_key(zone_conn, bucket.name, obj)
722 v = get_latest_object_version(k)
723 v.make_public()
724
725 for _, bucket in zone_bucket:
726 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
727
728 def test_concurrent_versioned_object_incremental_sync():
729 zonegroup = realm.master_zonegroup()
730 zonegroup_conns = ZonegroupConns(zonegroup)
731 zone = zonegroup_conns.rw_zones[0]
732
733 # create a versioned bucket
734 bucket = zone.create_bucket(gen_bucket_name())
735 log.debug('created bucket=%s', bucket.name)
736 bucket.configure_versioning(True)
737
738 zonegroup_meta_checkpoint(zonegroup)
739
740 # upload a dummy object and wait for sync. this forces each zone to finish
741 # a full sync and switch to incremental
742 new_key(zone, bucket, 'dummy').set_contents_from_string('')
743 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
744
745 # create several concurrent versions on each zone and let them race to sync
746 obj = 'obj'
747 for i in range(10):
748 for zone_conn in zonegroup_conns.rw_zones:
749 k = new_key(zone_conn, bucket, obj)
750 k.set_contents_from_string('version1')
751 log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id)
752
753 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
754 zonegroup_data_checkpoint(zonegroup_conns)
755
756 def test_version_suspended_incremental_sync():
757 zonegroup = realm.master_zonegroup()
758 zonegroup_conns = ZonegroupConns(zonegroup)
759
760 zone = zonegroup_conns.rw_zones[0]
761
762 # create a non-versioned bucket
763 bucket = zone.create_bucket(gen_bucket_name())
764 log.debug('created bucket=%s', bucket.name)
765 zonegroup_meta_checkpoint(zonegroup)
766
767 # upload an initial object
768 key1 = new_key(zone, bucket, 'obj')
769 key1.set_contents_from_string('')
770 log.debug('created initial version id=%s', key1.version_id)
771 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
772
773 # enable versioning
774 bucket.configure_versioning(True)
775 zonegroup_meta_checkpoint(zonegroup)
776
777 # re-upload the object as a new version
778 key2 = new_key(zone, bucket, 'obj')
779 key2.set_contents_from_string('')
780 log.debug('created new version id=%s', key2.version_id)
781 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
782
783 # suspend versioning
784 bucket.configure_versioning(False)
785 zonegroup_meta_checkpoint(zonegroup)
786
787 # re-upload the object as a 'null' version
788 key3 = new_key(zone, bucket, 'obj')
789 key3.set_contents_from_string('')
790 log.debug('created null version id=%s', key3.version_id)
791 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
792
793 def test_delete_marker_full_sync():
794 zonegroup = realm.master_zonegroup()
795 zonegroup_conns = ZonegroupConns(zonegroup)
796 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
797
798 # enable versioning
799 for _, bucket in zone_bucket:
800 bucket.configure_versioning(True)
801 zonegroup_meta_checkpoint(zonegroup)
802
803 for zone, bucket in zone_bucket:
804 # upload an initial object
805 key1 = new_key(zone, bucket, 'obj')
806 key1.set_contents_from_string('')
807
808 # create a delete marker
809 key2 = new_key(zone, bucket, 'obj')
810 key2.delete()
811
812 # wait for full sync
813 for _, bucket in zone_bucket:
814 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
815
816 def test_suspended_delete_marker_full_sync():
817 zonegroup = realm.master_zonegroup()
818 zonegroup_conns = ZonegroupConns(zonegroup)
819 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
820
821 # enable/suspend versioning
822 for _, bucket in zone_bucket:
823 bucket.configure_versioning(True)
824 bucket.configure_versioning(False)
825 zonegroup_meta_checkpoint(zonegroup)
826
827 for zone, bucket in zone_bucket:
828 # upload an initial object
829 key1 = new_key(zone, bucket, 'obj')
830 key1.set_contents_from_string('')
831
832 # create a delete marker
833 key2 = new_key(zone, bucket, 'obj')
834 key2.delete()
835
836 # wait for full sync
837 for _, bucket in zone_bucket:
838 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
839
840 def test_bucket_versioning():
841 buckets, zone_bucket = create_bucket_per_zone_in_realm()
842 for _, bucket in zone_bucket:
843 bucket.configure_versioning(True)
844 res = bucket.get_versioning_status()
845 key = 'Versioning'
846 assert(key in res and res[key] == 'Enabled')
847
848 def test_bucket_acl():
849 buckets, zone_bucket = create_bucket_per_zone_in_realm()
850 for _, bucket in zone_bucket:
851 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
852 bucket.set_acl('public-read')
853 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
854
855 def test_bucket_cors():
856 buckets, zone_bucket = create_bucket_per_zone_in_realm()
857 for _, bucket in zone_bucket:
858 cors_cfg = CORSConfiguration()
859 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
860 bucket.set_cors(cors_cfg)
861 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
862
863 def test_bucket_delete_notempty():
864 zonegroup = realm.master_zonegroup()
865 zonegroup_conns = ZonegroupConns(zonegroup)
866 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
867 zonegroup_meta_checkpoint(zonegroup)
868
869 for zone_conn, bucket_name in zone_bucket:
870 # upload an object to each bucket on its own zone
871 conn = zone_conn.get_connection()
872 bucket = conn.get_bucket(bucket_name)
873 k = bucket.new_key('foo')
874 k.set_contents_from_string('bar')
875 # attempt to delete the bucket before this object can sync
876 try:
877 conn.delete_bucket(bucket_name)
878 except boto.exception.S3ResponseError as e:
879 assert(e.error_code == 'BucketNotEmpty')
880 continue
881 assert False # expected 409 BucketNotEmpty
882
883 # assert that each bucket still exists on the master
884 c1 = zonegroup_conns.master_zone.conn
885 for _, bucket_name in zone_bucket:
886 assert c1.get_bucket(bucket_name)
887
888 def test_multi_period_incremental_sync():
889 zonegroup = realm.master_zonegroup()
890 if len(zonegroup.zones) < 3:
891 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
892
893 # periods to include in mdlog comparison
894 mdlog_periods = [realm.current_period.id]
895
896 # create a bucket in each zone
897 zonegroup_conns = ZonegroupConns(zonegroup)
898 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
899
900 zonegroup_meta_checkpoint(zonegroup)
901
902 z1, z2, z3 = zonegroup.zones[0:3]
903 assert(z1 == zonegroup.master_zone)
904
905 # kill zone 3 gateways to freeze sync status to incremental in first period
906 z3.stop()
907
908 # change master to zone 2 -> period 2
909 set_master_zone(z2)
910 mdlog_periods += [realm.current_period.id]
911
912 for zone_conn, _ in zone_bucket:
913 if zone_conn.zone == z3:
914 continue
915 bucket_name = gen_bucket_name()
916 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
917 bucket = zone_conn.conn.create_bucket(bucket_name)
918 buckets.append(bucket_name)
919
920 # wait for zone 1 to sync
921 zone_meta_checkpoint(z1)
922
923 # change master back to zone 1 -> period 3
924 set_master_zone(z1)
925 mdlog_periods += [realm.current_period.id]
926
927 for zone_conn, bucket_name in zone_bucket:
928 if zone_conn.zone == z3:
929 continue
930 bucket_name = gen_bucket_name()
931 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
932 zone_conn.conn.create_bucket(bucket_name)
933 buckets.append(bucket_name)
934
935 # restart zone 3 gateway and wait for sync
936 z3.start()
937 zonegroup_meta_checkpoint(zonegroup)
938
939 # verify that we end up with the same objects
940 for bucket_name in buckets:
941 for source_conn, _ in zone_bucket:
942 for target_conn in zonegroup_conns.zones:
943 if source_conn.zone == target_conn.zone:
944 continue
945
946 if target_conn.zone.has_buckets():
947 target_conn.check_bucket_eq(source_conn, bucket_name)
948
949 # verify that mdlogs are not empty and match for each period
950 for period in mdlog_periods:
951 master_mdlog = mdlog_list(z1, period)
952 assert len(master_mdlog) > 0
953 for zone in zonegroup.zones:
954 if zone == z1:
955 continue
956 mdlog = mdlog_list(zone, period)
957 assert len(mdlog) == len(master_mdlog)
958
959 # autotrim mdlogs for master zone
960 mdlog_autotrim(z1)
961
962 # autotrim mdlogs for peers
963 for zone in zonegroup.zones:
964 if zone == z1:
965 continue
966 mdlog_autotrim(zone)
967
968 # verify that mdlogs are empty for each period
969 for period in mdlog_periods:
970 for zone in zonegroup.zones:
971 mdlog = mdlog_list(zone, period)
972 assert len(mdlog) == 0
973
974 def test_datalog_autotrim():
975 zonegroup = realm.master_zonegroup()
976 zonegroup_conns = ZonegroupConns(zonegroup)
977 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
978
979 # upload an object to each zone to generate a datalog entry
980 for zone, bucket in zone_bucket:
981 k = new_key(zone, bucket.name, 'key')
982 k.set_contents_from_string('body')
983
984 # wait for metadata and data sync to catch up
985 zonegroup_meta_checkpoint(zonegroup)
986 zonegroup_data_checkpoint(zonegroup_conns)
987
988 # trim each datalog
989 for zone, _ in zone_bucket:
990 # read max markers for each shard
991 status = datalog_status(zone.zone)
992
993 datalog_autotrim(zone.zone)
994
995 for shard_id, shard_status in enumerate(status):
996 try:
997 before_trim = dateutil.parser.isoparse(shard_status['last_update'])
998 except: # empty timestamps look like "0.000000" and will fail here
999 continue
1000 entries = datalog_list(zone.zone, ['--shard-id', str(shard_id), '--max-entries', '1'])
1001 if not len(entries):
1002 continue
1003 after_trim = dateutil.parser.isoparse(entries[0]['timestamp'])
1004 assert before_trim < after_trim, "any datalog entries must be newer than trim"
1005
1006 def test_multi_zone_redirect():
1007 zonegroup = realm.master_zonegroup()
1008 if len(zonegroup.rw_zones) < 2:
1009 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
1010
1011 zonegroup_conns = ZonegroupConns(zonegroup)
1012 (zc1, zc2) = zonegroup_conns.rw_zones[0:2]
1013
1014 z1, z2 = (zc1.zone, zc2.zone)
1015
1016 set_sync_from_all(z2, False)
1017
1018 # create a bucket on the first zone
1019 bucket_name = gen_bucket_name()
1020 log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
1021 bucket = zc1.conn.create_bucket(bucket_name)
1022 obj = 'testredirect'
1023
1024 key = bucket.new_key(obj)
1025 data = 'A'*512
1026 key.set_contents_from_string(data)
1027
1028 zonegroup_meta_checkpoint(zonegroup)
1029
1030 # try to read object from second zone (should fail)
1031 bucket2 = get_bucket(zc2, bucket_name)
1032 assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
1033
1034 set_redirect_zone(z2, z1)
1035
1036 key2 = bucket2.get_key(obj)
1037
1038 eq(data, key2.get_contents_as_string(encoding='ascii'))
1039
1040 key = bucket.new_key(obj)
1041
1042 for x in ['a', 'b', 'c', 'd']:
1043 data = x*512
1044 key.set_contents_from_string(data)
1045 eq(data, key2.get_contents_as_string(encoding='ascii'))
1046
1047 # revert config changes
1048 set_sync_from_all(z2, True)
1049 set_redirect_zone(z2, None)
1050
1051 def test_zonegroup_remove():
1052 zonegroup = realm.master_zonegroup()
1053 zonegroup_conns = ZonegroupConns(zonegroup)
1054 if len(zonegroup.zones) < 2:
1055 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1056
1057 zonegroup_meta_checkpoint(zonegroup)
1058 z1, z2 = zonegroup.zones[0:2]
1059 c1, c2 = (z1.cluster, z2.cluster)
1060
1061 # get admin credentials out of existing zone
1062 system_key = z1.data['system_key']
1063 admin_creds = Credentials(system_key['access_key'], system_key['secret_key'])
1064
1065 # create a new zone in zonegroup on c2 and commit
1066 zone = Zone('remove', zonegroup, c2)
1067 zone.create(c2, admin_creds.credential_args())
1068 zonegroup.zones.append(zone)
1069 zonegroup.period.update(zone, commit=True)
1070
1071 zonegroup.remove(c1, zone)
1072
1073 # another 'zonegroup remove' should fail with ENOENT
1074 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
1075 assert(retcode == 2) # ENOENT
1076
1077 # delete the new zone
1078 zone.delete(c2)
1079
1080 # validate the resulting period
1081 zonegroup.period.update(z1, commit=True)
1082
1083
1084 def test_zg_master_zone_delete():
1085
1086 master_zg = realm.master_zonegroup()
1087 master_zone = master_zg.master_zone
1088
1089 assert(len(master_zg.zones) >= 1)
1090 master_cluster = master_zg.zones[0].cluster
1091
1092 rm_zg = ZoneGroup('remove_zg')
1093 rm_zg.create(master_cluster)
1094
1095 rm_zone = Zone('remove', rm_zg, master_cluster)
1096 rm_zone.create(master_cluster)
1097 master_zg.period.update(master_zone, commit=True)
1098
1099
1100 rm_zone.delete(master_cluster)
1101 # Period update: This should now fail as the zone will be the master zone
1102 # in that zg
1103 _, retcode = master_zg.period.update(master_zone, check_retcode=False)
1104 assert(retcode == errno.EINVAL)
1105
1106 # Proceed to delete the zonegroup as well, previous period now does not
1107 # contain a dangling master_zone, this must succeed
1108 rm_zg.delete(master_cluster)
1109 master_zg.period.update(master_zone, commit=True)
1110
1111 def test_set_bucket_website():
1112 buckets, zone_bucket = create_bucket_per_zone_in_realm()
1113 for _, bucket in zone_bucket:
1114 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
1115 try:
1116 bucket.set_website_configuration(website_cfg)
1117 except boto.exception.S3ResponseError as e:
1118 if e.error_code == 'MethodNotAllowed':
1119 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
1120 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
1121
1122 def test_set_bucket_policy():
1123 policy = '''{
1124 "Version": "2012-10-17",
1125 "Statement": [{
1126 "Effect": "Allow",
1127 "Principal": "*"
1128 }]
1129 }'''
1130 buckets, zone_bucket = create_bucket_per_zone_in_realm()
1131 for _, bucket in zone_bucket:
1132 bucket.set_policy(policy)
1133 assert(bucket.get_policy().decode('ascii') == policy)
1134
1135 @attr('bucket_sync_disable')
1136 def test_bucket_sync_disable():
1137 zonegroup = realm.master_zonegroup()
1138 zonegroup_conns = ZonegroupConns(zonegroup)
1139 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1140 zonegroup_meta_checkpoint(zonegroup)
1141
1142 for bucket_name in buckets:
1143 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1144
1145 for zone in zonegroup.zones:
1146 check_buckets_sync_status_obj_not_exist(zone, buckets)
1147
1148 zonegroup_data_checkpoint(zonegroup_conns)
1149
1150 @attr('bucket_sync_disable')
1151 def test_bucket_sync_enable_right_after_disable():
1152 zonegroup = realm.master_zonegroup()
1153 zonegroup_conns = ZonegroupConns(zonegroup)
1154 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1155
1156 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
1157 content = 'asdasd'
1158
1159 for zone, bucket in zone_bucket:
1160 for objname in objnames:
1161 k = new_key(zone, bucket.name, objname)
1162 k.set_contents_from_string(content)
1163
1164 zonegroup_meta_checkpoint(zonegroup)
1165
1166 for bucket_name in buckets:
1167 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1168
1169 for bucket_name in buckets:
1170 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1171 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
1172
1173 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
1174
1175 for zone, bucket in zone_bucket:
1176 for objname in objnames_2:
1177 k = new_key(zone, bucket.name, objname)
1178 k.set_contents_from_string(content)
1179
1180 for bucket_name in buckets:
1181 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1182
1183 zonegroup_data_checkpoint(zonegroup_conns)
1184
1185 @attr('bucket_sync_disable')
1186 def test_bucket_sync_disable_enable():
1187 zonegroup = realm.master_zonegroup()
1188 zonegroup_conns = ZonegroupConns(zonegroup)
1189 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1190
1191 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
1192 content = 'asdasd'
1193
1194 for zone, bucket in zone_bucket:
1195 for objname in objnames:
1196 k = new_key(zone, bucket.name, objname)
1197 k.set_contents_from_string(content)
1198
1199 zonegroup_meta_checkpoint(zonegroup)
1200
1201 for bucket_name in buckets:
1202 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1203
1204 for bucket_name in buckets:
1205 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1206
1207 zonegroup_meta_checkpoint(zonegroup)
1208
1209 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
1210
1211 for zone, bucket in zone_bucket:
1212 for objname in objnames_2:
1213 k = new_key(zone, bucket.name, objname)
1214 k.set_contents_from_string(content)
1215
1216 for bucket_name in buckets:
1217 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
1218
1219 for bucket_name in buckets:
1220 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1221
1222 zonegroup_data_checkpoint(zonegroup_conns)
1223
1224 def test_multipart_object_sync():
1225 zonegroup = realm.master_zonegroup()
1226 zonegroup_conns = ZonegroupConns(zonegroup)
1227 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1228
1229 _, bucket = zone_bucket[0]
1230
1231 # initiate a multipart upload
1232 upload = bucket.initiate_multipart_upload('MULTIPART')
1233 mp = boto.s3.multipart.MultiPartUpload(bucket)
1234 mp.key_name = upload.key_name
1235 mp.id = upload.id
1236 part_size = 5 * 1024 * 1024 # 5M min part size
1237 mp.upload_part_from_file(StringIO('a' * part_size), 1)
1238 mp.upload_part_from_file(StringIO('b' * part_size), 2)
1239 mp.upload_part_from_file(StringIO('c' * part_size), 3)
1240 mp.upload_part_from_file(StringIO('d' * part_size), 4)
1241 mp.complete_upload()
1242
1243 zonegroup_meta_checkpoint(zonegroup)
1244 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1245
1246 def test_encrypted_object_sync():
1247 zonegroup = realm.master_zonegroup()
1248 zonegroup_conns = ZonegroupConns(zonegroup)
1249
1250 if len(zonegroup.rw_zones) < 2:
1251 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1252
1253 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
1254
1255 # create a bucket on the first zone
1256 bucket_name = gen_bucket_name()
1257 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
1258 bucket = zone1.conn.create_bucket(bucket_name)
1259
1260 # upload an object with sse-c encryption
1261 sse_c_headers = {
1262 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
1263 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
1264 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
1265 }
1266 key = bucket.new_key('testobj-sse-c')
1267 data = 'A'*512
1268 key.set_contents_from_string(data, headers=sse_c_headers)
1269
1270 # upload an object with sse-kms encryption
1271 sse_kms_headers = {
1272 'x-amz-server-side-encryption': 'aws:kms',
1273 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
1274 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
1275 }
1276 key = bucket.new_key('testobj-sse-kms')
1277 key.set_contents_from_string(data, headers=sse_kms_headers)
1278
1279 # wait for the bucket metadata and data to sync
1280 zonegroup_meta_checkpoint(zonegroup)
1281 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
1282
1283 # read the encrypted objects from the second zone
1284 bucket2 = get_bucket(zone2, bucket_name)
1285 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
1286 eq(data, key.get_contents_as_string(headers=sse_c_headers, encoding='ascii'))
1287
1288 key = bucket2.get_key('testobj-sse-kms')
1289 eq(data, key.get_contents_as_string(encoding='ascii'))
1290
1291 def test_bucket_index_log_trim():
1292 zonegroup = realm.master_zonegroup()
1293 zonegroup_conns = ZonegroupConns(zonegroup)
1294
1295 zone = zonegroup_conns.rw_zones[0]
1296
1297 # create a test bucket, upload some objects, and wait for sync
1298 def make_test_bucket():
1299 name = gen_bucket_name()
1300 log.info('create bucket zone=%s name=%s', zone.name, name)
1301 bucket = zone.conn.create_bucket(name)
1302 for objname in ('a', 'b', 'c', 'd'):
1303 k = new_key(zone, name, objname)
1304 k.set_contents_from_string('foo')
1305 zonegroup_meta_checkpoint(zonegroup)
1306 zonegroup_bucket_checkpoint(zonegroup_conns, name)
1307 return bucket
1308
1309 # create a 'cold' bucket
1310 cold_bucket = make_test_bucket()
1311
1312 # trim with max-buckets=0 to clear counters for cold bucket. this should
1313 # prevent it from being considered 'active' by the next autotrim
1314 bilog_autotrim(zone.zone, [
1315 '--rgw-sync-log-trim-max-buckets', '0',
1316 ])
1317
1318 # create an 'active' bucket
1319 active_bucket = make_test_bucket()
1320
1321 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1322 bilog_autotrim(zone.zone, [
1323 '--rgw-sync-log-trim-max-buckets', '1',
1324 '--rgw-sync-log-trim-min-cold-buckets', '0',
1325 ])
1326
1327 # verify active bucket has empty bilog
1328 active_bilog = bilog_list(zone.zone, active_bucket.name)
1329 assert(len(active_bilog) == 0)
1330
1331 # verify cold bucket has nonempty bilog
1332 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1333 assert(len(cold_bilog) > 0)
1334
1335 # trim with min-cold-buckets=999 to trim all buckets
1336 bilog_autotrim(zone.zone, [
1337 '--rgw-sync-log-trim-max-buckets', '999',
1338 '--rgw-sync-log-trim-min-cold-buckets', '999',
1339 ])
1340
1341 # verify cold bucket has empty bilog
1342 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1343 assert(len(cold_bilog) == 0)
1344
1345 def test_bucket_reshard_index_log_trim():
1346 zonegroup = realm.master_zonegroup()
1347 zonegroup_conns = ZonegroupConns(zonegroup)
1348
1349 zone = zonegroup_conns.rw_zones[0]
1350
1351 # create a test bucket, upload some objects, and wait for sync
1352 def make_test_bucket():
1353 name = gen_bucket_name()
1354 log.info('create bucket zone=%s name=%s', zone.name, name)
1355 bucket = zone.conn.create_bucket(name)
1356 for objname in ('a', 'b', 'c', 'd'):
1357 k = new_key(zone, name, objname)
1358 k.set_contents_from_string('foo')
1359 zonegroup_meta_checkpoint(zonegroup)
1360 zonegroup_bucket_checkpoint(zonegroup_conns, name)
1361 return bucket
1362
1363 # create a 'test' bucket
1364 test_bucket = make_test_bucket()
1365
1366 # checking bucket layout before resharding
1367 json_obj_1 = bucket_layout(zone.zone, test_bucket.name)
1368 assert(len(json_obj_1['layout']['logs']) == 1)
1369
1370 first_gen = json_obj_1['layout']['current_index']['gen']
1371
1372 before_reshard_bilog = bilog_list(zone.zone, test_bucket.name, ['--gen', str(first_gen)])
1373 assert(len(before_reshard_bilog) == 4)
1374
1375 # Resharding the bucket
1376 zone.zone.cluster.admin(['bucket', 'reshard',
1377 '--bucket', test_bucket.name,
1378 '--num-shards', '3',
1379 '--yes-i-really-mean-it'])
1380
1381 # checking bucket layout after 1st resharding
1382 json_obj_2 = bucket_layout(zone.zone, test_bucket.name)
1383 assert(len(json_obj_2['layout']['logs']) == 2)
1384
1385 second_gen = json_obj_2['layout']['current_index']['gen']
1386
1387 after_reshard_bilog = bilog_list(zone.zone, test_bucket.name, ['--gen', str(second_gen)])
1388 assert(len(after_reshard_bilog) == 0)
1389
1390 # upload more objects
1391 for objname in ('e', 'f', 'g', 'h'):
1392 k = new_key(zone, test_bucket.name, objname)
1393 k.set_contents_from_string('foo')
1394 zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
1395
1396 # Resharding the bucket again
1397 zone.zone.cluster.admin(['bucket', 'reshard',
1398 '--bucket', test_bucket.name,
1399 '--num-shards', '3',
1400 '--yes-i-really-mean-it'])
1401
1402 # checking bucket layout after 2nd resharding
1403 json_obj_3 = bucket_layout(zone.zone, test_bucket.name)
1404 assert(len(json_obj_3['layout']['logs']) == 3)
1405
1406 zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
1407
1408 bilog_autotrim(zone.zone)
1409
1410 # checking bucket layout after 1st bilog autotrim
1411 json_obj_4 = bucket_layout(zone.zone, test_bucket.name)
1412 assert(len(json_obj_4['layout']['logs']) == 2)
1413
1414 bilog_autotrim(zone.zone)
1415
1416 # checking bucket layout after 2nd bilog autotrim
1417 json_obj_5 = bucket_layout(zone.zone, test_bucket.name)
1418 assert(len(json_obj_5['layout']['logs']) == 1)
1419
1420 bilog_autotrim(zone.zone)
1421
1422 # upload more objects
1423 for objname in ('i', 'j', 'k', 'l'):
1424 k = new_key(zone, test_bucket.name, objname)
1425 k.set_contents_from_string('foo')
1426 zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
1427
1428 # verify the bucket has non-empty bilog
1429 test_bilog = bilog_list(zone.zone, test_bucket.name)
1430 assert(len(test_bilog) > 0)
1431
1432 @attr('bucket_reshard')
1433 def test_bucket_reshard_incremental():
1434 zonegroup = realm.master_zonegroup()
1435 zonegroup_conns = ZonegroupConns(zonegroup)
1436 zone = zonegroup_conns.rw_zones[0]
1437
1438 # create a bucket
1439 bucket = zone.create_bucket(gen_bucket_name())
1440 log.debug('created bucket=%s', bucket.name)
1441 zonegroup_meta_checkpoint(zonegroup)
1442
1443 # upload some objects
1444 for objname in ('a', 'b', 'c', 'd'):
1445 k = new_key(zone, bucket.name, objname)
1446 k.set_contents_from_string('foo')
1447 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1448
1449 # reshard in each zone
1450 for z in zonegroup_conns.rw_zones:
1451 z.zone.cluster.admin(['bucket', 'reshard',
1452 '--bucket', bucket.name,
1453 '--num-shards', '3',
1454 '--yes-i-really-mean-it'])
1455
1456 # upload more objects
1457 for objname in ('e', 'f', 'g', 'h'):
1458 k = new_key(zone, bucket.name, objname)
1459 k.set_contents_from_string('foo')
1460 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1461
1462 @attr('bucket_reshard')
1463 def test_bucket_reshard_full():
1464 zonegroup = realm.master_zonegroup()
1465 zonegroup_conns = ZonegroupConns(zonegroup)
1466 zone = zonegroup_conns.rw_zones[0]
1467
1468 # create a bucket
1469 bucket = zone.create_bucket(gen_bucket_name())
1470 log.debug('created bucket=%s', bucket.name)
1471 zonegroup_meta_checkpoint(zonegroup)
1472
1473 # stop gateways in other zones so we can force the bucket to full sync
1474 for z in zonegroup_conns.rw_zones[1:]:
1475 z.zone.stop()
1476
1477 # use try-finally to restart gateways even if something fails
1478 try:
1479 # upload some objects
1480 for objname in ('a', 'b', 'c', 'd'):
1481 k = new_key(zone, bucket.name, objname)
1482 k.set_contents_from_string('foo')
1483
1484 # reshard on first zone
1485 zone.zone.cluster.admin(['bucket', 'reshard',
1486 '--bucket', bucket.name,
1487 '--num-shards', '3',
1488 '--yes-i-really-mean-it'])
1489
1490 # upload more objects
1491 for objname in ('e', 'f', 'g', 'h'):
1492 k = new_key(zone, bucket.name, objname)
1493 k.set_contents_from_string('foo')
1494 finally:
1495 for z in zonegroup_conns.rw_zones[1:]:
1496 z.zone.start()
1497
1498 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1499
1500 def test_bucket_creation_time():
1501 zonegroup = realm.master_zonegroup()
1502 zonegroup_conns = ZonegroupConns(zonegroup)
1503
1504 zonegroup_meta_checkpoint(zonegroup)
1505
1506 zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
1507 for z1, z2 in combinations(zone_buckets, 2):
1508 for a, b in zip(z1, z2):
1509 eq(a.name, b.name)
1510 eq(a.creation_date, b.creation_date)
1511
1512 def get_bucket_shard_objects(zone, num_shards):
1513 """
1514 Get one object for each shard of the bucket index log
1515 """
1516 cmd = ['bucket', 'shard', 'objects'] + zone.zone_args()
1517 cmd += ['--num-shards', str(num_shards)]
1518 shardobjs_json, ret = zone.cluster.admin(cmd, read_only=True)
1519 assert ret == 0
1520 shardobjs = json.loads(shardobjs_json)
1521 return shardobjs['objs']
1522
1523 def write_most_shards(zone, bucket_name, num_shards):
1524 """
1525 Write one object to most (but not all) bucket index shards.
1526 """
1527 objs = get_bucket_shard_objects(zone.zone, num_shards)
1528 random.shuffle(objs)
1529 del objs[-(len(objs)//10):]
1530 for obj in objs:
1531 k = new_key(zone, bucket_name, obj)
1532 k.set_contents_from_string('foo')
1533
1534 def reshard_bucket(zone, bucket_name, num_shards):
1535 """
1536 Reshard a bucket
1537 """
1538 cmd = ['bucket', 'reshard'] + zone.zone_args()
1539 cmd += ['--bucket', bucket_name]
1540 cmd += ['--num-shards', str(num_shards)]
1541 cmd += ['--yes-i-really-mean-it']
1542 zone.cluster.admin(cmd)
1543
1544 def get_obj_names(zone, bucket_name, maxobjs):
1545 """
1546 Get names of objects in a bucket.
1547 """
1548 cmd = ['bucket', 'list'] + zone.zone_args()
1549 cmd += ['--bucket', bucket_name]
1550 cmd += ['--max-entries', str(maxobjs)]
1551 objs_json, _ = zone.cluster.admin(cmd, read_only=True)
1552 objs = json.loads(objs_json)
1553 return [o['name'] for o in objs]
1554
1555 def bucket_keys_eq(zone1, zone2, bucket_name):
1556 """
1557 Ensure that two buckets have the same keys, but get the lists through
1558 radosgw-admin rather than S3 so it can be used when radosgw isn't running.
1559 Only works for buckets of 10,000 objects since the tests calling it don't
1560 need more, and the output from bucket list doesn't have an obvious marker
1561 with which to continue.
1562 """
1563 keys1 = get_obj_names(zone1, bucket_name, 10000)
1564 keys2 = get_obj_names(zone2, bucket_name, 10000)
1565 for key1, key2 in zip_longest(keys1, keys2):
1566 if key1 is None:
1567 log.critical('key=%s is missing from zone=%s', key1.name,
1568 zone1.name)
1569 assert False
1570 if key2 is None:
1571 log.critical('key=%s is missing from zone=%s', key2.name,
1572 zone2.name)
1573 assert False
1574
1575 @attr('bucket_reshard')
1576 def test_bucket_sync_run_basic_incremental():
1577 """
1578 Create several generations of objects, then run bucket sync
1579 run to ensure they're all processed.
1580 """
1581 zonegroup = realm.master_zonegroup()
1582 zonegroup_conns = ZonegroupConns(zonegroup)
1583 primary = zonegroup_conns.rw_zones[0]
1584
1585 # create a bucket write objects to it and wait for them to sync, ensuring
1586 # we are in incremental.
1587 bucket = primary.create_bucket(gen_bucket_name())
1588 log.debug('created bucket=%s', bucket.name)
1589 zonegroup_meta_checkpoint(zonegroup)
1590 write_most_shards(primary, bucket.name, 11)
1591 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1592
1593 try:
1594 # stop gateways in other zones so we can rely on bucket sync run
1595 for secondary in zonegroup_conns.rw_zones[1:]:
1596 secondary.zone.stop()
1597
1598 # build up multiple generations each with some objects written to
1599 # them.
1600 generations = [17, 19, 23, 29, 31, 37]
1601 for num_shards in generations:
1602 reshard_bucket(primary.zone, bucket.name, num_shards)
1603 write_most_shards(primary, bucket.name, num_shards)
1604
1605 # bucket sync run on every secondary
1606 for secondary in zonegroup_conns.rw_zones[1:]:
1607 cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args()
1608 cmd += ['--bucket', bucket.name, '--source-zone', primary.name]
1609 secondary.zone.cluster.admin(cmd)
1610
1611 bucket_keys_eq(primary.zone, secondary.zone, bucket.name)
1612
1613 finally:
1614 # Restart so bucket_checkpoint can actually fetch things from the
1615 # secondaries. Put this in a finally block so they restart even on
1616 # error.
1617 for secondary in zonegroup_conns.rw_zones[1:]:
1618 secondary.zone.start()
1619
1620 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1621
1622 def trash_bucket(zone, bucket_name):
1623 """
1624 Remove objects through radosgw-admin, zapping bilog to prevent the deletes
1625 from replicating.
1626 """
1627 objs = get_obj_names(zone, bucket_name, 10000)
1628 # Delete the objects
1629 for obj in objs:
1630 cmd = ['object', 'rm'] + zone.zone_args()
1631 cmd += ['--bucket', bucket_name]
1632 cmd += ['--object', obj]
1633 zone.cluster.admin(cmd)
1634
1635 # Zap the bilog
1636 cmd = ['bilog', 'trim'] + zone.zone_args()
1637 cmd += ['--bucket', bucket_name]
1638 zone.cluster.admin(cmd)
1639
1640 @attr('bucket_reshard')
1641 def test_zap_init_bucket_sync_run():
1642 """
1643 Create several generations of objects, trash them, then run bucket sync init
1644 and bucket sync run.
1645 """
1646 zonegroup = realm.master_zonegroup()
1647 zonegroup_conns = ZonegroupConns(zonegroup)
1648 primary = zonegroup_conns.rw_zones[0]
1649
1650 bucket = primary.create_bucket(gen_bucket_name())
1651 log.debug('created bucket=%s', bucket.name)
1652 zonegroup_meta_checkpoint(zonegroup)
1653
1654 # Write zeroth generation
1655 for obj in range(1, 6):
1656 k = new_key(primary, bucket.name, f'obj{obj * 11}')
1657 k.set_contents_from_string('foo')
1658 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1659
1660 # Write several more generations
1661 generations = [17, 19, 23, 29, 31, 37]
1662 for num_shards in generations:
1663 reshard_bucket(primary.zone, bucket.name, num_shards)
1664 for obj in range(1, 6):
1665 k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
1666 k.set_contents_from_string('foo')
1667 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1668
1669
1670 # Stop gateways, trash bucket, init, sync, and restart for every secondary
1671 for secondary in zonegroup_conns.rw_zones[1:]:
1672 try:
1673 secondary.zone.stop()
1674
1675 trash_bucket(secondary.zone, bucket.name)
1676
1677 cmd = ['bucket', 'sync', 'init'] + secondary.zone.zone_args()
1678 cmd += ['--bucket', bucket.name]
1679 cmd += ['--source-zone', primary.name]
1680 secondary.zone.cluster.admin(cmd)
1681
1682 cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args()
1683 cmd += ['--bucket', bucket.name, '--source-zone', primary.name]
1684 secondary.zone.cluster.admin(cmd)
1685
1686 bucket_keys_eq(primary.zone, secondary.zone, bucket.name)
1687
1688 finally:
1689 # Do this as a finally so we bring the zone back up even on error.
1690 secondary.zone.start()
1691
1692 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1693
1694 def test_role_sync():
1695 zonegroup = realm.master_zonegroup()
1696 zonegroup_conns = ZonegroupConns(zonegroup)
1697 roles, zone_role = create_role_per_zone(zonegroup_conns)
1698
1699 zonegroup_meta_checkpoint(zonegroup)
1700
1701 for source_conn, role in zone_role:
1702 for target_conn in zonegroup_conns.zones:
1703 if source_conn.zone == target_conn.zone:
1704 continue
1705
1706 check_role_eq(source_conn, target_conn, role)
1707
1708 @attr('data_sync_init')
1709 def test_bucket_full_sync_after_data_sync_init():
1710 zonegroup = realm.master_zonegroup()
1711 zonegroup_conns = ZonegroupConns(zonegroup)
1712 primary = zonegroup_conns.rw_zones[0]
1713 secondary = zonegroup_conns.rw_zones[1]
1714
1715 bucket = primary.create_bucket(gen_bucket_name())
1716 log.debug('created bucket=%s', bucket.name)
1717 zonegroup_meta_checkpoint(zonegroup)
1718
1719 try:
1720 # stop secondary zone before it starts a bucket full sync
1721 secondary.zone.stop()
1722
1723 # write some objects that don't sync yet
1724 for obj in range(1, 6):
1725 k = new_key(primary, bucket.name, f'obj{obj * 11}')
1726 k.set_contents_from_string('foo')
1727
1728 cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
1729 cmd += ['--source-zone', primary.name]
1730 secondary.zone.cluster.admin(cmd)
1731 finally:
1732 # Do this as a finally so we bring the zone back up even on error.
1733 secondary.zone.start()
1734
1735 # expect all objects to replicate via 'bucket full sync'
1736 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1737 zonegroup_data_checkpoint(zonegroup_conns)
1738
1739 @attr('data_sync_init')
1740 @attr('bucket_reshard')
1741 def test_resharded_bucket_full_sync_after_data_sync_init():
1742 zonegroup = realm.master_zonegroup()
1743 zonegroup_conns = ZonegroupConns(zonegroup)
1744 primary = zonegroup_conns.rw_zones[0]
1745 secondary = zonegroup_conns.rw_zones[1]
1746
1747 bucket = primary.create_bucket(gen_bucket_name())
1748 log.debug('created bucket=%s', bucket.name)
1749 zonegroup_meta_checkpoint(zonegroup)
1750
1751 try:
1752 # stop secondary zone before it starts a bucket full sync
1753 secondary.zone.stop()
1754
1755 # Write zeroth generation
1756 for obj in range(1, 6):
1757 k = new_key(primary, bucket.name, f'obj{obj * 11}')
1758 k.set_contents_from_string('foo')
1759
1760 # Write several more generations
1761 generations = [17, 19, 23, 29, 31, 37]
1762 for num_shards in generations:
1763 reshard_bucket(primary.zone, bucket.name, num_shards)
1764 for obj in range(1, 6):
1765 k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
1766 k.set_contents_from_string('foo')
1767
1768 cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
1769 cmd += ['--source-zone', primary.name]
1770 secondary.zone.cluster.admin(cmd)
1771 finally:
1772 # Do this as a finally so we bring the zone back up even on error.
1773 secondary.zone.start()
1774
1775 # expect all objects to replicate via 'bucket full sync'
1776 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1777 zonegroup_data_checkpoint(zonegroup_conns)
1778
1779 @attr('data_sync_init')
1780 def test_bucket_incremental_sync_after_data_sync_init():
1781 zonegroup = realm.master_zonegroup()
1782 zonegroup_conns = ZonegroupConns(zonegroup)
1783 primary = zonegroup_conns.rw_zones[0]
1784 secondary = zonegroup_conns.rw_zones[1]
1785
1786 bucket = primary.create_bucket(gen_bucket_name())
1787 log.debug('created bucket=%s', bucket.name)
1788 zonegroup_meta_checkpoint(zonegroup)
1789
1790 # upload a dummy object and wait for sync. this forces each zone to finish
1791 # a full sync and switch to incremental
1792 k = new_key(primary, bucket, 'dummy')
1793 k.set_contents_from_string('foo')
1794 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1795
1796 try:
1797 # stop secondary zone before it syncs the rest
1798 secondary.zone.stop()
1799
1800 # Write more objects to primary
1801 for obj in range(1, 6):
1802 k = new_key(primary, bucket.name, f'obj{obj * 11}')
1803 k.set_contents_from_string('foo')
1804
1805 cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
1806 cmd += ['--source-zone', primary.name]
1807 secondary.zone.cluster.admin(cmd)
1808 finally:
1809 # Do this as a finally so we bring the zone back up even on error.
1810 secondary.zone.start()
1811
1812 # expect remaining objects to replicate via 'bucket incremental sync'
1813 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1814 zonegroup_data_checkpoint(zonegroup_conns)
1815
1816 @attr('data_sync_init')
1817 @attr('bucket_reshard')
1818 def test_resharded_bucket_incremental_sync_latest_after_data_sync_init():
1819 zonegroup = realm.master_zonegroup()
1820 zonegroup_conns = ZonegroupConns(zonegroup)
1821 primary = zonegroup_conns.rw_zones[0]
1822 secondary = zonegroup_conns.rw_zones[1]
1823
1824 bucket = primary.create_bucket(gen_bucket_name())
1825 log.debug('created bucket=%s', bucket.name)
1826 zonegroup_meta_checkpoint(zonegroup)
1827
1828 # Write zeroth generation to primary
1829 for obj in range(1, 6):
1830 k = new_key(primary, bucket.name, f'obj{obj * 11}')
1831 k.set_contents_from_string('foo')
1832
1833 # Write several more generations
1834 generations = [17, 19, 23, 29, 31, 37]
1835 for num_shards in generations:
1836 reshard_bucket(primary.zone, bucket.name, num_shards)
1837 for obj in range(1, 6):
1838 k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
1839 k.set_contents_from_string('foo')
1840
1841 # wait for the secondary to catch up to the latest gen
1842 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1843
1844 try:
1845 # stop secondary zone before it syncs the rest
1846 secondary.zone.stop()
1847
1848 # write some more objects to the last gen
1849 for obj in range(1, 6):
1850 k = new_key(primary, bucket.name, f'obj{obj * generations[-1]}')
1851 k.set_contents_from_string('foo')
1852
1853 cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
1854 cmd += ['--source-zone', primary.name]
1855 secondary.zone.cluster.admin(cmd)
1856 finally:
1857 # Do this as a finally so we bring the zone back up even on error.
1858 secondary.zone.start()
1859
1860 # expect remaining objects in last gen to replicate via 'bucket incremental sync'
1861 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1862 zonegroup_data_checkpoint(zonegroup_conns)
1863
1864 @attr('data_sync_init')
1865 @attr('bucket_reshard')
1866 def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init():
1867 zonegroup = realm.master_zonegroup()
1868 zonegroup_conns = ZonegroupConns(zonegroup)
1869 primary = zonegroup_conns.rw_zones[0]
1870 secondary = zonegroup_conns.rw_zones[1]
1871
1872 bucket = primary.create_bucket(gen_bucket_name())
1873 log.debug('created bucket=%s', bucket.name)
1874 zonegroup_meta_checkpoint(zonegroup)
1875
1876 # Write zeroth generation to primary
1877 for obj in range(1, 6):
1878 k = new_key(primary, bucket.name, f'obj{obj * 11}')
1879 k.set_contents_from_string('foo')
1880
1881 # wait for the secondary to catch up
1882 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1883
1884 try:
1885 # stop secondary zone before it syncs later generations
1886 secondary.zone.stop()
1887
1888 # Write several more generations
1889 generations = [17, 19, 23, 29, 31, 37]
1890 for num_shards in generations:
1891 reshard_bucket(primary.zone, bucket.name, num_shards)
1892 for obj in range(1, 6):
1893 k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
1894 k.set_contents_from_string('foo')
1895
1896 cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
1897 cmd += ['--source-zone', primary.name]
1898 secondary.zone.cluster.admin(cmd)
1899 finally:
1900 # Do this as a finally so we bring the zone back up even on error.
1901 secondary.zone.start()
1902
1903 # expect all generations to replicate via 'bucket incremental sync'
1904 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1905 zonegroup_data_checkpoint(zonegroup_conns)
1906
1907 def sync_info(cluster, bucket = None):
1908 cmd = ['sync', 'info']
1909 if bucket:
1910 cmd += ['--bucket', bucket]
1911 (result_json, retcode) = cluster.admin(cmd)
1912 if retcode != 0:
1913 assert False, 'failed to get sync policy'
1914
1915 return json.loads(result_json)
1916
1917 def get_sync_policy(cluster, bucket = None):
1918 cmd = ['sync', 'policy', 'get']
1919 if bucket:
1920 cmd += ['--bucket', bucket]
1921 (result_json, retcode) = cluster.admin(cmd)
1922 if retcode != 0:
1923 assert False, 'failed to get sync policy'
1924
1925 return json.loads(result_json)
1926
1927 def create_sync_policy_group(cluster, group, status = "allowed", bucket = None):
1928 cmd = ['sync', 'group', 'create', '--group-id', group, '--status' , status]
1929 if bucket:
1930 cmd += ['--bucket', bucket]
1931 (result_json, retcode) = cluster.admin(cmd)
1932 if retcode != 0:
1933 assert False, 'failed to create sync policy group id=%s, bucket=%s' % (group, bucket)
1934 return json.loads(result_json)
1935
1936 def set_sync_policy_group_status(cluster, group, status, bucket = None):
1937 cmd = ['sync', 'group', 'modify', '--group-id', group, '--status' , status]
1938 if bucket:
1939 cmd += ['--bucket', bucket]
1940 (result_json, retcode) = cluster.admin(cmd)
1941 if retcode != 0:
1942 assert False, 'failed to set sync policy group id=%s, bucket=%s' % (group, bucket)
1943 return json.loads(result_json)
1944
1945 def get_sync_policy_group(cluster, group, bucket = None):
1946 cmd = ['sync', 'group', 'get', '--group-id', group]
1947 if bucket:
1948 cmd += ['--bucket', bucket]
1949 (result_json, retcode) = cluster.admin(cmd)
1950 if retcode != 0:
1951 assert False, 'failed to get sync policy group id=%s, bucket=%s' % (group, bucket)
1952 return json.loads(result_json)
1953
1954 def remove_sync_policy_group(cluster, group, bucket = None):
1955 cmd = ['sync', 'group', 'remove', '--group-id', group]
1956 if bucket:
1957 cmd += ['--bucket', bucket]
1958 (result_json, retcode) = cluster.admin(cmd)
1959 if retcode != 0:
1960 assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group, bucket)
1961 return json.loads(result_json)
1962
1963 def create_sync_group_flow_symmetrical(cluster, group, flow_id, zones, bucket = None):
1964 cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical', '--zones=%s' % zones]
1965 if bucket:
1966 cmd += ['--bucket', bucket]
1967 (result_json, retcode) = cluster.admin(cmd)
1968 if retcode != 0:
1969 assert False, 'failed to create sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket)
1970 return json.loads(result_json)
1971
1972 def create_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None):
1973 cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones]
1974 if bucket:
1975 cmd += ['--bucket', bucket]
1976 (result_json, retcode) = cluster.admin(cmd)
1977 if retcode != 0:
1978 assert False, 'failed to create sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket)
1979 return json.loads(result_json)
1980
1981 def remove_sync_group_flow_symmetrical(cluster, group, flow_id, zones = None, bucket = None):
1982 cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical']
1983 if zones:
1984 cmd += ['--zones=%s' % zones]
1985 if bucket:
1986 cmd += ['--bucket', bucket]
1987 (result_json, retcode) = cluster.admin(cmd)
1988 if retcode != 0:
1989 assert False, 'failed to remove sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket)
1990 return json.loads(result_json)
1991
1992 def remove_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None):
1993 cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones]
1994 if bucket:
1995 cmd += ['--bucket', bucket]
1996 (result_json, retcode) = cluster.admin(cmd)
1997 if retcode != 0:
1998 assert False, 'failed to remove sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket)
1999 return json.loads(result_json)
2000
2001 def create_sync_group_pipe(cluster, group, pipe_id, src_zones, dest_zones, bucket = None, args = []):
2002 cmd = ['sync', 'group', 'pipe', 'create', '--group-id', group, '--pipe-id' , pipe_id, '--source-zones=%s' % src_zones, '--dest-zones=%s' % dest_zones]
2003 if bucket:
2004 b_args = '--bucket=' + bucket
2005 cmd.append(b_args)
2006 if args:
2007 cmd += args
2008 (result_json, retcode) = cluster.admin(cmd)
2009 if retcode != 0:
2010 assert False, 'failed to create sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket)
2011 return json.loads(result_json)
2012
2013 def remove_sync_group_pipe(cluster, group, pipe_id, bucket = None, args = None):
2014 cmd = ['sync', 'group', 'pipe', 'remove', '--group-id', group, '--pipe-id' , pipe_id]
2015 if bucket:
2016 b_args = '--bucket=' + bucket
2017 cmd.append(b_args)
2018 if args:
2019 cmd.append(args)
2020 (result_json, retcode) = cluster.admin(cmd)
2021 if retcode != 0:
2022 assert False, 'failed to remove sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket)
2023 return json.loads(result_json)
2024
2025 def create_zone_bucket(zone):
2026 b_name = gen_bucket_name()
2027 log.info('create bucket zone=%s name=%s', zone.name, b_name)
2028 bucket = zone.create_bucket(b_name)
2029 return bucket
2030
2031 def create_object(zone_conn, bucket, objname, content):
2032 k = new_key(zone_conn, bucket.name, objname)
2033 k.set_contents_from_string(content)
2034
2035 def create_objects(zone_conn, bucket, obj_arr, content):
2036 for objname in obj_arr:
2037 create_object(zone_conn, bucket, objname, content)
2038
2039 def check_object_exists(bucket, objname, content = None):
2040 k = bucket.get_key(objname)
2041 assert_not_equal(k, None)
2042 if (content != None):
2043 assert_equal(k.get_contents_as_string(encoding='ascii'), content)
2044
2045 def check_objects_exist(bucket, obj_arr, content = None):
2046 for objname in obj_arr:
2047 check_object_exists(bucket, objname, content)
2048
2049 def check_object_not_exists(bucket, objname):
2050 k = bucket.get_key(objname)
2051 assert_equal(k, None)
2052
2053 def check_objects_not_exist(bucket, obj_arr):
2054 for objname in obj_arr:
2055 check_object_not_exists(bucket, objname)
2056
2057 @attr('sync_policy')
2058 def test_sync_policy_config_zonegroup():
2059 """
2060 test_sync_policy_config_zonegroup:
2061 test configuration of all sync commands
2062 """
2063 zonegroup = realm.master_zonegroup()
2064 zonegroup_meta_checkpoint(zonegroup)
2065
2066 zonegroup_conns = ZonegroupConns(zonegroup)
2067 z1, z2 = zonegroup.zones[0:2]
2068 c1, c2 = (z1.cluster, z2.cluster)
2069
2070 zones = z1.name+","+z2.name
2071
2072 c1.admin(['sync', 'policy', 'get'])
2073
2074 # (a) zonegroup level
2075 create_sync_policy_group(c1, "sync-group")
2076 set_sync_policy_group_status(c1, "sync-group", "enabled")
2077 get_sync_policy_group(c1, "sync-group")
2078
2079 get_sync_policy(c1)
2080
2081 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
2082 create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
2083
2084 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
2085 get_sync_policy_group(c1, "sync-group")
2086
2087 zonegroup.period.update(z1, commit=True)
2088
2089 # (b) bucket level
2090 zc1, zc2 = zonegroup_conns.zones[0:2]
2091 bucket = create_zone_bucket(zc1)
2092 bucket_name = bucket.name
2093
2094 create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name)
2095 set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name)
2096 get_sync_policy_group(c1, "sync-bucket", bucket_name)
2097
2098 get_sync_policy(c1, bucket_name)
2099
2100 create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
2101 create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
2102
2103 create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name)
2104 get_sync_policy_group(c1, "sync-bucket", bucket_name)
2105
2106 zonegroup_meta_checkpoint(zonegroup)
2107
2108 remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name)
2109 remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
2110 remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
2111 remove_sync_policy_group(c1, "sync-bucket", bucket_name)
2112
2113 get_sync_policy(c1, bucket_name)
2114
2115 zonegroup_meta_checkpoint(zonegroup)
2116
2117 remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
2118 remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
2119 remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1")
2120 remove_sync_policy_group(c1, "sync-group")
2121
2122 get_sync_policy(c1)
2123
2124 zonegroup.period.update(z1, commit=True)
2125
2126 return
2127
2128 @attr('sync_policy')
2129 def test_sync_flow_symmetrical_zonegroup_all():
2130 """
2131 test_sync_flow_symmetrical_zonegroup_all:
2132 allows sync from all the zones to all other zones (default case)
2133 """
2134
2135 zonegroup = realm.master_zonegroup()
2136 zonegroup_meta_checkpoint(zonegroup)
2137
2138 zonegroup_conns = ZonegroupConns(zonegroup)
2139
2140 (zoneA, zoneB) = zonegroup.zones[0:2]
2141 (zcA, zcB) = zonegroup_conns.zones[0:2]
2142
2143 c1 = zoneA.cluster
2144
2145 c1.admin(['sync', 'policy', 'get'])
2146
2147 zones = zoneA.name + ',' + zoneB.name
2148 create_sync_policy_group(c1, "sync-group")
2149 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
2150 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
2151 set_sync_policy_group_status(c1, "sync-group", "enabled")
2152
2153 zonegroup.period.update(zoneA, commit=True)
2154 get_sync_policy(c1)
2155
2156 objnames = [ 'obj1', 'obj2' ]
2157 content = 'asdasd'
2158 buckets = []
2159
2160 # create bucket & object in all zones
2161 bucketA = create_zone_bucket(zcA)
2162 buckets.append(bucketA)
2163 create_object(zcA, bucketA, objnames[0], content)
2164
2165 bucketB = create_zone_bucket(zcB)
2166 buckets.append(bucketB)
2167 create_object(zcB, bucketB, objnames[1], content)
2168
2169 zonegroup_meta_checkpoint(zonegroup)
2170 # 'zonegroup_data_checkpoint' currently fails for the zones not
2171 # allowed to sync. So as a workaround, data checkpoint is done
2172 # for only the ones configured.
2173 zone_data_checkpoint(zoneB, zoneA)
2174
2175 # verify if objects are synced accross the zone
2176 bucket = get_bucket(zcB, bucketA.name)
2177 check_object_exists(bucket, objnames[0], content)
2178
2179 bucket = get_bucket(zcA, bucketB.name)
2180 check_object_exists(bucket, objnames[1], content)
2181
2182 remove_sync_policy_group(c1, "sync-group")
2183 return
2184
2185 @attr('sync_policy')
2186 def test_sync_flow_symmetrical_zonegroup_select():
2187 """
2188 test_sync_flow_symmetrical_zonegroup_select:
2189 allow sync between zoneA & zoneB
2190 verify zoneC doesnt sync the data
2191 """
2192
2193 zonegroup = realm.master_zonegroup()
2194 zonegroup_conns = ZonegroupConns(zonegroup)
2195
2196 if len(zonegroup.zones) < 3:
2197 raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
2198
2199 zonegroup_meta_checkpoint(zonegroup)
2200
2201 (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
2202 (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
2203
2204 c1 = zoneA.cluster
2205
2206 # configure sync policy
2207 zones = zoneA.name + ',' + zoneB.name
2208 c1.admin(['sync', 'policy', 'get'])
2209 create_sync_policy_group(c1, "sync-group")
2210 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
2211 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
2212 set_sync_policy_group_status(c1, "sync-group", "enabled")
2213
2214 zonegroup.period.update(zoneA, commit=True)
2215 get_sync_policy(c1)
2216
2217 buckets = []
2218 content = 'asdasd'
2219
2220 # create bucketA & objects in zoneA
2221 objnamesA = [ 'obj1', 'obj2', 'obj3' ]
2222 bucketA = create_zone_bucket(zcA)
2223 buckets.append(bucketA)
2224 create_objects(zcA, bucketA, objnamesA, content)
2225
2226 # create bucketB & objects in zoneB
2227 objnamesB = [ 'obj4', 'obj5', 'obj6' ]
2228 bucketB = create_zone_bucket(zcB)
2229 buckets.append(bucketB)
2230 create_objects(zcB, bucketB, objnamesB, content)
2231
2232 zonegroup_meta_checkpoint(zonegroup)
2233 zone_data_checkpoint(zoneB, zoneA)
2234 zone_data_checkpoint(zoneA, zoneB)
2235
2236 # verify if objnamesA synced to only zoneB but not zoneC
2237 bucket = get_bucket(zcB, bucketA.name)
2238 check_objects_exist(bucket, objnamesA, content)
2239
2240 bucket = get_bucket(zcC, bucketA.name)
2241 check_objects_not_exist(bucket, objnamesA)
2242
2243 # verify if objnamesB synced to only zoneA but not zoneC
2244 bucket = get_bucket(zcA, bucketB.name)
2245 check_objects_exist(bucket, objnamesB, content)
2246
2247 bucket = get_bucket(zcC, bucketB.name)
2248 check_objects_not_exist(bucket, objnamesB)
2249
2250 remove_sync_policy_group(c1, "sync-group")
2251 return
2252
2253 @attr('sync_policy')
2254 def test_sync_flow_directional_zonegroup_select():
2255 """
2256 test_sync_flow_directional_zonegroup_select:
2257 allow sync from only zoneA to zoneB
2258
2259 verify that data doesn't get synced to zoneC and
2260 zoneA shouldn't sync data from zoneB either
2261 """
2262
2263 zonegroup = realm.master_zonegroup()
2264 zonegroup_conns = ZonegroupConns(zonegroup)
2265
2266 if len(zonegroup.zones) < 3:
2267 raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
2268
2269 zonegroup_meta_checkpoint(zonegroup)
2270
2271 (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
2272 (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
2273
2274 c1 = zoneA.cluster
2275
2276 # configure sync policy
2277 zones = zoneA.name + ',' + zoneB.name
2278 c1.admin(['sync', 'policy', 'get'])
2279 create_sync_policy_group(c1, "sync-group")
2280 create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
2281 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
2282 set_sync_policy_group_status(c1, "sync-group", "enabled")
2283
2284 zonegroup.period.update(zoneA, commit=True)
2285 get_sync_policy(c1)
2286
2287 buckets = []
2288 content = 'asdasd'
2289
2290 # create bucketA & objects in zoneA
2291 objnamesA = [ 'obj1', 'obj2', 'obj3' ]
2292 bucketA = create_zone_bucket(zcA)
2293 buckets.append(bucketA)
2294 create_objects(zcA, bucketA, objnamesA, content)
2295
2296 # create bucketB & objects in zoneB
2297 objnamesB = [ 'obj4', 'obj5', 'obj6' ]
2298 bucketB = create_zone_bucket(zcB)
2299 buckets.append(bucketB)
2300 create_objects(zcB, bucketB, objnamesB, content)
2301
2302 zonegroup_meta_checkpoint(zonegroup)
2303 zone_data_checkpoint(zoneB, zoneA)
2304
2305 # verify if objnamesA synced to only zoneB but not zoneC
2306 bucket = get_bucket(zcB, bucketA.name)
2307 check_objects_exist(bucket, objnamesA, content)
2308
2309 bucket = get_bucket(zcC, bucketA.name)
2310 check_objects_not_exist(bucket, objnamesA)
2311
2312 # verify if objnamesB are not synced to either zoneA or zoneC
2313 bucket = get_bucket(zcA, bucketB.name)
2314 check_objects_not_exist(bucket, objnamesB)
2315
2316 bucket = get_bucket(zcC, bucketB.name)
2317 check_objects_not_exist(bucket, objnamesB)
2318
2319 """
2320 verify the same at bucketA level
2321 configure another policy at bucketA level with src and dest
2322 zones specified to zoneA and zoneB resp.
2323
2324 verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
2325 """
2326 # reconfigure zonegroup pipe & flow
2327 remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
2328 remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
2329 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
2330 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
2331
2332 # change state to allowed
2333 set_sync_policy_group_status(c1, "sync-group", "allowed")
2334
2335 zonegroup.period.update(zoneA, commit=True)
2336 get_sync_policy(c1)
2337
2338 # configure sync policy for only bucketA and enable it
2339 create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
2340 create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
2341 args = ['--source-bucket=*', '--dest-bucket=*']
2342 create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args)
2343 set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
2344
2345 get_sync_policy(c1, bucketA.name)
2346
2347 zonegroup_meta_checkpoint(zonegroup)
2348
2349 # create objects in bucketA in zoneA and zoneB
2350 objnamesC = [ 'obj7', 'obj8', 'obj9' ]
2351 objnamesD = [ 'obj10', 'obj11', 'obj12' ]
2352 create_objects(zcA, bucketA, objnamesC, content)
2353 create_objects(zcB, bucketA, objnamesD, content)
2354
2355 zonegroup_meta_checkpoint(zonegroup)
2356 zone_data_checkpoint(zoneB, zoneA)
2357
2358 # verify that objnamesC are synced to bucketA in zoneB
2359 bucket = get_bucket(zcB, bucketA.name)
2360 check_objects_exist(bucket, objnamesC, content)
2361
2362 # verify that objnamesD are not synced to bucketA in zoneA
2363 bucket = get_bucket(zcA, bucketA.name)
2364 check_objects_not_exist(bucket, objnamesD)
2365
2366 remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
2367 remove_sync_policy_group(c1, "sync-group")
2368 return
2369
2370 @attr('sync_policy')
2371 def test_sync_single_bucket():
2372 """
2373 test_sync_single_bucket:
2374 Allow data sync for only bucketA but not for other buckets via
2375 below 2 methods
2376
2377 (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
2378 (b) bucket level: configure policy for bucketA
2379 """
2380
2381 zonegroup = realm.master_zonegroup()
2382 zonegroup_meta_checkpoint(zonegroup)
2383
2384 zonegroup_conns = ZonegroupConns(zonegroup)
2385
2386 (zoneA, zoneB) = zonegroup.zones[0:2]
2387 (zcA, zcB) = zonegroup_conns.zones[0:2]
2388
2389 c1 = zoneA.cluster
2390
2391 c1.admin(['sync', 'policy', 'get'])
2392
2393 zones = zoneA.name + ',' + zoneB.name
2394 get_sync_policy(c1)
2395
2396 objnames = [ 'obj1', 'obj2', 'obj3' ]
2397 content = 'asdasd'
2398 buckets = []
2399
2400 # create bucketA & bucketB in zoneA
2401 bucketA = create_zone_bucket(zcA)
2402 buckets.append(bucketA)
2403 bucketB = create_zone_bucket(zcA)
2404 buckets.append(bucketB)
2405
2406 zonegroup_meta_checkpoint(zonegroup)
2407
2408 """
2409 Method (a): configure pipe for only bucketA
2410 """
2411 # configure sync policy & pipe for only bucketA
2412 create_sync_policy_group(c1, "sync-group")
2413 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
2414 args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketA.name]
2415
2416 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args)
2417 set_sync_policy_group_status(c1, "sync-group", "enabled")
2418 get_sync_policy(c1)
2419 zonegroup.period.update(zoneA, commit=True)
2420
2421 sync_info(c1)
2422
2423 # create objects in bucketA & bucketB
2424 create_objects(zcA, bucketA, objnames, content)
2425 create_object(zcA, bucketB, objnames, content)
2426
2427 zonegroup_meta_checkpoint(zonegroup)
2428 zone_data_checkpoint(zoneB, zoneA)
2429
2430 # verify if bucketA objects are synced
2431 bucket = get_bucket(zcB, bucketA.name)
2432 check_objects_exist(bucket, objnames, content)
2433
2434 # bucketB objects should not be synced
2435 bucket = get_bucket(zcB, bucketB.name)
2436 check_objects_not_exist(bucket, objnames)
2437
2438
2439 """
2440 Method (b): configure policy at only bucketA level
2441 """
2442 # reconfigure group pipe
2443 remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
2444 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
2445
2446 # change state to allowed
2447 set_sync_policy_group_status(c1, "sync-group", "allowed")
2448
2449 zonegroup.period.update(zoneA, commit=True)
2450 get_sync_policy(c1)
2451
2452
2453 # configure sync policy for only bucketA and enable it
2454 create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
2455 create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
2456 create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name)
2457 set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
2458
2459 get_sync_policy(c1, bucketA.name)
2460
2461 # create object in bucketA
2462 create_object(zcA, bucketA, objnames[2], content)
2463
2464 # create object in bucketA too
2465 create_object(zcA, bucketB, objnames[2], content)
2466
2467 zonegroup_meta_checkpoint(zonegroup)
2468 zone_data_checkpoint(zoneB, zoneA)
2469
2470 # verify if bucketA objects are synced
2471 bucket = get_bucket(zcB, bucketA.name)
2472 check_object_exists(bucket, objnames[2], content)
2473
2474 # bucketB objects should not be synced
2475 bucket = get_bucket(zcB, bucketB.name)
2476 check_object_not_exists(bucket, objnames[2])
2477
2478 remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
2479 remove_sync_policy_group(c1, "sync-group")
2480 return
2481
2482 @attr('sync_policy')
2483 def test_sync_different_buckets():
2484 """
2485 test_sync_different_buckets:
2486 sync zoneA bucketA to zoneB bucketB via below methods
2487
2488 (a) zonegroup: directional flow but configure pipe for zoneA bucketA to zoneB bucketB
2489 (b) bucket: configure another policy at bucketA level with pipe set to
2490 another bucket(bucketB) in target zone.
2491
2492 sync zoneA bucketA from zoneB bucketB
2493 (c) configure another policy at bucketA level with pipe set from
2494 another bucket(bucketB) in source zone.
2495
2496 """
2497
2498 zonegroup = realm.master_zonegroup()
2499 zonegroup_meta_checkpoint(zonegroup)
2500
2501 zonegroup_conns = ZonegroupConns(zonegroup)
2502
2503 (zoneA, zoneB) = zonegroup.zones[0:2]
2504 (zcA, zcB) = zonegroup_conns.zones[0:2]
2505 zones = zoneA.name + ',' + zoneB.name
2506
2507 c1 = zoneA.cluster
2508
2509 c1.admin(['sync', 'policy', 'get'])
2510
2511 objnames = [ 'obj1', 'obj2' ]
2512 objnamesB = [ 'obj3', 'obj4' ]
2513 content = 'asdasd'
2514 buckets = []
2515
2516 # create bucketA & bucketB in zoneA
2517 bucketA = create_zone_bucket(zcA)
2518 buckets.append(bucketA)
2519 bucketB = create_zone_bucket(zcA)
2520 buckets.append(bucketB)
2521
2522 zonegroup_meta_checkpoint(zonegroup)
2523
2524 """
2525 Method (a): zonegroup - configure pipe for only bucketA
2526 """
2527 # configure pipe from zoneA bucketA to zoneB bucketB
2528 create_sync_policy_group(c1, "sync-group")
2529 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
2530 args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketB.name]
2531 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args)
2532 set_sync_policy_group_status(c1, "sync-group", "enabled")
2533 zonegroup.period.update(zoneA, commit=True)
2534 get_sync_policy(c1)
2535
2536 # create objects in bucketA
2537 create_objects(zcA, bucketA, objnames, content)
2538
2539 zonegroup_meta_checkpoint(zonegroup)
2540 zone_data_checkpoint(zoneB, zoneA)
2541
2542 # verify that objects are synced to bucketB in zoneB
2543 # but not to bucketA
2544 bucket = get_bucket(zcB, bucketA.name)
2545 check_objects_not_exist(bucket, objnames)
2546
2547 bucket = get_bucket(zcB, bucketB.name)
2548 check_objects_exist(bucket, objnames, content)
2549 """
2550 Method (b): configure policy at only bucketA level with pipe
2551 set to bucketB in target zone
2552 """
2553
2554 remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
2555 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
2556
2557 # change state to allowed
2558 set_sync_policy_group_status(c1, "sync-group", "allowed")
2559
2560 zonegroup.period.update(zoneA, commit=True)
2561 get_sync_policy(c1)
2562
2563 # configure sync policy for only bucketA and enable it
2564 create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
2565 create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
2566 args = ['--source-bucket=*', '--dest-bucket=' + bucketB.name]
2567 create_sync_group_pipe(c1, "sync-bucket", "sync-pipeA", zones, zones, bucketA.name, args)
2568 set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
2569
2570 get_sync_policy(c1, bucketA.name)
2571
2572 objnamesC = [ 'obj5', 'obj6' ]
2573
2574 zonegroup_meta_checkpoint(zonegroup)
2575 # create objects in bucketA
2576 create_objects(zcA, bucketA, objnamesC, content)
2577
2578 zonegroup_meta_checkpoint(zonegroup)
2579 zone_data_checkpoint(zoneB, zoneA)
2580
2581 """
2582 # verify that objects are synced to bucketB in zoneB
2583 # but not to bucketA
2584 """
2585 bucket = get_bucket(zcB, bucketA.name)
2586 check_objects_not_exist(bucket, objnamesC)
2587
2588 bucket = get_bucket(zcB, bucketB.name)
2589 check_objects_exist(bucket, objnamesC, content)
2590
2591 remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
2592 zonegroup_meta_checkpoint(zonegroup)
2593 get_sync_policy(c1, bucketA.name)
2594
2595 """
2596 Method (c): configure policy at only bucketA level with pipe
2597 set from bucketB in source zone
2598 verify zoneA bucketA syncs from zoneB BucketB but not bucketA
2599 """
2600
2601 # configure sync policy for only bucketA and enable it
2602 create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
2603 create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
2604 args = ['--source-bucket=' + bucketB.name, '--dest-bucket=' + '*']
2605 create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name, args)
2606 set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
2607
2608 get_sync_policy(c1, bucketA.name)
2609
2610 # create objects in bucketA & B in ZoneB
2611 objnamesD = [ 'obj7', 'obj8' ]
2612 objnamesE = [ 'obj9', 'obj10' ]
2613
2614 create_objects(zcB, bucketA, objnamesD, content)
2615 create_objects(zcB, bucketB, objnamesE, content)
2616
2617 zonegroup_meta_checkpoint(zonegroup)
2618 zone_data_checkpoint(zoneA, zoneB)
2619 """
2620 # verify that objects from only bucketB are synced to
2621 # bucketA in zoneA
2622 """
2623 bucket = get_bucket(zcA, bucketA.name)
2624 check_objects_not_exist(bucket, objnamesD)
2625 check_objects_exist(bucket, objnamesE, content)
2626
2627 remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
2628 remove_sync_policy_group(c1, "sync-group")
2629 return
2630
2631 @attr('sync_policy')
2632 def test_sync_multiple_buckets_to_single():
2633 """
2634 test_sync_multiple_buckets_to_single:
2635 directional flow
2636 (a) pipe: sync zoneA bucketA,bucketB to zoneB bucketB
2637
2638 (b) configure another policy at bucketA level with pipe configured
2639 to sync from multiple buckets (bucketA & bucketB)
2640
2641 verify zoneA bucketA & bucketB syncs to zoneB BucketB
2642 """
2643
2644 zonegroup = realm.master_zonegroup()
2645 zonegroup_meta_checkpoint(zonegroup)
2646
2647 zonegroup_conns = ZonegroupConns(zonegroup)
2648
2649 (zoneA, zoneB) = zonegroup.zones[0:2]
2650 (zcA, zcB) = zonegroup_conns.zones[0:2]
2651 zones = zoneA.name + ',' + zoneB.name
2652
2653 c1 = zoneA.cluster
2654
2655 c1.admin(['sync', 'policy', 'get'])
2656
2657 objnamesA = [ 'obj1', 'obj2' ]
2658 objnamesB = [ 'obj3', 'obj4' ]
2659 content = 'asdasd'
2660 buckets = []
2661
2662 # create bucketA & bucketB in zoneA
2663 bucketA = create_zone_bucket(zcA)
2664 buckets.append(bucketA)
2665 bucketB = create_zone_bucket(zcA)
2666 buckets.append(bucketB)
2667
2668 zonegroup_meta_checkpoint(zonegroup)
2669
2670 # configure pipe from zoneA bucketA,bucketB to zoneB bucketB
2671 create_sync_policy_group(c1, "sync-group")
2672 create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
2673 source_buckets = [ bucketA.name, bucketB.name ]
2674 for source_bucket in source_buckets:
2675 args = ['--source-bucket=' + source_bucket, '--dest-bucket=' + bucketB.name]
2676 create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, None, args)
2677
2678 set_sync_policy_group_status(c1, "sync-group", "enabled")
2679 zonegroup.period.update(zoneA, commit=True)
2680 get_sync_policy(c1)
2681
2682 # create objects in bucketA & bucketB
2683 create_objects(zcA, bucketA, objnamesA, content)
2684 create_objects(zcA, bucketB, objnamesB, content)
2685
2686 zonegroup_meta_checkpoint(zonegroup)
2687 zone_data_checkpoint(zoneB, zoneA)
2688
2689 # verify that both zoneA bucketA & bucketB objects are synced to
2690 # bucketB in zoneB but not to bucketA
2691 bucket = get_bucket(zcB, bucketA.name)
2692 check_objects_not_exist(bucket, objnamesA)
2693 check_objects_not_exist(bucket, objnamesB)
2694
2695 bucket = get_bucket(zcB, bucketB.name)
2696 check_objects_exist(bucket, objnamesA, content)
2697 check_objects_exist(bucket, objnamesB, content)
2698
2699 """
2700 Method (b): configure at bucket level
2701 """
2702 # reconfigure pipe & flow
2703 for source_bucket in source_buckets:
2704 remove_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket)
2705 remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
2706 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
2707 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
2708
2709 # change state to allowed
2710 set_sync_policy_group_status(c1, "sync-group", "allowed")
2711
2712 zonegroup.period.update(zoneA, commit=True)
2713 get_sync_policy(c1)
2714
2715 objnamesC = [ 'obj5', 'obj6' ]
2716 objnamesD = [ 'obj7', 'obj8' ]
2717
2718 # configure sync policy for only bucketA and enable it
2719 create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
2720 create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
2721 source_buckets = [ bucketA.name, bucketB.name ]
2722 for source_bucket in source_buckets:
2723 args = ['--source-bucket=' + source_bucket, '--dest-bucket=' + '*']
2724 create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, bucketA.name, args)
2725
2726 set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
2727
2728 get_sync_policy(c1)
2729
2730 zonegroup_meta_checkpoint(zonegroup)
2731 # create objects in bucketA
2732 create_objects(zcA, bucketA, objnamesC, content)
2733 create_objects(zcA, bucketB, objnamesD, content)
2734
2735 zonegroup_meta_checkpoint(zonegroup)
2736 zone_data_checkpoint(zoneB, zoneA)
2737
2738 # verify that both zoneA bucketA & bucketB objects are synced to
2739 # bucketA in zoneB but not to bucketB
2740 bucket = get_bucket(zcB, bucketB.name)
2741 check_objects_not_exist(bucket, objnamesC)
2742 check_objects_not_exist(bucket, objnamesD)
2743
2744 bucket = get_bucket(zcB, bucketA.name)
2745 check_objects_exist(bucket, objnamesD, content)
2746 check_objects_exist(bucket, objnamesD, content)
2747
2748 remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
2749 remove_sync_policy_group(c1, "sync-group")
2750 return
2751
2752 @attr('sync_policy')
2753 def test_sync_single_bucket_to_multiple():
2754 """
2755 test_sync_single_bucket_to_multiple:
2756 directional flow
2757 (a) pipe: sync zoneA bucketA to zoneB bucketA & bucketB
2758
2759 (b) configure another policy at bucketA level with pipe configured
2760 to sync to multiple buckets (bucketA & bucketB)
2761
2762 verify zoneA bucketA syncs to zoneB bucketA & bucketB
2763 """
2764
2765 zonegroup = realm.master_zonegroup()
2766 zonegroup_meta_checkpoint(zonegroup)
2767
2768 zonegroup_conns = ZonegroupConns(zonegroup)
2769
2770 (zoneA, zoneB) = zonegroup.zones[0:2]
2771 (zcA, zcB) = zonegroup_conns.zones[0:2]
2772 zones = zoneA.name + ',' + zoneB.name
2773
2774 c1 = zoneA.cluster
2775
2776 c1.admin(['sync', 'policy', 'get'])
2777
2778 objnamesA = [ 'obj1', 'obj2' ]
2779 content = 'asdasd'
2780 buckets = []
2781
2782 # create bucketA & bucketB in zoneA
2783 bucketA = create_zone_bucket(zcA)
2784 buckets.append(bucketA)
2785 bucketB = create_zone_bucket(zcA)
2786 buckets.append(bucketB)
2787
2788 zonegroup_meta_checkpoint(zonegroup)
2789
2790 # configure pipe from zoneA bucketA to zoneB bucketA, bucketB
2791 create_sync_policy_group(c1, "sync-group")
2792 create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
2793
2794 dest_buckets = [ bucketA.name, bucketB.name ]
2795 for dest_bucket in dest_buckets:
2796 args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + dest_bucket]
2797 create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, None, args)
2798
2799 create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args)
2800 set_sync_policy_group_status(c1, "sync-group", "enabled")
2801 zonegroup.period.update(zoneA, commit=True)
2802 get_sync_policy(c1)
2803
2804 # create objects in bucketA
2805 create_objects(zcA, bucketA, objnamesA, content)
2806
2807 zonegroup_meta_checkpoint(zonegroup)
2808 zone_data_checkpoint(zoneB, zoneA)
2809
2810 # verify that objects from zoneA bucketA are synced to both
2811 # bucketA & bucketB in zoneB
2812 bucket = get_bucket(zcB, bucketA.name)
2813 check_objects_exist(bucket, objnamesA, content)
2814
2815 bucket = get_bucket(zcB, bucketB.name)
2816 check_objects_exist(bucket, objnamesA, content)
2817
2818 """
2819 Method (b): configure at bucket level
2820 """
2821 remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
2822 create_sync_group_pipe(c1, "sync-group", "sync-pipe", '*', '*')
2823
2824 # change state to allowed
2825 set_sync_policy_group_status(c1, "sync-group", "allowed")
2826
2827 zonegroup.period.update(zoneA, commit=True)
2828 get_sync_policy(c1)
2829
2830 objnamesB = [ 'obj3', 'obj4' ]
2831
2832 # configure sync policy for only bucketA and enable it
2833 create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
2834 create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
2835 dest_buckets = [ bucketA.name, bucketB.name ]
2836 for dest_bucket in dest_buckets:
2837 args = ['--source-bucket=' + '*', '--dest-bucket=' + dest_bucket]
2838 create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, bucketA.name, args)
2839
2840 set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
2841
2842 get_sync_policy(c1)
2843
2844 zonegroup_meta_checkpoint(zonegroup)
2845 # create objects in bucketA
2846 create_objects(zcA, bucketA, objnamesB, content)
2847
2848 zonegroup_meta_checkpoint(zonegroup)
2849 zone_data_checkpoint(zoneB, zoneA)
2850
2851 # verify that objects from zoneA bucketA are synced to both
2852 # bucketA & bucketB in zoneB
2853 bucket = get_bucket(zcB, bucketA.name)
2854 check_objects_exist(bucket, objnamesB, content)
2855
2856 bucket = get_bucket(zcB, bucketB.name)
2857 check_objects_exist(bucket, objnamesB, content)
2858
2859 remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
2860 remove_sync_policy_group(c1, "sync-group")
2861 return