Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutinize http client code #2429

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
797dc34
http/client: Coroutinize connection::write_body
xemul Sep 12, 2024
32365d4
http/client: Coroutinize connection::maybe_wait_for_continue
xemul Sep 12, 2024
e8df44b
http/client: Coroutinize connection::send_request_head
xemul Sep 12, 2024
dd51c2c
http/client: Coroutinize connection::recv_reply
xemul Sep 12, 2024
ad9833d
http/client: Restore indentation of connection::recv_reply
xemul Sep 12, 2024
9b46ea5
http/client: Coroutinize connection::do_make_request
xemul Sep 12, 2024
86d6a8b
http/client: Restore indentation of connection::do_make_request
xemul Sep 12, 2024
14e3efc
http/client: Squash maybe_wait_for_continue into do_make_request
xemul Sep 12, 2024
c5a63be
http/client Squash send_request_head into do_make_request
xemul Sep 12, 2024
9b473bf
http/client: Coroutinize connection::make_request
xemul Sep 12, 2024
ffacf24
http/client: Coroutinize connection::close
xemul Sep 12, 2024
cb1b4b6
http/client: Restore indentation of connection::close
xemul Sep 12, 2024
bd36cf3
http/client: Coroutinize client::make_connection
xemul Sep 12, 2024
bdfdf16
http/client: Restore indentation of client::make_connection
xemul Sep 12, 2024
2d1550e
http/client: Coroutinize client::put_connection
xemul Sep 12, 2024
215e024
http/client: Coroutinize client::shrink_connections
xemul Sep 12, 2024
88de135
http/client: Restore indentation of client::shrink_connections
xemul Sep 12, 2024
b10c8b6
http/client: Coroutinize client::set_maximum_connections
xemul Sep 12, 2024
685f6f2
http/client: Coroutinize client::with_connection
xemul Sep 12, 2024
200ea10
http/client: Coroutinize client::with_new_connection
xemul Sep 12, 2024
4250bca
http/client: Coroutinize client::close
xemul Sep 12, 2024
688ac04
http/client: Restore indentation of client::close
xemul Sep 12, 2024
3e22a25
http/client: Coroutinize client::do_make_request
xemul Sep 12, 2024
02df7b8
http/client: Sanitize client::do_make_request unexpected body logging
xemul Sep 12, 2024
4dfb090
http/client: Coroutinize client::get_connection
xemul Sep 12, 2024
539fa2a
http/client: Restore indentation of client::get_connection
xemul Sep 12, 2024
3231966
http/client: Coroutinize client::do_make_request (con-less overload)
xemul Sep 12, 2024
67134fd
http/client: Restore indentation of client::do_make_request overload
xemul Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions include/seastar/http/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ public:
private:
future<reply_ptr> do_make_request(request& rq);
void setup_request(request& rq);
future<> send_request_head(const request& rq);
future<reply_ptr> maybe_wait_for_continue(const request& req);
future<> write_body(const request& rq);
future<reply_ptr> recv_reply();

Expand Down Expand Up @@ -193,7 +191,7 @@ private:
future<> shrink_connections();

template <std::invocable<connection&> Fn>
auto with_connection(Fn&& fn, abort_source*);
futurize_t<std::invoke_result_t<Fn, connection&>> with_connection(Fn fn, abort_source*);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, i believe Avi would like this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has less than 4 closing corner braces in a row, so not that bad

Meanwhile in scylla (scroll right):

