DummyAuthClientServer dummy_auth;
public:
- static const unsigned max_in_flight = 64;
- static const unsigned max_connections = 128;
+ const unsigned max_in_flight = 0;
+ const unsigned max_connections = 0;
static const unsigned max_message_len = 1024 * 1024 * 4;
SyntheticWorkload(int servers, int clients, string type, int random_num,
- Messenger::Policy srv_policy, Messenger::Policy cli_policy)
+ Messenger::Policy srv_policy, Messenger::Policy cli_policy,
+ int _max_in_flight = 64, int _max_connections = 128)
: client_policy(cli_policy),
dispatcher(false, this),
rng(time(NULL)),
- dummy_auth(g_ceph_context) {
+ dummy_auth(g_ceph_context),
+ max_in_flight(_max_in_flight),
+ max_connections(_max_connections) {
+
dummy_auth.auth_registry.refresh_config();
Messenger *msgr;
int base_port = 16800;
}
}
+ void send_large_message(bool inject_network_congestion=false) {
+ std::lock_guard l{lock};
+ ConnectionRef conn = _get_random_connection();
+ uuid_d uuid;
+ uuid.generate_random();
+ MCommand *m = new MCommand(uuid);
+ vector<string> cmds;
+ cmds.push_back("command");
+ // set the random data to make the large message
+ bufferlist bl;
+ string s("abcdefghijklmnopqrstuvwxyz");
+ for (int i = 0; i < 1024*256; i++)
+ bl.append(s);
+ // bl is around 6M
+ m->set_data(bl);
+ m->cmd = cmds;
+ m->set_priority(200);
+ // setup after connection is ready
+ if (inject_network_congestion && conn->is_connected()) {
+ g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100");
+ } else {
+ g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0");
+ }
+ conn->send_message(m);
+ }
+
void drop_connection() {
std::lock_guard l{lock};
if (available_connections.size() < 10)
g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
}
+// This is test for network block, means ::send return EAGAIN
+TEST_P(MessengerTest, SyntheticInjectTest5) {
+ SyntheticWorkload test_msg(1, 8, GetParam(), 100,
+ Messenger::Policy::stateful_server(0),
+ Messenger::Policy::lossless_client(0),
+ 64, 2);
+ bool simulate_network_congestion = true;
+ for (int i = 0; i < 2; ++i)
+ test_msg.generate_connection();
+ for (int i = 0; i < 5000; ++i) {
+ if (!(i % 10)) {
+ ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ if (i < 1600) {
+ // means that we would stuck 1600 * 6M (9.6G) around with 2 connections
+ test_msg.send_large_message(simulate_network_congestion);
+ } else {
+ simulate_network_congestion = false;
+ test_msg.send_large_message(simulate_network_congestion);
+ }
+ }
+ test_msg.wait_for_done();
+}
+
class MarkdownDispatcher : public Dispatcher {
ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");