/*	$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $	*/

/*-
 * Copyright (c)2010,2011 YAMAMOTO Takashi,
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */

/*
 * backend db operations
 */

#include <sys/cdefs.h>
#ifndef lint
__RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $");
#endif /* not lint */

#include <assert.h>
#include <err.h>
#include <errno.h>
#include <inttypes.h>
#include <puffs.h>
#include <stdbool.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <util.h>

#include <libpq-fe.h>

#include "pgfs_db.h"
#include "pgfs_waitq.h"
#include "pgfs_debug.h"

bool pgfs_dosync = false;

struct Xconn {
	TAILQ_ENTRY(Xconn) list;
	PGconn *conn;
	struct puffs_cc *blocker;
	struct puffs_cc *owner;
	bool in_trans;
	int id;
	const char *label;
};

static void
dumperror(struct Xconn *xc, const PGresult *res)
{
	static const struct {
		const char *name;
		int code;
	} fields[] = {
#define F(x)	{ .name = #x, .code = x, }
		F(PG_DIAG_SEVERITY),
		F(PG_DIAG_SQLSTATE),
		F(PG_DIAG_MESSAGE_PRIMARY),
		F(PG_DIAG_MESSAGE_DETAIL),
		F(PG_DIAG_MESSAGE_HINT),
		F(PG_DIAG_STATEMENT_POSITION),
		F(PG_DIAG_INTERNAL_POSITION),
		F(PG_DIAG_INTERNAL_QUERY),
		F(PG_DIAG_CONTEXT),
		F(PG_DIAG_SOURCE_FILE),
		F(PG_DIAG_SOURCE_LINE),
		F(PG_DIAG_SOURCE_FUNCTION),
#undef F
	};
	unsigned int i;

	if (!pgfs_dodprintf) {
		return;
	}
	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
	    PQresultStatus(res) == PGRES_FATAL_ERROR);
	for (i = 0; i < __arraycount(fields); i++) {
		const char *val = PQresultErrorField(res, fields[i].code);

		if (val == NULL) {
			continue;
		}
		fprintf(stderr, "%s: %s\n", fields[i].name, val);
	}
}

TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);

static struct Xconn *
getxc(struct puffs_cc *cc)
{
	struct Xconn *xc;

	assert(cc != NULL);
retry:
	TAILQ_FOREACH(xc, &xclist, list) {
		if (xc->blocker == NULL) {
			assert(xc->owner == NULL);
			xc->owner = cc;
			DPRINTF("xc %p acquire %p\n", xc, cc);
			return xc;
		} else {
			assert(xc->owner == xc->blocker);
		}
	}
	DPRINTF("no free conn %p\n", cc);
	waiton(&xcwaitq, cc);
	goto retry;
}

static void
relxc(struct Xconn *xc)
{

	assert(xc->in_trans);
	assert(xc->owner != NULL);
	xc->in_trans = false;
	xc->owner = NULL;
	wakeup_one(&xcwaitq);
}

static void
pqwait(struct Xconn *xc)
{
	PGconn *conn = xc->conn;
	struct puffs_cc *cc = xc->owner;

	if (PQflush(conn)) {
		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
	}
	if (!PQisBusy(conn)) {
		return;
	}
	assert(xc->blocker == NULL);
	xc->blocker = cc;
	DPRINTF("yielding %p\n", cc);
	/* XXX is it safe to yield before entering mainloop? */
	puffs_cc_yield(cc);
	DPRINTF("yield returned %p\n", cc);
	assert(xc->owner == cc);
	assert(xc->blocker == cc);
	xc->blocker = NULL;
}

static int
sqltoerrno(const char *sqlstate)
{
	/*
	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
	 *
	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
	 */
	static const struct {
		char sqlstate[5];
		int error;
	} map[] = {
		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
	};
	unsigned int i;

	for (i = 0; i < __arraycount(map); i++) {
		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
			const int error = map[i].error;

			if (error != 0) {
				DPRINTF("sqlstate %5s mapped to error %d\n",
				    sqlstate, error);
			}
			if (error == EINVAL) {
				/*
				 * sounds like a bug.
				 */
				abort();
			}
			return error;
		}
	}
	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
	return EIO;
}

