#include <seastar/rpc/rpc.hh>
#include <seastar/core/sleep.hh>
#include <seastar/rpc/lz4_compressor.hh>
+#include <seastar/util/log.hh>
+#include <seastar/core/loop.hh>
using namespace seastar;
template <typename Input>
inline sstring read(serializer, Input& in, rpc::type<sstring>) {
auto size = read_arithmetic_type<uint32_t>(in);
- sstring ret(sstring::initialized_later(), size);
- in.read(ret.begin(), size);
+ sstring ret = uninitialized_string(size);
+ in.read(ret.data(), size);
return ret;
}
static std::unique_ptr<rpc::protocol<serializer>::client> client;
static double x = 30.0;
- myrpc.set_logger([] (const sstring& log) {
- fmt::print("{}", log);
- std::cout << std::endl;
- });
+ static logger log("rpc_demo");
+ myrpc.set_logger(&log);
return app.run_deprecated(ac, av, [&] {
auto&& config = app.configuration();
auto test9_1 = myrpc.make_client<long (long a, long b, int c)>(9); // send optional
auto test9_2 = myrpc.make_client<long (long a, long b, int c, long d)>(9); // send more data than handler expects
auto test10 = myrpc.make_client<long ()>(10); // receive less then replied
- auto test10_1 = myrpc.make_client<future<long, int> ()>(10); // receive all
- auto test11 = myrpc.make_client<future<long, rpc::optional<int>> ()>(11); // receive more then replied
+ auto test10_1 = myrpc.make_client<future<rpc::tuple<long, int>> ()>(10); // receive all
+ auto test11 = myrpc.make_client<future<rpc::tuple<long, rpc::optional<int>>> ()>(11); // receive more then replied
auto test12 = myrpc.make_client<void (int sleep_ms, sstring payload)>(12); // large payload vs. server limits
auto test_nohandler = myrpc.make_client<void ()>(100000000); // non existing verb
auto test_nohandler_nowait = myrpc.make_client<rpc::no_wait_type ()>(100000000); // non existing verb, no_wait call
(void)test9_1(*client, 1, 2, 3).then([] (long r) { fmt::print("test9.1 got {:d}\n", r); });
(void)test9_2(*client, 1, 2, 3, 4).then([] (long r) { fmt::print("test9.2 got {:d}\n", r); });
(void)test10(*client).then([] (long r) { fmt::print("test10 got {:d}\n", r); });
- (void)test10_1(*client).then([] (long r, int rr) { fmt::print("test10_1 got {:d} and {:d}\n", r, rr); });
- (void)test11(*client).then([] (long r, rpc::optional<int> rr) { fmt::print("test11 got {:d} and {:d}\n", r, bool(rr)); });
+ (void)test10_1(*client).then([] (rpc::tuple<long, int> r) { fmt::print("test10_1 got {:d} and {:d}\n", std::get<0>(r), std::get<1>(r)); });
+ (void)test11(*client).then([] (rpc::tuple<long, rpc::optional<int> > r) { fmt::print("test11 got {:d} and {:d}\n", std::get<0>(r), bool(std::get<1>(r))); });
(void)test_nohandler(*client).then_wrapped([](future<> f) {
try {
f.get();
}
});
(void)sleep(500us).then([c] { c->cancel(); });
- (void)test_message_to_big(*client, sstring(sstring::initialized_later(), 10'000'001)).then_wrapped([](future<> f) {
+ (void)test_message_to_big(*client, uninitialized_string(10'000'001)).then_wrapped([](future<> f) {
try {
f.get();
fmt::print("test message to big shold not get here\n");
// server is configured for 10MB max, throw 25MB worth of requests at it.
auto now = rpc::rpc_clock_type::now();
return parallel_for_each(boost::irange(0, 25), [test12, now] (int idx) mutable {
- return test12(*client, 100, sstring(sstring::initialized_later(), 1'000'000)).then([idx, now] {
+ return test12(*client, 100, uninitialized_string(1'000'000)).then([idx, now] {
auto later = rpc::rpc_clock_type::now();
auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now);
fmt::print("idx {:d} completed after {:d} ms\n", idx, delta.count());
});
myrpc.register_handler(10, [] {
fmt::print("test 10\n");
- return make_ready_future<long, int>(1, 2);
+ return make_ready_future<rpc::tuple<long, int>>(rpc::tuple<long, int>(1, 2));
});
myrpc.register_handler(11, [] {
fmt::print("test 11\n");