fastcgi completely asynchronous

This changes the fastcgi implementation from a blocking I/O to an
async implementation on top of libevent' bufferevents.

Should improve the responsiveness of gmid especially when using remote
fastcgi applications.
This commit is contained in:
Omar Polo 2021-09-26 17:00:07 +00:00
parent 83fe545a2b
commit 741b69be96
3 changed files with 190 additions and 196 deletions

346
fcgi.c
View File

@ -149,7 +149,7 @@ prepare_header(struct fcgi_header *h, int type, int id, size_t size,
}
static int
fcgi_begin_request(int sock, int id)
fcgi_begin_request(struct bufferevent *bev, int id)
{
struct fcgi_begin_req_record r;
@ -165,13 +165,14 @@ fcgi_begin_request(int sock, int id)
r.body.role0 = FCGI_RESPONDER;
r.body.flags = FCGI_KEEP_CONN;
if (write(sock, &r, sizeof(r)) != sizeof(r))
if (bufferevent_write(bev, &r, sizeof(r)) == -1)
return -1;
return 0;
}
static int
fcgi_send_param(int sock, int id, const char *name, const char *value)
fcgi_send_param(struct bufferevent *bev, int id, const char *name,
const char *value)
{
struct fcgi_header h;
uint32_t namlen, vallen, padlen;
@ -196,95 +197,44 @@ fcgi_send_param(int sock, int id, const char *name, const char *value)
prepare_header(&h, FCGI_PARAMS, id, size, padlen);
if (write(sock, &h, sizeof(h)) != sizeof(h) ||
write(sock, s, sizeof(s)) != sizeof(s) ||
write(sock, name, namlen) != namlen ||
write(sock, value, vallen) != vallen ||
write(sock, padding, padlen) != padlen)
if (bufferevent_write(bev, &h, sizeof(h)) == -1 ||
bufferevent_write(bev, s, sizeof(s)) == -1 ||
bufferevent_write(bev, name, namlen) == -1 ||
bufferevent_write(bev, value, vallen) == -1 ||
bufferevent_write(bev, padding, padlen) == -1)
return -1;
return 0;
}
static int
fcgi_end_param(int sock, int id)
fcgi_end_param(struct bufferevent *bev, int id)
{
struct fcgi_header h;
prepare_header(&h, FCGI_PARAMS, id, 0, 0);
if (write(sock, &h, sizeof(h)) != sizeof(h))
if (bufferevent_write(bev, &h, sizeof(h)) == -1)
return -1;
prepare_header(&h, FCGI_STDIN, id, 0, 0);
if (write(sock, &h, sizeof(h)) != sizeof(h))
if (bufferevent_write(bev, &h, sizeof(h)) == -1)
return -1;
return 0;
}
static int
fcgi_abort_request(int sock, int id)
fcgi_abort_request(struct bufferevent *bev, int id)
{
struct fcgi_header h;
prepare_header(&h, FCGI_ABORT_REQUEST, id, 0, 0);
if (write(sock, &h, sizeof(h)) != sizeof(h))
if (bufferevent_write(bev, &h, sizeof(h)) == -1)
return -1;
return 0;
}
static int
must_read(int sock, char *d, size_t len)
{
ssize_t r;
#if DEBUG_FCGI
if (debug_socket == -1) {
struct sockaddr_un addr;
if ((debug_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
err(1, "socket");
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strlcpy(addr.sun_path, "./debug.sock", sizeof(addr.sun_path));
if (connect(debug_socket, (struct sockaddr*)&addr, sizeof(addr))
== -1)
err(1, "connect");
}
#endif
for (;;) {
switch (r = read(sock, d, len)) {
case -1:
case 0:
return -1;
default:
#if DEBUG_FCGI
write(debug_socket, d, r);
#endif
if (r == (ssize_t)len)
return 0;
len -= r;
d += r;
}
}
}
static int
fcgi_read_header(int sock, struct fcgi_header *h)
{
if (must_read(sock, (char*)h, sizeof(*h)) == -1)
return -1;
if (h->version != FCGI_VERSION_1) {
errno = EINVAL;
return -1;
}
return 0;
}
static inline int
recid(struct fcgi_header *h)
{
@ -346,128 +296,160 @@ end:
c->next(0, 0, c);
}
static int
consume(int fd, size_t len)
{
size_t l;
char buf[64];
while (len != 0) {
if ((l = len) > sizeof(buf))
l = sizeof(buf);
if (must_read(fd, buf, l) == -1)
return 0;
len -= l;
}
return 1;
}
static void
close_all(struct fcgi *f)
{
size_t i;
struct client *c;
for (i = 0; i < MAX_USERS; i++) {
c = &clients[i];
if (c->fcgi != f->id)
continue;
if (c->code != 0)
close_conn(0, 0, c);
else
start_reply(c, CGI_ERROR, "CGI error");
}
fcgi_close_backend(f);
}
void
fcgi_close_backend(struct fcgi *f)
{
event_del(&f->e);
close(f->fd);
f->fd = -1;
f->pending = 0;
f->s = FCGI_OFF;
bufferevent_free(f->bev);
f->bev = NULL;
close(fcgi->fd);
fcgi->fd = -1;
fcgi->pending = 0;
fcgi->s = FCGI_OFF;
}
void
handle_fcgi(int sock, short event, void *d)
fcgi_read(struct bufferevent *bev, void *d)
{
struct fcgi *f = d;
struct fcgi_header h;
struct fcgi *fcgi = d;
struct evbuffer *src = EVBUFFER_INPUT(bev);
struct fcgi_header hdr;
struct fcgi_end_req_body end;
struct client *c;
struct mbuf *mbuf;
size_t len;
if (fcgi_read_header(sock, &h) == -1)
goto err;
#if DEBUG_FCGI
if (debug_socket == -1) {
struct sockaddr_un addr;
c = try_client_by_id(recid(&h));
if (c == NULL || c->fcgi != f->id)
goto err;
if ((debug_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
err(1, "socket");
len = reclen(&h);
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strlcpy(addr.sun_path, "./debug.sock", sizeof(addr.sun_path));
if (connect(debug_socket, (struct sockaddr*)&addr, sizeof(addr))
== -1)
err(1, "connect");
}
#endif
switch (h.type) {
case FCGI_END_REQUEST:
if (len != sizeof(end))
goto err;
if (must_read(sock, (char*)&end, sizeof(end)) == -1)
goto err;
/* TODO: do something with the status? */
for (;;) {
if (EVBUFFER_LENGTH(src) < sizeof(hdr))
return;
f->pending--;
c->fcgi = -1;
c->next = close_conn;
event_once(c->fd, EV_WRITE, &copy_mbuf, c, NULL);
break;
memcpy(&hdr, EVBUFFER_DATA(src), sizeof(hdr));
case FCGI_STDERR:
/* discard stderr (for now) */
if (!consume(sock, len))
goto err;
break;
case FCGI_STDOUT:
if ((mbuf = calloc(1, sizeof(*mbuf) + len)) == NULL)
fatal("calloc");
mbuf->len = len;
if (must_read(sock, mbuf->data, len) == -1) {
free(mbuf);
c = try_client_by_id(recid(&hdr));
if (c == NULL) {
log_err(NULL,
"got invalid client id from fcgi backend %d",
recid(&hdr));
goto err;
}
if (TAILQ_EMPTY(&c->mbufhead)) {
TAILQ_INSERT_HEAD(&c->mbufhead, mbuf, mbufs);
event_once(c->fd, EV_WRITE, &copy_mbuf, c, NULL);
} else
TAILQ_INSERT_TAIL(&c->mbufhead, mbuf, mbufs);
break;
len = reclen(&hdr);
default:
log_err(NULL, "got invalid fcgi record (type=%d)", h.type);
goto err;
if (EVBUFFER_LENGTH(src) < sizeof(hdr) + len + hdr.padding)
return;
#if DEBUG_FCGI
write(debug_soocket, EVBUFFER_DATA(src),
sizeof(hdr) + len + hdr.padding);
#endif
evbuffer_drain(src, sizeof(hdr));
switch (hdr.type) {
case FCGI_END_REQUEST:
if (len != sizeof(end)) {
log_err(NULL,
"got invalid end request record size");
goto err;
}
bufferevent_read(bev, &end, sizeof(end));
/* TODO: do something with the status? */
fcgi->pending--;
c->fcgi = -1;
c->next = close_conn;
event_once(c->fd, EV_WRITE, &copy_mbuf, c, NULL);
break;
case FCGI_STDERR:
/* discard stderr (for now) */
evbuffer_drain(src, len);
break;
case FCGI_STDOUT:
if ((mbuf = calloc(1, sizeof(*mbuf) + len)) == NULL)
fatal("calloc");
mbuf->len = len;
bufferevent_read(bev, mbuf->data, len);
if (TAILQ_EMPTY(&c->mbufhead))
event_once(c->fd, EV_WRITE, &copy_mbuf,
c, NULL);
TAILQ_INSERT_TAIL(&c->mbufhead, mbuf, mbufs);
break;
default:
log_err(NULL, "got invalid fcgi record (type=%d)",
hdr.type);
goto err;
}
evbuffer_drain(src, hdr.padding);
if (fcgi->pending == 0 && shutting_down) {
fcgi_error(bev, EVBUFFER_EOF, fcgi);
return;
}
}
if (!consume(sock, h.padding))
goto err;
if (f->pending == 0 && shutting_down)
fcgi_close_backend(f);
return;
err:
close_all(f);
fcgi_error(bev, EVBUFFER_ERROR, fcgi);
}
void
send_fcgi_req(struct fcgi *f, struct client *c)
fcgi_write(struct bufferevent *bev, void *d)
{
/*
* There's no much use for the write callback.
*/
return;
}
void
fcgi_error(struct bufferevent *bev, short err, void *d)
{
struct fcgi *fcgi = d;
struct client *c;
size_t i;
if (!(err & EVBUFFER_ERROR) ||
!(err & EVBUFFER_EOF))
log_warn(NULL, "unknown event error (%x)",
err);
for (i = 0; i < MAX_USERS; ++i) {
c = &clients[i];
if (c->fcgi != fcgi->id)
continue;
if (c->code != 0)
close_conn(0, 0, 0);
else
start_reply(c, CGI_ERROR, "CGI error");
}
fcgi_close_backend(fcgi);
}
void
fcgi_req(struct fcgi *f, struct client *c)
{
char addr[NI_MAXHOST], buf[22];
int e;
@ -486,50 +468,50 @@ send_fcgi_req(struct fcgi *f, struct client *c)
c->next = NULL;
fcgi_begin_request(f->fd, c->id);
fcgi_send_param(f->fd, c->id, "GATEWAY_INTERFACE", "CGI/1.1");
fcgi_send_param(f->fd, c->id, "GEMINI_URL_PATH", c->iri.path);
fcgi_send_param(f->fd, c->id, "QUERY_STRING", c->iri.query);
fcgi_send_param(f->fd, c->id, "REMOTE_ADDR", addr);
fcgi_send_param(f->fd, c->id, "REMOTE_HOST", addr);
fcgi_send_param(f->fd, c->id, "REQUEST_METHOD", "");
fcgi_send_param(f->fd, c->id, "SERVER_NAME", c->iri.host);
fcgi_send_param(f->fd, c->id, "SERVER_PROTOCOL", "GEMINI");
fcgi_send_param(f->fd, c->id, "SERVER_SOFTWARE", GMID_VERSION);
fcgi_begin_request(f->bev, c->id);
fcgi_send_param(f->bev, c->id, "GATEWAY_INTERFACE", "CGI/1.1");
fcgi_send_param(f->bev, c->id, "GEMINI_URL_PATH", c->iri.path);
fcgi_send_param(f->bev, c->id, "QUERY_STRING", c->iri.query);
fcgi_send_param(f->bev, c->id, "REMOTE_ADDR", addr);
fcgi_send_param(f->bev, c->id, "REMOTE_HOST", addr);
fcgi_send_param(f->bev, c->id, "REQUEST_METHOD", "");
fcgi_send_param(f->bev, c->id, "SERVER_NAME", c->iri.host);
fcgi_send_param(f->bev, c->id, "SERVER_PROTOCOL", "GEMINI");
fcgi_send_param(f->bev, c->id, "SERVER_SOFTWARE", GMID_VERSION);
if (tls_peer_cert_provided(c->ctx)) {
fcgi_send_param(f->fd, c->id, "AUTH_TYPE", "CERTIFICATE");
fcgi_send_param(f->fd, c->id, "REMOTE_USER",
fcgi_send_param(f->bev, c->id, "AUTH_TYPE", "CERTIFICATE");
fcgi_send_param(f->bev, c->id, "REMOTE_USER",
tls_peer_cert_subject(c->ctx));
fcgi_send_param(f->fd, c->id, "TLS_CLIENT_ISSUER",
fcgi_send_param(f->bev, c->id, "TLS_CLIENT_ISSUER",
tls_peer_cert_issuer(c->ctx));
fcgi_send_param(f->fd, c->id, "TLS_CLIENT_HASH",
fcgi_send_param(f->bev, c->id, "TLS_CLIENT_HASH",
tls_peer_cert_hash(c->ctx));
fcgi_send_param(f->fd, c->id, "TLS_VERSION",
fcgi_send_param(f->bev, c->id, "TLS_VERSION",
tls_conn_version(c->ctx));
fcgi_send_param(f->fd, c->id, "TLS_CIPHER",
fcgi_send_param(f->bev, c->id, "TLS_CIPHER",
tls_conn_cipher(c->ctx));
snprintf(buf, sizeof(buf), "%d",
tls_conn_cipher_strength(c->ctx));
fcgi_send_param(f->fd, c->id, "TLS_CIPHER_STRENGTH", buf);
fcgi_send_param(f->bev, c->id, "TLS_CIPHER_STRENGTH", buf);
tim = tls_peer_cert_notbefore(c->ctx);
strftime(buf, sizeof(buf), "%FT%TZ",
gmtime_r(&tim, &tminfo));
fcgi_send_param(f->fd, c->id, "TLS_CLIENT_NOT_BEFORE", buf);
fcgi_send_param(f->bev, c->id, "TLS_CLIENT_NOT_BEFORE", buf);
tim = tls_peer_cert_notafter(c->ctx);
strftime(buf, sizeof(buf), "%FT%TZ",
gmtime_r(&tim, &tminfo));
fcgi_send_param(f->fd, c->id, "TLS_CLIENT_NOT_AFTER", buf);
fcgi_send_param(f->bev, c->id, "TLS_CLIENT_NOT_AFTER", buf);
TAILQ_FOREACH(p, &c->host->params, envs) {
fcgi_send_param(f->fd, c->id, p->name, p->value);
fcgi_send_param(f->bev, c->id, p->name, p->value);
}
} else
fcgi_send_param(f->fd, c->id, "AUTH_TYPE", "");
fcgi_send_param(f->bev, c->id, "AUTH_TYPE", "");
if (fcgi_end_param(f->fd, c->id) == -1)
close_all(f);
if (fcgi_end_param(f->bev, c->id) == -1)
fcgi_error(f->bev, EVBUFFER_ERROR, f);
}

9
gmid.h
View File

@ -68,7 +68,8 @@ struct fcgi {
char *port;
char *prog;
int fd;
struct event e;
struct bufferevent *bev;
/* number of pending clients */
int pending;
@ -382,8 +383,10 @@ int executor_main(struct imsgbuf*);
/* fcgi.c */
void fcgi_close_backend(struct fcgi *);
void handle_fcgi(int, short, void*);
void send_fcgi_req(struct fcgi*, struct client*);
void fcgi_read(struct bufferevent *, void *);
void fcgi_write(struct bufferevent *, void *);
void fcgi_error(struct bufferevent *, short, void *);
void fcgi_req(struct fcgi *, struct client *);
/* sandbox.c */
void sandbox_server_process(void);

View File

@ -615,7 +615,7 @@ apply_fastcgi(struct client *c)
break;
case FCGI_READY:
c->fcgi = id;
send_fcgi_req(f, c);
fcgi_req(f, c);
break;
}
@ -1229,13 +1229,23 @@ handle_imsg_fcgi_fd(struct imsgbuf *ibuf, struct imsg *imsg, size_t len)
id = imsg->hdr.peerid;
f = &fcgi[id];
if ((f->fd = imsg->fd) != -1) {
f->s = FCGI_READY;
event_set(&f->e, imsg->fd, EV_READ | EV_PERSIST, &handle_fcgi,
&fcgi[id]);
event_add(&f->e, NULL);
} else {
if ((f->fd = imsg->fd) == -1)
f->s = FCGI_OFF;
else {
mark_nonblock(f->fd);
f->s = FCGI_READY;
f->bev = bufferevent_new(f->fd, fcgi_read, fcgi_write,
fcgi_error, f);
if (f->bev == NULL) {
close(f->fd);
log_err(NULL, "%s: failed to allocate client buffer",
__func__);
f->s = FCGI_OFF;
}
bufferevent_enable(f->bev, EV_READ|EV_WRITE);
}
for (i = 0; i < MAX_USERS; ++i) {
@ -1245,11 +1255,11 @@ handle_imsg_fcgi_fd(struct imsgbuf *ibuf, struct imsg *imsg, size_t len)
if (c->fcgi != id)
continue;
if (f->fd == -1) {
if (f->s == FCGI_OFF) {
c->fcgi = -1;
start_reply(c, TEMP_FAILURE, "internal server error");
} else
send_fcgi_req(f, c);
fcgi_req(f, c);
}
}
@ -1280,8 +1290,7 @@ handle_imsg_quit(struct imsgbuf *ibuf, struct imsg *imsg, size_t len)
if (fcgi[i].path == NULL && fcgi[i].prog == NULL)
break;
if (!event_pending(&fcgi[i].e, EV_READ, NULL) ||
fcgi[i].pending != 0)
if (fcgi[i].bev == NULL || fcgi[i].pending != 0)
continue;
fcgi_close_backend(&fcgi[i]);