struct cmd {
	char name[32];		/* name of prepared statement */
	char *cmd;		/* query string */
	unsigned int nparams;
	Oid *paramtypes;
	uint32_t prepared_mask;	/* for which connections this is prepared? */
	unsigned int flags;	/* CMD_ flags */
};

#define	CMD_NOPREPARE	1	/* don't prepare this command */

struct cmd *
createcmd(const char *cmd, unsigned int flags, ...)
{
	struct cmd *c;
	va_list ap;
	const char *cp;
	unsigned int i;
	static unsigned int cmdid;

	c = emalloc(sizeof(*c));
	c->cmd = estrdup(cmd);
	c->nparams = 0;
	va_start(ap, flags);
	for (cp = cmd; *cp != 0; cp++) {
		if (*cp == '$') { /* XXX */
			c->nparams++;
		}
	}
	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
	for (i = 0; i < c->nparams; i++) {
		Oid type = va_arg(ap, Oid);
		assert(type == BYTEA ||
		    type == INT4OID || type == INT8OID || type == OIDOID ||
		    type == TEXTOID || type == TIMESTAMPTZOID);
		c->paramtypes[i] = type;
	}
	va_end(ap);
	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
	if ((flags & CMD_NOPREPARE) != 0) {
		c->prepared_mask = ~0;
	} else {
		c->prepared_mask = 0;
	}
	c->flags = flags;
	return c;
}

static void
freecmd(struct cmd *c)
{

	free(c->paramtypes);
	free(c->cmd);
	free(c);
}

static int
fetch_noresult(struct Xconn *xc)
{
	PGresult *res;
	ExecStatusType status;
	PGconn *conn = xc->conn;
	int error;

	pqwait(xc);
	res = PQgetResult(conn);
	if (res == NULL) {
		return ENOENT;
	}
	status = PQresultStatus(res);
	if (status == PGRES_COMMAND_OK) {
		assert(PQnfields(res) == 0);
		assert(PQntuples(res) == 0);
		if (!strcmp(PQcmdTuples(res), "0")) {
			error = ENOENT;
		} else {
			error = 0;
		}
	} else if (status == PGRES_FATAL_ERROR) {
		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
		assert(error != 0);
		dumperror(xc, res);
	} else {
		errx(1, "%s not command_ok: %d: %s", __func__,
		    (int)status,
		    PQerrorMessage(conn));
	}
	PQclear(res);
	res = PQgetResult(conn);
	assert(res == NULL);
	if (error != 0) {
		DPRINTF("error %d\n", error);
	}
	return error;
}

static int
preparecmd(struct Xconn *xc, struct cmd *c)
{
	PGconn *conn = xc->conn;
	const uint32_t mask = 1 << xc->id;
	int error;
	int ret;

	if ((c->prepared_mask & mask) != 0) {
		return 0;
	}
	assert((c->flags & CMD_NOPREPARE) == 0);
	DPRINTF("PREPARE: '%s'\n", c->cmd);
	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
	if (!ret) {
		errx(EXIT_FAILURE, "PQsendPrepare: %s",
		    PQerrorMessage(conn));
	}
	error = fetch_noresult(xc);
	if (error != 0) {
		return error;
	}
	c->prepared_mask |= mask;
	return 0;
}

/*
 * vsendcmd:
 *
 * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
 * 0 for text and 1 for binary.
 */

