/*	$NetBSD: pgfs_puffs.c,v 1.5 2014/10/18 07:11:07 snj Exp $	*/

/*-
 * Copyright (c)2010,2011 YAMAMOTO Takashi,
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */

/*
 * puffs node ops and fs ops.
 */

#include <sys/cdefs.h>
#ifndef lint
__RCSID("$NetBSD: pgfs_puffs.c,v 1.5 2014/10/18 07:11:07 snj Exp $");
#endif /* not lint */

#include <assert.h>
#include <err.h>
#include <errno.h>
#include <puffs.h>
#include <inttypes.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <util.h>

#include <libpq-fe.h>
#include <libpq/libpq-fs.h>	/* INV_* */

#include "pgfs.h"
#include "pgfs_db.h"
#include "pgfs_subs.h"
#include "pgfs_debug.h"

static fileid_t
cookie_to_fileid(puffs_cookie_t cookie)
{

	return (fileid_t)(uintptr_t)cookie;
}

static puffs_cookie_t
fileid_to_cookie(fileid_t id)
{
	puffs_cookie_t cookie = (puffs_cookie_t)(uintptr_t)id;

	/* XXX not true for 32-bit ports */
	assert(cookie_to_fileid(cookie) == id);
	return cookie;
}

puffs_cookie_t
pgfs_root_cookie(void)
{

	return fileid_to_cookie(PGFS_ROOT_FILEID);
}

int
pgfs_node_getattr(struct puffs_usermount *pu, puffs_cookie_t opc,
    struct vattr *va, const struct puffs_cred *pcr)
{
	struct Xconn *xc;
	struct fileid_lock_handle *lock;
	fileid_t fileid = cookie_to_fileid(opc);
	int error;

	DPRINTF("%llu\n", fileid);
	lock = fileid_lock(fileid, puffs_cc_getcc(pu));
retry:
	xc = begin_readonly(pu, "getattr");
	error = getattr(xc, fileid, va, GETATTR_ALL);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	goto done;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
done:
	fileid_unlock(lock);
	return error;
}

#define	PGFS_DIRCOOKIE_DOT	0	/* . entry */
#define	PGFS_DIRCOOKIE_DOTDOT	1	/* .. entry */
#define	PGFS_DIRCOOKIE_EOD	2	/* end of directory */

