]>
git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/apps/memcached/tests/test_memcached.py
3 # This file is open source software, licensed to you under the terms
4 # of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 # distributed with this work for additional information regarding copyright
6 # ownership. You may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
19 from contextlib
import contextmanager
33 class TimeoutError(Exception):
37 def tcp_connection(timeout
=1):
38 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
40 s
.connect(server_addr
)
43 return s
.recv(16*1024)
50 raise unittest
.SkipTest('Slow')
63 def tcp_call(msg
, timeout
=1):
64 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
66 s
.connect(server_addr
)
68 s
.shutdown(socket
.SHUT_WR
)
73 def udp_call_for_fragments(msg
, timeout
=1):
74 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
75 sock
.settimeout(timeout
)
76 this_req_id
= random
.randint(-32768, 32767)
78 datagram
= struct
.pack(">hhhh", this_req_id
, 0, 1, 0) + msg
.encode()
79 sock
.sendto(datagram
, server_addr
)
84 data
, addr
= sock
.recvfrom(1500)
85 req_id
, seq
, n
, res
= struct
.unpack_from(">hhhh", data
)
88 if n_determined
and n_determined
!= n
:
89 raise Exception('Inconsitent number of total messages, %d and %d' % (n_determined
, n
))
92 if req_id
!= this_req_id
:
93 raise Exception('Invalid request id: ' + req_id
+ ', expected ' + this_req_id
)
96 raise Exception('Duplicate message for seq=' + seq
)
98 messages
[seq
] = content
99 if len(messages
) == n
:
102 for k
, v
in sorted(messages
.items(), key
=lambda e
: e
[0]):
107 def udp_call(msg
, **kwargs
):
108 return b
''.join(udp_call_for_fragments(msg
, **kwargs
))
110 class MemcacheTest(unittest
.TestCase
):
111 def set(self
, key
, value
, flags
=0, expiry
=0):
112 self
.assertEqual(call('set %s %d %d %d\r\n%s\r\n' % (key
, flags
, expiry
, len(value
), value
)), b
'STORED\r\n')
114 def delete(self
, key
):
115 self
.assertEqual(call('delete %s\r\n' % key
), b
'DELETED\r\n')
117 def assertHasKey(self
, key
):
118 resp
= call('get %s\r\n' % key
)
119 if not resp
.startswith(('VALUE %s' % key
).encode()):
120 self
.fail('Key \'%s\' should be present, but got: %s' % (key
, resp
.decode()))
122 def assertNoKey(self
, key
):
123 resp
= call('get %s\r\n' % key
)
124 if resp
!= b
'END\r\n':
125 self
.fail('Key \'%s\' should not be present, but got: %s' % (key
, resp
.decode()))
127 def setKey(self
, key
):
128 self
.set(key
, 'some value')
130 def getItemVersion(self
, key
):
131 m
= re
.match(r
'VALUE %s \d+ \d+ (?P<version>\d+)' % key
, call('gets %s\r\n' % key
).decode())
132 return int(m
.group('version'))
134 def getStat(self
, name
, call_fn
=None):
135 if not call_fn
: call_fn
= call
136 resp
= call_fn('stats\r\n').decode()
137 m
= re
.search(r
'STAT %s (?P<value>.+)' % re
.escape(name
), resp
, re
.MULTILINE
)
138 return m
.group('value')
141 self
.assertEqual(call('flush_all\r\n'), b
'OK\r\n')
146 class TcpSpecificTests(MemcacheTest
):
147 def test_recovers_from_errors_in_the_stream(self
):
148 with
tcp_connection() as conn
:
149 self
.assertEqual(conn('get\r\n'), b
'ERROR\r\n')
150 self
.assertEqual(conn('get key\r\n'), b
'END\r\n')
152 def test_incomplete_command_results_in_error(self
):
153 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
154 s
.connect(server_addr
)
156 s
.shutdown(socket
.SHUT_WR
)
157 self
.assertEqual(recv_all(s
), b
'ERROR\r\n')
160 def test_stream_closed_results_in_error(self
):
161 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
162 s
.connect(server_addr
)
163 s
.shutdown(socket
.SHUT_WR
)
164 self
.assertEqual(recv_all(s
), b
'')
167 def test_unsuccesful_parsing_does_not_leave_data_behind(self
):
168 with
tcp_connection() as conn
:
169 self
.assertEqual(conn('set key 0 0 5\r\nhello\r\n'), b
'STORED\r\n')
170 self
.assertRegex(conn('delete a b c\r\n'), b
'^(CLIENT_)?ERROR.*\r\n$')
171 self
.assertEqual(conn('get key\r\n'), b
'VALUE key 0 5\r\nhello\r\nEND\r\n')
172 self
.assertEqual(conn('delete key\r\n'), b
'DELETED\r\n')
174 def test_flush_all_no_reply(self
):
175 self
.assertEqual(call('flush_all noreply\r\n'), b
'')
177 def test_set_no_reply(self
):
178 self
.assertEqual(call('set key 0 0 5 noreply\r\nhello\r\nget key\r\n'), b
'VALUE key 0 5\r\nhello\r\nEND\r\n')
181 def test_delete_no_reply(self
):
183 self
.assertEqual(call('delete key noreply\r\nget key\r\n'), b
'END\r\n')
185 def test_add_no_reply(self
):
186 self
.assertEqual(call('add key 0 0 1 noreply\r\na\r\nget key\r\n'), b
'VALUE key 0 1\r\na\r\nEND\r\n')
189 def test_replace_no_reply(self
):
190 self
.assertEqual(call('set key 0 0 1\r\na\r\n'), b
'STORED\r\n')
191 self
.assertEqual(call('replace key 0 0 1 noreply\r\nb\r\nget key\r\n'), b
'VALUE key 0 1\r\nb\r\nEND\r\n')
194 def test_cas_noreply(self
):
195 self
.assertNoKey('key')
196 self
.assertEqual(call('cas key 0 0 1 1 noreply\r\na\r\n'), b
'')
197 self
.assertNoKey('key')
199 self
.assertEqual(call('add key 0 0 5\r\nhello\r\n'), b
'STORED\r\n')
200 version
= self
.getItemVersion('key')
202 self
.assertEqual(call('cas key 1 0 5 %d noreply\r\naloha\r\n' % (version
+ 1)), b
'')
203 self
.assertEqual(call('get key\r\n'), b
'VALUE key 0 5\r\nhello\r\nEND\r\n')
205 self
.assertEqual(call('cas key 1 0 5 %d noreply\r\naloha\r\n' % (version
)), b
'')
206 self
.assertEqual(call('get key\r\n'), b
'VALUE key 1 5\r\naloha\r\nEND\r\n')
211 def test_connection_statistics(self
):
212 with
tcp_connection() as conn
:
213 curr_connections
= int(self
.getStat('curr_connections', call_fn
=conn
))
214 total_connections
= int(self
.getStat('total_connections', call_fn
=conn
))
215 with
tcp_connection() as conn2
:
216 self
.assertEqual(curr_connections
+ 1, int(self
.getStat('curr_connections', call_fn
=conn
)))
217 self
.assertEqual(total_connections
+ 1, int(self
.getStat('total_connections', call_fn
=conn
)))
218 self
.assertEqual(total_connections
+ 1, int(self
.getStat('total_connections', call_fn
=conn
)))
220 self
.assertEqual(curr_connections
, int(self
.getStat('curr_connections', call_fn
=conn
)))
222 class UdpSpecificTests(MemcacheTest
):
223 def test_large_response_is_split_into_mtu_chunks(self
):
224 max_datagram_size
= 1400
225 data
= '1' * (max_datagram_size
*3)
226 self
.set('key', data
)
228 chunks
= list(udp_call_for_fragments('get key\r\n'))
231 self
.assertLessEqual(len(chunk
), max_datagram_size
)
233 self
.assertEqual(b
''.join(chunks
).decode(),
234 'VALUE key 0 %d\r\n%s\r\n' \
235 'END\r\n' % (len(data
), data
))
239 class TestCommands(MemcacheTest
):
240 def test_basic_commands(self
):
241 self
.assertEqual(call('get key\r\n'), b
'END\r\n')
242 self
.assertEqual(call('set key 0 0 5\r\nhello\r\n'), b
'STORED\r\n')
243 self
.assertEqual(call('get key\r\n'), b
'VALUE key 0 5\r\nhello\r\nEND\r\n')
244 self
.assertEqual(call('delete key\r\n'), b
'DELETED\r\n')
245 self
.assertEqual(call('delete key\r\n'), b
'NOT_FOUND\r\n')
246 self
.assertEqual(call('get key\r\n'), b
'END\r\n')
248 def test_error_handling(self
):
249 self
.assertEqual(call('get\r\n'), b
'ERROR\r\n')
252 def test_expiry(self
):
253 self
.assertEqual(call('set key 0 1 5\r\nhello\r\n'), b
'STORED\r\n')
254 self
.assertEqual(call('get key\r\n'), b
'VALUE key 0 5\r\nhello\r\nEND\r\n')
256 self
.assertEqual(call('get key\r\n'), b
'END\r\n')
259 def test_expiry_at_epoch_time(self
):
260 expiry
= int(time
.time()) + 1
261 self
.assertEqual(call('set key 0 %d 5\r\nhello\r\n' % expiry
), b
'STORED\r\n')
262 self
.assertEqual(call('get key\r\n'), b
'VALUE key 0 5\r\nhello\r\nEND\r\n')
264 self
.assertEqual(call('get key\r\n'), b
'END\r\n')
266 def test_multiple_keys_in_get(self
):
267 self
.assertEqual(call('set key1 0 0 2\r\nv1\r\n'), b
'STORED\r\n')
268 self
.assertEqual(call('set key 0 0 2\r\nv2\r\n'), b
'STORED\r\n')
269 resp
= call('get key1 key\r\n')
270 self
.assertRegex(resp
, b
'^(VALUE key1 0 2\r\nv1\r\nVALUE key 0 2\r\nv2\r\nEND\r\n)|(VALUE key 0 2\r\nv2\r\nVALUE key1 0 2\r\nv1\r\nEND\r\n)$')
274 def test_flush_all(self
):
275 self
.set('key', 'value')
276 self
.assertEqual(call('flush_all\r\n'), b
'OK\r\n')
277 self
.assertNoKey('key')
279 def test_keys_set_after_flush_remain(self
):
280 self
.assertEqual(call('flush_all\r\n'), b
'OK\r\n')
282 self
.assertHasKey('key')
286 def test_flush_all_with_timeout_flushes_all_keys_even_those_set_after_flush(self
):
288 self
.assertEqual(call('flush_all 2\r\n'), b
'OK\r\n')
289 self
.assertHasKey('key')
292 self
.assertNoKey('key')
293 self
.assertNoKey('key2')
296 def test_subsequent_flush_is_merged(self
):
298 self
.assertEqual(call('flush_all 2\r\n'), b
'OK\r\n') # Can flush in anything between 1-2
299 self
.assertEqual(call('flush_all 4\r\n'), b
'OK\r\n') # Can flush in anything between 3-4
301 self
.assertHasKey('key')
304 self
.assertNoKey('key')
305 self
.assertNoKey('key2')
308 def test_immediate_flush_cancels_delayed_flush(self
):
309 self
.assertEqual(call('flush_all 2\r\n'), b
'OK\r\n')
310 self
.assertEqual(call('flush_all\r\n'), b
'OK\r\n')
313 self
.assertHasKey('key')
317 def test_flushing_in_the_past(self
):
321 key2_time
= int(time
.time())
322 self
.assertEqual(call('flush_all %d\r\n' % (key2_time
- 1)), b
'OK\r\n')
324 self
.assertNoKey("key1")
325 self
.assertNoKey("key2")
328 def test_memcache_does_not_crash_when_flushing_with_already_expred_items(self
):
329 self
.assertEqual(call('set key1 0 2 5\r\nhello\r\n'), b
'STORED\r\n')
331 self
.assertEqual(call('flush_all\r\n'), b
'OK\r\n')
333 def test_response_spanning_many_datagrams(self
):
334 key1_data
= '1' * 1000
335 key2_data
= '2' * 1000
336 key3_data
= '3' * 1000
337 self
.set('key1', key1_data
)
338 self
.set('key2', key2_data
)
339 self
.set('key3', key3_data
)
341 resp
= call('get key1 key2 key3\r\n').decode()
343 pattern
= '^VALUE (?P<v1>.*?\r\n.*?)\r\nVALUE (?P<v2>.*?\r\n.*?)\r\nVALUE (?P<v3>.*?\r\n.*?)\r\nEND\r\n$'
344 self
.assertRegex(resp
, pattern
)
346 m
= re
.match(pattern
, resp
)
347 self
.assertEqual(set([m
.group('v1'), m
.group('v2'), m
.group('v3')]),
348 set(['key1 0 %d\r\n%s' % (len(key1_data
), key1_data
),
349 'key2 0 %d\r\n%s' % (len(key2_data
), key2_data
),
350 'key3 0 %d\r\n%s' % (len(key3_data
), key3_data
)]))
356 def test_version(self
):
357 self
.assertRegex(call('version\r\n'), b
'^VERSION .*\r\n$')
360 self
.assertEqual(call('add key 0 0 1\r\na\r\n'), b
'STORED\r\n')
361 self
.assertEqual(call('add key 0 0 1\r\na\r\n'), b
'NOT_STORED\r\n')
364 def test_replace(self
):
365 self
.assertEqual(call('add key 0 0 1\r\na\r\n'), b
'STORED\r\n')
366 self
.assertEqual(call('replace key 0 0 1\r\na\r\n'), b
'STORED\r\n')
368 self
.assertEqual(call('replace key 0 0 1\r\na\r\n'), b
'NOT_STORED\r\n')
370 def test_cas_and_gets(self
):
371 self
.assertEqual(call('cas key 0 0 1 1\r\na\r\n'), b
'NOT_FOUND\r\n')
372 self
.assertEqual(call('add key 0 0 5\r\nhello\r\n'), b
'STORED\r\n')
373 version
= self
.getItemVersion('key')
375 self
.assertEqual(call('set key 1 0 5\r\nhello\r\n'), b
'STORED\r\n')
376 self
.assertEqual(call('gets key\r\n').decode(), 'VALUE key 1 5 %d\r\nhello\r\nEND\r\n' % (version
+ 1))
378 self
.assertEqual(call('cas key 0 0 5 %d\r\nhello\r\n' % (version
)), b
'EXISTS\r\n')
379 self
.assertEqual(call('cas key 0 0 5 %d\r\naloha\r\n' % (version
+ 1)), b
'STORED\r\n')
380 self
.assertEqual(call('gets key\r\n').decode(), 'VALUE key 0 5 %d\r\naloha\r\nEND\r\n' % (version
+ 2))
384 def test_curr_items_stat(self
):
385 self
.assertEqual(0, int(self
.getStat('curr_items')))
387 self
.assertEqual(1, int(self
.getStat('curr_items')))
389 self
.assertEqual(0, int(self
.getStat('curr_items')))
391 def test_how_stats_change_with_different_commands(self
):
392 get_count
= int(self
.getStat('cmd_get'))
393 set_count
= int(self
.getStat('cmd_set'))
394 flush_count
= int(self
.getStat('cmd_flush'))
395 total_items
= int(self
.getStat('total_items'))
396 get_misses
= int(self
.getStat('get_misses'))
397 get_hits
= int(self
.getStat('get_hits'))
398 cas_hits
= int(self
.getStat('cas_hits'))
399 cas_badval
= int(self
.getStat('cas_badval'))
400 cas_misses
= int(self
.getStat('cas_misses'))
401 delete_misses
= int(self
.getStat('delete_misses'))
402 delete_hits
= int(self
.getStat('delete_hits'))
403 curr_connections
= int(self
.getStat('curr_connections'))
404 incr_hits
= int(self
.getStat('incr_hits'))
405 incr_misses
= int(self
.getStat('incr_misses'))
406 decr_hits
= int(self
.getStat('decr_hits'))
407 decr_misses
= int(self
.getStat('decr_misses'))
417 call('set key1 0 0 1\r\na\r\n')
425 call('add key1 0 0 1\r\na\r\n')
428 call('add key2 0 0 1\r\na\r\n')
432 call('replace key1 0 0 1\r\na\r\n')
436 call('replace key3 0 0 1\r\na\r\n')
439 call('cas key4 0 0 1 1\r\na\r\n')
443 call('cas key1 0 0 1 %d\r\na\r\n' % self
.getItemVersion('key1'))
450 call('cas key1 0 0 1 %d\r\na\r\n' % (self
.getItemVersion('key1') + 1))
456 call('delete key1\r\n')
459 call('delete key1\r\n')
462 call('incr num 1\r\n')
464 call('decr num 1\r\n')
467 call('set num 0 0 1\r\n0\r\n')
471 call('incr num 1\r\n')
473 call('decr num 1\r\n')
479 self
.assertEqual(get_count
, int(self
.getStat('cmd_get')))
480 self
.assertEqual(set_count
, int(self
.getStat('cmd_set')))
481 self
.assertEqual(flush_count
, int(self
.getStat('cmd_flush')))
482 self
.assertEqual(total_items
, int(self
.getStat('total_items')))
483 self
.assertEqual(get_hits
, int(self
.getStat('get_hits')))
484 self
.assertEqual(get_misses
, int(self
.getStat('get_misses')))
485 self
.assertEqual(cas_misses
, int(self
.getStat('cas_misses')))
486 self
.assertEqual(cas_hits
, int(self
.getStat('cas_hits')))
487 self
.assertEqual(cas_badval
, int(self
.getStat('cas_badval')))
488 self
.assertEqual(delete_misses
, int(self
.getStat('delete_misses')))
489 self
.assertEqual(delete_hits
, int(self
.getStat('delete_hits')))
490 self
.assertEqual(0, int(self
.getStat('curr_items')))
491 self
.assertEqual(curr_connections
, int(self
.getStat('curr_connections')))
492 self
.assertEqual(incr_misses
, int(self
.getStat('incr_misses')))
493 self
.assertEqual(incr_hits
, int(self
.getStat('incr_hits')))
494 self
.assertEqual(decr_misses
, int(self
.getStat('decr_misses')))
495 self
.assertEqual(decr_hits
, int(self
.getStat('decr_hits')))
498 self
.assertEqual(call('incr key 0\r\n'), b
'NOT_FOUND\r\n')
500 self
.assertEqual(call('set key 0 0 1\r\n0\r\n'), b
'STORED\r\n')
501 self
.assertEqual(call('incr key 0\r\n'), b
'0\r\n')
502 self
.assertEqual(call('get key\r\n'), b
'VALUE key 0 1\r\n0\r\nEND\r\n')
504 self
.assertEqual(call('incr key 1\r\n'), b
'1\r\n')
505 self
.assertEqual(call('incr key 2\r\n'), b
'3\r\n')
506 self
.assertEqual(call('incr key %d\r\n' % (pow(2, 64) - 1)), b
'2\r\n')
507 self
.assertEqual(call('incr key %d\r\n' % (pow(2, 64) - 3)), b
'18446744073709551615\r\n')
508 self
.assertRegex(call('incr key 1\r\n').decode(), r
'0(\w+)?\r\n')
510 self
.assertEqual(call('set key 0 0 2\r\n1 \r\n'), b
'STORED\r\n')
511 self
.assertEqual(call('incr key 1\r\n'), b
'2\r\n')
513 self
.assertEqual(call('set key 0 0 2\r\n09\r\n'), b
'STORED\r\n')
514 self
.assertEqual(call('incr key 1\r\n'), b
'10\r\n')
517 self
.assertEqual(call('decr key 0\r\n'), b
'NOT_FOUND\r\n')
519 self
.assertEqual(call('set key 0 0 1\r\n7\r\n'), b
'STORED\r\n')
520 self
.assertEqual(call('decr key 1\r\n'), b
'6\r\n')
521 self
.assertEqual(call('get key\r\n'), b
'VALUE key 0 1\r\n6\r\nEND\r\n')
523 self
.assertEqual(call('decr key 6\r\n'), b
'0\r\n')
524 self
.assertEqual(call('decr key 2\r\n'), b
'0\r\n')
526 self
.assertEqual(call('set key 0 0 2\r\n20\r\n'), b
'STORED\r\n')
527 self
.assertRegex(call('decr key 11\r\n').decode(), r
'^9( )?\r\n$')
529 self
.assertEqual(call('set key 0 0 3\r\n100\r\n'), b
'STORED\r\n')
530 self
.assertRegex(call('decr key 91\r\n').decode(), r
'^9( )?\r\n$')
532 self
.assertEqual(call('set key 0 0 2\r\n1 \r\n'), b
'STORED\r\n')
533 self
.assertEqual(call('decr key 1\r\n'), b
'0\r\n')
535 self
.assertEqual(call('set key 0 0 2\r\n09\r\n'), b
'STORED\r\n')
536 self
.assertEqual(call('decr key 1\r\n'), b
'8\r\n')
538 def test_incr_and_decr_on_invalid_input(self
):
539 error_msg
= b
'CLIENT_ERROR cannot increment or decrement non-numeric value\r\n'
540 for cmd
in ['incr', 'decr']:
541 for value
in ['', '-1', 'a', '0x1', '18446744073709551616']:
542 self
.assertEqual(call('set key 0 0 %d\r\n%s\r\n' % (len(value
), value
)), b
'STORED\r\n')
543 prev
= call('get key\r\n')
544 self
.assertEqual(call(cmd
+ ' key 1\r\n'), error_msg
, "cmd=%s, value=%s" % (cmd
, value
))
545 self
.assertEqual(call('get key\r\n'), prev
)
548 def wait_for_memcache_tcp(timeout
=4):
549 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
550 timeout_at
= time
.time() + timeout
552 if time
.time() >= timeout_at
:
555 s
.connect(server_addr
)
558 except ConnectionRefusedError
:
562 def wait_for_memcache_udp(timeout
=4):
563 timeout_at
= time
.time() + timeout
565 if time
.time() >= timeout_at
:
568 udp_call('version\r\n', timeout
=0.2)
570 except socket
.timeout
:
573 if __name__
== '__main__':
574 parser
= argparse
.ArgumentParser(description
="memcache protocol tests")
575 parser
.add_argument('--server', '-s', action
="store", help="server adddress in <host>:<port> format", default
="localhost:11211")
576 parser
.add_argument('--udp', '-U', action
="store_true", help="Use UDP protocol")
577 parser
.add_argument('--fast', action
="store_true", help="Run only fast tests")
578 args
= parser
.parse_args()
580 host
, port
= args
.server
.split(':')
581 server_addr
= (host
, int(port
))
585 wait_for_memcache_udp()
588 wait_for_memcache_tcp()
590 runner
= unittest
.TextTestRunner()
591 loader
= unittest
.TestLoader()
592 suite
= unittest
.TestSuite()
593 suite
.addTest(loader
.loadTestsFromTestCase(TestCommands
))
595 suite
.addTest(loader
.loadTestsFromTestCase(UdpSpecificTests
))
597 suite
.addTest(loader
.loadTestsFromTestCase(TcpSpecificTests
))
598 result
= runner
.run(suite
)
599 if not result
.wasSuccessful():