static int
vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
{
	PGconn *conn = xc->conn;
	char **paramvalues;
	int *paramlengths;
	int *paramformats;
	unsigned int i;
	int error;
	int ret;

	assert(xc->owner != NULL);
	assert(xc->blocker == NULL);
	error = preparecmd(xc, c);
	if (error != 0) {
		return error;
	}
	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
	paramlengths = NULL;
	paramformats = NULL;
	DPRINTF("CMD: '%s'\n", c->cmd);
	for (i = 0; i < c->nparams; i++) {
		Oid type = c->paramtypes[i];
		char tmpstore[1024];
		const char *buf = NULL;
		intmax_t v = 0; /* XXXgcc */
		int sz;
		bool binary = false;

		switch (type) {
		case BYTEA:
			buf = va_arg(ap, const void *);
			sz = (int)va_arg(ap, size_t);
			binary = true;
			break;
		case INT8OID:
		case OIDOID:
		case INT4OID:
			switch (type) {
			case INT8OID:
				v = (intmax_t)va_arg(ap, int64_t);
				break;
			case OIDOID:
				v = (intmax_t)va_arg(ap, Oid);
				break;
			case INT4OID:
				v = (intmax_t)va_arg(ap, int32_t);
				break;
			default:
				errx(EXIT_FAILURE, "unknown integer oid %u",
				    type);
			}
			buf = tmpstore;
			sz = snprintf(tmpstore, sizeof(tmpstore),
			    "%jd", v);
			assert(sz != -1);
			assert((size_t)sz < sizeof(tmpstore));
			sz += 1;
			break;
		case TEXTOID:
		case TIMESTAMPTZOID:
			buf = va_arg(ap, char *);
			sz = strlen(buf) + 1;
			break;
		default:
			errx(EXIT_FAILURE, "%s: unknown param type %u",
			    __func__, type);
		}
		if (binary) {
			if (paramlengths == NULL) {
				paramlengths =
				    emalloc(c->nparams * sizeof(*paramformats));
			}
			if (paramformats == NULL) {
				paramformats = ecalloc(1,
				    c->nparams * sizeof(*paramformats));
			}
			paramformats[i] = 1;
			paramlengths[i] = sz;
		}
		paramvalues[i] = emalloc(sz);
		memcpy(paramvalues[i], buf, sz);
		if (binary) {
			DPRINTF("\t[%u]=<BINARY>\n", i);
		} else {
			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
		}
	}
	if ((c->flags & CMD_NOPREPARE) != 0) {
		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
		    (const char * const *)paramvalues, paramlengths,
		    paramformats, resultmode);
	} else {
		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
		    (const char * const *)paramvalues, paramlengths,
		    paramformats, resultmode);
	}
	for (i = 0; i < c->nparams; i++) {
		free(paramvalues[i]);
	}
	free(paramvalues);
	free(paramlengths);
	free(paramformats);
	if (!ret) {
		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
		    PQerrorMessage(conn));
	}
	return 0;
}

int
sendcmd(struct Xconn *xc, struct cmd *c, ...)
{
	va_list ap;
	int error;

	va_start(ap, c);
	error = vsendcmd(xc, 0, c, ap);
	va_end(ap);
	return error;
}

int
sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
{
	va_list ap;
	int error;

	va_start(ap, c);
	error = vsendcmd(xc, resultmode, c, ap);
	va_end(ap);
	return error;
}

/*
 * simplecmd: a convenient routine to execute a command which returns
 * no rows synchronously.
 */

int
simplecmd(struct Xconn *xc, struct cmd *c, ...)
{
	va_list ap;
	int error;

	va_start(ap, c);
	error = vsendcmd(xc, 0, c, ap);
	va_end(ap);
	if (error != 0) {
		return error;
	}
	return fetch_noresult(xc);
}

void
fetchinit(struct fetchstatus *s, struct Xconn *xc)
{
	s->xc = xc;
	s->res = NULL;
	s->cur = 0;
	s->nrows = 0;
	s->done = false;
}

static intmax_t
getint(const char *str)
{
	intmax_t i;
	char *ep;

	errno = 0;
	i = strtoimax(str, &ep, 10);
	assert(errno == 0);
	assert(str[0] != 0);
	assert(*ep == 0);
	return i;
}