int
pgfs_node_readdir(struct puffs_usermount *pu, puffs_cookie_t opc,
    struct dirent *dent, off_t *readoff, size_t *reslen,
    const struct puffs_cred *pcr, int *eofflag, off_t *cookies,
    size_t *ncookies)
{
	fileid_t parent_fileid;
	fileid_t child_fileid;
	uint64_t cookie;
	uint64_t nextcookie;
	uint64_t offset;
	struct Xconn *xc = NULL;
	static const Oid types[] = {
		TEXTOID,	/* name */
		INT8OID,	/* cookie */
		INT8OID,	/* nextcookie */
		INT8OID,	/* child_fileid */
	};
	const char *name;
	char *nametofree = NULL;
	struct fetchstatus s;
	int error;
	bool fetching;
	bool bufferfull;

	parent_fileid = cookie_to_fileid(opc);
	offset = *readoff;
	DPRINTF("%llu %" PRIu64 "\n", parent_fileid, offset);
	*ncookies = 0;
	fetching = false;
next:
	if (offset == PGFS_DIRCOOKIE_DOT) {
		name = ".";
		child_fileid = parent_fileid;
		cookie = offset;
		nextcookie = PGFS_DIRCOOKIE_DOTDOT;
		goto store_and_next;
	}
	if (offset == PGFS_DIRCOOKIE_DOTDOT) {
		if (parent_fileid != PGFS_ROOT_FILEID) {
			if (xc == NULL) {
				xc = begin(pu, "readdir1");
			}
			error = lookupp(xc, parent_fileid, &child_fileid);
			if (error != 0) {
				rollback(xc);
				return error;
			}
		} else {
			child_fileid = parent_fileid;
		}
		name = "..";
		cookie = offset;
		nextcookie = PGFS_DIRCOOKIE_EOD + 1;
		goto store_and_next;
	}
	if (offset == PGFS_DIRCOOKIE_EOD) {
		*eofflag = 1;
		goto done;
	}
	/* offset > PGFS_DIRCOOKIE_EOD; normal entries */
	if (xc == NULL) {
		xc = begin(pu, "readdir2");
	}
	if (!fetching) {
		static struct cmd *c;

		/*
		 * a simpler query like "ORDER BY name OFFSET :offset - 3"
		 * would work well for most of cases.  however, it doesn't for
		 * applications which expect readdir cookies are kept valid
		 * even after unlink of other entries in the directory.
		 * eg. cvs, bonnie++
		 *
		 * 2::int8 == PGFS_DIRCOOKIE_EOD
		 */
		CREATECMD(c,
			"SELECT name, cookie, "
			"lead(cookie, 1, 2::int8) OVER (ORDER BY cookie), "
			"child_fileid "
			"FROM dirent "
			"WHERE parent_fileid = $1 "
			"AND cookie >= $2 "
			"ORDER BY cookie", INT8OID, INT8OID);
		error = sendcmd(xc, c, parent_fileid, offset);
		if (error != 0) {
			rollback(xc);
			return error;
		}
		fetching = true;
		fetchinit(&s, xc);
	}
	/*
	 * fetch and process an entry
	 */
	error = FETCHNEXT(&s, types, &nametofree, &cookie, &nextcookie,
	    &child_fileid);
	if (error == ENOENT) {
		DPRINTF("ENOENT\n");
		if (offset == PGFS_DIRCOOKIE_EOD + 1) {
			DPRINTF("empty directory\n");
			*eofflag = 1;
			goto done;
		}
		fetchdone(&s);
		rollback(xc);
		return EINVAL;
	}
	if (error != 0) {
		DPRINTF("error %d\n", error);
		fetchdone(&s);
		rollback(xc);
		return error;
	}
	if (offset != cookie && offset != PGFS_DIRCOOKIE_EOD + 1) {
		free(nametofree);
		fetchdone(&s);
		rollback(xc);
		return EINVAL;
	}
	name = nametofree;
store_and_next:
	/*
	 * store an entry and continue processing unless the result buffer
	 * is full.
	 */
	bufferfull = !puffs_nextdent(&dent, name, child_fileid, DT_UNKNOWN,
	    reslen);
	free(nametofree);
	nametofree = NULL;
	if (bufferfull) {
		*eofflag = 0;
		goto done;
	}
	PUFFS_STORE_DCOOKIE(cookies, ncookies, cookie);
	offset = nextcookie;
	*readoff = offset;
	goto next;
done:
	/*
	 * cleanup and update atime of the directory.
	 */
	assert(nametofree == NULL);
	if (fetching) {
		fetchdone(&s);
		fetching = false;
	}
	if (xc == NULL) {
retry:
		xc = begin(pu, "readdir3");
	}
	error = update_atime(xc, parent_fileid);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_lookup(struct puffs_usermount *pu, puffs_cookie_t opc,
    struct puffs_newinfo *pni, const struct puffs_cn *pcn)
{
	struct vattr dva;
	struct vattr cva;
	struct puffs_cred * const pcr = pcn->pcn_cred;
	fileid_t parent_fileid;
	const char *name;
	fileid_t child_fileid;
	struct Xconn *xc;
	mode_t access_mode;
	int error;
	int saved_error;

	parent_fileid = cookie_to_fileid(opc);
	name = pcn->pcn_name;
	DPRINTF("%llu %s\n", parent_fileid, name);
	assert(strcmp(name, ".")); /* . is handled by framework */
retry:
	xc = begin_readonly(pu, "lookup");
	error = getattr(xc, parent_fileid, &dva,
	    GETATTR_TYPE|GETATTR_MODE|GETATTR_UID|GETATTR_GID);
	if (error != 0) {
		goto got_error;
	}
	access_mode = PUFFS_VEXEC;
	if ((pcn->pcn_flags & NAMEI_ISLASTCN) != 0 &&
	    pcn->pcn_nameiop != NAMEI_LOOKUP) {
		access_mode |= PUFFS_VWRITE;
	}
	error = puffs_access(dva.va_type, dva.va_mode, dva.va_uid, dva.va_gid,
	    access_mode, pcr);
	if (error != 0) {
		goto commit_and_return;
	}
	if (!strcmp(name, "..")) {
		error = lookupp(xc, parent_fileid, &child_fileid);
		if (error != 0) {
			goto got_error;
		}
	} else {
		static struct cmd *c;
		static const Oid types[] = { INT8OID, };
		struct fetchstatus s;

		CREATECMD(c, "SELECT child_fileid "
			"FROM dirent "
			"WHERE parent_fileid = $1 AND name = $2",
			INT8OID, TEXTOID);
		error = sendcmd(xc, c, parent_fileid, name);
		if (error != 0) {
			DPRINTF("sendcmd %d\n", error);
			goto got_error;
		}
		fetchinit(&s, xc);
		error = FETCHNEXT(&s, types, &child_fileid);
		fetchdone(&s);
		if (error == ENOENT) {
			goto commit_and_return;
		}
		if (error != 0) {
			goto got_error;
		}
	}
	error = getattr(xc, child_fileid, &cva, GETATTR_TYPE|GETATTR_SIZE);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	puffs_newinfo_setcookie(pni, fileid_to_cookie(child_fileid));
	puffs_newinfo_setvtype(pni, cva.va_type);
	puffs_newinfo_setsize(pni, cva.va_size);
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
commit_and_return:
	saved_error = error;
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	return saved_error;
}

int
pgfs_node_mkdir(struct puffs_usermount *pu, puffs_cookie_t opc,
    struct puffs_newinfo *pni, const struct puffs_cn *pcn,
    const struct vattr *va)
{
	struct Xconn *xc;
	fileid_t parent_fileid = cookie_to_fileid(opc);
	fileid_t new_fileid;
	struct puffs_cred * const pcr = pcn->pcn_cred;
	uid_t uid;
	gid_t gid;
	int error;

	DPRINTF("%llu %s\n", parent_fileid, pcn->pcn_name);
	if (puffs_cred_getuid(pcr, &uid) == -1 ||
	    puffs_cred_getgid(pcr, &gid) == -1) {
		return errno;
	}
retry:
	xc = begin(pu, "mkdir");
	error = mklinkfile(xc, parent_fileid, pcn->pcn_name, VDIR,
	    va->va_mode, uid, gid, &new_fileid);
	if (error == 0) {
		error = update_nlink(xc, parent_fileid, 1);
	}
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	puffs_newinfo_setcookie(pni, fileid_to_cookie(new_fileid));
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_create(struct puffs_usermount *pu, puffs_cookie_t opc,
    struct puffs_newinfo *pni, const struct puffs_cn *pcn,
    const struct vattr *va)
{
	struct Xconn *xc;
	fileid_t parent_fileid = cookie_to_fileid(opc);
	fileid_t new_fileid;
	struct puffs_cred * const pcr = pcn->pcn_cred;
	uid_t uid;
	gid_t gid;
	int error;

	DPRINTF("%llu %s\n", parent_fileid, pcn->pcn_name);
	if (puffs_cred_getuid(pcr, &uid) == -1 ||
	    puffs_cred_getgid(pcr, &gid) == -1) {
		return errno;
	}
retry:
	xc = begin(pu, "create");
	error = mklinkfile_lo(xc, parent_fileid, pcn->pcn_name, VREG,
	    va->va_mode,
	    uid, gid, &new_fileid, NULL);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	puffs_newinfo_setcookie(pni, fileid_to_cookie(new_fileid));
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_write(struct puffs_usermount *pu, puffs_cookie_t opc,
    uint8_t *buf, off_t offset, size_t *resid,
    const struct puffs_cred *pcr, int ioflags)
{
	struct Xconn *xc;
	struct fileid_lock_handle *lock;
	fileid_t fileid = cookie_to_fileid(opc);
	size_t resultlen;
	int fd;
	int error;

	if ((ioflags & PUFFS_IO_APPEND) != 0) {
		DPRINTF("%llu append sz %zu\n", fileid, *resid);
	} else {
		DPRINTF("%llu off %" PRIu64 " sz %zu\n", fileid,
		    (uint64_t)offset, *resid);
	}
	lock = fileid_lock(fileid, puffs_cc_getcc(pu));
retry:
	xc = begin(pu, "write");
	error = update_mctime(xc, fileid);
	if (error != 0) {
		goto got_error;
	}
	error = lo_open_by_fileid(xc, fileid, INV_WRITE, &fd);
	if (error != 0) {
		goto got_error;
	}
	if ((ioflags & PUFFS_IO_APPEND) != 0) {
		int32_t off;

		error = my_lo_lseek(xc, fd, 0, SEEK_END, &off);
		if (error != 0) {
			goto got_error;
		}
		offset = off;
	}
	if (offset < 0) {			/* negative offset */
		error = EINVAL;
		goto got_error;
	}
	if ((uint64_t)(INT64_MAX - offset) < *resid ||	/* int64 overflow */
	    INT_MAX < offset + *resid) {	/* our max filesize */
		error = EFBIG;
		goto got_error;
	}
	if ((ioflags & PUFFS_IO_APPEND) == 0) {
		error = my_lo_lseek(xc, fd, offset, SEEK_SET, NULL);
		if (error != 0) {
			goto got_error;
		}
	}
	error = my_lo_write(xc, fd, (const char *)buf, *resid, &resultlen);
	if (error != 0) {
		goto got_error;
	}
	assert(*resid >= resultlen);
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	*resid -= resultlen;
	DPRINTF("resid %zu\n", *resid);
	goto done;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
done:
	fileid_unlock(lock);
	return error;
}

int
pgfs_node_read(struct puffs_usermount *pu, puffs_cookie_t opc,
    uint8_t *buf, off_t offset, size_t *resid,
    const struct puffs_cred *pcr, int ioflags)
{
	struct Xconn *xc;
	fileid_t fileid = cookie_to_fileid(opc);
	size_t resultlen;
	int fd;
	int error;

	DPRINTF("%llu off %" PRIu64 " sz %zu\n",
	    fileid, (uint64_t)offset, *resid);
retry:
	xc = begin(pu, "read");
	/*
	 * try to update atime first as it's prone to conflict with other
	 * transactions.  eg. read-ahead requests can conflict each other.
	 * we don't want to retry my_lo_read as it's expensive.
	 *
	 * XXX probably worth to implement noatime mount option.
	 */
	error = update_atime(xc, fileid);
	if (error != 0) {
		goto got_error;
	}
	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
	if (error != 0) {
		goto got_error;
	}
	error = my_lo_lseek(xc, fd, offset, SEEK_SET, NULL);
	if (error != 0) {
		goto got_error;
	}
	error = my_lo_read(xc, fd, buf, *resid, &resultlen);
	if (error != 0) {
		goto got_error;
	}
	assert(*resid >= resultlen);
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	*resid -= resultlen;
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_link(struct puffs_usermount *pu, puffs_cookie_t dir_opc,
    puffs_cookie_t targ_opc, const struct puffs_cn *pcn)
{
	struct Xconn *xc;
	fileid_t dir_fileid = cookie_to_fileid(dir_opc);
	fileid_t targ_fileid = cookie_to_fileid(targ_opc);
	struct vattr va;
	int error;

	DPRINTF("%llu %llu %s\n", dir_fileid, targ_fileid, pcn->pcn_name);
retry:
	xc = begin(pu, "link");
	error = getattr(xc, targ_fileid, &va, GETATTR_TYPE);
	if (error != 0) {
		goto got_error;
	}
	if (va.va_type == VDIR) {
		error = EPERM;
		goto got_error;
	}
	error = linkfile(xc, dir_fileid, pcn->pcn_name, targ_fileid);
	if (error != 0) {
		goto got_error;
	}
	error = update_ctime(xc, targ_fileid);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_remove(struct puffs_usermount *pu, puffs_cookie_t opc,
    puffs_cookie_t targ, const struct puffs_cn *pcn)
{
	struct Xconn *xc;
	fileid_t fileid = cookie_to_fileid(opc);
	fileid_t targ_fileid = cookie_to_fileid(targ);
	struct vattr va;
	int error;

retry:
	xc = begin(pu, "remove");
	error = getattr(xc, targ_fileid, &va, GETATTR_TYPE);
	if (error != 0) {
		goto got_error;
	}
	if (va.va_type == VDIR) {
		error = EPERM;
		goto got_error;
	}
	error = unlinkfile(xc, fileid, pcn->pcn_name, targ_fileid);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	puffs_setback(puffs_cc_getcc(pu), PUFFS_SETBACK_INACT_N2);
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_rmdir(struct puffs_usermount *pu, puffs_cookie_t opc,
    puffs_cookie_t targ, const struct puffs_cn *pcn)
{
	struct Xconn *xc;
	fileid_t parent_fileid = cookie_to_fileid(opc);
	fileid_t targ_fileid = cookie_to_fileid(targ);
	struct vattr va;
	bool empty;
	int error;

retry:
	xc = begin(pu, "rmdir");
	error = getattr(xc, targ_fileid, &va, GETATTR_TYPE);
	if (error != 0) {
		goto got_error;
	}
	if (va.va_type != VDIR) {
		error = ENOTDIR;
		goto got_error;
	}
	error = isempty(xc, targ_fileid, &empty);
	if (error != 0) {
		goto got_error;
	}
	if (!empty) {
		error = ENOTEMPTY;
		goto got_error;
	}
	error = unlinkfile(xc, parent_fileid, pcn->pcn_name, targ_fileid);
	if (error == 0) {
		error = update_nlink(xc, parent_fileid, -1);
	}
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	puffs_setback(puffs_cc_getcc(pu), PUFFS_SETBACK_INACT_N2);
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_inactive(struct puffs_usermount *pu, puffs_cookie_t opc)
{
	struct Xconn *xc;
	fileid_t fileid = cookie_to_fileid(opc);
	int error;

	/*
	 * XXX
	 * probably this should be handed to the separate "reaper" context
	 * because lo_unlink() can be too expensive to execute synchronously.
	 * however, the puffs_cc API doesn't provide a way to create a worker
	 * context.
	 */

	DPRINTF("%llu\n", fileid);
retry:
	xc = begin(pu, "inactive");
	error = cleanupfile(xc, fileid);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_setattr(struct puffs_usermount *pu, puffs_cookie_t opc,
    const struct vattr *va, const struct puffs_cred *pcr)
{
	struct Xconn *xc;
	struct fileid_lock_handle *lock;
	fileid_t fileid = cookie_to_fileid(opc);
	struct vattr ova;
	unsigned int attrs;
	int error;

	DPRINTF("%llu\n", fileid);
	if (va->va_flags != (u_long)PUFFS_VNOVAL) {
		return EOPNOTSUPP;
	}
	attrs = 0;
	if (va->va_uid != (uid_t)PUFFS_VNOVAL ||
	    va->va_gid != (gid_t)PUFFS_VNOVAL) {
		attrs |= GETATTR_UID|GETATTR_GID|GETATTR_MODE;
	}
	if (va->va_mode != (mode_t)PUFFS_VNOVAL) {
		attrs |= GETATTR_TYPE|GETATTR_UID|GETATTR_GID;
	}
	if (va->va_atime.tv_sec != PUFFS_VNOVAL ||
	    va->va_mtime.tv_sec != PUFFS_VNOVAL ||
	    va->va_ctime.tv_sec != PUFFS_VNOVAL) {
		attrs |= GETATTR_UID|GETATTR_GID|GETATTR_MODE;
	}
	lock = fileid_lock(fileid, puffs_cc_getcc(pu));
retry:
	xc = begin(pu, "setattr");
	error = getattr(xc, fileid, &ova, attrs);
	if (error != 0) {
		goto got_error;
	}
	if (va->va_uid != (uid_t)PUFFS_VNOVAL ||
	    va->va_gid != (gid_t)PUFFS_VNOVAL) {
		static struct cmd *c;
		uint64_t newuid =
		    va->va_uid != (uid_t)PUFFS_VNOVAL ? va->va_uid : ova.va_uid;
		uint64_t newgid =
		    va->va_gid != (gid_t)PUFFS_VNOVAL ? va->va_gid : ova.va_gid;

		error = puffs_access_chown(ova.va_uid, ova.va_gid,
		    newuid, newgid, pcr);
		if (error != 0) {
			goto got_error;
		}
		CREATECMD(c,
			"UPDATE file "
			"SET uid = $1, gid = $2 "
			"WHERE fileid = $3", INT8OID, INT8OID, INT8OID);
		error = simplecmd(xc, c, newuid, newgid, fileid);
		if (error != 0) {
			goto got_error;
		}
		ova.va_uid = newuid;
		ova.va_gid = newgid;
	}
	if (va->va_mode != (mode_t)PUFFS_VNOVAL) {
		static struct cmd *c;
		uint64_t newmode = va->va_mode;

		error = puffs_access_chmod(ova.va_uid, ova.va_gid, ova.va_type,
		    newmode, pcr);
		if (error != 0) {
			goto got_error;
		}
		CREATECMD(c,
			"UPDATE file "
			"SET mode = $1 "
			"WHERE fileid = $2", INT8OID, INT8OID);
		error = simplecmd(xc, c, newmode, fileid);
		if (error != 0) {
			goto got_error;
		}
		ova.va_mode = newmode;
	}
	if (va->va_atime.tv_sec != PUFFS_VNOVAL ||
	    va->va_mtime.tv_sec != PUFFS_VNOVAL ||
	    va->va_ctime.tv_sec != PUFFS_VNOVAL ||
	    va->va_birthtime.tv_sec != PUFFS_VNOVAL) {
		error = puffs_access_times(ova.va_uid, ova.va_gid, ova.va_mode,
		    (va->va_vaflags & VA_UTIMES_NULL) != 0, pcr);
		if (error != 0) {
			goto got_error;
		}
		if (va->va_atime.tv_sec != PUFFS_VNOVAL) {
			static struct cmd *c;
			char *ts;

			error = timespec_to_pgtimestamp(&va->va_atime, &ts);
			if (error != 0) {
				goto got_error;
			}
			CREATECMD(c,
				"UPDATE file "
				"SET atime = $1 "
				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
			error = simplecmd(xc, c, ts, fileid);
			free(ts);
			if (error != 0) {
				goto got_error;
			}
		}
		if (va->va_mtime.tv_sec != PUFFS_VNOVAL) {
			static struct cmd *c;
			char *ts;

			error = timespec_to_pgtimestamp(&va->va_mtime, &ts);
			if (error != 0) {
				goto got_error;
			}
			CREATECMD(c,
				"UPDATE file "
				"SET mtime = $1 "
				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
			error = simplecmd(xc, c, ts, fileid);
			free(ts);
			if (error != 0) {
				goto got_error;
			}
		}
		if (va->va_ctime.tv_sec != PUFFS_VNOVAL) {
			static struct cmd *c;
			char *ts;

			error = timespec_to_pgtimestamp(&va->va_ctime, &ts);
			if (error != 0) {
				goto got_error;
			}
			CREATECMD(c,
				"UPDATE file "
				"SET ctime = $1 "
				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
			error = simplecmd(xc, c, ts, fileid);
			free(ts);
			if (error != 0) {
				goto got_error;
			}
		}
		if (va->va_birthtime.tv_sec != PUFFS_VNOVAL) {
			static struct cmd *c;
			char *ts;

			error = timespec_to_pgtimestamp(&va->va_birthtime, &ts);
			if (error != 0) {
				goto got_error;
			}
			CREATECMD(c,
				"UPDATE file "
				"SET btime = $1 "
				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
			error = simplecmd(xc, c, ts, fileid);
			free(ts);
			if (error != 0) {
				goto got_error;
			}
		}
	}
	if (va->va_size != (uint64_t)PUFFS_VNOVAL) {
		int fd;

		if (va->va_size > INT_MAX) {
			error = EFBIG;
			goto got_error;
		}
		error = lo_open_by_fileid(xc, fileid, INV_READ|INV_WRITE, &fd);
		if (error != 0) {
			goto got_error;
		}
		error = my_lo_truncate(xc, fd, va->va_size);
		if (error != 0) {
			goto got_error;
		}
		error = my_lo_close(xc, fd);
		if (error != 0) {
			goto got_error;
		}
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	goto done;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
done:
	fileid_unlock(lock);
	return error;
}

int
pgfs_node_rename(struct puffs_usermount *pu, puffs_cookie_t src_dir,
    puffs_cookie_t src, const struct puffs_cn *pcn_src,
    puffs_cookie_t targ_dir, puffs_cookie_t targ,
    const struct puffs_cn *pcn_targ)
{
	struct Xconn *xc;
	fileid_t fileid_src_dir = cookie_to_fileid(src_dir);
	fileid_t fileid_src = cookie_to_fileid(src);
	fileid_t fileid_targ_dir = cookie_to_fileid(targ_dir);
	fileid_t fileid_targ = cookie_to_fileid(targ);
	struct vattr va_src;
	struct vattr va_targ;
	int error;

	DPRINTF("%llu %llu %llu %llu\n", fileid_src_dir, fileid_src,
	    fileid_targ_dir, fileid_targ);
retry:
	xc = begin(pu, "rename");
	error = getattr(xc, fileid_src, &va_src, GETATTR_TYPE);
	if (error != 0) {
		goto got_error;
	}
	if (va_src.va_type == VDIR) {
		error = check_path(xc, fileid_src, fileid_targ_dir);
		if (error != 0) {
			goto got_error;
		}
	}
	if (fileid_targ != 0) {
		error = getattr(xc, fileid_targ, &va_targ,
		    GETATTR_TYPE|GETATTR_NLINK);
		if (error != 0) {
			goto got_error;
		}
		if (va_src.va_type == VDIR) {
			if (va_targ.va_type != VDIR) {
				error = ENOTDIR;
				goto got_error;
			}
			if (va_targ.va_nlink != 2) {
				error = ENOTEMPTY;
				goto got_error;
			}
		} else if (va_targ.va_type == VDIR) {
			error = EISDIR;
			goto got_error;
		}
		error = unlinkfile(xc, fileid_targ_dir, pcn_targ->pcn_name,
		    fileid_targ);
		if (error == 0 && va_targ.va_type == VDIR) {
			error = update_nlink(xc, fileid_targ_dir, -1);
		}
		if (error != 0) {
			goto got_error;
		}
	}
	error = linkfile(xc, fileid_targ_dir, pcn_targ->pcn_name, fileid_src);
	if (error == 0 && va_src.va_type == VDIR) {
		error = update_nlink(xc, fileid_targ_dir, 1);
	}
	if (error != 0) {
		goto got_error;
	}
	/* XXX ctime? */
	error = unlinkfile(xc, fileid_src_dir, pcn_src->pcn_name, fileid_src);
	if (error == 0 && va_src.va_type == VDIR) {
		error = update_nlink(xc, fileid_src_dir, -1);
	}
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_symlink(struct puffs_usermount *pu, puffs_cookie_t opc,
    struct puffs_newinfo *pni, const struct puffs_cn *pcn,
    const struct vattr *va, const char *target)
{
	struct Xconn *xc;
	struct puffs_cred *pcr = pcn->pcn_cred;
	fileid_t parent_fileid = cookie_to_fileid(opc);
	fileid_t new_fileid;
	size_t resultlen;
	size_t targetlen;
	uid_t uid;
	gid_t gid;
	int loid;
	int fd;
	int error;

	DPRINTF("%llu %s %s\n", parent_fileid, pcn->pcn_name, target);
	if (puffs_cred_getuid(pcr, &uid) == -1 ||
	    puffs_cred_getgid(pcr, &gid) == -1) {
		return errno;
	}
retry:
	xc = begin(pu, "symlink");
	error = mklinkfile_lo(xc, parent_fileid, pcn->pcn_name, VLNK,
	    va->va_mode, uid, gid, &new_fileid, &loid);
	if (error != 0) {
		goto got_error;
	}
	error = my_lo_open(xc, loid, INV_WRITE, &fd);
	if (error != 0) {
		goto got_error;
	}
	targetlen = strlen(target);
	error = my_lo_write(xc, fd, target, targetlen, &resultlen);
	if (error != 0) {
		goto got_error;
	}
	if (resultlen != targetlen) {
		error = ENOSPC; /* XXX */
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	puffs_newinfo_setcookie(pni, fileid_to_cookie(new_fileid));
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_readlink(struct puffs_usermount *pu, puffs_cookie_t opc,
    const struct puffs_cred *pcr, char *buf, size_t *buflenp)
{
	fileid_t fileid = cookie_to_fileid(opc);
	struct Xconn *xc;
	size_t resultlen;
	int fd;
	int error;

	DPRINTF("%llu\n", fileid);
	xc = begin_readonly(pu, "readlink");
	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
	if (error != 0) {
		rollback(xc);
		return error;
	}
	error = my_lo_read(xc, fd, buf, *buflenp, &resultlen);
	if (error != 0) {
		rollback(xc);
		return error;
	}
	assert(resultlen <= *buflenp);
	error = commit(xc);
	if (error != 0) {
		return error;
	}
	*buflenp = resultlen;
	return 0;
}

int
pgfs_node_access(struct puffs_usermount *pu, puffs_cookie_t opc,
    int mode, const struct puffs_cred *pcr)
{
	struct Xconn *xc;
	fileid_t fileid = cookie_to_fileid(opc);
	struct vattr va;
	int error;

	DPRINTF("%llu\n", fileid);
retry:
	xc = begin_readonly(pu, "access");
	error = getattr(xc, fileid, &va,
	    GETATTR_TYPE|GETATTR_MODE|GETATTR_UID|GETATTR_GID);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	return puffs_access(va.va_type, va.va_mode, va.va_uid, va.va_gid, mode,
	    pcr);
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}

int
pgfs_node_fsync(struct puffs_usermount *pu, puffs_cookie_t opc,
    const struct puffs_cred *pcr, int flags, off_t offlo, off_t offhi)
{
	fileid_t fileid = cookie_to_fileid(opc);

	DPRINTF("%llu\n", fileid);
	return flush_xacts(pu);
}

int
pgfs_fs_statvfs(struct puffs_usermount *pu, struct statvfs *sbp)
{
	struct Xconn *xc;
	uint64_t nfiles;
	uint64_t bytes;
	uint64_t lo_bytes;
	static struct cmd *c_nfiles;
	static struct cmd *c_bytes;
	static struct cmd *c_lobytes;
	static const Oid types[] = { INT8OID, };
	struct fetchstatus s;
	int error;

retry:
	xc = begin_readonly(pu, "statvfs");
	/*
	 * use an estimate which we can retrieve quickly, instead of
	 * "SELECT count(*) from file".
	 */
	CREATECMD_NOPARAM(c_nfiles,
		"SELECT reltuples::int8 "
		"FROM pg_class c LEFT JOIN pg_namespace n "
		"ON (n.oid=c.relnamespace) "
		"WHERE n.nspname = 'pgfs' AND c.relname = 'file'");
	CREATECMD_NOPARAM(c_bytes,
		"SELECT sum(pg_total_relation_size(c.oid))::int8 "
		"FROM pg_class c LEFT JOIN pg_namespace n "
		"ON (n.oid=c.relnamespace) "
		"WHERE n.nspname = 'pgfs'");
	/*
	 * the following is not correct if someone else is using large objects
	 * in the same database.  we don't bother to join with datafork it as
	 * it's too expensive for the little benefit.
	 */
	CREATECMD_NOPARAM(c_lobytes,
		"SELECT pg_total_relation_size('pg_largeobject')::int8");
	error = sendcmd(xc, c_nfiles);
	if (error != 0) {
		goto got_error;
	}
	fetchinit(&s, xc);
	error = FETCHNEXT(&s, types, &nfiles);
	fetchdone(&s);
	if (error != 0) {
		goto got_error;
	}
	error = sendcmd(xc, c_bytes);
	if (error != 0) {
		goto got_error;
	}
	fetchinit(&s, xc);
	error = FETCHNEXT(&s, types, &bytes);
	fetchdone(&s);
	if (error != 0) {
		goto got_error;
	}
	error = sendcmd(xc, c_lobytes);
	if (error != 0) {
		goto got_error;
	}
	fetchinit(&s, xc);
	error = FETCHNEXT(&s, types, &lo_bytes);
	fetchdone(&s);
	if (error != 0) {
		goto got_error;
	}
	error = commit(xc);
	if (error != 0) {
		goto got_error;
	}
	/*
	 * XXX fill f_blocks and f_files with meaningless large values.
	 * there are no easy way to provide meaningful values for them
	 * esp. with tablespaces.
	 */
	sbp->f_bsize = LOBLKSIZE;
	sbp->f_frsize = LOBLKSIZE;
	sbp->f_blocks = INT64_MAX / 100 / sbp->f_frsize;
	sbp->f_bfree = sbp->f_blocks - howmany(bytes + lo_bytes, sbp->f_frsize);
	sbp->f_bavail = sbp->f_bfree;
	sbp->f_bresvd = 0;
	sbp->f_files = INT_MAX;
	sbp->f_ffree = sbp->f_files - nfiles;
	sbp->f_favail = sbp->f_ffree;
	sbp->f_fresvd = 0;
	return 0;
got_error:
	rollback(xc);
	if (error == EAGAIN) {
		goto retry;
	}
	return error;
}