test/boost/user_function_test.cc:769:        e.execute_cql("CREATE FUNCTION my_func(val int) CALLED ON NULL INPUT RETURNS map<int, frozen<set<frozen<list<frozen<tuple<text, bigint>>>>>>> LANGUAGE Lua AS  \

😱

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, i meant, he always prefers an explicit return type. this one make it more explicit. so..


template <typename Fn>
requires std::invocable<Fn, connection&>
Expand Down
177 changes: 69 additions & 108 deletions src/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ module;
#endif

#include <concepts>
#include <coroutine>
#include <memory>
#include <optional>
#include <stdexcept>
Expand All @@ -35,6 +36,7 @@ module seastar;
#include <seastar/core/loop.hh>
#include <seastar/core/when_all.hh>
#include <seastar/core/reactor.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/net/tls.hh>
#include <seastar/http/client.hh>
#include <seastar/http/request.hh>
Expand Down Expand Up @@ -77,40 +79,19 @@ connection::connection(connected_socket&& fd, internal::client_ref cr)
future<> connection::write_body(const request& req) {
if (req.body_writer) {
if (req.content_length != 0) {
return req.body_writer(internal::make_http_content_length_output_stream(_write_buf, req.content_length, req._bytes_written)).then([&req] {
if (req.content_length == req._bytes_written) {
return make_ready_future<>();
} else {
return make_exception_future<>(std::runtime_error(format("partial request body write, need {} sent {}", req.content_length, req._bytes_written)));
}
});
co_await req.body_writer(internal::make_http_content_length_output_stream(_write_buf, req.content_length, req._bytes_written));
if (req.content_length != req._bytes_written) {
throw std::runtime_error(format("partial request body write, need {} sent {}", req.content_length, req._bytes_written));
}
} else {
co_await req.body_writer(internal::make_http_chunked_output_stream(_write_buf));
co_await _write_buf.write("0\r\n\r\n");
}
return req.body_writer(internal::make_http_chunked_output_stream(_write_buf)).then([this] {
return _write_buf.write("0\r\n\r\n");
});
} else if (!req.content.empty()) {
return _write_buf.write(req.content);
} else {
return make_ready_future<>();
co_await _write_buf.write(req.content);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this add an allocation in the common case that the buffer isn't full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but all users we have use lambda body writer and don't step on this branch. But lambda body writer also adds allocation here

}
}

future<connection::reply_ptr> connection::maybe_wait_for_continue(const request& req) {
if (req.get_header("Expect") == "") {
return make_ready_future<reply_ptr>(nullptr);
}

return _write_buf.flush().then([this] {
return recv_reply().then([] (reply_ptr rep) {
if (rep->_status == reply::status_type::continue_) {
return make_ready_future<reply_ptr>(nullptr);
} else {
return make_ready_future<reply_ptr>(std::move(rep));
}
});
});
}

void connection::setup_request(request& req) {
if (req._version.empty()) {
req._version = "1.1";
Expand All @@ -123,62 +104,50 @@ void connection::setup_request(request& req) {
}
}

future<> connection::send_request_head(const request& req) {
return _write_buf.write(req.request_line()).then([this, &req] {
return req.write_request_headers(_write_buf).then([this] {
return _write_buf.write("\r\n", 2);
});
});
}

future<connection::reply_ptr> connection::recv_reply() {
http_response_parser parser;
return do_with(std::move(parser), [this] (auto& parser) {
parser.init();
return _read_buf.consume(parser).then([this, &parser] {
if (parser.eof()) {
http_log.trace("Parsing response EOFed");
throw std::system_error(ECONNABORTED, std::system_category());
}
if (parser.failed()) {
http_log.trace("Parsing response failed");
throw std::runtime_error("Invalid http server response");
}
parser.init();
co_await _read_buf.consume(parser);
if (parser.eof()) {
http_log.trace("Parsing response EOFed");
throw std::system_error(ECONNABORTED, std::system_category());
}
if (parser.failed()) {
http_log.trace("Parsing response failed");
throw std::runtime_error("Invalid http server response");
}

auto resp = parser.get_parsed_response();
sstring length_header = resp->get_header("Content-Length");
resp->content_length = strtol(length_header.c_str(), nullptr, 10);
if ((resp->_version != "1.1") || seastar::internal::case_insensitive_cmp()(resp->get_header("Connection"), "close")) {
_persistent = false;
}
return make_ready_future<reply_ptr>(std::move(resp));
});
});
auto resp = parser.get_parsed_response();
sstring length_header = resp->get_header("Content-Length");
resp->content_length = strtol(length_header.c_str(), nullptr, 10);
if ((resp->_version != "1.1") || seastar::internal::case_insensitive_cmp()(resp->get_header("Connection"), "close")) {
_persistent = false;
}
co_return resp;
}

future<connection::reply_ptr> connection::do_make_request(request& req) {
setup_request(req);
return send_request_head(req).then([this, &req] {
return maybe_wait_for_continue(req).then([this, &req] (reply_ptr cont) {
if (cont) {
return make_ready_future<reply_ptr>(std::move(cont));
}
co_await _write_buf.write(req.request_line());
co_await req.write_request_headers(_write_buf);
co_await _write_buf.write("\r\n", 2);

if (req.get_header("Expect") != "") {
co_await _write_buf.flush();
reply_ptr rep = co_await recv_reply();
if (rep->_status != reply::status_type::continue_) {
co_return rep;
}
}

return write_body(req).then([this] {
return _write_buf.flush().then([this] {
return recv_reply();
});
});
});
});
co_await write_body(req);
co_await _write_buf.flush();
co_return co_await recv_reply();
}

future<reply> connection::make_request(request req) {
return do_with(std::move(req), [this] (auto& req) {
return do_make_request(req).then([] (reply_ptr rep) {
return make_ready_future<reply>(std::move(*rep));
});
});
reply_ptr rep = co_await do_make_request(req);
co_return std::move(*rep);
}

input_stream<char> connection::in(reply& rep) {
Expand All @@ -195,12 +164,10 @@ void connection::shutdown() noexcept {
}

future<> connection::close() {
return when_all(_read_buf.close(), _write_buf.close()).discard_result().then([this] {
auto la = _fd.local_address();
return std::move(_closed).then([la = std::move(la)] {
http_log.trace("destroyed connection {}", la);
});
});
co_await when_all(_read_buf.close(), _write_buf.close());
auto la = _fd.local_address();
co_await std::move(_closed);
http_log.trace("destroyed connection {}", la);
}

class basic_connection_factory : public connection_factory {
Expand Down Expand Up @@ -271,61 +238,55 @@ future<client::connection_ptr> client::get_connection(abort_source* as) {

future<client::connection_ptr> client::make_connection(abort_source* as) {
_total_new_connections++;
return _new_connections->make(as).then([cr = internal::client_ref(this)] (connected_socket cs) mutable {
http_log.trace("created new http connection {}", cs.local_address());
auto con = seastar::make_shared<connection>(std::move(cs), std::move(cr));
return make_ready_future<connection_ptr>(std::move(con));
});
auto cr = internal::client_ref(this);
connected_socket cs = co_await _new_connections->make(as);
http_log.trace("created new http connection {}", cs.local_address());
auto con = seastar::make_shared<connection>(std::move(cs), std::move(cr));
co_return con;
}

future<> client::put_connection(connection_ptr con) {
if (con->_persistent && (_nr_connections <= _max_connections)) {
http_log.trace("push http connection {} to pool", con->_fd.local_address());
_pool.push_back(*con);
_wait_con.signal();
return make_ready_future<>();
co_return;
}

http_log.trace("dropping connection {}", con->_fd.local_address());
return con->close().finally([con] {});
co_await con->close();
}

future<> client::shrink_connections() {
if (_nr_connections <= _max_connections) {
return make_ready_future<>();
}
while (_nr_connections > _max_connections) {
if (!_pool.empty()) {
connection_ptr con = _pool.front().shared_from_this();
_pool.pop_front();
co_await con->close();
continue;
}

if (!_pool.empty()) {
connection_ptr con = _pool.front().shared_from_this();
_pool.pop_front();
return con->close().finally([this, con] {
return shrink_connections();
});
co_await _wait_con.wait();
}

return _wait_con.wait().then([this] {
return shrink_connections();
});
}

future<> client::set_maximum_connections(unsigned nr) {
if (nr > _max_connections) {
_max_connections = nr;
_wait_con.broadcast();
return make_ready_future<>();
co_return;
}

_max_connections = nr;
return shrink_connections();
co_await shrink_connections();
}

template <std::invocable<connection&> Fn>
auto client::with_connection(Fn&& fn, abort_source* as) {
return get_connection(as).then([this, fn = std::move(fn)] (connection_ptr con) mutable {
return fn(*con).finally([this, con = std::move(con)] () mutable {
return put_connection(std::move(con));
});
});
futurize_t<std::invoke_result_t<Fn, connection&>> client::with_connection(Fn fn, abort_source* as) {
connection_ptr con = co_await get_connection(as);
auto f = co_await coroutine::as_future(futurize_invoke(std::move(fn), *con));
co_await put_connection(std::move(con));
co_return co_await std::move(f);
}

template <typename Fn>
Expand Down