static int
vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
{
	PGconn *conn = s->xc->conn;
	unsigned int i;

	assert(conn != NULL);
	if (s->res == NULL) {
		ExecStatusType status;
		int error;

		pqwait(s->xc);
		s->res = PQgetResult(conn);
		if (s->res == NULL) {
			s->done = true;
			return ENOENT;
		}
		status = PQresultStatus(s->res);
		if (status == PGRES_FATAL_ERROR) {
			error = sqltoerrno(
			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
			assert(error != 0);
			dumperror(s->xc, s->res);
			return error;
		}
		if (status != PGRES_TUPLES_OK) {
			errx(1, "not tuples_ok: %s",
			    PQerrorMessage(conn));
		}
		assert((unsigned int)PQnfields(s->res) == n);
		s->nrows = PQntuples(s->res);
		if (s->nrows == 0) {
			DPRINTF("no rows\n");
			return ENOENT;
		}
		assert(s->nrows >= 1);
		s->cur = 0;
	}
	for (i = 0; i < n; i++) {
		size_t size;

		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
		    i, PQftype(s->res, i), types[i],
		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
		    "<BINARY>");
		assert(PQftype(s->res, i) == types[i]);
		assert(!PQgetisnull(s->res, s->cur, i));
		switch(types[i]) {
		case INT8OID:
			*va_arg(ap, int64_t *) =
			    getint(PQgetvalue(s->res, s->cur, i));
			break;
		case OIDOID:
			*va_arg(ap, Oid *) =
			    getint(PQgetvalue(s->res, s->cur, i));
			break;
		case INT4OID:
			*va_arg(ap, int32_t *) =
			    getint(PQgetvalue(s->res, s->cur, i));
			break;
		case TEXTOID:
			*va_arg(ap, char **) =
			    estrdup(PQgetvalue(s->res, s->cur, i));
			break;
		case BYTEA:
			size = PQgetlength(s->res, s->cur, i);
			memcpy(va_arg(ap, void *),
			    PQgetvalue(s->res, s->cur, i), size);
			*va_arg(ap, size_t *) = size;
			break;
		default:
			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
			    types[i]);
		}
	}
	s->cur++;
	if (s->cur == s->nrows) {
		PQclear(s->res);
		s->res = NULL;
	}
	return 0;
}

int
fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
{
	va_list ap;
	int error;

	va_start(ap, types);
	error = vfetchnext(s, n, types, ap);
	va_end(ap);
	return error;
}

void
fetchdone(struct fetchstatus *s)
{

	if (s->res != NULL) {
		PQclear(s->res);
		s->res = NULL;
	}
	if (!s->done) {
		PGresult *res;
		unsigned int n;

		n = 0;
		while ((res = PQgetResult(s->xc->conn)) != NULL) {
			PQclear(res);
			n++;
		}
		if (n > 0) {
			DPRINTF("%u rows dropped\n", n);
		}
	}
}

int
simplefetch(struct Xconn *xc, Oid type, ...)
{
	struct fetchstatus s;
	va_list ap;
	int error;

	fetchinit(&s, xc);
	va_start(ap, type);
	error = vfetchnext(&s, 1, &type, ap);
	va_end(ap);
	assert(error != 0 || s.res == NULL);
	fetchdone(&s);
	return error;
}

/*
 * setlabel: set the descriptive label for the connection.
 *
 * we use simple pointer comparison for label equality check.
 */
static void
setlabel(struct Xconn *xc, const char *label)
{
	int error;

	/*
	 * put the label into application_name so that it's shown in
	 * pg_stat_activity.  we are sure that our labels don't need
	 * PQescapeStringConn.
	 *
	 * example:
	 *	SELECT pid,application_name,query FROM pg_stat_activity
	 *	WHERE state <> 'idle'
	 */

	if (label != NULL && label != xc->label) {
		struct cmd *c;
		char cmd_str[1024];

		snprintf(cmd_str, sizeof(cmd_str),
		    "SET application_name TO 'pgfs: %s'", label);
		c = createcmd(cmd_str, CMD_NOPREPARE);
		error = simplecmd(xc, c);
		freecmd(c);
		assert(error == 0);
		xc->label = label;
	} else {
#if 0 /* don't bother to clear label */
		static struct cmd *c;

		CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
		error = simplecmd(xc, c);
		assert(error == 0);
#endif
	}
}

struct Xconn *
begin(struct puffs_usermount *pu, const char *label)
{
	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
	static struct cmd *c;
	int error;

	setlabel(xc, label);
	CREATECMD_NOPARAM(c, "BEGIN");
	assert(!xc->in_trans);
	error = simplecmd(xc, c);
	assert(error == 0);
	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
	xc->in_trans = true;
	return xc;
}

struct Xconn *
begin_readonly(struct puffs_usermount *pu, const char *label)
{
	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
	static struct cmd *c;
	int error;

	setlabel(xc, label);
	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
	assert(!xc->in_trans);
	error = simplecmd(xc, c);
	assert(error == 0);
	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
	xc->in_trans = true;
	return xc;
}

void
rollback(struct Xconn *xc)
{
	PGTransactionStatusType status;

	/*
	 * check the status as we are not sure the status of our transaction
	 * after a failed commit.
	 */
	status = PQtransactionStatus(xc->conn);
	assert(status != PQTRANS_ACTIVE);
	assert(status != PQTRANS_UNKNOWN);
	if (status != PQTRANS_IDLE) {
		static struct cmd *c;
		int error;

		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
		CREATECMD_NOPARAM(c, "ROLLBACK");
		error = simplecmd(xc, c);
		assert(error == 0);
	}
	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
	setlabel(xc, NULL);
	relxc(xc);
}

int
commit(struct Xconn *xc)
{
	static struct cmd *c;
	int error;

	CREATECMD_NOPARAM(c, "COMMIT");
	error = simplecmd(xc, c);
	setlabel(xc, NULL);
	if (error == 0) {
		DPRINTF("xc %p commit %p\n", xc, xc->owner);
		relxc(xc);
	}
	return error;
}

int
commit_sync(struct Xconn *xc)
{
	static struct cmd *c;
	int error;

	assert(!pgfs_dosync);
	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
	error = simplecmd(xc, c);
	assert(error == 0);
	return commit(xc);
}

static void
pgfs_notice_receiver(void *vp, const PGresult *res)
{
	struct Xconn *xc = vp;

	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
	fprintf(stderr, "got a notice on %p\n", xc);
	dumperror(xc, res);
}

static int
pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
    int fd, int *done)
{
	struct Xconn *xc;
	PGconn *conn;

	TAILQ_FOREACH(xc, &xclist, list) {
		if (PQsocket(xc->conn) == fd) {
			break;
		}
	}
	assert(xc != NULL);
	conn = xc->conn;
	PQconsumeInput(conn);
	if (!PQisBusy(conn)) {
		if (xc->blocker != NULL) {
			DPRINTF("schedule %p\n", xc->blocker);
			puffs_cc_schedule(xc->blocker);
		} else {
			DPRINTF("no blockers\n");
		}
	}
	*done = 0;
	return 0;
}

int
pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
    const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
{
	const char *keywords[3+1];
	const char *values[3];
	unsigned int i;

	if (nconn > 32) {
		/*
		 * limit from sizeof(cmd->prepared_mask)
		 */
		return EINVAL;
	}
	if (debug) {
		pgfs_dodprintf = true;
	}
	if (synchronous) {
		pgfs_dosync = true;
	}
	i = 0;
	if (dbname != NULL) {
		keywords[i] = "dbname";
		values[i] = dbname;
		i++;
	}
	if (dbuser != NULL) {
		keywords[i] = "user";
		values[i] = dbuser;
		i++;
	}
	keywords[i] = "application_name";
	values[i] = "pgfs";
	i++;
	keywords[i] = NULL;
	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
	for (i = 0; i < nconn; i++) {
		struct Xconn *xc;
		struct Xconn *xc2;
		static int xcid;
		PGconn *conn;
		struct cmd *c;
		int error;

		conn = PQconnectdbParams(keywords, values, 0);
		if (conn == NULL) {
			errx(EXIT_FAILURE,
			    "PQconnectdbParams: unknown failure");
		}
		if (PQstatus(conn) != CONNECTION_OK) {
			/*
			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
			 */
			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
			    PQerrorMessage(conn));
		}
		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
		xc = emalloc(sizeof(*xc));
		xc->conn = conn;
		xc->blocker = NULL;
		xc->owner = NULL;
		xc->in_trans = false;
		xc->id = xcid++;
		xc->label = NULL;
		assert(xc->id < 32);
		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
		TAILQ_INSERT_HEAD(&xclist, xc, list);
		xc2 = begin(pu, NULL);
		assert(xc2 == xc);
		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
		error = simplecmd(xc, c);
		assert(error == 0);
		freecmd(c);
		c = createcmd("SET SESSION CHARACTERISTICS AS "
		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
		    CMD_NOPREPARE);
		error = simplecmd(xc, c);
		assert(error == 0);
		freecmd(c);
		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
		error = simplecmd(xc, c);
		assert(error == 0);
		freecmd(c);
		if (!pgfs_dosync) {
			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
			    CMD_NOPREPARE);
			error = simplecmd(xc, c);
			assert(error == 0);
			freecmd(c);
		}
		if (debug) {
			struct fetchstatus s;
			static const Oid types[] = { INT8OID, };
			uint64_t pid;

			c = createcmd("SELECT pg_backend_pid()::int8;",
			    CMD_NOPREPARE);
			error = sendcmd(xc, c);
			assert(error == 0);
			fetchinit(&s, xc);
			error = FETCHNEXT(&s, types, &pid);
			fetchdone(&s);
			assert(error == 0);
			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
		}
		error = commit(xc);
		assert(error == 0);
		assert(xc->owner == NULL);
	}
	/*
	 * XXX cleanup unlinked files here?  what to do when the filesystem
	 * is shared?
	 */
	return 0;
}

struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
struct puffs_cc *flusher = NULL;

int
flush_xacts(struct puffs_usermount *pu)
{
	struct puffs_cc *cc = puffs_cc_getcc(pu);
	struct Xconn *xc;
	static struct cmd *c;
	uint64_t dummy;
	int error;

	/*
	 * flush all previously issued asynchronous transactions.
	 *
	 * XXX
	 * unfortunately it seems that there is no clean way to tell
	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
	 * too expensive and overkill for our purpose.
	 * besides, PostgreSQL has an optimization to skip XLOG flushing
	 * for transactions which didn't produce WAL records.
	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
	 * doesn't produce any WAL records, doesn't flush the XLOG even if
	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
	 * optimization.
	 * on the other hand, we try to avoid creating unnecessary WAL activity
	 * by serializing flushing and checking XLOG locations.
	 */

	assert(!pgfs_dosync);
	if (flusher != NULL) { /* serialize flushers */
		DPRINTF("%p flush in progress %p\n", cc, flusher);
		waiton(&flushwaitq, cc);
		assert(flusher == NULL);
	}
	DPRINTF("%p start flushing\n", cc);
	flusher = cc;
retry:
	xc = begin(pu, "flush");
	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
	error = sendcmd(xc, c);
	if (error != 0) {
		goto got_error;
	}
	error = simplefetch(xc, INT8OID, &dummy);
	assert(error != 0 || dummy == 1);
	if (error == ENOENT) {
		/*
		 * there seems to be nothing to flush.
		 */
		DPRINTF("%p no sync\n", cc);
		error = 0;
	}
	if (error != 0) {
		goto got_error;
	}
	error = commit_sync(xc);
	if (error != 0) {
		goto got_error;
	}
	goto done;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
done:
	assert(flusher == cc);
	flusher = NULL;
	wakeup_one(&flushwaitq);
	DPRINTF("%p end flushing error=%d\n", cc, error);
	return error;
}