[Pvfs2-cvs] commit by slang in pvfs2/src/kernel/linux-2.6: Makefile.in acl.c dcache.c dir.c file.c inode.c namei.c pvfs2-bufmap.c pvfs2-bufmap.h pvfs2-cache.c pvfs2-kernel.h pvfs2-utils.c super.c symlink.c xattr-default.c xattr_default.c

CVS commit program cvs at parl.clemson.edu
Thu Oct 19 18:17:11 EDT 2006


Update of /projects/cvsroot/pvfs2/src/kernel/linux-2.6
In directory parlweb1:/tmp/cvs-serv5758/src/kernel/linux-2.6

Modified Files:
      Tag: WALT3
	Makefile.in acl.c dcache.c dir.c file.c inode.c namei.c 
	pvfs2-bufmap.c pvfs2-bufmap.h pvfs2-cache.c pvfs2-kernel.h 
	pvfs2-utils.c super.c symlink.c xattr-default.c 
Removed Files:
      Tag: WALT3
	xattr_default.c 
Log Message:
reverse merge of HEAD to WALT3 branch.


Index: Makefile.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/Makefile.in,v
diff -p -u -r1.21.10.1 -r1.21.10.2
--- Makefile.in	18 Sep 2006 15:05:21 -0000	1.21.10.1
+++ Makefile.in	19 Oct 2006 22:17:07 -0000	1.21.10.2
@@ -99,14 +99,14 @@ default: links
 # link to real source directory if out-of-tree build
 links:
 	$(E)for i in $(csrc) $(hsrc); do \
-	    if [[ ! -f $$i  &&  ! -L $$i ]] ; then \
+	    if [ ! -f $$i  -a  ! -L $$i ] ; then \
 		ln -s $(relative_src_dir)/$(here)/$$i ;\
 	    fi ;\
 	done
 
 clean:
 	$(E)for i in $(csrc) $(hsrc); do \
-	    if [[ -L $$i ]] ; then \
+	    if [ -L $$i ] ; then \
 		rm -f $$i ;\
 	    fi ;\
 	done

Index: acl.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/acl.c,v
diff -p -u -r1.12.6.1 -r1.12.6.2
--- acl.c	18 Sep 2006 15:05:21 -0000	1.12.6.1
+++ acl.c	19 Oct 2006 22:17:07 -0000	1.12.6.2
@@ -18,6 +18,7 @@
 #include "pvfs2-bufmap.h"
 
 #if !defined(PVFS2_LINUX_KERNEL_2_4) && defined(HAVE_GENERIC_GETXATTR) && defined(CONFIG_FS_POSIX_ACL)
+#include "pvfs2-internal.h"
 
 #ifdef HAVE_POSIX_ACL_H
 #include <linux/posix_acl.h>
@@ -225,8 +226,8 @@ pvfs2_get_acl(struct inode *inode, int t
         gossip_err("pvfs2_get_acl: Could not allocate value ptr\n");
         return ERR_PTR(-ENOMEM);
     }
-    gossip_debug(GOSSIP_ACL_DEBUG, "inode %ld, key %s, type %d\n", 
-            (long) inode->i_ino, key, type);
+    gossip_debug(GOSSIP_ACL_DEBUG, "inode %llu, key %s, type %d\n", 
+            llu(get_handle_from_ino(inode)), key, type);
     ret = pvfs2_inode_getxattr(inode, "", key, value, PVFS_MAX_XATTR_VALUELEN);
     /* if the key exists, convert it to an in-memory rep */
     if (ret > 0)
@@ -238,8 +239,8 @@ pvfs2_get_acl(struct inode *inode, int t
         acl = NULL;
     }
     else {
-        gossip_err("inode %ld retrieving acl's failed with error %d\n",
-                (long) inode->i_ino, ret);
+        gossip_err("inode %llu retrieving acl's failed with error %d\n",
+                llu(get_handle_from_ino(inode)), ret);
         acl = ERR_PTR(ret);
     }
     if (value) {
@@ -289,8 +290,9 @@ pvfs2_set_acl(struct inode *inode, int t
                 }
                 else /* okay, go ahead and do just that */
                 {
+                    if (inode->i_mode != mode)
+                        SetModeFlag(pvfs2_inode);
                     inode->i_mode = mode;
-                    SetModeFlag(pvfs2_inode);
                     mark_inode_dirty_sync(inode);
                     if (error == 0) /* equivalent. so dont set acl! */
                         acl = NULL;
@@ -317,8 +319,8 @@ pvfs2_set_acl(struct inode *inode, int t
             return -EINVAL;
         }
     }
-    gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_set_acl: inode %ld, key %s type %d\n",
-            (long) inode->i_ino, name, type);
+    gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_set_acl: inode %llu, key %s type %d\n",
+            llu(get_handle_from_ino(inode)), name, type);
     /* If we do have an access control list, then we need to encode that! */
     if (acl) 
     {
@@ -527,7 +529,8 @@ int pvfs2_init_acl(struct inode *inode, 
             inode->i_mode &= ~current->fs->umask;
             gossip_debug(GOSSIP_ACL_DEBUG, "inode->i_mode before %o and "
                     "after %o\n", old_mode, inode->i_mode);
-            SetModeFlag(pvfs2_inode);
+            if (old_mode != inode->i_mode)
+                SetModeFlag(pvfs2_inode);
         }
     }
     if (get_acl_flag(inode) == 1 && acl)
@@ -639,8 +642,8 @@ static int pvfs2_check_acl(struct inode 
 {
     struct posix_acl *acl = NULL;
 
-    gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_check_acl: called on inode %ld\n",
-            (long) inode->i_ino);
+    gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_check_acl: called on inode %llu\n",
+            llu(get_handle_from_ino(inode)));
 
     acl = pvfs2_get_acl(inode, ACL_TYPE_ACCESS);
 
@@ -655,8 +658,8 @@ static int pvfs2_check_acl(struct inode 
         int error = posix_acl_permission(inode, acl, mask);
         posix_acl_release(acl);
         gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_check_acl: posix_acl_permission "
-                " (inode %ld, acl %p, mask %x) returned %d\n",
-                (long) inode->i_ino, acl, mask, error);
+                " (inode %llu, acl %p, mask %x) returned %d\n",
+                 llu(get_handle_from_ino(inode)), acl, mask, error);
         return error;
     }
     gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_check_acl returning EAGAIN\n");
@@ -671,12 +674,12 @@ int pvfs2_permission(struct inode *inode
     ret = generic_permission(inode, mask, pvfs2_check_acl);
     if (ret != 0)
     {
-        gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_permission failed: inode: %ld mask = %o"
+        gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_permission failed: inode: %llu mask = %o"
                 "mode = %o current->fsuid = %d "
                 "inode->i_uid = %d, inode->i_gid = %d "
                 "in_group_p = %d "
                 "(ret = %d)\n",
-                (long) inode->i_ino, mask, inode->i_mode, current->fsuid, 
+                llu(get_handle_from_ino(inode)), mask, inode->i_mode, current->fsuid, 
                 inode->i_uid, inode->i_gid, 
                 in_group_p(inode->i_gid),
                 ret);
@@ -688,8 +691,8 @@ int pvfs2_permission(struct inode *inode
                 inode->i_mode & S_IRWXG);
     }
     else {
-        gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_permission succeeded on inode %ld\n",
-                (long) inode->i_ino);
+        gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_permission succeeded on inode %llu\n",
+                llu(get_handle_from_ino(inode)));
     }
     return ret;
 #else
@@ -697,11 +700,11 @@ int pvfs2_permission(struct inode *inode
     int mode = inode->i_mode;
     int error;
 
-    gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_permission: inode: %ld mask = %o"
+    gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_permission: inode: %llu mask = %o"
             "mode = %o current->fsuid = %d "
             "inode->i_uid = %d, inode->i_gid = %d"
             "in_group_p = %d\n", 
-            (long) inode->i_ino, mask, mode, current->fsuid,
+            llu(get_handle_from_ino(inode)), mask, mode, current->fsuid,
             inode->i_uid, inode->i_gid,
             in_group_p(inode->i_gid));
 

Index: dcache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/dcache.c,v
diff -p -u -r1.28.6.1 -r1.28.6.2
--- dcache.c	18 Sep 2006 15:05:21 -0000	1.28.6.1
+++ dcache.c	19 Oct 2006 22:17:07 -0000	1.28.6.2
@@ -40,8 +40,7 @@ static int pvfs2_d_revalidate_common(str
         /* first perform a lookup to make sure that the object not only
          * exists, but is still in the expected place in the name space 
          */
-        if(!(PVFS2_SB(inode->i_sb)->root_handle ==
-            pvfs2_ino_to_handle(inode->i_ino)))
+        if (!is_root_handle(inode))
         {
             gossip_debug(GOSSIP_DCACHE_DEBUG, "pvfs2_d_revalidate_common: attempting lookup.\n");
             new_op = op_alloc(PVFS2_VFS_OP_LOOKUP);
@@ -51,14 +50,19 @@ static int pvfs2_d_revalidate_common(str
             }
             new_op->upcall.req.lookup.sym_follow = PVFS2_LOOKUP_LINK_NO_FOLLOW;
             parent = PVFS2_I(parent_inode);
-            if (parent && parent->refn.handle && parent->refn.fs_id)
+            if (parent && parent->refn.handle != PVFS_HANDLE_NULL && parent->refn.fs_id != PVFS_FS_ID_NULL)
             {
                 new_op->upcall.req.lookup.parent_refn = parent->refn;
             }
             else
             {
+#if defined(HAVE_IGET4_LOCKED) || defined(HAVE_IGET5_LOCKED)
+                gossip_lerr("Critical error: i_ino cannot be relied upon when using iget5/iget4\n");
+                op_release(new_op);
+                return 0;
+#endif
                 new_op->upcall.req.lookup.parent_refn.handle =
-                    pvfs2_ino_to_handle(parent_inode->i_ino);
+                    get_handle_from_ino(parent_inode);
                 new_op->upcall.req.lookup.parent_refn.fs_id =
                     PVFS2_SB(parent_inode->i_sb)->fs_id;
             }
@@ -69,9 +73,8 @@ static int pvfs2_d_revalidate_common(str
                 new_op, "pvfs2_lookup", 
                 get_interruptible_flag(parent_inode));
 
-            if((new_op->downcall.status != 0) ||
-               (new_op->downcall.resp.lookup.refn.handle !=
-               pvfs2_ino_to_handle(inode->i_ino)))
+            if((new_op->downcall.status != 0) || 
+                    !match_handle(new_op->downcall.resp.lookup.refn.handle, inode))
             {
                 gossip_debug(GOSSIP_DCACHE_DEBUG, "pvfs2_d_revalidate_common: lookup failure or no match.\n");
                 op_release(new_op);
@@ -87,7 +90,7 @@ static int pvfs2_d_revalidate_common(str
 
         /* now perform revalidation */
         gossip_debug(GOSSIP_DCACHE_DEBUG, " (inode %llu)\n",
-                    llu(pvfs2_ino_to_handle(inode->i_ino)));
+                    llu(get_handle_from_ino(inode)));
         gossip_debug(GOSSIP_DCACHE_DEBUG, "pvfs2_d_revalidate_common: calling pvfs2_internal_revalidate().\n");
         ret = pvfs2_internal_revalidate(inode);
     }

Index: dir.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/dir.c,v
diff -p -u -r1.40.4.1 -r1.40.4.2
--- dir.c	18 Sep 2006 15:05:21 -0000	1.40.4.1
+++ dir.c	19 Oct 2006 22:17:09 -0000	1.40.4.2
@@ -125,22 +125,20 @@ static int pvfs2_readdir(
     pvfs2_kernel_op_t *new_op = NULL;
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(dentry->d_inode);
 
-  restart_readdir:
 
     pos = (PVFS_ds_position)file->f_pos;
     /* are we done? */
     if (pos == PVFS_READDIR_END)
     {
-        gossip_debug(GOSSIP_DIR_DEBUG, "Skipping to graceful termination path since we are done\n");
+        gossip_debug(GOSSIP_DIR_DEBUG, 
+                     "Skipping to graceful termination "
+                     "path since we are done\n");
         pvfs2_inode->directory_version = 0;
-        pvfs2_inode->num_readdir_retries =
-            PVFS2_NUM_READDIR_RETRIES;
         return 0;
     }
 
     gossip_debug(GOSSIP_DIR_DEBUG, "pvfs2_readdir called on %s (pos=%d, "
-                "retry=%d, v=%llu)\n", dentry->d_name.name, (int)pos,
-                (int)pvfs2_inode->num_readdir_retries,
+                "v=%llu)\n", dentry->d_name.name, (int)pos,
                 llu(pvfs2_inode->directory_version));
 
     switch (pos)
@@ -153,8 +151,9 @@ static int pvfs2_readdir(
         token_set = 1;
         if (pvfs2_inode->directory_version == 0)
         {
-            ino = dentry->d_inode->i_ino;
-            gossip_debug(GOSSIP_DIR_DEBUG, "calling filldir of . with pos = %d\n", pos);
+            ino = get_ino_from_handle(dentry->d_inode);
+            gossip_debug(GOSSIP_DIR_DEBUG, 
+                         "calling filldir of . with pos = %d\n", pos);
             if (filldir(dirent, ".", 1, pos, ino, DT_DIR) < 0)
             {
                 break;
@@ -167,8 +166,9 @@ static int pvfs2_readdir(
         token_set = 1;
         if (pvfs2_inode->directory_version == 0)
         {
-            ino = parent_ino(dentry);
-            gossip_debug(GOSSIP_DIR_DEBUG, "calling filldir of .. with pos = %d\n", pos);
+            ino = get_parent_ino_from_dentry(dentry);
+            gossip_debug(GOSSIP_DIR_DEBUG, 
+                         "calling filldir of .. with pos = %d\n", pos);
             if (filldir(dirent, "..", 2, pos, ino, DT_DIR) < 0)
             {
                 break;
@@ -192,15 +192,21 @@ static int pvfs2_readdir(
 	    return -ENOMEM;
 	}
 
-	if (pvfs2_inode && pvfs2_inode->refn.handle &&
-            pvfs2_inode->refn.fs_id)
+	if (pvfs2_inode && pvfs2_inode->refn.handle != PVFS_HANDLE_NULL &&
+            pvfs2_inode->refn.fs_id != PVFS_FS_ID_NULL)
 	{
 	    new_op->upcall.req.readdir.refn = pvfs2_inode->refn;
 	}
 	else
 	{
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+            gossip_lerr("Critical error: i_ino cannot be relied "
+                        "on when using iget4/5\n");
+            op_release(new_op);
+            return -EINVAL;
+#endif
 	    new_op->upcall.req.readdir.refn.handle =
-		pvfs2_ino_to_handle(dentry->d_inode->i_ino);
+		get_handle_from_ino(dentry->d_inode);
 	    new_op->upcall.req.readdir.refn.fs_id =
 		PVFS2_SB(dentry->d_inode->i_sb)->fs_id;
 	}
@@ -217,7 +223,8 @@ static int pvfs2_readdir(
         ret = readdir_index_get(&buffer_index);
         if (ret < 0)
         {
-            gossip_err("pvfs2_readdir: readdir_index_get() failure (%d)\n", ret);
+            gossip_err("pvfs2_readdir: readdir_index_get() "
+                       "failure (%d)\n", ret);
             goto err;
         }
         new_op->upcall.req.readdir.buf_index = buffer_index;
@@ -237,55 +244,34 @@ static int pvfs2_readdir(
             long bytes_decoded;
 
             if ((bytes_decoded = readdir_handle_ctor(&rhandle, 
-                            new_op->downcall.trailer_buf,
-                            buffer_index)) < 0)
+                                                     new_op->downcall.trailer_buf,
+                                                     buffer_index)) < 0)
             {
                 ret = bytes_decoded;
                 gossip_err("pvfs2_readdir: Could not decode trailer buffer "
-                        " into a readdir response %d\n", ret);
+                           " into a readdir response %d\n", ret);
                 goto err;
             }
+
             if (bytes_decoded != new_op->downcall.trailer_size)
             {
-                gossip_err("pvfs2_readdir: # bytes decoded (%ld) != trailer size (%ld)\n",
-                        bytes_decoded, (long) new_op->downcall.trailer_size);
+                gossip_err("pvfs2_readdir: # bytes "
+                           "decoded (%ld) != trailer size (%ld)\n",
+                           bytes_decoded, (long) new_op->downcall.trailer_size);
                 ret = -EINVAL;
                 goto err;
             }
+
             if (rhandle.readdir_response.pvfs_dirent_outcount == 0)
             {
                 goto graceful_termination_path;
             }
 
-            if (pvfs2_inode->directory_version == 0)
+            if (pvfs2_inode->directory_version !=
+                rhandle.readdir_response.directory_version)
             {
                 pvfs2_inode->directory_version =
-                        rhandle.readdir_response.directory_version;
-            }
-
-            if (pvfs2_inode->num_readdir_retries > -1)
-            {
-                if (pvfs2_inode->directory_version !=
-                    rhandle.readdir_response.directory_version)
-                {
-                    gossip_debug(GOSSIP_DIR_DEBUG, "detected directory change on listing; "
-                                "starting over\n");
-
-                    file->f_pos = 0;
-                    pvfs2_inode->directory_version =
-                        rhandle.readdir_response.directory_version;
-
-                    readdir_handle_dtor(&rhandle);
-                    op_release(new_op);
-                    pvfs2_inode->num_readdir_retries--;
-                    goto restart_readdir;
-                }
-            }
-            else
-            {
-                gossip_debug(GOSSIP_DIR_DEBUG, "Giving up on readdir retries to avoid "
-                            "possible livelock (%d tries attempted)\n",
-                            PVFS2_NUM_READDIR_RETRIES);
+                    rhandle.readdir_response.directory_version;
             }
 
             for (i = 0; i < rhandle.readdir_response.pvfs_dirent_outcount; i++)
@@ -293,48 +279,54 @@ static int pvfs2_readdir(
                 len = rhandle.readdir_response.dirent_array[i].d_length;
                 current_entry = rhandle.readdir_response.dirent_array[i].d_name;
                 current_ino = pvfs2_handle_to_ino(
-                        rhandle.readdir_response.dirent_array[i].handle);
+                    rhandle.readdir_response.dirent_array[i].handle);
 
-                gossip_debug(GOSSIP_DIR_DEBUG, "calling filldir for %s with len %d, pos %ld\n",
-                        current_entry, len, (unsigned long) pos);
+                gossip_debug(GOSSIP_DIR_DEBUG, 
+                             "calling filldir for %s with len %d, pos %ld\n",
+                             current_entry, len, (unsigned long) pos);
                 if (filldir(dirent, current_entry, len, pos,
                             current_ino, DT_UNKNOWN) < 0)
                 {
 graceful_termination_path:
                     pvfs2_inode->directory_version = 0;
-                    pvfs2_inode->num_readdir_retries = PVFS2_NUM_READDIR_RETRIES;
-
                     ret = 0;
                     break;
                 }
                 file->f_pos++;
                 pos++;
             }
-            /* For the first time around, use the token returned by the readdir response */
-            if (token_set == 1) {
+            /* For the first time around, use the token 
+             * returned by the readdir response */
+            if (token_set == 1) 
+            {
                 if (i == rhandle.readdir_response.pvfs_dirent_outcount)
                     file->f_pos = rhandle.readdir_response.token;
                 else 
                     file->f_pos = i;
             }
-            gossip_debug(GOSSIP_DIR_DEBUG, "pos = %d, file->f_pos should have been %ld\n", pos, 
-                    (unsigned long) file->f_pos);
+            gossip_debug(GOSSIP_DIR_DEBUG, 
+                         "pos = %d, file->f_pos should have been %ld\n", pos, 
+                         (unsigned long) file->f_pos);
         }
         else
         {
             readdir_index_put(buffer_index);
-            gossip_debug(GOSSIP_DIR_DEBUG, "Failed to readdir (downcall status %d)\n",
-                        new_op->downcall.status);
+            gossip_debug(GOSSIP_DIR_DEBUG, 
+                         "Failed to readdir (downcall status %d)\n",
+                         new_op->downcall.status);
         }
 err:
         readdir_handle_dtor(&rhandle);
         op_release(new_op);
         break;
-    }
-    }
+    } /* end default: block */
+    } /* end switch block */
+
     if (ret == 0)
     {
-        gossip_debug(GOSSIP_DIR_DEBUG, "pvfs2_readdir about to update_atime %p\n", dentry->d_inode);
+        gossip_debug(GOSSIP_DIR_DEBUG, 
+                     "pvfs2_readdir about to update_atime %p\n", 
+                     dentry->d_inode);
 
         SetAtimeFlag(pvfs2_inode);
         dentry->d_inode->i_atime = CURRENT_TIME;
@@ -504,6 +496,7 @@ static int pvfs2_readdirplus_common(
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(dentry->d_inode);
     filldirplus_t filldirplus = NULL;
     filldirpluslite_t filldirplus_lite = NULL;
+    PVFS_object_ref ref;
 
     direntplus = info->direntplus;
     if (info->lite == 0)
@@ -515,7 +508,6 @@ static int pvfs2_readdirplus_common(
         filldirplus_lite = info->u.plus_lite.filldirplus_lite;
     }
 
-restart_readdir:
 
     pos = (PVFS_ds_position)file->f_pos;
     /* are we done? */
@@ -523,13 +515,10 @@ restart_readdir:
     {
         gossip_debug(GOSSIP_DIR_DEBUG, "Skipping to graceful termination path since we are done\n");
         pvfs2_inode->directory_version = 0;
-        pvfs2_inode->num_readdir_retries =
-            PVFS2_NUM_READDIR_RETRIES;
         return 0;
     }
     gossip_debug(GOSSIP_DIR_DEBUG, "pvfs2_readdirplus called on %s (pos=%d, "
-                "retry=%d, v=%llu)\n", dentry->d_name.name, (int)pos,
-                (int)pvfs2_inode->num_readdir_retries,
+                "v=%llu)\n", dentry->d_name.name, (int)pos,
                 llu(pvfs2_inode->directory_version));
 
     switch (pos)
@@ -544,8 +533,10 @@ restart_readdir:
             token_set = 1;
             if (pvfs2_inode->directory_version == 0)
             {
-                ino = dentry->d_inode->i_ino;
-                inode = iget(dentry->d_inode->i_sb, ino);
+                ino = get_ino_from_handle(dentry->d_inode);
+                ref.fs_id = get_fsid_from_ino(dentry->d_inode);
+                ref.handle = get_handle_from_ino(dentry->d_inode);
+                inode = pvfs2_iget(dentry->d_inode->i_sb, &ref);
                 if (inode)
                 {
                     if (info->lite == 0)
@@ -584,8 +575,10 @@ restart_readdir:
             token_set = 1;
             if (pvfs2_inode->directory_version == 0)
             {
-                ino = parent_ino(dentry);
-                inode = iget(dentry->d_inode->i_sb, ino);
+                ino = get_parent_ino_from_dentry(dentry);
+                ref.fs_id = get_fsid_from_ino(dentry->d_parent->d_inode);
+                ref.handle = get_handle_from_ino(dentry->d_parent->d_inode);
+                inode = pvfs2_iget(dentry->d_inode->i_sb, &ref);
                 if (inode) 
                 {
                     if (info->lite == 0)
@@ -634,15 +627,20 @@ restart_readdir:
             {
                 return -ENOMEM;
             }
-            if (pvfs2_inode && pvfs2_inode->refn.handle &&
-                pvfs2_inode->refn.fs_id)
+            if (pvfs2_inode && pvfs2_inode->refn.handle != PVFS_HANDLE_NULL
+                    && pvfs2_inode->refn.fs_id != PVFS_FS_ID_NULL)
             {
                 new_op->upcall.req.readdirplus.refn = pvfs2_inode->refn;
             }
             else
             {
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+                gossip_lerr("Critical error: i_ino cannot be relied on when using iget4/5\n");
+                op_release(new_op);
+                return -EINVAL;
+#endif
                 new_op->upcall.req.readdirplus.refn.handle =
-                    pvfs2_ino_to_handle(dentry->d_inode->i_ino);
+                    get_handle_from_ino(dentry->d_inode);
                 new_op->upcall.req.readdirplus.refn.fs_id =
                     PVFS2_SB(dentry->d_inode->i_sb)->fs_id;
             }
@@ -707,29 +705,11 @@ restart_readdir:
                         rhandle.readdirplus_response.directory_version;
                 }
 
-                if (pvfs2_inode->num_readdir_retries > -1)
-                {
-                    if (pvfs2_inode->directory_version !=
-                        rhandle.readdirplus_response.directory_version)
-                    {
-                        gossip_debug(GOSSIP_DIR_DEBUG, "detected directory change on listing; "
-                                    "starting over\n");
-
-                        file->f_pos = 0;
-                        pvfs2_inode->directory_version =
-                            rhandle.readdirplus_response.directory_version;
-
-                        readdirplus_handle_dtor(&rhandle);
-                        op_release(new_op);
-                        pvfs2_inode->num_readdir_retries--;
-                        goto restart_readdir;
-                    }
-                }
-                else
+                if (pvfs2_inode->directory_version !=
+                    rhandle.readdirplus_response.directory_version)
                 {
-                    gossip_debug(GOSSIP_DIR_DEBUG, "Giving up on readdirplus retries to avoid "
-                                "possible livelock (%d tries attempted)\n",
-                                PVFS2_NUM_READDIR_RETRIES);
+                    pvfs2_inode->directory_version =
+                        rhandle.readdirplus_response.directory_version;
                 }
 
                 for (i = 0; i < rhandle.readdirplus_response.pvfs_dirent_outcount; i++)
@@ -750,8 +730,10 @@ restart_readdir:
 
                     if (stat_error == 0)
                     {
+                        ref.fs_id = get_fsid_from_ino(dentry->d_inode);
+                        ref.handle = handle;
                         /* locate inode in the icache, but don't getattr() */
-                        filled_inode = iget_locked(dentry->d_inode->i_sb, current_ino);
+                        filled_inode = pvfs2_iget_locked(dentry->d_inode->i_sb, &ref);
                         if (filled_inode == NULL) {
                             gossip_err("Could not allocate inode\n");
                             ret = -ENOMEM;
@@ -788,9 +770,7 @@ restart_readdir:
                                 filled_inode->i_bdev = NULL;
                                 filled_inode->i_cdev = NULL;
                                 filled_inode->i_mapping->a_ops = &pvfs2_address_operations;
-#ifndef PVFS2_LINUX_KERNEL_2_4
                                 filled_inode->i_mapping->backing_dev_info = &pvfs2_backing_dev_info;
-#endif
                                 /* Make sure that we unlock the inode */
                                 unlock_new_inode(filled_inode);
                             }
@@ -846,8 +826,6 @@ restart_readdir:
                     {
 graceful_termination_path:
                         pvfs2_inode->directory_version = 0;
-                        pvfs2_inode->num_readdir_retries =
-                            PVFS2_NUM_READDIR_RETRIES;
 
                         ret = 0;
                         break;

Index: file.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/file.c,v
diff -p -u -r1.117.4.1 -r1.117.4.2
--- file.c	18 Sep 2006 15:05:21 -0000	1.117.4.1
+++ file.c	19 Oct 2006 22:17:09 -0000	1.117.4.2
@@ -17,7 +17,7 @@
 #include <linux/fs.h>
 #include <linux/pagemap.h>
 
-enum {
+enum io_type {
     IO_READ = 0,
     IO_WRITE = 1,
     IO_READV = 0,
@@ -48,8 +48,8 @@ int pvfs2_file_open(
 {
     int ret = -EINVAL;
 
-    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_open: called on %s (inode is %d)\n",
-                file->f_dentry->d_name.name, (int)inode->i_ino);
+    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_open: called on %s (inode is %llu)\n",
+                file->f_dentry->d_name.name, llu(get_handle_from_ino(inode)));
 
     inode->i_mapping->host = inode;
     inode->i_mapping->a_ops = &pvfs2_address_operations;
@@ -101,315 +101,213 @@ int pvfs2_file_open(
     return ret;
 }
 
+enum dest_type {
+    COPY_TO_ADDRESSES = 0,
+    COPY_TO_PAGES     = 1
+};
+
 struct rw_options {
-    int type;
-    /* sigh.. we will never pass sparse type checks.. */
-    char *buf;
-    size_t count;
-    loff_t *offset;
+    /* whether or not it is a synchronous I/O operation */
+    int            async;
+    /* whether it is a READ/WRITE operation */
+    enum io_type   type; 
+    /* whether we are copying to addresses/pages */
+    enum dest_type copy_dest_type; 
+    struct file   *file;
+    struct inode  *inode;
+    pvfs2_inode_t *pvfs2_inode;
+    loff_t readahead_size;
+    /* whether the destination addresses are in user/kernel */
+    int copy_to_user;
+    const char *fnstr;
+    /* Asynch I/O control block */
+    struct kiocb *iocb;
+    union {
+        struct {
+            struct iovec *iov;
+            unsigned long nr_segs;
+        } address;
+        struct {
+            struct page  **pages;
+            unsigned long nr_pages;
+        } pages;
+    } dest;
     union {
         struct {
-            struct inode *inode;
-            int copy_to_user;
-            loff_t readahead_size;
-        } read;
+            loff_t        *offset;
+        } io;
         struct {
-            struct file *file;
-        } write;
-    } io;
+            struct xtvec  *xtvec;
+            unsigned long  xtnr_segs;
+        } iox;
+    } off;
 };
 
-static ssize_t do_read_write(struct rw_options *rw)
+/*
+ * Post and wait for the I/O upcall to finish
+ * @rw - contains state information to initiate the I/O operation
+ * @vec- contains the memory vector regions 
+ * @nr_segs - number of memory vector regions
+ * @total_size - total expected size of the I/O operation
+ */
+static ssize_t wait_for_io(struct rw_options *rw, struct iovec *vec,
+        int nr_segs, size_t total_size)
 {
     pvfs2_kernel_op_t *new_op = NULL;
     int buffer_index = -1;
-    struct inode *inode;
-    pvfs2_inode_t *pvfs2_inode = NULL;
-    char *current_buf = NULL;
-    size_t count;
-    loff_t *offset;
     ssize_t ret;
-    ssize_t total_count;
-    char *fnstr = NULL;
-    size_t readahead_size;
-    int copy_to_user;
-    struct file *file;
 
-    total_count = 0;
-    ret = -EINVAL;
-    file = NULL;
-    inode = NULL;
-    if (!rw)
-        goto out;
-    count = rw->count;
-    if (count == 0)
+    if (!rw || !vec || nr_segs < 0 || total_size <= 0 
+            || !rw->pvfs2_inode || !rw->inode || !rw->fnstr)
     {
-        ret = 0;
+        gossip_lerr("invalid parameters (rw: %p, vec: %p, nr_segs: %d, "
+                "total_size: %zd)\n", rw, vec, nr_segs, total_size);
+        ret = -EINVAL;
         goto out;
     }
-    current_buf = (char *) rw->buf;
-    if (!current_buf)
-        goto out;
-    offset = rw->offset;
-    if (!offset)
-        goto out;
-    if (rw->type == IO_READ)
+    new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
+    if (!new_op)
     {
-        inode = rw->io.read.inode;
-        if (!inode)
-            goto out;
-        file = NULL;
-        copy_to_user = rw->io.read.copy_to_user;
-        ret = -EFAULT;
-        if (copy_to_user && 
-                !access_ok(VERIFY_WRITE, (char __user *) current_buf, count))
-            goto out;
-        fnstr = "pvfs2_file_read";
-        readahead_size = rw->io.read.readahead_size;
+        ret = -ENOMEM;
+        goto out;
     }
-    else
+    /* synchronous I/O */
+    new_op->upcall.req.io.async_vfs_io = PVFS_VFS_SYNC_IO; 
+    new_op->upcall.req.io.readahead_size = (int32_t) rw->readahead_size;
+    new_op->upcall.req.io.io_type = 
+        (rw->type == IO_READV) ? PVFS_IO_READ : PVFS_IO_WRITE;
+    new_op->upcall.req.io.refn = rw->pvfs2_inode->refn;
+    /* get a shared buffer index */
+    ret = pvfs_bufmap_get(&buffer_index);
+    if (ret < 0)
     {
-        file = rw->io.write.file;
-        copy_to_user = 1;
-        readahead_size = 0;
-        if (!file)
-            goto out;
-        inode = file->f_dentry->d_inode;
-        if (!inode)
-            goto out;
-        fnstr = "pvfs2_file_write";
-        ret = -EFAULT;
-        if (!access_ok(VERIFY_READ, (char __user *) current_buf, count))
-            goto out;
-        if(file->f_pos > i_size_read(inode))
-        {
-            i_size_write(inode, file->f_pos);
-        }
-        /* perform generic linux kernel tests for sanity of write arguments */
-        /* NOTE: this is particularly helpful in handling fsize rlimit properly */
-#ifdef PVFS2_LINUX_KERNEL_2_4
-        ret = pvfs2_precheck_file_write(file, inode, &count, offset);
-#else
-        ret = generic_write_checks(file, offset, &count, S_ISBLK(inode->i_mode));
-#endif
-        if (ret != 0 || count == 0)
-        {
-            gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_write: failed generic argument checks.\n");
-            goto out;
-        }
-        gossip_debug(GOSSIP_FILE_DEBUG, "%s: proceeding with offset : %ld, size %ld\n",
-                fnstr, (unsigned long) *offset, (unsigned long) count);
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get failure (%ld)\n",
+                rw->fnstr, (long) ret);
+        goto out;
     }
-    pvfs2_inode = PVFS2_I(inode);
+    gossip_debug(GOSSIP_FILE_DEBUG, "GET op %p -> buffer_index %d\n", new_op, buffer_index);
 
-    while(total_count < count)
+    new_op->upcall.req.io.buf_index = buffer_index;
+    new_op->upcall.req.io.count = total_size;
+    new_op->upcall.req.io.offset = *(rw->off.io.offset);
+
+    gossip_debug(GOSSIP_FILE_DEBUG, "%s: copy_to_user %d nr_segs %u, "
+            "offset: %llu total_size: %zd\n", rw->fnstr, rw->copy_to_user, 
+            nr_segs, llu(*(rw->off.io.offset)), total_size);
+    if (rw->type == IO_WRITEV)
     {
-        size_t each_count, amt_complete;
-
-        new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
-        if (!new_op)
+        /* 
+         * copy data from application/kernel by pulling it out 
+         * of the iovec.
+         */
+        if (rw->copy_to_user)
         {
-            ret = -ENOMEM;
-            goto out;
+            ret = pvfs_bufmap_copy_iovec_from_user(
+                    buffer_index, vec, nr_segs, total_size);
+        }
+        else {
+            ret = pvfs_bufmap_copy_iovec_from_kernel(
+                    buffer_index, vec, nr_segs, total_size);
         }
-
-        new_op->upcall.req.io.async_vfs_io = PVFS_VFS_SYNC_IO; /* synchronous I/O */
-        new_op->upcall.req.io.readahead_size = readahead_size;
-        new_op->upcall.req.io.io_type = 
-            (rw->type == IO_READ) ? PVFS_IO_READ : PVFS_IO_WRITE;
-        new_op->upcall.req.io.refn = pvfs2_inode->refn;
-
-        ret = pvfs_bufmap_get(&buffer_index);
         if (ret < 0)
         {
-            gossip_err("do_read_write: pvfs_bufmap_get() "
-                        "failure (%ld)\n", (long) ret);
+            gossip_lerr("Failed to copy-in buffers. Please make sure "
+                        "that the pvfs2-client is running. %ld\n", 
+                        (long) ret);
             goto out;
         }
-        gossip_debug(GOSSIP_FILE_DEBUG, "GET op %p -> buffer_index %d\n", new_op, buffer_index);
-        /* how much to transfer in this loop iteration */
-        each_count = (((count - total_count) > pvfs_bufmap_size_query()) ?
-                      pvfs_bufmap_size_query() : (count - total_count));
+    }
+    ret = service_operation(new_op, rw->fnstr,
+         get_interruptible_flag(rw->inode));
 
-        new_op->upcall.req.io.buf_index = buffer_index;
-        new_op->upcall.req.io.count = each_count;
-        new_op->upcall.req.io.offset = *offset;
-        if (rw->type == IO_WRITE)
-        {
-            /* copy data from application */
-            ret = pvfs_bufmap_copy_from_user(buffer_index, current_buf, each_count);
-            if(ret < 0)
-            {
-                gossip_debug(GOSSIP_FILE_DEBUG, "%s: Failed to copy user buffer.\n", fnstr);
-                goto out;
-            }
-        }
-        ret = service_operation(
-            new_op, fnstr, 
-            get_interruptible_flag(inode));
+    if (ret < 0)
+    {
+          /* this macro is defined in pvfs2-kernel.h */
+          handle_io_error();
 
-        if (ret < 0)
-        {
-            /* this macro is defined in pvfs2-kernel.h */
-            handle_io_error();
+          /*
+            don't write an error to syslog on signaled operation
+            termination unless we've got debugging turned on, as
+            this can happen regularly (i.e. ctrl-c)
+          */
+          if (ret == -EINTR)
+          {
+              gossip_debug(GOSSIP_FILE_DEBUG, "%s: returning error %ld\n", 
+                      rw->fnstr, (long) ret);
+          }
+          else
+          {
+              gossip_err(
+                    "%s: error in %s handle %llu, "
+                    "FILE: %s\n  -- returning %ld\n",
+                    rw->fnstr, 
+                    rw->type == IO_READV ? "vectored read from" : "vectored write to",
+                    llu(get_handle_from_ino(rw->inode)),
+                    (rw->file && rw->file->f_dentry && rw->file->f_dentry->d_name.name ?
+                     (char *)rw->file->f_dentry->d_name.name : "UNKNOWN"),
+                    (long) ret);
+          }
+          goto out;
+    }
 
-            /*
-              don't write an error to syslog on signaled operation
-              termination unless we've got debugging turned on, as
-              this can happen regularly (i.e. ctrl-c)
-            */
-            if (ret == -EINTR)
+    if (rw->type == IO_READV)
+    {
+        /*
+         * copy data to application/kernel by pushing it out to the iovec.
+         */
+        if (new_op->downcall.resp.io.amt_complete)
+        {
+            if (rw->copy_to_user)
             {
-                gossip_debug(GOSSIP_FILE_DEBUG, "%s: returning error %ld\n", fnstr, (long) ret);
+                ret = pvfs_bufmap_copy_to_user_iovec(buffer_index, vec, 
+                        nr_segs, new_op->downcall.resp.io.amt_complete);
             }
             else
             {
-                gossip_err(
-                    "%s: error writing to handle %llu, "
-                    "-- returning %ld\n",
-                    fnstr,
-                    llu(pvfs2_ino_to_handle(inode->i_ino)),
-                    (long) ret);
+                ret = pvfs_bufmap_copy_to_kernel_iovec(buffer_index, vec,
+                        nr_segs, new_op->downcall.resp.io.amt_complete);
             }
-            goto out;
-        }
-        if (rw->type == IO_READ)
-        {
-            /* copy data out to destination */
-            if (new_op->downcall.resp.io.amt_complete)
+            if (ret < 0)
             {
-                if (copy_to_user)
-                {
-                    ret = pvfs_bufmap_copy_to_user(
-                        current_buf, buffer_index,
-                        new_op->downcall.resp.io.amt_complete);
-                }
-                else
-                {
-                    ret = pvfs_bufmap_copy_to_kernel(
-                        current_buf, buffer_index,
-                        new_op->downcall.resp.io.amt_complete);
-                }
-                if (ret)
-                {
-                    gossip_debug(GOSSIP_FILE_DEBUG, "Failed to copy user buffer.\n");
-                    /* put error code in downcall so that handle_io_error()
-                     * preserves properly
-                     */
-                    new_op->downcall.status = ret;
-                    handle_io_error();
-                    goto out;
-                }
+                gossip_lerr("%s: Failed to copy-out buffers.  Please make sure "
+                            "that the pvfs2-client is running (%ld)\n",
+                            rw->fnstr, (long) ret);
+                /* put error codes in downcall so that handle_io_error()
+                 * preserves it properly */
+                new_op->downcall.status = ret;
+                handle_io_error();
+                goto out;
             }
         }
-        current_buf += new_op->downcall.resp.io.amt_complete;
-        *offset += new_op->downcall.resp.io.amt_complete;
-        total_count += new_op->downcall.resp.io.amt_complete;
-        amt_complete = new_op->downcall.resp.io.amt_complete;
-        /*
-          tell the device file owner waiting on I/O that this read has
-          completed and it can return now.  in this exact case, on
-          wakeup the daemon will free the op, so we *cannot* touch it
-          after this.
-        */
-        wake_up_daemon_for_return(new_op);
-        new_op = NULL;
-        pvfs_bufmap_put(buffer_index);
-        buffer_index = -1;
-        /* if we got a short read/write, fall out and return what we
-         * got so far
-         */
-        if (amt_complete < each_count)
-        {
-            break;
-        }
-    }
-    if (total_count > 0) {
-        ret = total_count;
     }
+    ret = new_op->downcall.resp.io.amt_complete;
+    /*
+      tell the device file owner waiting on I/O that this read has
+      completed and it can return now.  in this exact case, on
+      wakeup the daemon will free the op, so we *cannot* touch it
+      after this.
+    */
+    wake_up_daemon_for_return(new_op);
+    new_op = NULL;
 out:
-    if (buffer_index >= 0) {
+    if (buffer_index >= 0)
+    {
         pvfs_bufmap_put(buffer_index);
         gossip_debug(GOSSIP_FILE_DEBUG, "PUT buffer_index %d\n", buffer_index);
+        buffer_index = -1;
     }
     if (new_op) 
-        op_release(new_op);
-    if (ret > 0 && inode != NULL && pvfs2_inode != NULL)
     {
-        if (rw->type == IO_READ)
-        {
-            SetAtimeFlag(pvfs2_inode);
-            inode->i_atime = CURRENT_TIME;
-        }
-        else {
-            SetMtimeFlag(pvfs2_inode);
-            inode->i_mtime = CURRENT_TIME;
-        }
-        mark_inode_dirty_sync(inode);
+        op_release(new_op);
+        new_op = NULL;
     }
     return ret;
 }
 
-/** Read data from a specified offset in a file (referenced by inode).
- *  Data may be placed either in a user or kernel buffer.
- */
-ssize_t pvfs2_inode_read(
-    struct inode *inode,
-    char __user *buf,
-    size_t count,
-    loff_t *offset,
-    int copy_to_user,
-    loff_t readahead_size)
-{
-    struct rw_options rw;
-    rw.type = IO_READ;
-    rw.buf  = buf;
-    rw.count = count;
-    rw.offset = offset;
-    rw.io.read.inode = inode;
-    rw.io.read.copy_to_user = copy_to_user;
-    rw.io.read.readahead_size = readahead_size;
-    return do_read_write(&rw); 
-}
-
-/** Read data from a specified offset in a file into a user buffer.
- */
-ssize_t pvfs2_file_read(
-    struct file *file,
-    char __user *buf,
-    size_t count,
-    loff_t *offset)
-{
-    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_read: called on %s [off %lu size %lu]\n",
-                (file && file->f_dentry && file->f_dentry->d_name.name ?
-                 (char *)file->f_dentry->d_name.name : "UNKNOWN"),
-                (unsigned long) *offset, (unsigned long) count);
-
-    return pvfs2_inode_read(
-        file->f_dentry->d_inode, buf, count, offset, 1, 0);
-}
-
-/** Write data from a contiguous user buffer into a file at a specified
- *  offset.
- */
-static ssize_t pvfs2_file_write(
-    struct file *file,
-    const char __user *buf,
-    size_t count,
-    loff_t *offset)
-{
-    struct rw_options rw;
-    rw.type = IO_WRITE;
-    rw.buf  = (char *) buf;
-    rw.count = count;
-    rw.offset = offset;
-    rw.io.write.file = file;
-    return do_read_write(&rw);
-}
-
 /*
- * The reason we need to do this is to be able to support readv() and writev()
- * of larger than PVFS_DEFAULT_DESC_SIZE (4 MB). What that means is that
+ * The reason we need to do this is to be able to support 
+ * readv and writev that are
+ * larger than PVFS_DEFAULT_DESC_SIZE (4 MB). What that means is that
  * we will create a new io vec descriptor for those memory addresses that 
  * go beyond the limit
  * Return value for this routine is -ve in case of errors
@@ -558,54 +456,100 @@ static long estimate_max_iovecs(const st
     return max_nr_iovecs;
 }
 
-static ssize_t do_readv_writev(int type, struct file *file,
-        const struct iovec *iov, unsigned long nr_segs, loff_t *offset)
+/*
+ * Common entry point for read/write/readv/writev
+ */
+static ssize_t do_direct_readv_writev(struct rw_options *rw)
 {
-    ssize_t ret;
+    ssize_t ret, total_count;
+    struct inode *inode = NULL;
+    pvfs2_inode_t *pvfs2_inode = NULL;
+    struct file *file;
     unsigned int to_free;
-    unsigned long seg;
-    ssize_t total_count, count;
-    size_t  each_count;
-    struct inode *inode = file->f_dentry->d_inode;
-    pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
-    unsigned long new_nr_segs = 0;
+    size_t count;
+    struct iovec *iov;
+    unsigned long nr_segs, seg, new_nr_segs = 0;
     long max_new_nr_segs = 0;
     unsigned int  seg_count = 0, *seg_array = NULL;
     struct iovec *iovecptr = NULL, *ptr = NULL;
-    pvfs2_kernel_op_t *new_op = NULL;
-    int buffer_index = -1;
-    size_t amt_complete = 0;
-    char *fnstr = (type == IO_READV) ? "pvfs2_file_readv" : "pvfs2_file_writev";
+    loff_t *offset;
 
-    ret = -EINVAL;
     total_count = 0;
+    ret = -EINVAL;
+    file = NULL;
+    inode = NULL;
     count =  0;
     to_free = 0;
-    /* Compute total and max number of segments after split */
-    if ((max_new_nr_segs = estimate_max_iovecs(iov, nr_segs, &count)) < 0)
+    if (!rw || !rw->fnstr)
     {
-        return -EINVAL;
+        gossip_lerr("Invalid parameters\n");
+        goto out;
     }
-    if (count == 0)
+    offset = rw->off.io.offset;
+    if (!offset)
     {
-        return 0;
+        gossip_err("%s: Invalid offset\n", rw->fnstr);
+        goto out;
+    }
+    inode = rw->inode;
+    if (!inode)
+    {
+        gossip_err("%s: Invalid inode\n", rw->fnstr);
+        goto out;
+    }
+    pvfs2_inode = rw->pvfs2_inode;
+    if (!pvfs2_inode)
+    {
+        gossip_err("%s: Invalid pvfs2 inode\n", rw->fnstr);
+        goto out;
+    }
+    file  = rw->file;
+    iov = rw->dest.address.iov;
+    nr_segs = rw->dest.address.nr_segs;
+    if (iov == NULL || nr_segs < 0)
+    {
+        gossip_err("%s: Invalid iovec %p or nr_segs %ld\n",
+                rw->fnstr, iov, nr_segs);
+        goto out;
+    }
+    /* Compute total and max number of segments after split */
+    if ((max_new_nr_segs = estimate_max_iovecs(iov, nr_segs, &count)) < 0)
+    {
+        gossip_lerr("%s: could not estimate iovec %ld\n", rw->fnstr, max_new_nr_segs);
+        goto out;
     }
-    if (type == IO_WRITEV)
+    if (rw->type == IO_WRITEV)
     {
-        /* perform generic linux kernel tests for sanity of write arguments */
-        /* NOTE: this is particularly helpful in handling fsize rlimit properly */
+        if (!file)
+        {
+            gossip_err("%s: Invalid file pointer\n", rw->fnstr);
+            goto out;
+        }
+        if (file->f_pos > i_size_read(inode))
+        {
+            i_size_write(inode, file->f_pos);
+        }
+        /* perform generic linux kernel tests for sanity of write 
+         * arguments 
+         */
 #ifdef PVFS2_LINUX_KERNEL_2_4
         ret = pvfs2_precheck_file_write(file, inode, &count, offset);
 #else
         ret = generic_write_checks(file, offset, &count, S_ISBLK(inode->i_mode));
 #endif
-        if (ret != 0 || count == 0)
+        if (ret != 0)
         {
-            gossip_debug(GOSSIP_FILE_DEBUG, "%s: failed generic argument checks.\n", fnstr);
+            gossip_err("%s: failed generic argument checks.\n", rw->fnstr);
             goto out;
         }
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: proceeding with offset : %llu, size %zd\n",
+                rw->fnstr, llu(*offset), count);
+    }
+    if (count == 0)
+    {
+        ret = 0;
+        goto out;
     }
-    total_count = 0;
     /*
      * if the total size of data transfer requested is greater than
      * the kernel-set blocksize of PVFS2, then we split the iovecs
@@ -626,15 +570,16 @@ static ssize_t do_readv_writev(int type,
                         &seg_count, &seg_array)  /* OUT */ ) < 0)
         {
             gossip_err("%s: Failed to split iovecs to satisfy larger "
-                    " than blocksize readv/writev request %zd\n", fnstr, ret);
+                    " than blocksize readv/writev request %zd\n", rw->fnstr, ret);
             goto out;
         }
         gossip_debug(GOSSIP_FILE_DEBUG, "%s: Splitting iovecs from %lu to %lu [max_new %lu]\n", 
-                fnstr, nr_segs, new_nr_segs, max_new_nr_segs);
+                rw->fnstr, nr_segs, new_nr_segs, max_new_nr_segs);
         /* We must free seg_array and iovecptr */
         to_free = 1;
     }
-    else {
+    else 
+    {
         new_nr_segs = nr_segs;
         /* use the given iovec description */
         iovecptr = (struct iovec *) iov;
@@ -647,152 +592,783 @@ static ssize_t do_readv_writev(int type,
     }
     ptr = iovecptr;
 
-    gossip_debug(GOSSIP_FILE_DEBUG, "%s %d@%llu\n", fnstr, (int) count, *offset);
+    gossip_debug(GOSSIP_FILE_DEBUG, "%s %zd@%llu\n", 
+            rw->fnstr, count, llu(*offset));
     gossip_debug(GOSSIP_FILE_DEBUG, "%s: new_nr_segs: %lu, seg_count: %u\n", 
-            fnstr, new_nr_segs, seg_count);
+            rw->fnstr, new_nr_segs, seg_count);
 #ifdef PVFS2_KERNEL_DEBUG
     for (seg = 0; seg < new_nr_segs; seg++)
     {
         gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %p to %p [%d bytes]\n", 
-                fnstr,
+                rw->fnstr,
                 seg + 1, iovecptr[seg].iov_base, 
                 iovecptr[seg].iov_base + iovecptr[seg].iov_len, 
                 (int) iovecptr[seg].iov_len);
     }
     for (seg = 0; seg < seg_count; seg++)
     {
-        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %u\n", fnstr, seg + 1, seg_array[seg]);
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %u\n",
+                rw->fnstr, seg + 1, seg_array[seg]);
     }
 #endif
     seg = 0;
     while (total_count < count)
     {
-        new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
-        if (!new_op)
-        {
-            ret = -ENOMEM;
-            goto out;
-        }
-        new_op->upcall.req.io.async_vfs_io = PVFS_VFS_SYNC_IO; /* synchronous I/O */
-        /* disable read-ahead */
-        new_op->upcall.req.io.readahead_size = 0;
-        new_op->upcall.req.io.io_type = 
-            (type == IO_READV) ? PVFS_IO_READ : PVFS_IO_WRITE;
-        new_op->upcall.req.io.refn = pvfs2_inode->refn;
+        size_t each_count, amt_complete;
 
-        /* get a shared buffer index */
-        ret = pvfs_bufmap_get(&buffer_index);
+        /* how much to transfer in this loop iteration */
+        each_count = (((count - total_count) > pvfs_bufmap_size_query()) ?
+                      pvfs_bufmap_size_query() : (count - total_count));
+        /* and push the I/O through */
+        ret = wait_for_io(rw, ptr, seg_array[seg], each_count);
         if (ret < 0)
         {
-            gossip_err("%s: pvfs_bufmap_get() failure (%zd)\n", fnstr, ret);
             goto out;
         }
-        gossip_debug(GOSSIP_FILE_DEBUG, "GET op %p -> buffer_index %d\n", new_op, buffer_index);
-
-        /* how much to transfer in this loop iteration */
-        each_count = (((count - total_count) > pvfs_bufmap_size_query()) ?
-                      pvfs_bufmap_size_query() : (count - total_count));
+        /* advance the iovec pointer */
+        ptr += seg_array[seg];
+        seg++;
+        *offset += ret;
+        total_count += ret;
+        amt_complete = ret;
 
-        new_op->upcall.req.io.buf_index = buffer_index;
-        new_op->upcall.req.io.count = each_count;
-        new_op->upcall.req.io.offset = *offset;
-        if (type == IO_WRITEV)
+        /* if we got a short I/O operations,
+         * fall out and return what we got so far 
+         */
+        if (amt_complete < each_count)
+        {
+            break;
+        }
+    }
+    if (total_count > 0)
+    {
+        ret = total_count;
+    }
+out:
+    if (to_free) 
+    {
+        kfree(iovecptr);
+        kfree(seg_array);
+    }
+    if (ret > 0 && inode != NULL && pvfs2_inode != NULL)
+    {
+        if (rw->type == IO_READV)
+        {
+            SetAtimeFlag(pvfs2_inode);
+            inode->i_atime = CURRENT_TIME;
+        }
+        else 
+        {
+            SetMtimeFlag(pvfs2_inode);
+            inode->i_mtime = CURRENT_TIME;
+        }
+        mark_inode_dirty_sync(inode);
+    }
+    return ret;
+}
+
+/** Read data from a specified offset in a file (referenced by inode).
+ *  Data may be placed either in a user or kernel buffer.
+ */
+ssize_t pvfs2_inode_read(
+    struct inode *inode,
+    char __user *buf,
+    size_t count,
+    loff_t *offset,
+    int copy_to_user,
+    loff_t readahead_size)
+{
+    struct rw_options rw;
+    struct iovec vec;
+
+    memset(&rw, 0, sizeof(rw));
+    rw.async = 0;
+    rw.type = IO_READ;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.readahead_size = readahead_size;
+    rw.copy_to_user = copy_to_user;
+    rw.fnstr = __FUNCTION__;
+    vec.iov_base = buf;
+    vec.iov_len  = count;
+    rw.inode = inode;
+    rw.pvfs2_inode = PVFS2_I(inode);
+    rw.file = NULL;
+    rw.dest.address.iov = &vec;
+    rw.dest.address.nr_segs = 1;
+    rw.off.io.offset = offset;
+    return do_direct_readv_writev(&rw); 
+}
+
+/** Read data from a specified offset in a file into a user buffer.
+ */
+ssize_t pvfs2_file_read(
+    struct file *file,
+    char __user *buf,
+    size_t count,
+    loff_t *offset)
+{
+    struct rw_options rw;
+    struct iovec vec;
+
+    memset(&rw, 0, sizeof(rw));
+    rw.async = 0;
+    rw.type = IO_READ;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    vec.iov_base = buf;
+    vec.iov_len  = count;
+    rw.inode = file->f_dentry->d_inode;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.file = file;
+    rw.dest.address.iov = &vec;
+    rw.dest.address.nr_segs = 1;
+    rw.off.io.offset = offset;
+
+    if (IS_IMMUTABLE(rw.inode)) 
+    {
+        rw.readahead_size = (rw.inode)->i_size;
+        return generic_file_read(file, buf, count, offset);
+    }
+    else 
+    {
+        rw.readahead_size = 0;
+        return do_direct_readv_writev(&rw);
+    }
+}
+
+/** Write data from a contiguous user buffer into a file at a specified
+ *  offset.
+ */
+static ssize_t pvfs2_file_write(
+    struct file *file,
+    const char __user *buf,
+    size_t count,
+    loff_t *offset)
+{
+    struct rw_options rw;
+    struct iovec vec;
+
+    memset(&rw, 0, sizeof(rw));
+    rw.async = 0;
+    rw.type = IO_WRITE;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.readahead_size = 0;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    vec.iov_base  = (char *) buf;
+    vec.iov_len   = count;
+    rw.file = file;
+    rw.inode = file->f_dentry->d_inode;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.dest.address.iov = &vec;
+    rw.dest.address.nr_segs = 1;
+    rw.off.io.offset = offset;
+    return do_direct_readv_writev(&rw);
+}
+
+/** Reads data to several contiguous user buffers (an iovec) from a file at a
+ * specified offset.
+ */
+static ssize_t pvfs2_file_readv(
+    struct file *file,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    loff_t *offset)
+{
+    struct rw_options rw;
+
+    memset(&rw, 0, sizeof(rw));
+    rw.async = 0;
+    rw.type = IO_READV;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    rw.inode = file->f_dentry->d_inode;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.file  = file;
+    rw.dest.address.iov = (struct iovec *) iov;
+    rw.dest.address.nr_segs = nr_segs;
+    rw.off.io.offset = offset;
+
+    rw.readahead_size = 0;
+    return do_direct_readv_writev(&rw);
+}
+
+/** Write data from a several contiguous user buffers (an iovec) into a file at
+ * a specified offset.
+ */
+static ssize_t pvfs2_file_writev(
+    struct file *file,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    loff_t *offset)
+{
+    struct rw_options rw;
+
+    memset(&rw, 0, sizeof(rw));
+    rw.async = 0;
+    rw.type = IO_WRITEV;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.readahead_size = 0;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    rw.file = file;
+    rw.inode = file->f_dentry->d_inode;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.dest.address.iov = (struct iovec *) iov;
+    rw.dest.address.nr_segs = nr_segs;
+    rw.off.io.offset = offset;
+
+    return do_direct_readv_writev(&rw);
+}
+
+
+/* Construct a trailer of <file offsets, length pairs> in a buffer that we
+ * pass in as an upcall trailer to client-core. This is used by clientcore
+ * to construct a Request_hindexed type to stage the non-contiguous I/O
+ * to file
+ */
+static int construct_file_offset_trailer(char **trailer, 
+        PVFS_size *trailer_size, int seg_count, struct xtvec *xptr)
+{
+    int i;
+    struct read_write_x *rwx;
+
+    *trailer_size = seg_count * sizeof(struct read_write_x);
+    *trailer = (char *) vmalloc(*trailer_size);
+    if (*trailer == NULL)
+    {
+        *trailer_size = 0;
+        return -ENOMEM;
+    }
+    rwx = (struct read_write_x *) *trailer;
+    for (i = 0; i < seg_count; i++) 
+    {
+        rwx->off = xptr[i].xtv_off;
+        rwx->len = xptr[i].xtv_len;
+        rwx++;
+    }
+    return 0;
+}
+
+/*
+ * The reason we need to do this is to be able to support readx() and writex()
+ * of larger than PVFS_DEFAULT_DESC_SIZE (4 MB). What that means is that
+ * we will create a new xtvec descriptor for those file offsets that 
+ * go beyond the limit
+ * Return value for this routine is -ve in case of errors
+ * and 0 in case of success.
+ * Further, the new_nr_segs pointer is updated to hold the new value
+ * of number of xtvecs, the new_xtvec pointer is updated to hold the pointer
+ * to the new split xtvec, and the size array is an array of integers holding
+ * the number of xtvecs that straddle PVFS_DEFAULT_DESC_SIZE.
+ * The max_new_nr_segs value is computed by the caller and passed in.
+ * (It will be (count of all xtv_len/ block_size) + 1).
+ */
+static int split_xtvecs(unsigned long max_new_nr_segs,  /* IN */
+        unsigned long nr_segs,              /* IN */
+        const struct xtvec *original_xtvec, /* IN */
+        unsigned long *new_nr_segs, struct xtvec **new_vec,  /* OUT */
+        unsigned int *seg_count, unsigned int **seg_array)   /* OUT */
+{
+    int seg, count, begin_seg, tmpnew_nr_segs;
+    struct xtvec *new_xtvec = NULL, *orig_xtvec;
+    unsigned int *sizes = NULL, sizes_count = 0;
+
+    if (nr_segs <= 0 || original_xtvec == NULL 
+            || new_nr_segs == NULL || new_vec == NULL
+            || seg_count == NULL || seg_array == NULL || max_new_nr_segs <= 0)
+    {
+        gossip_err("Invalid parameters to split_xtvecs\n");
+        return -EINVAL;
+    }
+    *new_nr_segs = 0;
+    *new_vec = NULL;
+    *seg_count = 0;
+    *seg_array = NULL;
+    /* copy the passed in xtvec descriptor to a temp structure */
+    orig_xtvec = (struct xtvec *) kmalloc(nr_segs * sizeof(struct xtvec),
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (orig_xtvec == NULL)
+    {
+        gossip_err("split_xtvecs: Could not allocate memory for %lu bytes!\n", 
+                (unsigned long)(nr_segs * sizeof(struct xtvec)));
+        return -ENOMEM;
+    }
+    new_xtvec = (struct xtvec *) kmalloc(max_new_nr_segs * sizeof(struct xtvec), 
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (new_xtvec == NULL)
+    {
+        kfree(orig_xtvec);
+        gossip_err("split_xtvecs: Could not allocate memory for %lu bytes!\n", 
+                (unsigned long)(max_new_nr_segs * sizeof(struct xtvec)));
+        return -ENOMEM;
+    }
+    sizes = (unsigned int *) kmalloc(max_new_nr_segs * sizeof(unsigned int), 
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (sizes == NULL)
+    {
+        kfree(new_xtvec);
+        kfree(orig_xtvec);
+        gossip_err("split_xtvecs: Could not allocate memory for %lu bytes!\n", 
+                (unsigned long)(max_new_nr_segs * sizeof(int)));
+        return -ENOMEM;
+    }
+    /* copy the passed in xtvec to a temp structure */
+    memcpy(orig_xtvec, original_xtvec, nr_segs * sizeof(struct xtvec));
+    memset(new_xtvec, 0, max_new_nr_segs * sizeof(struct xtvec));
+    memset(sizes, 0, max_new_nr_segs * sizeof(int));
+    begin_seg = 0;
+    count = 0;
+    tmpnew_nr_segs = 0;
+repeat:
+    for (seg = begin_seg; seg < nr_segs; seg++)
+    {
+        if (tmpnew_nr_segs >= max_new_nr_segs || sizes_count >= max_new_nr_segs)
+        {
+            kfree(sizes);
+            kfree(orig_xtvec);
+            kfree(new_xtvec);
+            gossip_err("split_xtvecs: exceeded the index limit (%d)\n", 
+                    tmpnew_nr_segs);
+            return -EINVAL;
+        }
+        if (count + orig_xtvec[seg].xtv_len < pvfs_bufmap_size_query())
+        {
+            count += orig_xtvec[seg].xtv_len;
+            
+            memcpy(&new_xtvec[tmpnew_nr_segs], &orig_xtvec[seg], 
+                    sizeof(struct xtvec));
+            tmpnew_nr_segs++;
+            sizes[sizes_count]++;
+        }
+        else
+        {
+            new_xtvec[tmpnew_nr_segs].xtv_off = orig_xtvec[seg].xtv_off;
+            new_xtvec[tmpnew_nr_segs].xtv_len = 
+                (pvfs_bufmap_size_query() - count);
+            tmpnew_nr_segs++;
+            sizes[sizes_count]++;
+            sizes_count++;
+            begin_seg = seg;
+            orig_xtvec[seg].xtv_off += (pvfs_bufmap_size_query() - count);
+            orig_xtvec[seg].xtv_len -= (pvfs_bufmap_size_query() - count);
+            count = 0;
+            break;
+        }
+    }
+    if (seg != nr_segs) {
+        goto repeat;
+    }
+    else
+    {
+        sizes_count++;
+    }
+    *new_nr_segs = tmpnew_nr_segs;
+    /* new_xtvec is freed by the caller */
+    *new_vec = new_xtvec;
+    *seg_count = sizes_count;
+    /* seg_array is also freed by the caller */
+    *seg_array = sizes;
+    kfree(orig_xtvec);
+    return 0;
+}
+
+static long 
+estimate_max_xtvecs(const struct xtvec *curr, unsigned long nr_segs, ssize_t *total_count)
+{
+    unsigned long i;
+    long max_nr_xtvecs;
+    ssize_t total, count;
+
+    total = 0;
+    count = 0;
+    max_nr_xtvecs = 0;
+    for (i = 0; i < nr_segs; i++) 
+    {
+        const struct xtvec *xv = &curr[i];
+        count += xv->xtv_len;
+	if (unlikely((ssize_t)(count|xv->xtv_len) < 0))
+            return -EINVAL;
+        if (total + xv->xtv_len < pvfs_bufmap_size_query())
+        {
+            total += xv->xtv_len;
+            max_nr_xtvecs++;
+        }
+        else 
+        {
+            total = (total + xv->xtv_len - pvfs_bufmap_size_query());
+            max_nr_xtvecs += (total / pvfs_bufmap_size_query() + 2);
+        }
+    }
+    *total_count = count;
+    return max_nr_xtvecs;
+}
+
+/*
+ * Post and wait for the I/O upcall to finish.
+ * @rw  - contains state information to initiate the I/O operation
+ * @vec - contains the memory regions
+ * @nr_segs - number of memory vector regions
+ * @xtvec - contains the file regions
+ * @xtnr_segs - number of file vector regions
+ */
+static ssize_t wait_for_iox(struct rw_options *rw, struct iovec *vec, int nr_segs,
+        struct xtvec *xtvec, int xtnr_segs, size_t total_size)
+{
+    pvfs2_kernel_op_t *new_op = NULL;
+    int buffer_index = -1;
+    ssize_t ret;
+
+    if (!rw || !vec || nr_segs < 0 || total_size <= 0
+            || !xtvec || xtnr_segs < 0)
+    {
+        gossip_lerr("invalid parameters (rw: %p, vec: %p, nr_segs: %d, "
+                "xtvec %p, xtnr_segs %d, total_size: %zd\n", rw, vec, nr_segs,
+                xtvec, xtnr_segs, total_size);
+        ret = -EINVAL;
+        goto out;
+    }
+    if (!rw->pvfs2_inode || !rw->inode || !rw->fnstr)
+    {
+        gossip_lerr("invalid parameters (pvfs2_inode: %p, inode: %p, fnstr: %p\n",
+                rw->pvfs2_inode, rw->inode, rw->fnstr);
+        ret = -EINVAL;
+        goto out;
+    }
+    new_op = op_alloc_trailer(PVFS2_VFS_OP_FILE_IOX);
+    if (!new_op)
+    {
+        ret = -ENOMEM;
+        goto out;
+    }
+    new_op->upcall.req.iox.io_type = 
+        (rw->type == IO_READX) ? PVFS_IO_READ : PVFS_IO_WRITE;
+    new_op->upcall.req.iox.refn = rw->pvfs2_inode->refn;
+
+    /* get a shared buffer index */
+    ret = pvfs_bufmap_get(&buffer_index);
+    if (ret < 0)
+    {
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get() "
+                    "failure (%ld)\n", rw->fnstr, (long) ret);
+        goto out;
+    }
+    new_op->upcall.req.iox.buf_index = buffer_index;
+    new_op->upcall.req.iox.count     = total_size;
+    /* construct the upcall trailer buffer */
+    if ((ret = construct_file_offset_trailer(&new_op->upcall.trailer_buf, 
+                    &new_op->upcall.trailer_size, xtnr_segs, xtvec)) < 0)
+    {
+        gossip_err("%s: construct_file_offset_trailer "
+                "failure (%ld)\n", rw->fnstr, (long) ret);
+        goto out;
+    }
+    gossip_debug(GOSSIP_FILE_DEBUG, "%s: copy_to_user %d nr_segs %d, "
+            "xtnr_segs: %d "
+            "total_size: %zd\n",
+            rw->fnstr, rw->copy_to_user, 
+            nr_segs, xtnr_segs,
+            total_size);
+
+    if (rw->type == IO_WRITEX)
+    {
+        /* copy data from application by pulling it out
+         * of the iovec.
+         */
+        if (rw->copy_to_user)
         {
-            /* 
-             * copy data from application by pulling it out  of the iovec.
-             * Number of segments to copy so that we don't overflow the block-size
-             * is set in seg_array[], and ptr points to the appropriate
-             * beginning of the iovec from where data needs to be copied out,
-             * and each_count indicates the size in bytes that needs to be pulled
-             * out.  */
-            gossip_debug(GOSSIP_FILE_DEBUG, "%s nr_segs %u, offset: %llu each_count: %d\n",
-                    fnstr, seg_array[seg], *offset, (int) each_count);
             ret = pvfs_bufmap_copy_iovec_from_user(
-                    buffer_index, ptr, seg_array[seg], each_count);
+                    buffer_index, vec, nr_segs, total_size);
+        }
+        else {
+            ret = pvfs_bufmap_copy_iovec_from_kernel(
+                    buffer_index, vec, nr_segs, total_size);
+        }
+        if (ret < 0)
+        {
+            gossip_lerr("%s: failed to copy-in user buffer. Please make sure "
+                    " that the pvfs2-client is running. %ld\n",
+                    rw->fnstr, (long) ret);
+            goto out;
+        }
+    }
+    /* whew! finally service this operation */
+    ret = service_operation(new_op, rw->fnstr,
+            get_interruptible_flag(rw->inode));
+    if (ret < 0)
+    {
+          /* this macro is defined in pvfs2-kernel.h */
+          handle_io_error();
+
+          /*
+            don't write an error to syslog on signaled operation
+            termination unless we've got debugging turned on, as
+            this can happen regularly (i.e. ctrl-c)
+          */
+          if (ret == -EINTR)
+          {
+              gossip_debug(GOSSIP_FILE_DEBUG, "%s: returning error %ld\n", 
+                      rw->fnstr, (long) ret);
+          }
+          else
+          {
+              gossip_err(
+                "%s: error in %s handle %llu, "
+                "FILE: %s\n  -- returning %ld\n",
+                rw->fnstr, 
+                rw->type == IO_READX ? "noncontig read from" : "noncontig write to",
+                llu(get_handle_from_ino(rw->inode)),
+                (rw->file && rw->file->f_dentry && rw->file->f_dentry->d_name.name ?
+                     (char *) rw->file->f_dentry->d_name.name : "UNKNOWN"),
+                    (long) ret);
+          }
+          goto out;
+    }
+    gossip_debug(GOSSIP_FILE_DEBUG, "downcall returned %lld\n",
+            llu(new_op->downcall.resp.iox.amt_complete));
+    if (rw->type == IO_READX)
+    {
+        /* copy data to application by pushing it out to the iovec.
+         */
+        if (new_op->downcall.resp.iox.amt_complete)
+        {
+            if (rw->copy_to_user)
+            {
+                ret = pvfs_bufmap_copy_to_user_iovec(buffer_index, vec,
+                        nr_segs, new_op->downcall.resp.iox.amt_complete);
+            }
+            else
+            {
+                ret = pvfs_bufmap_copy_to_kernel_iovec(buffer_index, vec, 
+                        nr_segs, new_op->downcall.resp.iox.amt_complete);
+            }
             if (ret < 0)
             {
-                gossip_err("%s: Failed to copy user buffer.  Please make sure "
-                            "that the pvfs2-client is running. %zd\n", fnstr, ret);
+                gossip_lerr("%s: failed to copy-out user buffers. Please make sure "
+                        " that the pvfs2-client is running. (%ld)\n", 
+                        rw->fnstr, (long) ret);
+                /* put error codes in downcall so that handle_io_error()
+                 * preserves it properly */
+                new_op->downcall.status = ret;
+                handle_io_error();
                 goto out;
             }
         }
-        ret = service_operation(new_op, fnstr,
-             get_interruptible_flag(inode));
+    }
+    ret = new_op->downcall.resp.iox.amt_complete;
+    gossip_debug(GOSSIP_FILE_DEBUG, "wait_for_iox returning %ld\n", (long) ret);
+     /*
+      tell the device file owner waiting on I/O that this I/O has
+      completed and it can return now.  in this exact case, on
+      wakeup the device will free the op, so we *cannot* touch it
+      after this.
+    */
+    wake_up_daemon_for_return(new_op);
+    new_op = NULL;
+out:
+    if (buffer_index >= 0)
+    {
+        pvfs_bufmap_put(buffer_index);
+        gossip_debug(GOSSIP_FILE_DEBUG, "PUT buffer_index %d\n", buffer_index);
+        buffer_index = -1;
+    }
+    if (new_op) 
+    {
+        if (new_op->upcall.trailer_buf)
+            vfree(new_op->upcall.trailer_buf);
+        op_release(new_op);
+        new_op = NULL;
+    }
+    return ret;
+}
 
-        if (ret < 0)
-        {
-              /* this macro is defined in pvfs2-kernel.h */
-              handle_io_error();
+static ssize_t do_direct_readx_writex(struct rw_options *rw)
+{
+    ssize_t ret, total_count, count_mem, count_stream;
+    struct inode *inode = NULL;
+    pvfs2_inode_t *pvfs2_inode = NULL;
+    unsigned int to_free;
+    struct iovec *iov; 
+    unsigned long seg, nr_segs, xtnr_segs;
+    struct xtvec *xtvec; 
+    long max_new_nr_segs_mem, max_new_nr_segs_stream;
+    unsigned long new_nr_segs_mem = 0, new_nr_segs_stream = 0;
+    unsigned int seg_count_mem, *seg_array_mem = NULL;
+    unsigned int seg_count_stream, *seg_array_stream = NULL;
+    struct iovec *iovecptr = NULL, *ptr = NULL;
+    struct xtvec *xtvecptr = NULL, *xptr = NULL;
 
-              /*
-                don't write an error to syslog on signaled operation
-                termination unless we've got debugging turned on, as
-                this can happen regularly (i.e. ctrl-c)
-              */
-              if (ret == -EINTR)
-              {
-                  gossip_debug(GOSSIP_FILE_DEBUG, "%s: returning error %zd\n", fnstr, ret);
-              }
-              else
-              {
-                  gossip_err(
-                        "%s: error on handle %llu, "
-                        "FILE: %s\n  -- returning %zd\n",
-                        fnstr, llu(pvfs2_ino_to_handle(inode->i_ino)),
-                        (file && file->f_dentry && file->f_dentry->d_name.name ?
-                         (char *)file->f_dentry->d_name.name : "UNKNOWN"),
-                        ret);
-              }
-              goto out;
-        }
+    total_count = 0;
+    ret = -EINVAL;
+    to_free = 0;
+    inode = NULL;
+    count_mem = 0;
+    max_new_nr_segs_mem = 0;
+    count_stream = 0;
+    max_new_nr_segs_stream = 0;
 
-        if (type == IO_READV)
+    if (!rw || !rw->fnstr)
+    {
+        gossip_lerr("Invalid parameters\n");
+        goto out;
+    }
+    inode = rw->inode;
+    if (!inode)
+    {
+        gossip_err("%s: invalid inode\n", rw->fnstr);
+        goto out;
+    }
+    pvfs2_inode = rw->pvfs2_inode;
+    if (!pvfs2_inode)
+    {
+        gossip_err("%s: Invalid pvfs2 inode\n", rw->fnstr);
+        goto out;
+    }
+    iov  = rw->dest.address.iov;
+    nr_segs = rw->dest.address.nr_segs;
+    if (iov == NULL || nr_segs < 0)
+    {
+        gossip_err("%s: Invalid iovec %p or nr_segs %ld\n",
+                rw->fnstr, iov, nr_segs);
+        goto out;
+    }
+    /* Compute total and max number of segments after split of the memory vector */
+    if ((max_new_nr_segs_mem = estimate_max_iovecs(iov, nr_segs, &count_mem)) < 0)
+    {
+        gossip_lerr("%s: could not estimate iovec %ld\n", rw->fnstr, max_new_nr_segs_mem);
+        goto out;
+    }
+    xtvec = rw->off.iox.xtvec;
+    xtnr_segs = rw->off.iox.xtnr_segs;
+    if (xtvec == NULL || xtnr_segs < 0)
+    {
+        gossip_err("%s: Invalid xtvec %p or xtnr_segs %ld\n",
+                rw->fnstr, xtvec, xtnr_segs);
+        goto out;
+    }
+    /* Calculate the total stream length amd max segments after split of the stream vector */
+    if ((max_new_nr_segs_stream = estimate_max_xtvecs(xtvec, xtnr_segs, &count_stream)) < 0)
+    {
+        gossip_lerr("%s: could not estimate xtvec %ld\n", rw->fnstr, max_new_nr_segs_stream);
+        goto out;
+    }
+    if (count_mem == 0)
+    {
+        return 0;
+    }
+    if (count_mem != count_stream) 
+    {
+        gossip_err("%s: mem count %ld != stream count %ld\n",
+                rw->fnstr, (long) count_mem, (long) count_stream);
+        goto out;
+    }
+    /*
+     * if the total size of data transfer requested is greater than
+     * the kernel-set blocksize of PVFS2, then we split the iovecs
+     * such that no iovec description straddles a block size limit
+     */
+    if (count_mem > pvfs_bufmap_size_query())
+    {
+        /*
+         * Split up the given iovec description such that
+         * no iovec descriptor straddles over the block-size limitation.
+         * This makes us our job easier to stage the I/O.
+         * In addition, this function will also compute an array with seg_count
+         * entries that will store the number of segments that straddle the
+         * block-size boundaries.
+         */
+        if ((ret = split_iovecs(max_new_nr_segs_mem, nr_segs, iov, /* IN */
+                        &new_nr_segs_mem, &iovecptr, /* OUT */
+                        &seg_count_mem, &seg_array_mem)  /* OUT */ ) < 0)
         {
-            gossip_debug(GOSSIP_FILE_DEBUG, "%s: nr_segs %u, offset: %llu each_count:%d\n",
-                fnstr, (int) seg_array[seg], *offset, (int) each_count);
-            /*
-             * copy data to application by pushing it out to the iovec.
-             * Number of segments to copy so that we don't
-             * overflow the block-size is set in seg_array[], and
-             * ptr points to the appropriate beginning of the
-             * iovec from where data needs to be copied to, and
-             * new_op->downcall.resp.io.amt_complete indicates
-             * the size in bytes that needs to be pushed out
-             */
-            if (new_op->downcall.resp.io.amt_complete)
-            {
-                ret = pvfs_bufmap_copy_to_user_iovec(buffer_index, ptr, seg_array[seg],
-                        new_op->downcall.resp.io.amt_complete);
-                if (ret < 0)
-                {
-                    gossip_err("Failed to copy user buffer.  Please make sure "
-                                "that the pvfs2-client is running.\n");
-                    /* put error codes in downcall so that handle_io_error()
-                     * preserves it properly */
-                    new_op->downcall.status = ret;
-                    handle_io_error();
-                    goto out;
-                }
-            }
+            gossip_err("%s: Failed to split iovecs to satisfy larger "
+                    " than blocksize readx request %ld\n", rw->fnstr, (long) ret);
+            goto out;
         }
-        /* advance the iovec pointer */
-        ptr += seg_array[seg];
-        seg++;
-        *offset += new_op->downcall.resp.io.amt_complete;
-        total_count += new_op->downcall.resp.io.amt_complete;
-        amt_complete = new_op->downcall.resp.io.amt_complete;
+        /* We must free seg_array_mem and iovecptr, xtvecptr and seg_array_stream */
+        to_free = 1;
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: Splitting iovecs from %lu to %lu [max_new %lu]\n", 
+                rw->fnstr, nr_segs, new_nr_segs_mem, max_new_nr_segs_mem);
+        /* 
+         * Split up the given xtvec description such that
+         * no xtvec descriptor straddles over the block-size limitation.
+         */
+        if ((ret = split_xtvecs(max_new_nr_segs_stream, xtnr_segs, xtvec, /* IN */
+                        &new_nr_segs_stream, &xtvecptr, /* OUT */
+                        &seg_count_stream, &seg_array_stream) /* OUT */) < 0)
+        {
+            gossip_err("Failed to split iovecs to satisfy larger "
+                    " than blocksize readx request %ld\n", (long) ret);
+            goto out;
+        }
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: Splitting xtvecs from %lu to %lu [max_new %lu]\n", 
+                rw->fnstr, xtnr_segs, new_nr_segs_stream, max_new_nr_segs_stream);
+    }
+    else 
+    {
+        new_nr_segs_mem = nr_segs;
+        /* use the given iovec description */
+        iovecptr = (struct iovec *) iov;
+        /* There is only 1 element in the seg_array_mem */
+        seg_count_mem = 1;
+        /* and its value is the number of segments passed in */
+        seg_array_mem = (unsigned int *) &nr_segs;
+        
+        new_nr_segs_stream = xtnr_segs;
+        /* use the given file description */
+        xtvecptr = (struct xtvec *) xtvec;
+        /* There is only 1 element in the seg_array_stream */
+        seg_count_stream = 1;
+        /* and its value is the number of segments passed in */
+        seg_array_stream = (unsigned int *) &xtnr_segs;
+        /* We dont have to free up anything */
+        to_free = 0;
+    }
+#ifdef PVFS2_KERNEL_DEBUG
+    for (seg = 0; seg < new_nr_segs_mem; seg++)
+    {
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %p to %p [%ld bytes]\n",
+                rw->fnstr,
+                seg + 1, iovecptr[seg].iov_base,
+                iovecptr[seg].iov_base + iovecptr[seg].iov_len,
+                (long) iovecptr[seg].iov_len);
+    }
+    for (seg = 0; seg < new_nr_segs_stream; seg++)
+    {
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %ld to %ld [%ld bytes]\n",
+                rw->fnstr,
+                seg + 1, (long) xtvecptr[seg].xtv_off,
+                (long) xtvecptr[seg].xtv_off + xtvecptr[seg].xtv_len,
+                (long) xtvecptr[seg].xtv_len);
+    }
+#endif
+    seg = 0;
+    ptr = iovecptr;
+    xptr = xtvecptr;
 
-        /*
-          tell the device file owner waiting on I/O that this read has
-          completed and it can return now.  in this exact case, on
-          wakeup the daemon will free the op, so we *cannot* touch it
-          after this.
-        */
-        wake_up_daemon_for_return(new_op);
-        new_op = NULL;
-        pvfs_bufmap_put(buffer_index);
-        buffer_index = -1;
+    while (total_count < count_mem)
+    {
+        size_t  each_count, amt_complete;
 
+        /* how much to transfer in this loop iteration */
+        each_count = (((count_mem - total_count) > pvfs_bufmap_size_query()) ?
+                      pvfs_bufmap_size_query() : (count_mem - total_count));
+        /* and push the I/O through */
+        ret = wait_for_iox(rw, ptr, seg_array_mem[seg],
+                xptr, seg_array_stream[seg], each_count);
+        if (ret < 0)
+        {
+            goto out;
+        }
+        /* Advance the iovec pointer */
+        ptr += seg_array_mem[seg];
+        /* Advance the xtvec pointer */
+        xptr += seg_array_stream[seg];
+        seg++;
+        total_count += ret;
+        amt_complete = ret;
         /* if we got a short I/O operations,
          * fall out and return what we got so far 
          */
@@ -802,24 +1378,20 @@ static ssize_t do_readv_writev(int type,
         }
     }
     if (total_count > 0)
-    {
-        ret = total_count;
-    }
-out:
-    if (buffer_index >= 0) {
-        pvfs_bufmap_put(buffer_index);
-        gossip_debug(GOSSIP_FILE_DEBUG, "PUT buffer_index %d\n", buffer_index);
+    {
+        ret = total_count;
     }
-    if (new_op)
-        op_release(new_op);
-    if (to_free) 
+out:
+    if (to_free)
     {
         kfree(iovecptr);
-        kfree(seg_array);
+        kfree(seg_array_mem);
+        kfree(xtvecptr);
+        kfree(seg_array_stream);
     }
     if (ret > 0 && inode != NULL && pvfs2_inode != NULL)
     {
-        if (type == IO_READV)
+        if (rw->type == IO_READX)
         {
             SetAtimeFlag(pvfs2_inode);
             inode->i_atime = CURRENT_TIME;
@@ -834,30 +1406,72 @@ out:
     return ret;
 }
 
-
-/** Reads data to several contiguous user buffers (an iovec) from a file at a
- * specified offset.
- */
-static ssize_t pvfs2_file_readv(
+#ifndef HAVE_READX_FILE_OPERATIONS
+static ssize_t pvfs2_file_readx(
     struct file *file,
     const struct iovec *iov,
     unsigned long nr_segs,
-    loff_t *offset)
+    const struct xtvec *xtvec,
+    unsigned long xtnr_segs) __attribute__((unused));
+#endif
+static ssize_t pvfs2_file_readx(
+    struct file *file,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    const struct xtvec *xtvec,
+    unsigned long xtnr_segs)
 {
-    return do_readv_writev(IO_READV, file, iov, nr_segs, offset);
-}
+    struct rw_options rw;
 
+    memset(&rw, 0, sizeof(rw));
+    rw.async = 0;
+    rw.type = IO_READX;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    rw.inode = file->f_dentry->d_inode;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.file  = file;
+    rw.dest.address.iov = (struct iovec *) iov;
+    rw.dest.address.nr_segs = nr_segs;
+    rw.off.iox.xtvec = (struct xtvec *) xtvec;
+    rw.off.iox.xtnr_segs = xtnr_segs;
 
-/** Write data from a several contiguous user buffers (an iovec) into a file at
- * a specified offset.
- */
-static ssize_t pvfs2_file_writev(
+    return do_direct_readx_writex(&rw);
+}
+
+#ifndef HAVE_WRITEX_FILE_OPERATIONS
+static ssize_t pvfs2_file_writex(
     struct file *file,
     const struct iovec *iov,
     unsigned long nr_segs,
-    loff_t *offset)
+    const struct xtvec *xtvec,
+    unsigned long xtnr_segs) __attribute__((unused));
+#endif
+static ssize_t pvfs2_file_writex(
+    struct file *file,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    const struct xtvec *xtvec,
+    unsigned long xtnr_segs)
 {
-    return do_readv_writev(IO_WRITEV, file, iov, nr_segs, offset);
+    struct rw_options rw;
+
+    memset(&rw, 0, sizeof(rw));
+    rw.async = 0;
+    rw.type = IO_WRITEX;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    rw.inode = file->f_dentry->d_inode;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.file  = file;
+    rw.dest.address.iov = (struct iovec *) iov;
+    rw.dest.address.nr_segs = nr_segs;
+    rw.off.iox.xtvec = (struct xtvec *) xtvec;
+    rw.off.iox.xtnr_segs = xtnr_segs;
+
+    return do_direct_readx_writex(&rw);
 }
 
 
@@ -1738,6 +2352,7 @@ fill_default_kiocb(pvfs2_kiocb *x,
     iocb->ki_cancel = aio_cancel;
     return;
 }
+
 /*
  * This function will do the following,
  * On an error, it returns a -ve error number.
@@ -1753,340 +2368,147 @@ fill_default_kiocb(pvfs2_kiocb *x,
  * that get completion notification from interrupt
  * context, we get completion notification from a process
  * context (i.e. the client daemon).
+ * TODO: We do not handle vectored aio requests yet
  */
-static ssize_t 
-pvfs2_file_aio_read(struct kiocb *iocb, char __user *buffer,
-        size_t count, loff_t offset)
+static ssize_t do_direct_aio_read_write(struct rw_options *rw)
 {
-    struct file *filp = NULL;
-    struct inode *inode = NULL;
-    ssize_t error = -EINVAL;
+    struct file *filp;
+    struct inode *inode;
+    ssize_t error;
+    pvfs2_inode_t *pvfs2_inode;
+    struct iovec *iov;
+    unsigned long nr_segs, max_new_nr_segs;
+    size_t count;
+    struct kiocb *iocb;
+    loff_t *offset;
+    pvfs2_kiocb *x;
 
-    if (count == 0)
-    {
-        return 0;
-    }
-    if (iocb->ki_pos != offset)
-    {
-        return -EINVAL;
-    }
-    if (unlikely(((ssize_t)count)) < 0)
+    error = -EINVAL;
+    if (!rw || !rw->fnstr || !rw->off.io.offset)
     {
-        return -EINVAL;
+        gossip_lerr("Invalid parameters (rw %p)\n", rw);
+        goto out_error;
     }
-    if (access_ok(VERIFY_WRITE, buffer, count) == 0)
+    inode = rw->inode;
+    filp  = rw->file;
+    iocb  = rw->iocb;
+    pvfs2_inode = rw->pvfs2_inode;
+    offset = rw->off.io.offset;
+    if (!inode || !filp || !pvfs2_inode || !iocb || !offset)
     {
-        return -EFAULT;
+        gossip_lerr("Invalid parameters\n");
+        goto out_error;
     }
-    /* Each I/O operation is not allowed to be greater than our block size */
-    if (count > pvfs_bufmap_size_query())
+    if (iocb->ki_pos != *offset)
     {
-        gossip_err("aio_read: cannot transfer (%d) bytes"
-                " (larger than block size %d)\n",
-                (int) count, pvfs_bufmap_size_query());
-        return -EINVAL;
+        gossip_lerr("iocb offsets don't match (%llu %llu)\n",
+                llu(iocb->ki_pos), llu(*offset));
+        goto out_error;
     }
-    filp = iocb->ki_filp;
-    error = -EINVAL;
-    if (filp && filp->f_mapping 
-             && (inode = filp->f_mapping->host))
+    iov = rw->dest.address.iov;
+    nr_segs = rw->dest.address.nr_segs;
+    if (iov == NULL || nr_segs < 0)
     {
-        ssize_t ret = 0;
-        pvfs2_kiocb *x = NULL;
-
-        /* First time submission */
-        if ((x = (pvfs2_kiocb *) iocb->private) == NULL)
-        {
-            int buffer_index = -1;
-            pvfs2_kernel_op_t *new_op = NULL;
-            pvfs2_kiocb pvfs_kiocb;
-            char __user *current_buf = buffer;
-            pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
-            
-            new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
-            if (!new_op)
-            {
-                error = -ENOMEM;
-                goto out_error;
-            }
-            /* Increase ref count */
-            get_op(new_op);
-            /* (A)synchronous I/O */
-            new_op->upcall.req.io.async_vfs_io = 
-                is_sync_kiocb(iocb) ? PVFS_VFS_SYNC_IO 
-                                    : PVFS_VFS_ASYNC_IO;
-            new_op->upcall.req.io.readahead_size = 0;
-            new_op->upcall.req.io.io_type = PVFS_IO_READ;
-            new_op->upcall.req.io.refn = pvfs2_inode->refn;
-            error = pvfs_bufmap_get(&buffer_index);
-            if (error < 0)
-            {
-                gossip_err("pvfs2_file_aio_read: pvfs_bufmap_get() "
-                        " failure %d\n", (int) ret);
-                /* drop ref count and possibly de-allocate */
-                put_op(new_op);
-                goto out_error;
-            }
-            gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: pvfs_bufmap_get %d\n",
-                    buffer_index);
-            new_op->upcall.req.io.buf_index = buffer_index;
-            new_op->upcall.req.io.count = count;
-            new_op->upcall.req.io.offset = offset;
-            /*
-             * if it is a synchronous operation, we
-             * don't allocate anything here
-             */
-            if (is_sync_kiocb(iocb))
-            {
-                x = &pvfs_kiocb;
-            }
-            else /* asynchronous iocb */
-            {
-                x = kiocb_alloc();
-                if (x == NULL)
-                {
-                    error = -ENOMEM;
-                    /* drop the buffer index */
-                    pvfs_bufmap_put(buffer_index);
-                    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: pvfs_bufmap_put %d\n",
-                            buffer_index);
-                    /* drop the reference count and deallocate */
-                    put_op(new_op);
-                    goto out_error;
-                }
-                gossip_debug(GOSSIP_FILE_DEBUG, "kiocb_alloc: %p\n", x);
-                /* 
-                 * destructor function to make sure that we free
-                 * up this allocated piece of memory 
-                 */
-                iocb->ki_dtor = pvfs2_aio_dtor;
-            }
-            /* If user requested synchronous type of operation */
-            if (is_sync_kiocb(iocb))
-            {
-                /* 
-                 * Stage the operation!
-                 */
-                ret = service_operation(
-                        new_op, "pvfs2_file_aio_read",  
-                        get_interruptible_flag(inode));
-                if (ret < 0)
-                {
-                    handle_sync_aio_error();
-                    /*
-                      don't write an error to syslog on signaled operation
-                      termination unless we've got debugging turned on, as
-                      this can happen regularly (i.e. ctrl-c)
-                    */
-                    if (ret == -EINTR)
-                    {
-                        gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: returning error %d\n" 
-                                , (int) ret);
-                    }
-                    else
-                    {
-                        gossip_err(
-                            "pvfs2_file_aio_read: error reading from "
-                            " handle %llu, "
-                            "\n  -- returning %d\n",
-                            llu(pvfs2_ino_to_handle(inode->i_ino)),
-                            (int) ret);
-                    }
-                    error = ret;
-                    goto out_error;
-                }
-                /* copy data out to destination */
-                if (new_op->downcall.resp.io.amt_complete)
-                {
-                    ret = pvfs_bufmap_copy_to_user(
-                            current_buf, buffer_index, 
-                            new_op->downcall.resp.io.amt_complete);
-                }
-                if (ret)
-                {
-                    gossip_debug(GOSSIP_FILE_DEBUG, "Failed to copy user buffer %d\n", (int) ret);
-                    new_op->downcall.status = ret;
-                    handle_sync_aio_error();
-                    error = ret;
-                    goto out_error;
-                }
-                error = new_op->downcall.resp.io.amt_complete;
-                wake_up_daemon_for_return(new_op);
-                pvfs_bufmap_put(buffer_index);
-                gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: pvfs_bufmap_put %d\n",
-                        buffer_index);
-                if (error > 0)
-                {
-                    SetAtimeFlag(pvfs2_inode);
-                    inode->i_atime = CURRENT_TIME;
-                    mark_inode_dirty_sync(inode);
-                }
-                /* new_op is freed by the client-daemon */
-                goto out_error;
-            }
-            else
-            {
-                /* 
-                 * We need to set the cancellation callbacks + 
-                 * other state information
-                 * here if the asynchronous request is going to
-                 * be successfully submitted 
-                 */
-                fill_default_kiocb(x, current, iocb, PVFS_IO_READ,
-                        buffer_index, new_op, current_buf,
-                        offset, count,
-                        &pvfs2_aio_cancel);
-                /*
-                 * We need to be able to retrieve this structure from
-                 * the op structure as well, since the client-daemon
-                 * needs to send notifications upon aio_completion.
-                 */
-                new_op->priv = x;
-                /* and stash it away in the kiocb structure as well */
-                iocb->private = x;
-                /*
-                 * Add it to the list of ops to be serviced
-                 * but don't wait for it to be serviced. 
-                 * Return immediately 
-                 */
-                service_operation(new_op, "pvfs2_file_aio_read", 
-                        PVFS2_OP_ASYNC);
-                gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: queued "
-                        " read operation [%ld for %d]\n",
-                            (unsigned long) offset, (int) count);
-                error = -EIOCBQUEUED;
-                /*
-                 * All cleanups done upon completion
-                 * (OR) cancellation!
-                 */
-            }
-        }
-        /* I don't think this path will ever be taken */
-        else { /* retry and see what is the status! */
-            error = pvfs2_aio_retry(iocb);
-        }
+        gossip_lerr("Invalid iovector (%p) or invalid iovec count (%ld)\n",
+                iov, nr_segs);
+        goto out_error;
     }
-out_error:
-    return error;
-}
-
-/*
- * This function will do the following,
- * On an error, it returns a -ve error number.
- * For a synchronous iocb, we copy the user's data into the 
- * buffers before returning and
- * the count of how much was actually written.
- * For a first-time asynchronous iocb, we copy the user's
- * data into the buffers before submitting the 
- * I/O to the client-daemon and do not wait
- * for the matching downcall to be written and we 
- * return a special -EIOCBQUEUED
- * to indicate that we have queued the request.
- * NOTE: Unlike typical aio requests
- * that get completion notification from interrupt
- * context, we get completion notification from a process
- * context (i.e. the client daemon). 
- */
-static ssize_t 
-pvfs2_file_aio_write(struct kiocb *iocb, const char __user *buffer,
-        size_t count, loff_t offset)
-{
-    struct file *filp = NULL;
-    struct inode *inode = NULL;
-    ssize_t error = -EINVAL;
-
-    if (count == 0)
+    if (nr_segs > 1)
     {
-        return 0;
+        gossip_lerr("%s: not implemented yet (aio with %ld segments)\n",
+                rw->fnstr, nr_segs);
+        goto out_error;
     }
-    if (iocb->ki_pos != offset)
+    count = 0;
+    /* Compute total and max number of segments after split */
+    if ((max_new_nr_segs = estimate_max_iovecs(iov, nr_segs, &count)) < 0)
     {
-        return -EINVAL;
+        gossip_lerr("%s: could not estimate iovecs %ld\n", rw->fnstr, max_new_nr_segs);
+        goto out_error;
     }
     if (unlikely(((ssize_t)count)) < 0)
     {
-        return -EINVAL;
+        gossip_lerr("%s: count overflow\n", rw->fnstr);
+        goto out_error;
     }
-    if (access_ok(VERIFY_READ, buffer, count) == 0)
+    /* synchronous I/O */
+    if (!rw->async)
     {
-        return -EFAULT;
+        error = do_direct_readv_writev(rw);
+        goto out_error;
     }
-    filp = iocb->ki_filp;
-    if (filp && filp->f_mapping 
-             && (inode = filp->f_mapping->host))
+    /* Asynchronous I/O */
+    if (rw->type == IO_WRITE)
     {
         int ret;
-        /* perform generic linux kernel tests for 
-         * sanity of write arguments 
-         * NOTE: this is particularly helpful in 
-         * handling fsize rlimit properly 
-         */
+        /* perform generic tests for sanity of write arguments */
 #ifdef PVFS2_LINUX_KERNEL_2_4
         ret = pvfs2_precheck_file_write(filp, inode, &count, offset);
 #else
-        ret = generic_write_checks(filp, &offset, &count,
-                S_ISBLK(inode->i_mode));
+        ret = generic_write_checks(filp, offset, &count, S_ISBLK(inode->i_mode));
 #endif
-        if (ret != 0 || count == 0)
+        if (ret != 0)
         {
-            gossip_err("pvfs2_file_aio_write: failed generic "
-                    " argument checks.\n");
-            return(ret);
+            gossip_err("%s: failed generic "
+                    " argument checks.\n", rw->fnstr);
+            return ret;
         }
     }
-    /* Each I/O operation is not allowed to be greater than our block size */
-    if (count > pvfs_bufmap_size_query())
+    if (count == 0)
     {
-        gossip_err("aio_write: cannot transfer (%d) bytes"
+        error = 0;
+        goto out_error;
+    }
+    else if (count > pvfs_bufmap_size_query())
+    {
+        /* TODO: Asynchronous I/O operation is not allowed to 
+         * be greater than our block size 
+         */
+        gossip_lerr("%s: cannot transfer (%zd) bytes"
                 " (larger than block size %d)\n",
-                (int) count, pvfs_bufmap_size_query());
-        return -EINVAL;
+                rw->fnstr, count, pvfs_bufmap_size_query());
+        goto out_error;
     }
-    error = -EINVAL;
-    if (filp && inode)
+    gossip_debug(GOSSIP_FILE_DEBUG, "Posting asynchronous I/O operation\n");
+    /* First time submission */
+    if ((x = (pvfs2_kiocb *) iocb->private) == NULL)
     {
-        ssize_t ret = 0;
-        pvfs2_kiocb *x = NULL;
-
-        /* First time submission */
-        if ((x = (pvfs2_kiocb *) iocb->private) == NULL)
+        int buffer_index = -1;
+        pvfs2_kernel_op_t *new_op = NULL;
+        char __user *current_buf = (char *) rw->dest.address.iov[0].iov_base;
+        pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
+        
+        new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
+        if (!new_op)
+        {
+            error = -ENOMEM;
+            goto out_error;
+        }
+        /* Increase ref count */
+        get_op(new_op);
+        /* Asynchronous I/O */
+        new_op->upcall.req.io.async_vfs_io = PVFS_VFS_ASYNC_IO;
+        new_op->upcall.req.io.io_type = (rw->type == IO_READ) ?
+                                        PVFS_IO_READ : PVFS_IO_WRITE;
+        new_op->upcall.req.io.refn = pvfs2_inode->refn;
+        error = pvfs_bufmap_get(&buffer_index);
+        if (error < 0)
+        {
+            gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get()"
+                    " failure %ld\n", rw->fnstr, (long) error);
+            /* drop ref count and possibly de-allocate */
+            put_op(new_op);
+            goto out_error;
+        }
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get %d\n",
+                rw->fnstr, buffer_index);
+        new_op->upcall.req.io.buf_index = buffer_index;
+        new_op->upcall.req.io.count = count;
+        new_op->upcall.req.io.offset = *offset;
+        if (rw->type == IO_WRITE)
         {
-            int buffer_index = -1;
-            pvfs2_kernel_op_t *new_op = NULL;
-            pvfs2_kiocb pvfs_kiocb;
-            char __user *current_buf = (char *) buffer;
-            pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
-            
-            new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
-            if (!new_op)
-            {
-                error = -ENOMEM;
-                goto out_error;
-            }
-            /* Increase ref count */
-            get_op(new_op);
-            /* (A)synchronous I/O */
-            new_op->upcall.req.io.async_vfs_io = 
-                is_sync_kiocb(iocb) ? PVFS_VFS_SYNC_IO 
-                                    : PVFS_VFS_ASYNC_IO;
-            new_op->upcall.req.io.io_type = PVFS_IO_WRITE;
-            new_op->upcall.req.io.refn = pvfs2_inode->refn;
-            error = pvfs_bufmap_get(&buffer_index);
-            if (error < 0)
-            {
-                gossip_err("pvfs2_file_aio_write: pvfs_bufmap_get()"
-                        " failure %d\n", (int) ret);
-                /* drop ref count and possibly de-allocate */
-                put_op(new_op);
-                goto out_error;
-            }
-            gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_write: pvfs_bufmap_put %d\n",
-                    buffer_index);
-            new_op->upcall.req.io.buf_index = buffer_index;
-            new_op->upcall.req.io.count = count;
-            new_op->upcall.req.io.offset = offset;
             /* 
-             * copy the data from the application. 
+             * copy the data from the application for writes 
              * Should this be done here even for async I/O? 
              * We could return -EIOCBRETRY here and have 
              * the data copied in the pvfs2_aio_retry routine,
@@ -2096,141 +2518,144 @@ pvfs2_file_aio_write(struct kiocb *iocb,
                     buffer_index, current_buf, count);
             if (error < 0)
             {
-                gossip_debug(GOSSIP_FILE_DEBUG, "Failed to copy user buffer %d\n", (int) ret);
+                gossip_err("%s: Failed to copy user buffer %ld. Make sure that pvfs2-client-core"
+                        " is still running \n", rw->fnstr, (long) error);
                 /* drop the buffer index */
                 pvfs_bufmap_put(buffer_index);
-                gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: pvfs_bufmap_put %d\n",
-                        buffer_index);
+                gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_put %d\n",
+                        rw->fnstr, buffer_index);
                 /* drop the reference count and deallocate */
                 put_op(new_op);
                 goto out_error;
             }
-
-            /* 
-             * if it is a synchronous operation, we
-             * don't allocate anything here 
-             */
-            if (is_sync_kiocb(iocb))
-            {
-                x = &pvfs_kiocb;
-            }
-            else /* asynchronous iocb */
-            {
-                x = kiocb_alloc();
-                if (x == NULL)
-                {
-                    error = -ENOMEM;
-                    /* drop the buffer index */
-                    pvfs_bufmap_put(buffer_index);
-                    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: pvfs_bufmap_put %d\n",
-                            buffer_index);
-                    /* drop the reference count and deallocate */
-                    put_op(new_op);
-                    goto out_error;
-                }
-                gossip_debug(GOSSIP_FILE_DEBUG, "kiocb_alloc: %p\n", x);
-                /* 
-                 * destructor function to make sure that we free 
-                 * up this allocated piece of memory 
-                 */
-                iocb->ki_dtor = pvfs2_aio_dtor;
-            }
-            /* If user requested synchronous type of operation */
-            if (is_sync_kiocb(iocb))
-            {
-                /*
-                 * Stage the operation!
-                 */
-                ret = service_operation(
-                        new_op, "pvfs2_file_aio_write",  
-                        get_interruptible_flag(inode));
-                if (ret < 0)
-                {
-                    handle_sync_aio_error();
-                    /*
-                      don't write an error to syslog on signaled operation
-                      termination unless we've got debugging turned on, as
-                      this can happen regularly (i.e. ctrl-c)
-                    */
-                    if (ret == -EINTR)
-                    {
-                        gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_write: returning error %d\n", 
-                                (int) ret);
-                    }
-                    else
-                    {
-                        gossip_err(
-                            "pvfs2_file_aio_write: error writing to "
-                            " handle %llu, "
-                            "FILE: %s\n  -- "
-                            "returning %d\n",
-                            llu(pvfs2_ino_to_handle(inode->i_ino)),
-                            (filp && filp->f_dentry 
-                             && filp->f_dentry->d_name.name ?
-                             (char *)filp->f_dentry->d_name.name : "UNKNOWN"),
-                            (int) ret);
-                    }
-                    error = ret;
-                    goto out_error;
-                }
-                error = new_op->downcall.resp.io.amt_complete;
-                wake_up_daemon_for_return(new_op);
-                pvfs_bufmap_put(buffer_index);
-                gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_read: pvfs_bufmap_put %d\n",
-                        (int) buffer_index);
-                if (error > 0)
-                {
-                    SetMtimeFlag(pvfs2_inode);
-                    inode->i_mtime = CURRENT_TIME;
-                    mark_inode_dirty_sync(inode);
-                }
-                /* new_op is freed by the client-daemon */
-                goto out_error;
-            }
-            else
-            {
-                /* 
-                 * We need to set the cancellation callbacks + 
-                 * other state information
-                 * here if the asynchronous request is going to
-                 * be successfully submitted 
-                 */
-                fill_default_kiocb(x, current, iocb, PVFS_IO_WRITE,
-                        buffer_index, new_op, current_buf,
-                        offset, count,
-                        &pvfs2_aio_cancel);
-                /*
-                 * We need to be able to retrieve this structure from
-                 * the op structure as well, since the client-daemon
-                 * needs to send notifications upon aio_completion.
-                 */
-                new_op->priv = x;
-                /* and stash it away in the kiocb structure as well */
-                iocb->private = x;
-                /*
-                 * Add it to the list of ops to be serviced
-                 * but don't wait for it to be serviced. 
-                 * Return immediately 
-                 */
-                service_operation(new_op, "pvfs2_file_aio_write", 
-                        PVFS2_OP_ASYNC);
-                gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_aio_write: queued "
-                        " write operation [%ld for %d]\n",
-                            (unsigned long) offset, (int) count);
-                error = -EIOCBQUEUED;
-                /*
-                 * All cleanups done upon completion
-                 * (OR) cancellation!
-                 */
-            }
         }
-        /* I don't think this path is ever taken */
-        else { /* retry and see what is the status! */
-            error = pvfs2_aio_retry(iocb);
+        x = kiocb_alloc();
+        if (x == NULL)
+        {
+            error = -ENOMEM;
+            /* drop the buffer index */
+            pvfs_bufmap_put(buffer_index);
+            gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_put %d\n",
+                    rw->fnstr, buffer_index);
+            /* drop the reference count and deallocate */
+            put_op(new_op);
+            goto out_error;
         }
+        gossip_debug(GOSSIP_FILE_DEBUG, "kiocb_alloc: %p\n", x);
+        /* 
+         * destructor function to make sure that we free
+         * up this allocated piece of memory 
+         */
+        iocb->ki_dtor = pvfs2_aio_dtor;
+        /* 
+         * We need to set the cancellation callbacks + 
+         * other state information
+         * here if the asynchronous request is going to
+         * be successfully submitted 
+         */
+        fill_default_kiocb(x, current, iocb, 
+                (rw->type == IO_READ) ? PVFS_IO_READ : PVFS_IO_WRITE,
+                buffer_index, new_op, current_buf,
+                *offset, count,
+                &pvfs2_aio_cancel);
+        /*
+         * We need to be able to retrieve this structure from
+         * the op structure as well, since the client-daemon
+         * needs to send notifications upon aio_completion.
+         */
+        new_op->priv = x;
+        /* and stash it away in the kiocb structure as well */
+        iocb->private = x;
+        /*
+         * Add it to the list of ops to be serviced
+         * but don't wait for it to be serviced. 
+         * Return immediately 
+         */
+        service_operation(new_op, rw->fnstr, 
+                PVFS2_OP_ASYNC);
+        gossip_debug(GOSSIP_FILE_DEBUG, "%s: queued "
+                " operation [%llu for %zd]\n",
+                rw->fnstr, llu(*offset), count);
+        error = -EIOCBQUEUED;
+        /*
+         * All cleanups done upon completion
+         * (OR) cancellation!
+         */
+    }
+    /* I don't think this path will ever be taken */
+    else { /* retry and see what is the status! */
+        error = pvfs2_aio_retry(iocb);
     }
 out_error:
     return error;
+}
+
+static ssize_t 
+pvfs2_file_aio_read(struct kiocb *iocb, char __user *buffer,
+        size_t count, loff_t offset)
+{
+    struct rw_options rw;
+    struct iovec vec;
+    memset(&rw, 0, sizeof(rw));
+    rw.async = !is_sync_kiocb(iocb);
+    rw.type = IO_READ;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.off.io.offset = &offset;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    rw.iocb = iocb;
+    vec.iov_base = (char __user *) buffer;
+    vec.iov_len  = count;
+    rw.file = iocb->ki_filp;
+    if (!rw.file || !(rw.file)->f_mapping)
+    {
+        return -EINVAL;
+    }
+    rw.inode = (rw.file)->f_mapping->host;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.dest.address.iov = &vec;
+    rw.dest.address.nr_segs = 1;
+
+    if (IS_IMMUTABLE(rw.inode)) 
+    {
+        rw.readahead_size = (rw.inode)->i_size;
+        return generic_file_aio_read(iocb, buffer, count, offset);
+    }
+    else 
+    {
+        rw.readahead_size = 0;
+        return do_direct_aio_read_write(&rw);
+    }
+}
+
+static ssize_t 
+pvfs2_file_aio_write(struct kiocb *iocb, const char __user *buffer,
+        size_t count, loff_t offset)
+{
+    struct rw_options rw;
+    struct iovec vec;
+
+    memset(&rw, 0, sizeof(rw));
+    rw.async = !is_sync_kiocb(iocb);
+    rw.type = IO_WRITE;
+    rw.copy_dest_type = COPY_TO_ADDRESSES;
+    rw.readahead_size = 0;
+    rw.off.io.offset = &offset;
+    rw.copy_to_user = 1;
+    rw.fnstr = __FUNCTION__;
+    rw.iocb = iocb;
+    vec.iov_base = (char __user *) buffer;
+    vec.iov_len  = count;
+    rw.file = iocb->ki_filp;
+    if (!rw.file || !(rw.file)->f_mapping)
+    {
+        return -EINVAL;
+    }
+    rw.inode = (rw.file)->f_mapping->host;
+    rw.pvfs2_inode = PVFS2_I(rw.inode);
+    rw.dest.address.iov = &vec;
+    rw.dest.address.nr_segs = 1;
+    return do_direct_aio_read_write(&rw);
 }
 
 #endif

Index: inode.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/inode.c,v
diff -p -u -r1.67.2.1 -r1.67.2.2
--- inode.c	18 Sep 2006 15:05:22 -0000	1.67.2.1
+++ inode.c	19 Oct 2006 22:17:09 -0000	1.67.2.2
@@ -13,6 +13,7 @@
 #include "pvfs2-kernel.h"
 #include "pvfs2-bufmap.h"
 #include "pvfs2-types.h"
+#include "pvfs2-internal.h"
 
 static int read_one_page(struct page *page)
 {
@@ -160,8 +161,11 @@ struct address_space_operations pvfs2_ad
 void pvfs2_truncate(struct inode *inode)
 {
     loff_t orig_size = i_size_read(inode);
-    gossip_debug(GOSSIP_INODE_DEBUG, "pvfs2: pvfs2_truncate called on inode %d "
-                "with size %ld\n",(int)inode->i_ino, (long) orig_size);
+
+    if (IS_APPEND(inode) || IS_IMMUTABLE(inode))
+        return;
+    gossip_debug(GOSSIP_INODE_DEBUG, "pvfs2: pvfs2_truncate called on inode %llu "
+                "with size %ld\n", llu(get_handle_from_ino(inode)), (long) orig_size);
 
     /* successful truncate when size changes also requires mtime updates 
      * although the mtime updates are propagated lazily!
@@ -348,6 +352,106 @@ struct inode_operations pvfs2_file_inode
 #endif
 };
 
+#if defined(HAVE_IGET5_LOCKED) || defined (HAVE_IGET4_LOCKED)
+
+/*
+ * Given a PVFS2 object identifier (fsid, handle), convert it into a ino_t type
+ * that will be used as a hash-index from where the handle will
+ * be searched for in the VFS hash table of inodes.
+ */
+static inline ino_t pvfs2_handle_hash(PVFS_object_ref *ref)
+{
+    if (!ref)
+        return 0;
+    return pvfs2_handle_to_ino(ref->handle);
+}
+
+/* the ->set callback of iget5_locked and friends. Sorta equivalent to the ->read_inode()
+ * callback if we are using iget and friends 
+ */
+static int pvfs2_set_inode(struct inode *inode, void *data)
+{
+    /* callbacks to set inode number handle */
+    PVFS_object_ref *ref = (PVFS_object_ref *) data;
+    pvfs2_inode_t *pvfs2_inode = NULL;
+
+    pvfs2_inode = PVFS2_I(inode);
+    pvfs2_inode_initialize(pvfs2_inode);
+    pvfs2_inode->refn.fs_id  = ref->fs_id;
+    pvfs2_inode->refn.handle = ref->handle;
+    return 0;
+}
+
+#ifdef HAVE_IGET5_LOCKED
+static int
+pvfs2_test_inode(struct inode *inode, void *data)
+#elif defined(HAVE_IGET4_LOCKED)
+static int 
+pvfs2_test_inode(struct inode *inode, unsigned long ino, void *data)
+#endif
+{
+    /* callbacks to determine if handles match */
+    PVFS_object_ref *ref = (PVFS_object_ref *) data;
+    pvfs2_inode_t *pvfs2_inode = NULL;
+
+    pvfs2_inode = PVFS2_I(inode);
+    return (pvfs2_inode->refn.handle == ref->handle && pvfs2_inode->refn.fs_id == ref->fs_id);
+}
+#endif
+
+/*
+ * Front-end to lookup the inode-cache maintained by the VFS using the PVFS2
+ * file handle instead of the inode number.
+ * Problem with iget() is well-documented in that it can lead to possible
+ * collissions especially for a file-system with 64 bit handles since inode->i_ino
+ * is only a scalar field (32 bits). So the trick now is to use iget4_locked (OR) iget5_locked
+ * if the kernel defines one and set inode number to be just a hash for the
+ * handle
+ * @sb: the file system super block instance
+ * @ref: The PVFS2 object for which we are trying to locate an inode structure
+ * @keep_locked : indicates whether the inode must be simply allocated and not filled
+ * in with the results from a ->getattr. i.e. if keep_locked is set to 0, we do a getattr() and
+ * unlock the inode and if set to 1, we do not issue a getattr() and keep it locked
+ * 
+ * Boy, this function is so ugly with all these macros. I wish I could find a better
+ * way to reduce the macro clutter.
+ */
+struct inode *pvfs2_iget_common(struct super_block *sb, PVFS_object_ref *ref, int keep_locked)
+{
+    struct inode *inode = NULL;
+    unsigned long hash;
+
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+    hash = pvfs2_handle_hash(ref);
+#if defined(HAVE_IGET5_LOCKED)
+    inode = iget5_locked(sb, hash, pvfs2_test_inode, pvfs2_set_inode, ref);
+#elif defined(HAVE_IGET4_LOCKED)
+    inode = iget4_locked(sb, hash, pvfs2_test_inode, ref);
+#endif
+#else
+    hash = (unsigned long) ref->handle;
+#ifdef HAVE_IGET_LOCKED
+    inode = iget_locked(sb, hash);
+#else
+    /* iget() internally issues a call to read_inode() */
+    inode = iget(sb, hash);
+#endif
+#endif
+    if (!keep_locked)
+    {
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED) || defined(HAVE_IGET_LOCKED)
+        if (inode && (inode->i_state & I_NEW))
+        {
+            inode->i_ino = hash; /* needed for stat etc */
+            /* issue a call to read the inode */
+            sb->s_op->read_inode(inode);
+            unlock_new_inode(inode);
+        }
+#endif
+    }
+    return inode;
+}
+
 /** Allocates a Linux inode structure with additional PVFS2-specific
  *  private data (I think -- RobR).
  */
@@ -356,7 +460,7 @@ struct inode *pvfs2_get_custom_inode(
     struct inode *dir,
     int mode,
     dev_t dev,
-    unsigned long ino)
+    PVFS_object_ref object)
 {
     struct inode *inode = NULL;
     pvfs2_inode_t *pvfs2_inode = NULL;
@@ -365,7 +469,7 @@ struct inode *pvfs2_get_custom_inode(
                 "MAJOR(dev)=%u | MINOR(dev)=%u)\n", sb, MAJOR(dev),
                 MINOR(dev));
 
-    inode = iget(sb, ino);
+    inode = pvfs2_iget(sb, &object);
     if (inode)
     {
 	/* initialize pvfs2 specific private data */
@@ -378,10 +482,7 @@ struct inode *pvfs2_get_custom_inode(
             return NULL;
         }
 
-        if (inode->i_ino != PVFS2_SB(inode->i_sb)->root_handle)
-        {
-            inode->i_mode = mode;
-        }
+        inode->i_mode = mode;
         inode->i_mapping->host = inode;
         inode->i_uid = current->fsuid;
         inode->i_gid = current->fsgid;
@@ -429,8 +530,8 @@ struct inode *pvfs2_get_custom_inode(
             goto error;
 	}
 #if !defined(PVFS2_LINUX_KERNEL_2_4) && defined(HAVE_GENERIC_GETXATTR) && defined(CONFIG_FS_POSIX_ACL)
-        gossip_debug(GOSSIP_ACL_DEBUG, "Initializing ACL's for inode %ld\n", 
-                (long) inode->i_ino);
+        gossip_debug(GOSSIP_ACL_DEBUG, "Initializing ACL's for inode %llu\n", 
+                llu(get_handle_from_ino(inode)));
         /* Initialize the ACLs of the new inode */
         pvfs2_init_acl(inode, dir);
 #endif

Index: namei.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/namei.c,v
diff -p -u -r1.76.4.1 -r1.76.4.2
--- namei.c	18 Sep 2006 15:05:22 -0000	1.76.4.1
+++ namei.c	19 Oct 2006 22:17:09 -0000	1.76.4.2
@@ -89,7 +89,7 @@ static struct dentry *pvfs2_lookup(
     new_op = op_alloc(PVFS2_VFS_OP_LOOKUP);
     if (!new_op)
     {
-	return NULL;
+	return ERR_PTR(-ENOMEM);
     }
 
 #ifdef PVFS2_LINUX_KERNEL_2_4
@@ -109,14 +109,20 @@ static struct dentry *pvfs2_lookup(
     {
         sb = dir->i_sb;
         parent = PVFS2_I(dir);
-        if (parent && parent->refn.handle && parent->refn.fs_id)
+        if (parent && parent->refn.handle != PVFS_HANDLE_NULL 
+                && parent->refn.fs_id != PVFS_FS_ID_NULL)
         {
             new_op->upcall.req.lookup.parent_refn = parent->refn;
         }
         else
         {
+#if defined(HAVE_IGET4_LOCKED) || defined(HAVE_IGET5_LOCKED)
+            gossip_lerr("Critical error: i_ino cannot be relied on when using iget5/iget4\n");
+            op_release(new_op);
+            return ERR_PTR(-EINVAL);
+#endif
             new_op->upcall.req.lookup.parent_refn.handle =
-                pvfs2_ino_to_handle(dir->i_ino);
+                get_handle_from_ino(dir);
             new_op->upcall.req.lookup.parent_refn.fs_id =
                 PVFS2_SB(sb)->fs_id;
         }
@@ -155,20 +161,14 @@ static struct dentry *pvfs2_lookup(
     /* lookup inode matching name (or add if not there) */
     if (ret > -1)
     {
-	inode = iget(sb, pvfs2_handle_to_ino(
-                         new_op->downcall.resp.lookup.refn.handle));
+	inode = pvfs2_iget(sb, &new_op->downcall.resp.lookup.refn);
 	if (inode && !is_bad_inode(inode))
 	{
             struct dentry *res;
-	    found_pvfs2_inode = PVFS2_I(inode);
-
-	    /* store the retrieved handle and fs_id */
-	    found_pvfs2_inode->refn = new_op->downcall.resp.lookup.refn;
 
 	    /* update dentry/inode pair into dcache */
 	    dentry->d_op = &pvfs2_dentry_operations;
 
-            gossip_debug(GOSSIP_NAME_DEBUG, "calling pvfs2_d_splice_alias\n");
             res = pvfs2_d_splice_alias(dentry, inode);
 
             gossip_debug(GOSSIP_NAME_DEBUG, "Lookup success (inode ct = %d)\n",
@@ -411,8 +411,9 @@ static int pvfs2_rename(
       use the root handle/fs_id as specified by the
       inode's corresponding superblock
     */
-    if (pvfs2_old_parent_inode->refn.handle &&
-        pvfs2_old_parent_inode->refn.fs_id)
+    if (pvfs2_old_parent_inode &&
+            pvfs2_old_parent_inode->refn.handle != PVFS_HANDLE_NULL &&
+            pvfs2_old_parent_inode->refn.fs_id != PVFS_FS_ID_NULL)
     {
         new_op->upcall.req.rename.old_parent_refn =
             pvfs2_old_parent_inode->refn;
@@ -427,8 +428,9 @@ static int pvfs2_rename(
     }
 
     /* do the same for the new parent */
-    if (pvfs2_new_parent_inode->refn.handle &&
-        pvfs2_new_parent_inode->refn.fs_id)
+    if (pvfs2_new_parent_inode &&
+            pvfs2_new_parent_inode->refn.handle != PVFS_HANDLE_NULL &&
+            pvfs2_new_parent_inode->refn.fs_id != PVFS_FS_ID_NULL)
     {
         new_op->upcall.req.rename.new_parent_refn =
             pvfs2_new_parent_inode->refn;

Index: pvfs2-bufmap.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/pvfs2-bufmap.c,v
diff -p -u -r1.39.14.1 -r1.39.14.2
--- pvfs2-bufmap.c	18 Sep 2006 15:05:22 -0000	1.39.14.1
+++ pvfs2-bufmap.c	19 Oct 2006 22:17:10 -0000	1.39.14.2
@@ -345,20 +345,20 @@ void readdir_index_put(int buffer_index)
  *
  * returns 0 on success, -errno on failure
  */
-int pvfs_bufmap_copy_to_user(void __user *to, int buffer_index, int size)
+int pvfs_bufmap_copy_to_user(void __user *to, int buffer_index, size_t size)
 {
-    int ret = 0, amt_copied = 0, amt_remaining = 0;
-    int cur_copy_size = 0, index = 0;
+    size_t ret = 0, amt_copied = 0, amt_remaining = 0, cur_copy_size = 0;
+    int index = 0;
     void __user *offset = to;
     void *from_kaddr = NULL;
     struct pvfs_bufmap_desc *from = &desc_array[buffer_index];
 
     gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_to_user: to %p, from %p, index %d, "
-                "size %d\n", to, from, buffer_index, size);
+                "size %zd\n", to, from, buffer_index, size);
 
     if (bufmap_init == 0)
     {
-        gossip_err("pvfs2_bufmap_copy_to_user: not yet "
+        gossip_err("pvfs_bufmap_copy_to_user: not yet "
                     "initialized.\n");
         gossip_err("pvfs2: please confirm that pvfs2-client daemon is running.\n");
         return -EIO;
@@ -376,7 +376,7 @@ int pvfs_bufmap_copy_to_user(void __user
 
         if (ret)
         {
-            gossip_debug(GOSSIP_BUFMAP_DEBUG, "Failed to copy data to user space %d\n", ret);
+            gossip_debug(GOSSIP_BUFMAP_DEBUG, "Failed to copy data to user space %zd\n", ret);
             return -EFAULT;
         }
 
@@ -388,19 +388,19 @@ int pvfs_bufmap_copy_to_user(void __user
 }
 
 int pvfs_bufmap_copy_to_kernel(
-    void *to, int buffer_index, int size)
+    void *to, int buffer_index, size_t size)
 {
-    int amt_copied = 0, amt_remaining = 0;
-    int cur_copy_size = 0, index = 0;
+    size_t amt_copied = 0, amt_remaining = 0, cur_copy_size = 0;
+    int index = 0;
     void *offset = to, *from_kaddr = NULL;
     struct pvfs_bufmap_desc *from = &desc_array[buffer_index];
 
-    gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_to_kernel: to %p, index %d, size %d\n",
+    gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_to_kernel: to %p, index %d, size %zd\n",
                 to, buffer_index, size);
 
     if (bufmap_init == 0)
     {
-        gossip_err("pvfs2_bufmap_copy_to_kernel: not yet "
+        gossip_err("pvfs_bufmap_copy_to_kernel: not yet "
                     "initialized.\n");
         gossip_err("pvfs2: please confirm that pvfs2-client daemon is running.\n");
         return -EIO;
@@ -430,20 +430,20 @@ int pvfs_bufmap_copy_to_kernel(
  * returns 0 on success, -errno on failure
  */
 int pvfs_bufmap_copy_from_user(
-    int buffer_index, void __user *from, int size)
+    int buffer_index, void __user *from, size_t size)
 {
-    int ret = 0, amt_copied = 0, amt_remaining = 0;
-    int cur_copy_size = 0, index = 0;
+    size_t ret = 0, amt_copied = 0, amt_remaining = 0, cur_copy_size = 0;
+    int index = 0;
     void __user *offset = from;
     void *to_kaddr = NULL;
     struct pvfs_bufmap_desc *to = &desc_array[buffer_index];
 
     gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_from_user: from %p, index %d, "
-                "size %d\n", from, buffer_index, size);
+                "size %zd\n", from, buffer_index, size);
 
     if (bufmap_init == 0)
     {
-        gossip_err("pvfs2_bufmap_copy_from_user: not yet "
+        gossip_err("pvfs_bufmap_copy_from_user: not yet "
                     "initialized.\n");
         gossip_err("pvfs2: please confirm that pvfs2-client daemon is running.\n");
         return -EIO;
@@ -488,10 +488,10 @@ int pvfs_bufmap_copy_iovec_from_user(
     int buffer_index,
     const struct iovec *iov,
     unsigned long nr_segs,
-    int size)
+    size_t size)
 {
-    int ret = 0, amt_copied = 0; 
-    int cur_copy_size = 0, index = 0;
+    size_t ret = 0, amt_copied = 0, cur_copy_size = 0;
+    int index = 0;
     void *to_kaddr = NULL;
     void __user *from_addr = NULL;
     struct iovec *copied_iovec = NULL;
@@ -499,11 +499,11 @@ int pvfs_bufmap_copy_iovec_from_user(
     unsigned int seg, page_offset = 0;
 
     gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_iovec_from_user: index %d, "
-                "size %d\n", buffer_index, size);
+                "size %zd\n", buffer_index, size);
 
     if (bufmap_init == 0)
     {
-        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs2_bufmap_copy_iovec_from_user: not yet "
+        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_iovec_from_user: not yet "
                     "initialized; returning\n");
         return -EIO;
     }
@@ -528,7 +528,7 @@ int pvfs_bufmap_copy_iovec_from_user(
     }
     if (amt_copied != size)
     {
-        gossip_err("pvfs2_bufmap_copy_iovec_from_user: computed total (%d) is not equal to (%d)\n",
+        gossip_err("pvfs2_bufmap_copy_iovec_from_user: computed total (%zd) is not equal to (%zd)\n",
                 amt_copied, size);
         kfree(copied_iovec);
         return -EINVAL;
@@ -572,7 +572,7 @@ int pvfs_bufmap_copy_iovec_from_user(
         ret = copy_from_user(to_kaddr + page_offset, from_addr, cur_copy_size);
         pvfs2_kunmap(to->page_array[index]);
 #if 0
-        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs2_bufmap_copy_iovec_from_user: copying from user %p to kernel %p %d bytes (to_kddr: %p,page_offset: %d)\n",
+        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs2_bufmap_copy_iovec_from_user: copying from user %p to kernel %p %zd bytes (to_kddr: %p,page_offset: %d)\n",
                 from_addr, to_kaddr + page_offset, cur_copy_size, to_kaddr, page_offset); 
 #endif
         if (ret)
@@ -592,12 +592,122 @@ int pvfs_bufmap_copy_iovec_from_user(
         }
     }
     kfree(copied_iovec);
+    return 0;
+}
+
+/* pvfs_bufmap_copy_iovec_from_kernel()
+ *
+ * copies data from several kernel space address's in an iovec
+ * to a mapped buffer
+ *
+ * Note that the mapped buffer is a series of pages and therefore
+ * the copies have to be split by PAGE_SIZE bytes at a time.
+ * Note that this routine checks that summation of iov_len
+ * across all the elements of iov is equal to size.
+ *
+ * returns 0 on success, -errno on failure
+ */
+int pvfs_bufmap_copy_iovec_from_kernel(
+    int buffer_index,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    size_t size)
+{
+    size_t amt_copied = 0, cur_copy_size = 0;
+    int index = 0;
+    void *to_kaddr = NULL;
+    void *from_kaddr = NULL;
+    struct iovec *copied_iovec = NULL;
+    struct pvfs_bufmap_desc *to = &desc_array[buffer_index];
+    unsigned int seg, page_offset = 0;
+
+    gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_iovec_from_kernel: index %d, "
+                "size %zd\n", buffer_index, size);
+
+    if (bufmap_init == 0)
+    {
+        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_iovec_from_kernel: not yet "
+                    "initialized; returning\n");
+        return -EIO;
+    }
+    /*
+     * copy the passed in iovec so that we can change some of its fields
+     */
+    copied_iovec = (struct iovec *) kmalloc(nr_segs * sizeof(struct iovec),
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (copied_iovec == NULL)
+    {
+        gossip_err("pvfs2_bufmap_copy_iovec_from_kernel: failed allocating memory\n");
+        return -ENOMEM;
+    }
+    memcpy(copied_iovec, iov, nr_segs * sizeof(struct iovec));
+    /*
+     * Go through each segment in the iovec and make sure that
+     * the summation of iov_len matches the given size.
+     */
+    for (seg = 0, amt_copied = 0; seg < nr_segs; seg++)
+    {
+        amt_copied += copied_iovec[seg].iov_len;
+    }
     if (amt_copied != size)
     {
-	gossip_err("Failed to copy all the data from user space [%d instead of %d]\n",
+        gossip_err("pvfs2_bufmap_copy_iovec_from_kernel: computed total (%zd) is not equal to (%zd)\n",
                 amt_copied, size);
-	return -EIO;
+        kfree(copied_iovec);
+        return -EINVAL;
     }
+
+    index = 0;
+    amt_copied = 0;
+    seg = 0;
+    page_offset = 0;
+    /* Go through each segment in the iovec and copy its
+     * buffer into the mapped buffer one page at a time though
+     */
+    while (amt_copied < size)
+    {
+	struct iovec *iv = &copied_iovec[seg];
+        int inc_index = 0;
+
+        if (iv->iov_len < (PAGE_SIZE - page_offset)) 
+        {
+            cur_copy_size = iv->iov_len;
+            seg++;
+            from_kaddr = iv->iov_base;
+            inc_index = 0;
+        }
+        else if (iv->iov_len == (PAGE_SIZE - page_offset))
+        {
+            cur_copy_size = iv->iov_len;
+            seg++;
+            from_kaddr = iv->iov_base;
+            inc_index = 1;
+        }
+        else 
+        {
+            cur_copy_size = (PAGE_SIZE - page_offset);
+            from_kaddr = iv->iov_base;
+            iv->iov_base += cur_copy_size;
+            iv->iov_len -= cur_copy_size;
+            inc_index = 1;
+        }
+        to_kaddr = pvfs2_kmap(to->page_array[index]);
+        memcpy(to_kaddr + page_offset, from_kaddr, cur_copy_size);
+        pvfs2_kunmap(to->page_array[index]);
+#if 0
+        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs2_bufmap_copy_iovec_from_kernel: copying from kernel %p to kernel %p %zd bytes (to_kddr: %p,page_offset: %d)\n",
+                from_kaddr, to_kaddr + page_offset, cur_copy_size, to_kaddr, page_offset); 
+#endif
+        amt_copied += cur_copy_size;
+        if (inc_index) {
+            page_offset = 0;
+            index++;
+        }
+        else {
+            page_offset += cur_copy_size;
+        }
+    }
+    kfree(copied_iovec);
     return 0;
 }
 
@@ -612,10 +722,11 @@ int pvfs_bufmap_copy_to_user_iovec(
     int buffer_index,
     const struct iovec *iov,
     unsigned long nr_segs,
-    int size)
+    size_t size)
 {
-    int ret = 0, amt_copied = 0;
-    int cur_copy_size = 0, index = 0;
+    size_t ret = 0, amt_copied = 0;
+    size_t cur_copy_size = 0;
+    int index = 0;
     void *from_kaddr = NULL;
     void __user *to_addr = NULL;
     struct iovec *copied_iovec = NULL;
@@ -623,7 +734,7 @@ int pvfs_bufmap_copy_to_user_iovec(
     unsigned int seg, page_offset = 0;
 
     gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_to_user_iovec: index %d, "
-                "size %d\n", buffer_index, size);
+                "size %zd\n", buffer_index, size);
 
     if (bufmap_init == 0)
     {
@@ -652,7 +763,7 @@ int pvfs_bufmap_copy_to_user_iovec(
     }
     if (amt_copied < size)
     {
-        gossip_err("pvfs2_bufmap_copy_to_user_iovec: computed total (%d) is less than (%d)\n",
+        gossip_err("pvfs2_bufmap_copy_to_user_iovec: computed total (%zd) is less than (%zd)\n",
                 amt_copied, size);
         kfree(copied_iovec);
         return -EINVAL;
@@ -720,6 +831,119 @@ int pvfs_bufmap_copy_to_user_iovec(
     return 0;
 }
 
+/* pvfs_bufmap_copy_to_kernel_iovec()
+ *
+ * copies data to several kernel space address's in an iovec
+ * from a mapped buffer
+ *
+ * returns 0 on success, -errno on failure
+ */
+int pvfs_bufmap_copy_to_kernel_iovec(
+    int buffer_index,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    size_t size)
+{
+    size_t amt_copied = 0;
+    size_t cur_copy_size = 0;
+    int index = 0;
+    void *from_kaddr = NULL;
+    void *to_kaddr = NULL;
+    struct iovec *copied_iovec = NULL;
+    struct pvfs_bufmap_desc *from = &desc_array[buffer_index];
+    unsigned int seg, page_offset = 0;
+
+    gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_to_kernel_iovec: index %d, "
+                "size %zd\n", buffer_index, size);
+
+    if (bufmap_init == 0)
+    {
+        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs2_bufmap_copy_to_kernel_iovec: not yet "
+                    "initialized; returning\n");
+        return -EIO;
+    }
+    /*
+     * copy the passed in iovec so that we can change some of its fields
+     */
+    copied_iovec = (struct iovec *) kmalloc(nr_segs * sizeof(struct iovec),
+            PVFS2_BUFMAP_GFP_FLAGS);
+    if (copied_iovec == NULL)
+    {
+        gossip_err("pvfs2_bufmap_copy_to_kernel_iovec: failed allocating memory\n");
+        return -ENOMEM;
+    }
+    memcpy(copied_iovec, iov, nr_segs * sizeof(struct iovec));
+    /*
+     * Go through each segment in the iovec and make sure that
+     * the summation of iov_len is greater than the given size.
+     */
+    for (seg = 0, amt_copied = 0; seg < nr_segs; seg++)
+    {
+        amt_copied += copied_iovec[seg].iov_len;
+    }
+    if (amt_copied < size)
+    {
+        gossip_err("pvfs2_bufmap_copy_to_kernel_iovec: computed total (%zd) is less than (%zd)\n",
+                amt_copied, size);
+        kfree(copied_iovec);
+        return -EINVAL;
+    }
+
+    index = 0;
+    amt_copied = 0;
+    seg = 0;
+    page_offset = 0;
+    /* 
+     * Go through each segment in the iovec and copy from the mapper buffer,
+     * but make sure that we do so one page at a time.
+     */
+    while (amt_copied < size)
+    {
+	struct iovec *iv = &copied_iovec[seg];
+        int inc_index = 0;
+
+        if (iv->iov_len < (PAGE_SIZE - page_offset))
+        {
+            cur_copy_size = iv->iov_len;
+            seg++;
+            to_kaddr = iv->iov_base;
+            inc_index = 0;
+        }
+        else if (iv->iov_len == (PAGE_SIZE - page_offset))
+        {
+            cur_copy_size = iv->iov_len;
+            seg++;
+            to_kaddr = iv->iov_base;
+            inc_index = 1;
+        }
+        else 
+        {
+            cur_copy_size = (PAGE_SIZE - page_offset);
+            to_kaddr = iv->iov_base;
+            iv->iov_base += cur_copy_size;
+            iv->iov_len  -= cur_copy_size;
+            inc_index = 1;
+        }
+        from_kaddr = pvfs2_kmap(from->page_array[index]);
+        memcpy(to_kaddr, from_kaddr + page_offset, cur_copy_size);
+        pvfs2_kunmap(from->page_array[index]);
+#if 0
+        gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs2_bufmap_copy_to_kernel_iovec: copying to kernel %p from kernel %p %d bytes (from_kaddr:%p, page_offset:%d)\n",
+                to_kaddr, from_kaddr + page_offset, cur_copy_size, from_kaddr, page_offset); 
+#endif
+        amt_copied += cur_copy_size;
+        if (inc_index) {
+            page_offset = 0;
+            index++;
+        }
+        else {
+            page_offset += cur_copy_size;
+        }
+    }
+    kfree(copied_iovec);
+    return 0;
+}
+
 #ifdef HAVE_AIO_VFS_SUPPORT
 
 /* pvfs_bufmap_copy_to_user_task()
@@ -734,14 +958,14 @@ int pvfs_bufmap_copy_to_user_iovec(
  * returns number of bytes copied on success,
  * -errno on failure
  */
-int pvfs_bufmap_copy_to_user_task(
+size_t pvfs_bufmap_copy_to_user_task(
         struct task_struct *tsk,
         void __user *to, 
         int buffer_index,
-        int size)
+        size_t size)
 {
-    int ret = 0, amt_copied = 0, amt_remaining = 0;
-    int cur_copy_size = 0, index = 0;
+    size_t ret = 0, amt_copied = 0, amt_remaining = 0, cur_copy_size = 0;
+    int index = 0;
     void *from_kaddr = NULL;
     struct pvfs_bufmap_desc *from = &desc_array[buffer_index];
 
@@ -755,7 +979,7 @@ int pvfs_bufmap_copy_to_user_task(
 
     gossip_debug(GOSSIP_BUFMAP_DEBUG, "pvfs_bufmap_copy_to_user_task: "
             " PID: %d, to %p, from %p, index %d, "
-            " size %d\n", tsk->pid, to, from, buffer_index, size);
+            " size %zd\n", tsk->pid, to, from, buffer_index, size);
 
     if (bufmap_init == 0)
     {

Index: pvfs2-bufmap.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/pvfs2-bufmap.h,v
diff -p -u -r1.13.20.1 -r1.13.20.2
--- pvfs2-bufmap.h	18 Sep 2006 15:05:22 -0000	1.13.20.1
+++ pvfs2-bufmap.h	19 Oct 2006 22:17:10 -0000	1.13.20.2
@@ -41,36 +41,48 @@ void readdir_index_put(
 int pvfs_bufmap_copy_from_user(
     int buffer_index,
     void __user *from,
-    int size);
+    size_t size);
 
 int pvfs_bufmap_copy_iovec_from_user(
     int buffer_index,
     const struct iovec *iov,
     unsigned long nr_segs,
-    int size);
+    size_t size);
+
+int pvfs_bufmap_copy_iovec_from_kernel(
+    int buffer_index,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    size_t size);
 
 int pvfs_bufmap_copy_to_user(
     void __user *to,
     int buffer_index,
-    int size);
+    size_t size);
 
 int pvfs_bufmap_copy_to_user_iovec(
     int buffer_index,
     const struct iovec *iov,
     unsigned long nr_segs,
-    int size);
+    size_t size);
+
+int pvfs_bufmap_copy_to_kernel_iovec(
+    int buffer_index,
+    const struct iovec *iov,
+    unsigned long nr_segs,
+    size_t size);
 
 int pvfs_bufmap_copy_to_kernel(
     void *to,
     int buffer_index,
-    int size);
+    size_t size);
 
 #ifdef HAVE_AIO_VFS_SUPPORT
-int pvfs_bufmap_copy_to_user_task(
+size_t pvfs_bufmap_copy_to_user_task(
         struct task_struct *tsk,
         void __user *to,
         int buffer_index, 
-        int size);
+        size_t size);
 #endif
 
 #endif /* __PVFS2_BUFMAP_H */

Index: pvfs2-cache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/pvfs2-cache.c,v
diff -p -u -r1.31.2.1 -r1.31.2.2
--- pvfs2-cache.c	18 Sep 2006 15:05:22 -0000	1.31.2.1
+++ pvfs2-cache.c	19 Oct 2006 22:17:10 -0000	1.31.2.2
@@ -109,8 +109,12 @@ char *get_opname_string(pvfs2_kernel_op_
             return "OP_CANCEL";
         else if (type == PVFS2_VFS_OP_FSYNC)
             return "OP_FSYNC";
+        else if (type == PVFS2_VFS_OP_FSKEY)
+            return "OP_FSKEY";
+        else if (type == PVFS2_VFS_OP_FILE_IOX)
+            return "OP_FILE_IOX";
     }
-    return "OP_INVALID";
+    return "OP_UNKNOWN?";
 }
 
 static pvfs2_kernel_op_t *op_alloc_common(int32_t op_linger, int32_t type)
@@ -251,6 +255,7 @@ static void pvfs2_inode_cache_ctor(
     if (flags & SLAB_CTOR_CONSTRUCTOR)
     {
         memset(pvfs2_inode, 0, sizeof(pvfs2_inode_t));
+        ClearInitFlag(pvfs2_inode);
 
         pvfs2_inode_initialize(pvfs2_inode);
 

Index: pvfs2-kernel.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/pvfs2-kernel.h,v
diff -p -u -r1.119.2.1 -r1.119.2.2
--- pvfs2-kernel.h	18 Sep 2006 15:05:22 -0000	1.119.2.1
+++ pvfs2-kernel.h	19 Oct 2006 22:17:10 -0000	1.119.2.2
@@ -144,7 +144,6 @@ typedef unsigned long sector_t;
 #define PVFS2_SEEK_END                 0x00000002
 #define PVFS2_MAX_NUM_OPTIONS          0x00000004
 #define PVFS2_MAX_MOUNT_OPT_LEN        0x00000080
-#define PVFS2_NUM_READDIR_RETRIES      0x0000000A
 #define PVFS2_MAX_FSKEY_LEN            64
 
 #define MAX_DEV_REQ_UPSIZE (2*sizeof(int32_t) +   \
@@ -170,15 +169,6 @@ sizeof(uint64_t) + sizeof(pvfs2_downcall
 #define MSECS_TO_JIFFIES(ms) (((ms)*HZ+999)/1000)
 #endif
 
-/* translates an inode number to a pvfs2 handle */
-#define pvfs2_ino_to_handle(ino) (PVFS_handle)ino
-
-/* translates a pvfs2 handle to an inode number */
-#define pvfs2_handle_to_ino(handle) (ino_t)pvfs2_handle_l32(handle)
-
-#define pvfs2_handle_l32(handle) (__u32)(handle)
-#define pvfs2_handle_h32(handle) (__u32)(handle >> 32)
-
 /************************************
  * valid pvfs2 kernel operation states
  *
@@ -317,6 +307,15 @@ int pvfs2_xattr_get_default(struct inode
 
 #endif
 
+#ifndef HAVE_STRUCT_XTVEC
+/* Redefine xtvec structure so that we could move helper functions out of the define */
+struct xtvec 
+{
+    __kernel_off_t xtv_off;  /* must be off_t */
+    __kernel_size_t xtv_len; /* must be size_t */
+};
+#endif
+
 /************************************
  * pvfs2 data structures
  ************************************/
@@ -358,9 +357,8 @@ typedef struct
 typedef struct
 {
     PVFS_object_ref refn;
-    int num_readdir_retries;
-    uint64_t directory_version;
     char *link_target;
+    uint64_t directory_version;
     /*
      * Reading/Writing Extended attributes need to acquire the appropriate
      * reader/writer semaphore on the pvfs2_inode_t structure.
@@ -375,6 +373,7 @@ typedef struct
     sector_t last_failed_block_index_read;
     int error_code;
 
+    /* State of in-memory attributes not yet flushed to disk associated with this object */
     unsigned long pinode_flags;
     /* All allocated pvfs2_inode_t objects are chained to a list */
     struct list_head list;
@@ -384,6 +383,7 @@ typedef struct
 #define P_MTIME_FLAG 1
 #define P_CTIME_FLAG 2
 #define P_MODE_FLAG  3
+#define P_INIT_FLAG  4
 
 #define ClearAtimeFlag(pinode) clear_bit(P_ATIME_FLAG, &(pinode)->pinode_flags)
 #define SetAtimeFlag(pinode)   set_bit(P_ATIME_FLAG, &(pinode)->pinode_flags)
@@ -401,6 +401,10 @@ typedef struct
 #define SetModeFlag(pinode)   set_bit(P_MODE_FLAG, &(pinode)->pinode_flags)
 #define ModeFlag(pinode)      test_bit(P_MODE_FLAG, &(pinode)->pinode_flags)
 
+#define ClearInitFlag(pinode) clear_bit(P_INIT_FLAG, &(pinode)->pinode_flags)
+#define SetInitFlag(pinode)   set_bit(P_INIT_FLAG, &(pinode)->pinode_flags)
+#define InitFlag(pinode)      test_bit(P_INIT_FLAG, &(pinode)->pinode_flags)
+
 /** mount options.  only accepted mount options are listed.
  */
 typedef struct
@@ -419,6 +423,16 @@ typedef struct
     * file if set. NOTE: this is disabled by default.
     */
     int suid;
+    /** noatime option (if set) is inspired by the nfs mount option
+    * that requires the file system to disable atime updates for all
+    * files if set. NOTE: this is disabled by default.
+    */
+    int noatime;
+    /** nodiratime option (if set) is inspired by the nfs mount option
+    * that requires the file system to disable atime updates for
+    * directories alone if set. NOTE: this is disabled by default.
+    */
+    int nodiratime;
 } pvfs2_mount_options_t;
 
 /** per superblock private pvfs2 info */
@@ -576,6 +590,60 @@ static inline pvfs2_sb_info_t *PVFS2_SB(
 #endif
 }
 
+static inline PVFS_handle ino_to_pvfs2_handle(ino_t ino)
+{
+    return (PVFS_handle) ino;
+}
+
+static inline ino_t pvfs2_handle_to_ino(PVFS_handle handle)
+{
+    ino_t ino;
+
+    ino = (ino_t) handle;
+    if (sizeof(ino_t) < sizeof(PVFS_handle))
+        ino ^= handle >> (sizeof(PVFS_handle) - sizeof(ino_t)) * 8;
+    return ino;
+}
+
+static inline PVFS_handle get_handle_from_ino(struct inode *inode)
+{
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+    return PVFS2_I(inode)->refn.handle;
+#else
+    return ino_to_pvfs2_handle(inode->i_ino);
+#endif
+}
+
+static inline PVFS_fs_id get_fsid_from_ino(struct inode *inode)
+{
+    return PVFS2_I(inode)->refn.fs_id;
+}
+
+static inline ino_t get_ino_from_handle(struct inode *inode)
+{
+    PVFS_handle handle;
+    ino_t ino;
+
+    handle = get_handle_from_ino(inode);
+    ino = pvfs2_handle_to_ino(handle);
+    return ino;
+}
+
+static inline ino_t get_parent_ino_from_dentry(struct dentry *dentry)
+{
+    return get_ino_from_handle(dentry->d_parent->d_inode);
+}
+
+static inline int is_root_handle(struct inode *inode)
+{
+    return PVFS2_SB(inode->i_sb)->root_handle == get_handle_from_ino(inode);
+}
+
+static inline int match_handle(PVFS_handle resp_handle, struct inode *inode)
+{
+    return resp_handle == get_handle_from_ino(inode);
+}
+
 /****************************
  * defined in pvfs2-cache.c
  ****************************/
@@ -671,7 +739,7 @@ struct inode *pvfs2_get_custom_inode(
     struct inode *dir,
     int mode,
     dev_t dev,
-    unsigned long ino);
+    PVFS_object_ref ref);
 
 int pvfs2_setattr(
     struct dentry *dentry,
@@ -705,6 +773,12 @@ int pvfs2_removexattr(struct dentry *den
 /****************************
  * defined in namei.c
  ****************************/
+struct inode *pvfs2_iget_common(
+        struct super_block *sb,
+        PVFS_object_ref *ref, int keep_locked);
+#define pvfs2_iget(sb, ref)        pvfs2_iget_common(sb, ref, 0)
+#define pvfs2_iget_locked(sb, ref) pvfs2_iget_common(sb, ref, 1)
+
 #ifdef PVFS2_LINUX_KERNEL_2_4
 int pvfs2_permission(struct inode *, int);
 #else
@@ -979,8 +1053,8 @@ do {                                    
 #ifdef USE_MMAP_RA_CACHE
 #define clear_inode_mmap_ra_cache(inode)                  \
 do {                                                      \
-  gossip_debug(GOSSIP_INODE_DEBUG, "calling clear_inode_mmap_ra_cache on %d\n",\
-              (int)inode->i_ino);                         \
+  gossip_debug(GOSSIP_INODE_DEBUG, "calling clear_inode_mmap_ra_cache on %llu\n",\
+              llu(get_handle_from_ino(inode)));                         \
   pvfs2_flush_mmap_racache(inode);                        \
   gossip_debug(GOSSIP_INODE_DEBUG, "clear_inode_mmap_ra_cache finished\n");    \
 } while(0)
@@ -1182,13 +1256,6 @@ static inline loff_t i_size_read(struct 
 static inline void i_size_write(struct inode *inode, loff_t i_size)
 {
     inode->i_size = i_size;
-}
-#endif
-
-#ifndef HAVE_PARENT_INO
-static inline ino_t parent_ino(struct dentry *dentry)
-{
-    return dentry->d_parent->d_inode->i_ino;
 }
 #endif
 

Index: pvfs2-utils.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/pvfs2-utils.c,v
diff -p -u -r1.123.2.1 -r1.123.2.2
--- pvfs2-utils.c	18 Sep 2006 15:05:22 -0000	1.123.2.1
+++ pvfs2-utils.c	19 Oct 2006 22:17:10 -0000	1.123.2.2
@@ -98,6 +98,24 @@ PVFS_fs_id fsid_of_op(pvfs2_kernel_op_t 
     return fsid;
 }
 
+static void pvfs2_set_inode_flags(struct inode *inode, 
+        PVFS_sys_attr *attrs)
+{
+    if (attrs->flags & PVFS_IMMUTABLE_FL)
+        inode->i_flags |= S_IMMUTABLE;
+    else 
+        inode->i_flags &= ~S_IMMUTABLE;
+    if (attrs->flags & PVFS_APPEND_FL)
+        inode->i_flags |= S_APPEND;
+    else
+        inode->i_flags &= ~S_APPEND;
+    if (attrs->flags & PVFS_NOATIME_FL)
+        inode->i_flags |= S_NOATIME;
+    else
+        inode->i_flags &= ~S_NOATIME;
+    return;
+}
+
 /* NOTE: symname is ignored unless the inode is a sym link */
 int copy_attributes_to_inode(
     struct inode *inode,
@@ -129,37 +147,41 @@ int copy_attributes_to_inode(
         */
         inode->i_blksize = pvfs_bufmap_size_query();
         inode->i_blkbits = PAGE_CACHE_SHIFT;
-        gossip_debug(GOSSIP_UTILS_DEBUG, "attrs->mask = %x (%d, objtype = %x), size = %ld\n", 
-                attrs->mask, attrs->mask & PVFS_ATTR_SYS_SIZE, 
-                attrs->objtype,
-                (unsigned long) attrs->size);
+        gossip_debug(GOSSIP_UTILS_DEBUG, "attrs->mask = %x (objtype = %s)\n", 
+                attrs->mask, 
+                attrs->objtype == PVFS_TYPE_METAFILE ? "file" :
+                attrs->objtype == PVFS_TYPE_DIRECTORY ? "directory" :
+                attrs->objtype == PVFS_TYPE_SYMLINK ? "symlink" :
+                 "invalid/unknown");
                 
-
-        if ((attrs->objtype == PVFS_TYPE_METAFILE) &&
-            (attrs->mask & PVFS_ATTR_SYS_SIZE))
+        if (attrs->objtype == PVFS_TYPE_METAFILE)
         {
-            inode_size = (loff_t)attrs->size;
-            rounded_up_size =
-                (inode_size + (4096 - (inode_size % 4096)));
+            pvfs2_set_inode_flags(inode, attrs);
+            if (attrs->mask & PVFS_ATTR_SYS_SIZE)
+            {
+                inode_size = (loff_t)attrs->size;
+                rounded_up_size =
+                    (inode_size + (4096 - (inode_size % 4096)));
 
-            pvfs2_lock_inode(inode);
+                pvfs2_lock_inode(inode);
 #ifdef PVFS2_LINUX_KERNEL_2_4
 #if (PVFS2_LINUX_KERNEL_2_4_MINOR_VER > 21)
-            inode->i_bytes = inode_size;
+                inode->i_bytes = inode_size;
 #endif
 #else
-            /* this is always ok for 2.6.x */
-            inode->i_bytes = inode_size;
+                /* this is always ok for 2.6.x */
+                inode->i_bytes = inode_size;
 #endif
-            inode->i_blocks = (unsigned long)(rounded_up_size / 512);
-            pvfs2_unlock_inode(inode);
+                inode->i_blocks = (unsigned long)(rounded_up_size / 512);
+                pvfs2_unlock_inode(inode);
 
-            /*
-              NOTE: make sure all the places we're called from have
-              the inode->i_sem lock.  we're fine in 99% of the cases
-              since we're mostly called from a lookup.
-            */
-            inode->i_size = inode_size;
+                /*
+                  NOTE: make sure all the places we're called from have
+                  the inode->i_sem lock.  we're fine in 99% of the cases
+                  since we're mostly called from a lookup.
+                */
+                inode->i_size = inode_size;
+            }
         }
         else if ((attrs->objtype == PVFS_TYPE_SYMLINK) &&
                  (symname != NULL))
@@ -229,16 +251,12 @@ int copy_attributes_to_inode(
 
         inode->i_mode |= perm_mode;
 
-        /* NOTE: this will change once we move from the iget() model to the
-         * iget5() interface where i_ino will only be a hash and not the actual
-         * handle itself!
-         * Most file systems have moved to that model
-         */
-        if (inode->i_ino == PVFS2_SB(inode->i_sb)->root_handle)
+        if (is_root_handle(inode))
         {
             /* special case: mark the root inode as sticky */
             inode->i_mode |= S_ISVTX;
-            gossip_debug(GOSSIP_ACL_DEBUG, "Marking inode %ld as sticky\n", (long) inode->i_ino);
+            gossip_debug(GOSSIP_ACL_DEBUG, "Marking inode %llu as sticky\n", 
+                    llu(get_handle_from_ino(inode)));
         }
 
         switch (attrs->objtype)
@@ -289,8 +307,8 @@ int copy_attributes_to_inode(
                 gossip_err("pvfs2:copy_attributes_to_inode: got invalid "
                             "attribute type %x\n", attrs->objtype);
         }
-        gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2: copy_attributes_to_inode: setting inode->i_mode to %o from %o\n",
-                inode->i_mode, old_mode);
+        gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2: copy_attributes_to_inode: setting i_mode to %o, i_size to %lu\n",
+                inode->i_mode, (unsigned long)i_size_read(inode));
     }
     return ret;
 }
@@ -303,7 +321,7 @@ static inline void convert_attribute_mod
     attrs->perms = PVFS_util_translate_mode(mode, suid);
     attrs->mask |= PVFS_ATTR_SYS_PERM;
 
-    gossip_debug(GOSSIP_UTILS_DEBUG, "mode is %d | translated perms is %d\n", mode,
+    gossip_debug(GOSSIP_UTILS_DEBUG, "mode is %o | translated perms is %o\n", mode,
                 attrs->perms);
 
     /* NOTE: this function only called during setattr.  Setattr must not mess
@@ -377,7 +395,7 @@ static inline int copy_attributes_from_i
         tmp_mode = iattr->ia_mode;
         if (tmp_mode & (S_ISVTX))
         {
-            if(inode->i_ino == PVFS2_SB(inode->i_sb)->root_handle)
+            if (is_root_handle(inode))
             {
                 /* allow sticky bit to be set on root (since it shows up that
                  * way by default anyhow), but don't show it to
@@ -418,7 +436,7 @@ int pvfs2_inode_getattr(struct inode *in
     pvfs2_inode_t *pvfs2_inode = NULL;
 
     gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_inode_getattr: called on inode %llu\n",
-                llu(pvfs2_ino_to_handle(inode->i_ino)));
+                llu(get_handle_from_ino(inode)));
 
     if (inode)
     {
@@ -438,15 +456,27 @@ int pvfs2_inode_getattr(struct inode *in
            that call flow looks like:
            lookup --> iget --> read_inode --> here
 
+           In the case we are doing an iget4 or an iget5_locked, there
+           is no call made to read_inode so we actually have valid fields
+           in pvfs2_inode->refn
+
            if the inode were already in the inode cache, it looks like:
            lookup --> revalidate --> here
         */
         if (pvfs2_inode->refn.handle == PVFS_HANDLE_NULL)
         {
-            pvfs2_inode->refn.handle = pvfs2_ino_to_handle(inode->i_ino);
+#if defined(HAVE_IGET4_LOCKED) || defined(HAVE_IGET5_LOCKED)
+            gossip_lerr("Critical error: Invalid handle despite using iget4/iget5\n");
+            return -EINVAL;
+#endif
+            pvfs2_inode->refn.handle = get_handle_from_ino(inode);
         }
         if (pvfs2_inode->refn.fs_id == PVFS_FS_ID_NULL)
         {
+#if defined(HAVE_IGET4_LOCKED) || defined(HAVE_IGET5_LOCKED)
+            gossip_lerr("Critical error: Invalid fsid despite using iget4/iget5\n");
+            return -EINVAL;
+#endif
             pvfs2_inode->refn.fs_id = PVFS2_SB(inode->i_sb)->fs_id;
         }
 
@@ -582,18 +612,28 @@ int pvfs2_flush_inode(struct inode *inod
         wbattr.ia_valid |= ATTR_MTIME;
     if (CtimeFlag(pvfs2_inode))
         wbattr.ia_valid |= ATTR_CTIME;
-    if (AtimeFlag(pvfs2_inode))
+    /*
+     * We do not need to honor atime flushes if
+     * a) object has a noatime marker
+     * b) object is a directory and has a nodiratime marker on the fs
+     * c) entire file system is mounted with noatime option
+     */
+    if (!((inode->i_flags & S_NOATIME)
+            || (inode->i_sb->s_flags & MS_NOATIME)
+            || ((inode->i_sb->s_flags & MS_NODIRATIME) && S_ISDIR(inode->i_mode))) && AtimeFlag(pvfs2_inode))
+    {
         wbattr.ia_valid |= ATTR_ATIME;
+    }
     if (ModeFlag(pvfs2_inode)) 
     {
         wbattr.ia_mode = inode->i_mode;
         wbattr.ia_valid |= ATTR_MODE;
-        gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_flush_inode (%ld) writing mode %o\n",
-                (long) inode->i_ino, inode->i_mode);
+        gossip_debug(GOSSIP_ACL_DEBUG, "pvfs2_flush_inode (%llu) writing mode %o\n",
+                llu(get_handle_from_ino(inode)), inode->i_mode);
     }
 
-    gossip_debug(GOSSIP_UTILS_DEBUG, "*********** pvfs2_flush_inode: %ld "
-            "(ia_valid %d)\n", (long) inode->i_ino, wbattr.ia_valid);
+    gossip_debug(GOSSIP_UTILS_DEBUG, "*********** pvfs2_flush_inode: %llu "
+            "(ia_valid %d)\n", llu(get_handle_from_ino(inode)), wbattr.ia_valid);
     if (wbattr.ia_valid == 0)
     {
         return 0;
@@ -695,8 +735,8 @@ ssize_t pvfs2_inode_getxattr(struct inod
     }
     if (inode)
     {
-        gossip_debug(GOSSIP_XATTR_DEBUG, "getxattr on inode %ld, name %s (uid %o, gid %o)\n", 
-                (long) inode->i_ino, name, current->fsuid, current->fsgid);
+        gossip_debug(GOSSIP_XATTR_DEBUG, "getxattr on inode %llu, name %s (uid %o, gid %o)\n", 
+                llu(get_handle_from_ino(inode)), name, current->fsuid, current->fsgid);
         pvfs2_inode = PVFS2_I(inode);
         /* obtain the xattr semaphore */
         down_read(&pvfs2_inode->xattr_sem);
@@ -765,9 +805,9 @@ ssize_t pvfs2_inode_getxattr(struct inod
                     memcpy(buffer, new_op->downcall.resp.getxattr.val, 
                             new_length);
                     ret = new_length;
-                    gossip_debug(GOSSIP_XATTR_DEBUG, "pvfs2_inode_getxattr: inode %ld key %s "
+                    gossip_debug(GOSSIP_XATTR_DEBUG, "pvfs2_inode_getxattr: inode %llu key %s "
                             " key_sz %d, val_length %d\n", 
-                        (long) inode->i_ino,
+                        llu(get_handle_from_ino(inode)),
                         (char*)new_op->upcall.req.getxattr.key, 
                         (int) new_op->upcall.req.getxattr.key_sz, (int) ret);
                 }
@@ -776,8 +816,8 @@ ssize_t pvfs2_inode_getxattr(struct inod
         else if (ret == -ENOENT)
         {
             ret = -ENODATA; /* if no such keys exists we set this to be errno */
-            gossip_debug(GOSSIP_XATTR_DEBUG, "pvfs2_inode_getxattr: inode %ld key %s does not exist!\n",
-                    (long) inode->i_ino, (char *) new_op->upcall.req.getxattr.key);
+            gossip_debug(GOSSIP_XATTR_DEBUG, "pvfs2_inode_getxattr: inode %llu key %s does not exist!\n",
+                    llu(get_handle_from_ino(inode)), (char *) new_op->upcall.req.getxattr.key);
         }
 
         /* when request is serviced properly, free req op struct */
@@ -824,8 +864,8 @@ int pvfs2_inode_setxattr(struct inode *i
     }
     if (inode)
     {
-        gossip_debug(GOSSIP_XATTR_DEBUG, "setxattr on inode %ld, name %s\n", 
-                (long) inode->i_ino, name);
+        gossip_debug(GOSSIP_XATTR_DEBUG, "setxattr on inode %llu, name %s\n", 
+                llu(get_handle_from_ino(inode)), name);
         if (IS_RDONLY(inode))
         {
             gossip_err("pvfs2_inode_setxattr: Read-only file system\n");
@@ -833,7 +873,8 @@ int pvfs2_inode_setxattr(struct inode *i
         }
         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
         {
-            gossip_err("pvfs2_inode_setxattr: Immutable inode or append-only inode; operation not permitted\n");
+            gossip_err("pvfs2_inode_setxattr: Immutable inode or append-only "
+                    "inode; operation not permitted\n");
             return -EPERM;
         }
         pvfs2_inode = PVFS2_I(inode);
@@ -1072,7 +1113,6 @@ static inline struct inode *pvfs2_create
     int ret = -1;
     pvfs2_kernel_op_t *new_op = NULL;
     pvfs2_inode_t *parent = PVFS2_I(dir);
-    pvfs2_inode_t *pvfs2_inode = NULL;
     struct inode *inode = NULL;
 
     new_op = op_alloc(PVFS2_VFS_OP_CREATE);
@@ -1082,14 +1122,20 @@ static inline struct inode *pvfs2_create
         return NULL;
     }
 
-    if (parent && parent->refn.handle && parent->refn.fs_id)
+    if (parent && parent->refn.handle != PVFS_HANDLE_NULL && parent->refn.fs_id != PVFS_FS_ID_NULL)
     {
         new_op->upcall.req.create.parent_refn = parent->refn;
     }
     else
     {
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+        gossip_lerr("Critical error: i_ino cannot be relied on when using iget4/5\n");
+        *error_code = -EINVAL;
+        op_release(new_op);
+        return NULL;
+#endif
         new_op->upcall.req.create.parent_refn.handle =
-            pvfs2_ino_to_handle(dir->i_ino);
+            get_handle_from_ino(dir);
         new_op->upcall.req.create.parent_refn.fs_id =
             PVFS2_SB(dir->i_sb)->fs_id;
     }
@@ -1112,8 +1158,7 @@ static inline struct inode *pvfs2_create
     if (ret > -1)
     {
         inode = pvfs2_get_custom_inode(
-            dir->i_sb, dir, (S_IFREG | mode), 0, pvfs2_handle_to_ino(
-                new_op->downcall.resp.create.refn.handle));
+            dir->i_sb, dir, (S_IFREG | mode), 0, new_op->downcall.resp.create.refn);
         if (!inode)
         {
             gossip_err("*** Failed to allocate pvfs2 file inode\n");
@@ -1122,12 +1167,8 @@ static inline struct inode *pvfs2_create
             return NULL;
         }
 
-        gossip_debug(GOSSIP_UTILS_DEBUG, "Assigned file inode new number of %d\n",
-                    (int)inode->i_ino);
-
-        pvfs2_inode = PVFS2_I(inode);
-        pvfs2_inode->refn = new_op->downcall.resp.create.refn;
-
+        gossip_debug(GOSSIP_UTILS_DEBUG, "Assigned file inode new number of %llu\n",
+                    llu(get_handle_from_ino(inode)));
         /* finally, add dentry with this new inode to the dcache */
         gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_create_file: Instantiating\n *negative* "
                     "dentry %p for %s\n", dentry,
@@ -1135,8 +1176,8 @@ static inline struct inode *pvfs2_create
 
         dentry->d_op = &pvfs2_dentry_operations;
         d_instantiate(dentry, inode);
-        gossip_debug(GOSSIP_ACL_DEBUG, "Inode (Regular File) %ld -> %s\n",
-                (long) inode->i_ino, dentry->d_name.name);
+        gossip_debug(GOSSIP_ACL_DEBUG, "Inode (Regular File) %llu -> %s\n",
+                llu(get_handle_from_ino(inode)), dentry->d_name.name);
     }
     else
     {
@@ -1159,7 +1200,6 @@ static inline struct inode *pvfs2_create
     int ret = -1;
     pvfs2_kernel_op_t *new_op = NULL;
     pvfs2_inode_t *parent = PVFS2_I(dir);
-    pvfs2_inode_t *pvfs2_inode = NULL;
     struct inode *inode = NULL;
 
     new_op = op_alloc(PVFS2_VFS_OP_MKDIR);
@@ -1169,14 +1209,20 @@ static inline struct inode *pvfs2_create
         return NULL;
     }
 
-    if (parent && parent->refn.handle && parent->refn.fs_id)
+    if (parent && parent->refn.handle != PVFS_HANDLE_NULL && parent->refn.fs_id != PVFS_FS_ID_NULL)
     {
         new_op->upcall.req.mkdir.parent_refn = parent->refn;
     }
     else
     {
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+        gossip_lerr("Critical error: i_ino cannot be relied on when using iget4/5\n");
+        *error_code = -EINVAL;
+        op_release(new_op);
+        return NULL;
+#endif
         new_op->upcall.req.mkdir.parent_refn.handle =
-            pvfs2_ino_to_handle(dir->i_ino);
+            get_handle_from_ino(dir);
         new_op->upcall.req.mkdir.parent_refn.fs_id =
             PVFS2_SB(dir->i_sb)->fs_id;
     }
@@ -1199,8 +1245,7 @@ static inline struct inode *pvfs2_create
     if (ret > -1)
     {
         inode = pvfs2_get_custom_inode(
-            dir->i_sb, dir, (S_IFDIR | mode), 0, pvfs2_handle_to_ino(
-                new_op->downcall.resp.mkdir.refn.handle));
+            dir->i_sb, dir, (S_IFDIR | mode), 0, new_op->downcall.resp.mkdir.refn);
         if (!inode)
         {
             gossip_err("*** Failed to allocate pvfs2 dir inode\n");
@@ -1209,12 +1254,8 @@ static inline struct inode *pvfs2_create
             return NULL;
         }
 
-        gossip_debug(GOSSIP_UTILS_DEBUG, "Assigned dir inode new number of %d\n",
-                    (int) inode->i_ino);
-
-        pvfs2_inode = PVFS2_I(inode);
-        pvfs2_inode->refn = new_op->downcall.resp.mkdir.refn;
-
+        gossip_debug(GOSSIP_UTILS_DEBUG, "Assigned dir inode new number of %llu\n",
+                    llu(get_handle_from_ino(inode)));
         /* finally, add dentry with this new inode to the dcache */
         gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_create_dir: Instantiating\n  *negative* "
                     "dentry %p for %s\n", dentry,
@@ -1222,8 +1263,8 @@ static inline struct inode *pvfs2_create
 
         dentry->d_op = &pvfs2_dentry_operations;
         d_instantiate(dentry, inode);
-        gossip_debug(GOSSIP_ACL_DEBUG, "Inode (Directory) %ld -> %s\n",
-                (long) inode->i_ino, dentry->d_name.name);
+        gossip_debug(GOSSIP_ACL_DEBUG, "Inode (Directory) %llu -> %s\n",
+                llu(get_handle_from_ino(inode)), dentry->d_name.name);
     }
     else
     {
@@ -1247,7 +1288,6 @@ static inline struct inode *pvfs2_create
     int ret = -1;
     pvfs2_kernel_op_t *new_op = NULL;
     pvfs2_inode_t *parent = PVFS2_I(dir);
-    pvfs2_inode_t *pvfs2_inode = NULL;
     struct inode *inode = NULL;
 
     new_op = op_alloc(PVFS2_VFS_OP_SYMLINK);
@@ -1257,14 +1297,20 @@ static inline struct inode *pvfs2_create
         return NULL;
     }
 
-    if (parent && parent->refn.handle && parent->refn.fs_id)
+    if (parent && parent->refn.handle != PVFS_HANDLE_NULL && parent->refn.fs_id != PVFS_FS_ID_NULL)
     {
         new_op->upcall.req.sym.parent_refn = parent->refn;
     }
     else
     {
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+        gossip_lerr("Critical error: i_ino cannot be relied on when using iget4/5\n");
+        *error_code = -EINVAL;
+        op_release(new_op);
+        return NULL;
+#endif
         new_op->upcall.req.sym.parent_refn.handle =
-            pvfs2_ino_to_handle(dir->i_ino);
+            get_handle_from_ino(dir);
         new_op->upcall.req.sym.parent_refn.fs_id =
             PVFS2_SB(dir->i_sb)->fs_id;
     }
@@ -1288,8 +1334,7 @@ static inline struct inode *pvfs2_create
     if (ret > -1)
     {
         inode = pvfs2_get_custom_inode(
-            dir->i_sb, dir, (S_IFLNK | mode), 0, pvfs2_handle_to_ino(
-                new_op->downcall.resp.sym.refn.handle));
+            dir->i_sb, dir, (S_IFLNK | mode), 0, new_op->downcall.resp.sym.refn);
         if (!inode)
         {
             gossip_err("*** Failed to allocate pvfs2 symlink inode\n");
@@ -1298,11 +1343,8 @@ static inline struct inode *pvfs2_create
             return NULL;
         }
 
-        gossip_debug(GOSSIP_UTILS_DEBUG, "Assigned symlink inode new number of %d\n",
-                    (int)inode->i_ino);
-
-        pvfs2_inode = PVFS2_I(inode);
-        pvfs2_inode->refn = new_op->downcall.resp.sym.refn;
+        gossip_debug(GOSSIP_UTILS_DEBUG, "Assigned symlink inode new number of %llu\n",
+                    llu(get_handle_from_ino(inode)));
 
         /* finally, add dentry with this new inode to the dcache */
         gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_create_symlink: Instantiating\n  "
@@ -1311,8 +1353,8 @@ static inline struct inode *pvfs2_create
 
         dentry->d_op = &pvfs2_dentry_operations;
         d_instantiate(dentry, inode);
-        gossip_debug(GOSSIP_ACL_DEBUG, "Inode (Symlink) %ld -> %s\n",
-                (long) inode->i_ino, dentry->d_name.name);
+        gossip_debug(GOSSIP_ACL_DEBUG, "Inode (Symlink) %llu -> %s\n",
+                llu(get_handle_from_ino(inode)), dentry->d_name.name);
     }
     else
     {
@@ -1382,9 +1424,9 @@ int pvfs2_remove_entry(
 
     if (inode && parent && dentry)
     {
-        gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_remove_entry: called on %s\n  (inode %d): "
+        gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_remove_entry: called on %s\n  (inode %llu): "
                     "Parent is %llu | fs_id %d\n", dentry->d_name.name,
-                    (int)inode->i_ino, llu(parent->refn.handle),
+                    llu(get_handle_from_ino(inode)), llu(parent->refn.handle),
                     parent->refn.fs_id);
 
         new_op = op_alloc(PVFS2_VFS_OP_REMOVE);
@@ -1393,14 +1435,19 @@ int pvfs2_remove_entry(
             return -ENOMEM;
         }
 
-        if (parent && parent->refn.handle && parent->refn.fs_id)
+        if (parent && parent->refn.handle != PVFS_HANDLE_NULL && parent->refn.fs_id != PVFS_FS_ID_NULL)
         {
             new_op->upcall.req.remove.parent_refn = parent->refn;
         }
         else
         {
+#if defined(HAVE_IGET5_LOCKED) || defined(HAVE_IGET4_LOCKED)
+            gossip_lerr("Critical error: i_ino cannot be relied on when using iget4/5\n");
+            op_release(new_op);
+            return -ENOMEM;
+#endif
             new_op->upcall.req.remove.parent_refn.handle =
-                pvfs2_ino_to_handle(dir->i_ino);
+                get_handle_from_ino(dir);
             new_op->upcall.req.remove.parent_refn.fs_id =
                 PVFS2_SB(dir->i_sb)->fs_id;
         }
@@ -1425,9 +1472,9 @@ int pvfs2_truncate_inode(
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
     pvfs2_kernel_op_t *new_op = NULL;
 
-    gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2: pvfs2_truncate_inode %d: "
+    gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2: pvfs2_truncate_inode %llu: "
                 "Handle is %llu | fs_id %d | size is %lu\n",
-                (int)inode->i_ino, llu(pvfs2_inode->refn.handle),
+                llu(get_handle_from_ino(inode)), llu(pvfs2_inode->refn.handle),
                 pvfs2_inode->refn.fs_id, (unsigned long)size);
 
     new_op = op_alloc(PVFS2_VFS_OP_TRUNCATE);
@@ -1595,10 +1642,10 @@ struct inode *pvfs2_sb_find_inode_handle
         const struct file_handle *fhandle)
 {
     struct inode *inode = NULL;
-    unsigned long inode_number;
     int err = 0;
     pvfs2_opaque_handle_t opaque_handle;
     PVFS_sys_attr attrs;
+    PVFS_object_ref ref;
 
     /* Decode the buffer */
     err = get_opaque_handle(sb, fhandle, &opaque_handle);
@@ -1608,13 +1655,10 @@ struct inode *pvfs2_sb_find_inode_handle
     /* and convert the opaque handle structure to the PVFS_sys_attr structure */
     convert_opaque_handle_to_sys_attr(&attrs, &opaque_handle);
 
-    /* FIXME:
-     * We ought to move to the iget5 model otherwise we are ending 
-     * up truncating handle 
-     */
-    inode_number = (unsigned long) opaque_handle.handle;
-    gossip_debug(GOSSIP_UTILS_DEBUG, "Obtained inode number %lu\n",
-            (unsigned long) inode_number);
+    ref.handle = opaque_handle.handle;
+    ref.fs_id  = opaque_handle.fsid;
+    gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_sb_find_inode_handle: obtained inode number %llu\n",
+            llu(opaque_handle.handle));
     /* 
      * NOTE: Locate the inode number in the icache if possible.
      * If not allocate a new inode that is returned locked and
@@ -1625,7 +1669,7 @@ struct inode *pvfs2_sb_find_inode_handle
      * Consequently, this approach should scale well since openfh()
      * does not require any network messages.
      */
-    inode = iget_locked(sb, inode_number);
+    inode = pvfs2_iget_locked(sb, &ref);
 
     if (!inode) {
         gossip_err("Could not allocate inode\n");
@@ -1656,9 +1700,7 @@ struct inode *pvfs2_sb_find_inode_handle
             inode->i_bdev            = NULL;
             inode->i_cdev            = NULL;
             inode->i_mapping->a_ops  = &pvfs2_address_operations;
-#ifndef PVFS2_LINUX_KERNEL_2_4
             inode->i_mapping->backing_dev_info = &pvfs2_backing_dev_info;
-#endif
             /* Make sure that we unlock the inode */
             unlock_new_inode(inode);
         }
@@ -1778,8 +1820,8 @@ int pvfs2_flush_mmap_racache(struct inod
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
     pvfs2_kernel_op_t *new_op = NULL;
 
-    gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_flush_mmap_racache %d: Handle is %llu "
-                "| fs_id %d\n",(int)inode->i_ino,
+    gossip_debug(GOSSIP_UTILS_DEBUG, "pvfs2_flush_mmap_racache %llu: Handle is %llu "
+                "| fs_id %d\n", llu(get_handle_from_ino(inode)),
                 pvfs2_inode->refn.handle, pvfs2_inode->refn.fs_id);
 
     new_op = op_alloc(PVFS2_VFS_OP_MMAP_RA_FLUSH);
@@ -1867,13 +1909,16 @@ int pvfs2_cancel_op_in_progress(unsigned
 
 void pvfs2_inode_initialize(pvfs2_inode_t *pvfs2_inode)
 {
-    pvfs2_inode->refn.handle = PVFS_HANDLE_NULL;
-    pvfs2_inode->refn.fs_id = PVFS_FS_ID_NULL;
-    pvfs2_inode->last_failed_block_index_read = 0;
-    pvfs2_inode->link_target = NULL;
-    pvfs2_inode->num_readdir_retries = PVFS2_NUM_READDIR_RETRIES;
-    pvfs2_inode->directory_version = 0;
-    pvfs2_inode->error_code = 0;
+    if (!InitFlag(pvfs2_inode))
+    {
+        pvfs2_inode->refn.handle = PVFS_HANDLE_NULL;
+        pvfs2_inode->refn.fs_id = PVFS_FS_ID_NULL;
+        pvfs2_inode->last_failed_block_index_read = 0;
+        pvfs2_inode->link_target = NULL;
+        pvfs2_inode->directory_version = 0;
+        pvfs2_inode->error_code = 0;
+        SetInitFlag(pvfs2_inode);
+    }
 }
 
 /*
@@ -1885,7 +1930,6 @@ void pvfs2_inode_finalize(pvfs2_inode_t 
     pvfs2_inode->refn.handle = PVFS_HANDLE_NULL;
     pvfs2_inode->refn.fs_id = PVFS_FS_ID_NULL;
     pvfs2_inode->last_failed_block_index_read = 0;
-    pvfs2_inode->num_readdir_retries = PVFS2_NUM_READDIR_RETRIES;
     pvfs2_inode->directory_version = 0;
     pvfs2_inode->error_code = 0;
 }
@@ -1904,19 +1948,18 @@ void pvfs2_op_initialize(pvfs2_kernel_op
 
 void pvfs2_make_bad_inode(struct inode *inode)
 {
-    if (pvfs2_handle_to_ino(PVFS2_SB(inode->i_sb)->root_handle) ==
-        inode->i_ino)
+    if (is_root_handle(inode))
     {
         /*
           if this occurs, the pvfs2-client-core was killed but we
           can't afford to lose the inode operations and such
           associated with the root handle in any case
         */
-        gossip_debug(GOSSIP_UTILS_DEBUG, "*** NOT making bad root inode %lu\n", inode->i_ino);
+        gossip_debug(GOSSIP_UTILS_DEBUG, "*** NOT making bad root inode %llu\n", llu(get_handle_from_ino(inode)));
     }
     else
     {
-        gossip_debug(GOSSIP_UTILS_DEBUG, "*** making bad inode %lu\n", inode->i_ino);
+        gossip_debug(GOSSIP_UTILS_DEBUG, "*** making bad inode %llu\n", llu(get_handle_from_ino(inode)));
         make_bad_inode(inode);
     }
 }

Index: super.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/super.c,v
diff -p -u -r1.72.2.1 -r1.72.2.2
--- super.c	18 Sep 2006 15:05:22 -0000	1.72.2.1
+++ super.c	19 Oct 2006 22:17:11 -0000	1.72.2.2
@@ -5,6 +5,7 @@
  */
 
 #include "pvfs2-kernel.h"
+#include "pvfs2-internal.h"
 
 /* list for storing pvfs2 specific superblocks in use */
 LIST_HEAD(pvfs2_superblocks);
@@ -17,6 +18,9 @@ static void pvfs2_sb_get_fs_key(struct s
 #endif
 static atomic_t pvfs2_inode_alloc_count, pvfs2_inode_dealloc_count;
 
+static char *keywords[] = {"intr", "acl", "suid", "noatime", "nodiratime"};
+static int num_possible_keywords = sizeof(keywords)/sizeof(char *);
+
 static int parse_mount_options(
    char *option_str, struct super_block *sb, int silent)
 {
@@ -24,8 +28,6 @@ static int parse_mount_options(
     pvfs2_sb_info_t *pvfs2_sb = NULL;
     int i = 0, j = 0, num_keywords = 0, got_device = 0;
 
-    static char *keywords[] = {"intr", "acl", "suid"};
-    static int num_possible_keywords = 3;
     static char options[PVFS2_MAX_NUM_OPTIONS][PVFS2_MAX_MOUNT_OPT_LEN];
 
     if (!silent)
@@ -122,6 +124,24 @@ static int parse_mount_options(
                         pvfs2_sb->mnt_options.suid = 1;
                         break;
                     }
+                    else if (strncmp(options[i], "noatime", 7) == 0)
+                    {
+                        if (!silent)
+                        {
+                            gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2: mount option "
+                                       "noatime specified\n");
+                        }
+                        pvfs2_sb->mnt_options.noatime = 1;
+                    }
+                    else if (strncmp(options[i], "nodiratime", 10) == 0)
+                    {
+                        if (!silent)
+                        {
+                            gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2: mount option "
+                                       "nodiratime specified\n");
+                        }
+                        pvfs2_sb->mnt_options.nodiratime = 1;
+                    }
                 }
             }
 
@@ -173,6 +193,7 @@ static struct inode *pvfs2_alloc_inode(s
         new_inode = &pvfs2_inode->vfs_inode;
         gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_alloc_inode: allocated %p\n", pvfs2_inode);
         atomic_inc(&pvfs2_inode_alloc_count);
+        new_inode->i_flags &= ~(S_APPEND|S_IMMUTABLE|S_NOATIME);
     }
     return new_inode;
 }
@@ -183,8 +204,8 @@ static void pvfs2_destroy_inode(struct i
 
     if (pvfs2_inode)
     {
-        gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_destroy_inode: deallocated %p destroying inode %ld\n",
-                    pvfs2_inode, (long)inode->i_ino);
+        gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_destroy_inode: deallocated %p destroying inode %llu\n",
+                    pvfs2_inode, llu(get_handle_from_ino(inode)));
 
         atomic_inc(&pvfs2_inode_dealloc_count);
         pvfs2_inode_finalize(pvfs2_inode);
@@ -197,8 +218,8 @@ static void pvfs2_read_inode(
 {
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
 
-    gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_read_inode: %p (inode = %lu | ct = %d)\n",
-                pvfs2_inode, inode->i_ino, (int)atomic_read(&inode->i_count));
+    gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_read_inode: %p (inode = %llu | ct = %d)\n",
+                pvfs2_inode, llu(get_handle_from_ino(inode)), (int)atomic_read(&inode->i_count));
 
     /*
       at this point we know the private inode data handle/fs_id can't
@@ -244,14 +265,17 @@ static void pvfs2_read_inode(
         pvfs2_inode_initialize(pvfs2_inode);
         inode->u.generic_ip = pvfs2_inode;
         pvfs2_inode->vfs_inode = inode;
+        inode->i_flags &= ~(S_APPEND|S_IMMUTABLE|S_NOATIME);
 
-        gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2: pvfs2_read_inode: allocated %p (inode = %lu | "
-                "ct = %d)\n", pvfs2_inode, inode->i_ino,
-                (int)atomic_read(&inode->i_count));
         if (pvfs2_inode_getattr(inode, PVFS_ATTR_SYS_ALL_NOHINT) != 0)
         {
             pvfs2_make_bad_inode(inode);
         }
+        else {
+            gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2: pvfs2_read_inode: allocated %p (inode = %llu | "
+                    "ct = %d)\n", pvfs2_inode, llu(get_handle_from_ino(inode)),
+                                  (int)atomic_read(&inode->i_count));
+        }
     }
     else
     {
@@ -265,8 +289,8 @@ static void pvfs2_clear_inode(struct ino
 {
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
 
-    gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_clear_inode: deallocated %p, destroying inode %ld\n",
-                pvfs2_inode, (long)inode->i_ino);
+    gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_clear_inode: deallocated %p, destroying inode %llu\n",
+                pvfs2_inode, llu(get_handle_from_ino(inode)));
 
     pvfs2_inode_finalize(pvfs2_inode);
     pvfs2_inode_release(pvfs2_inode);
@@ -280,8 +304,8 @@ static void pvfs2_put_inode(
     struct inode *inode)
 {
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
-    gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_put_inode: pvfs2_inode: %p (i_ino %d) = %d (nlink=%d)\n",
-                pvfs2_inode, (int)inode->i_ino, (int)atomic_read(&inode->i_count),
+    gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_put_inode: pvfs2_inode: %p (inode = %llu) = %d (nlink=%d)\n",
+                pvfs2_inode, llu(get_handle_from_ino(inode)), (int)atomic_read(&inode->i_count),
                 (int)inode->i_nlink);
 
     if (atomic_read(&inode->i_count) == 1)
@@ -510,6 +534,10 @@ int pvfs2_remount(
                 ((PVFS2_SB(sb)->mnt_options.acl == 1) ? MS_POSIXACL : 0));
             sb->s_xattr = pvfs2_xattr_handlers;
 #endif
+            sb->s_flags = ((sb->s_flags & ~MS_NOATIME)  |
+                ((PVFS2_SB(sb)->mnt_options.noatime == 1) ? MS_NOATIME : 0));
+            sb->s_flags = ((sb->s_flags & ~MS_NODIRATIME) |
+                ((PVFS2_SB(sb)->mnt_options.nodiratime == 1) ? MS_NODIRATIME : 0));
         }
 
         new_op = op_alloc(PVFS2_VFS_OP_FS_MOUNT);
@@ -785,7 +813,7 @@ static void pvfs2_dirty_inode(struct ino
     if (inode)
     {
         pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
-        gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_dirty_inode: %ld\n", (long) inode->i_ino);
+        gossip_debug(GOSSIP_SUPER_DEBUG, "pvfs2_dirty_inode: %llu\n", llu(get_handle_from_ino(inode)));
         SetAtimeFlag(pvfs2_inode);
     }
     return;
@@ -834,6 +862,7 @@ struct super_block* pvfs2_get_sb(
     pvfs2_kernel_op_t *new_op = NULL;
     char *dev_name = NULL;
     int ret = -EINVAL;
+    PVFS_object_ref root_object;
 
     if (!data || !sb)
     {
@@ -862,6 +891,10 @@ struct super_block* pvfs2_get_sb(
             gossip_err("Failed to parse mount time options\n");
             goto error_exit;
         }
+        sb->s_flags = ((sb->s_flags & ~MS_NOATIME)  |
+            ((PVFS2_SB(sb)->mnt_options.noatime == 1) ? MS_NOATIME : 0));
+        sb->s_flags = ((sb->s_flags & ~MS_NODIRATIME) |
+            ((PVFS2_SB(sb)->mnt_options.nodiratime == 1) ? MS_NODIRATIME : 0));
         dev_name = PVFS2_SB(sb)->devname;
     }
 
@@ -907,17 +940,20 @@ struct super_block* pvfs2_get_sb(
     sb->s_blocksize_bits = PVFS2_BUFMAP_DEFAULT_DESC_SHIFT;
     sb->s_maxbytes = MAX_LFS_FILESIZE;
 
+    root_object.handle = PVFS2_SB(sb)->root_handle;
+    root_object.fs_id  = PVFS2_SB(sb)->fs_id;
+
     /* alloc and initialize our root directory inode by explicitly requesting
      * the sticky bit to be set */
     root = pvfs2_get_custom_inode(
-        sb, NULL, (S_IFDIR | 0755 | S_ISVTX), 0, PVFS2_SB(sb)->root_handle);
+        sb, NULL, (S_IFDIR | 0755 | S_ISVTX), 0, root_object);
     if (!root)
     {
         ret = -ENOMEM;
         goto error_exit;
     }
-    gossip_debug(GOSSIP_SUPER_DEBUG, "Allocated root inode [%p] with mode %x\n", root, root->i_mode);
-    PVFS2_I(root)->refn.fs_id = PVFS2_SB(sb)->fs_id;
+    gossip_debug(GOSSIP_SUPER_DEBUG, "Allocated root inode [%p] with mode %o\n",
+            root, root->i_mode);
 
     /* allocates and places root dentry in dcache */
     root_dentry = d_alloc_root(root);
@@ -978,6 +1014,7 @@ int pvfs2_fill_sb(
     struct inode *root = NULL;
     struct dentry *root_dentry = NULL;
     pvfs2_mount_sb_info_t *mount_sb_info = (pvfs2_mount_sb_info_t *)data;
+    PVFS_object_ref root_object;
 
     /* alloc and init our private pvfs2 sb info */
     sb->s_fs_info = kmalloc(sizeof(pvfs2_sb_info_t), PVFS2_GFP_FLAGS);
@@ -1003,9 +1040,13 @@ int pvfs2_fill_sb(
         /* mark the superblock as whether it supports acl's or not */
         sb->s_flags = ((sb->s_flags & ~MS_POSIXACL) | 
             ((PVFS2_SB(sb)->mnt_options.acl == 1) ? MS_POSIXACL : 0));
+        sb->s_flags = ((sb->s_flags & ~MS_NOATIME)  |
+            ((PVFS2_SB(sb)->mnt_options.noatime == 1) ? MS_NOATIME : 0));
+        sb->s_flags = ((sb->s_flags & ~MS_NODIRATIME) |
+            ((PVFS2_SB(sb)->mnt_options.nodiratime == 1) ? MS_NODIRATIME : 0));
     }
     else {
-        sb->s_flags = (sb->s_flags & ~MS_POSIXACL);
+        sb->s_flags = (sb->s_flags & ~(MS_POSIXACL | MS_NOATIME | MS_NODIRATIME));
     }
 
 #if defined(HAVE_GENERIC_GETXATTR) && defined(CONFIG_FS_POSIX_ACL)
@@ -1020,17 +1061,17 @@ int pvfs2_fill_sb(
     sb->s_blocksize_bits = PVFS2_BUFMAP_DEFAULT_DESC_SHIFT;
     sb->s_maxbytes = MAX_LFS_FILESIZE;
 
+    root_object.handle = PVFS2_SB(sb)->root_handle;
+    root_object.fs_id  = PVFS2_SB(sb)->fs_id;
     /* alloc and initialize our root directory inode. be explicit about sticky
      * bit */
     root = pvfs2_get_custom_inode(sb, NULL, (S_IFDIR | 0755 | S_ISVTX),
-                                  0, PVFS2_SB(sb)->root_handle);
+                                  0, root_object);
     if (!root)
     {
         return -ENOMEM;
     }
     gossip_debug(GOSSIP_SUPER_DEBUG, "Allocated root inode [%p] with mode %x\n", root, root->i_mode);
-    PVFS2_I(root)->refn.handle = PVFS2_SB(sb)->root_handle;
-    PVFS2_I(root)->refn.fs_id = PVFS2_SB(sb)->fs_id;
 
     /* allocates and places root dentry in dcache */
     root_dentry = d_alloc_root(root);

Index: symlink.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/symlink.c,v
diff -p -u -r1.14.14.1 -r1.14.14.2
--- symlink.c	18 Sep 2006 15:05:22 -0000	1.14.14.1
+++ symlink.c	19 Oct 2006 22:17:11 -0000	1.14.14.2
@@ -6,14 +6,15 @@
 
 #include "pvfs2-kernel.h"
 #include "pvfs2-bufmap.h"
+#include "pvfs2-internal.h"
 
 static int pvfs2_readlink(
     struct dentry *dentry, char __user *buffer, int buflen)
 {
     pvfs2_inode_t *pvfs2_inode = PVFS2_I(dentry->d_inode);
 
-    gossip_debug(GOSSIP_INODE_DEBUG, "pvfs2_readlink called on inode %d\n",
-                (int)dentry->d_inode->i_ino);
+    gossip_debug(GOSSIP_INODE_DEBUG, "pvfs2_readlink called on inode %llu\n",
+                llu(get_handle_from_ino(dentry->d_inode)));
 
     /*
       if we're getting called, the vfs has no doubt already done a

Index: xattr-default.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/kernel/linux-2.6/xattr-default.c,v
diff -p -u -r1.1.16.1 -r1.1.16.2
--- xattr-default.c	18 Sep 2006 15:05:22 -0000	1.1.16.1
+++ xattr-default.c	19 Oct 2006 22:17:11 -0000	1.1.16.2
@@ -26,7 +26,7 @@ int pvfs2_xattr_set_default(struct inode
 
     if (strcmp(name, "") == 0)
         return -EINVAL;
-    if ( !S_ISREG(inode->i_mode) &&
+    if (!S_ISREG(inode->i_mode) &&
        (!S_ISDIR(inode->i_mode) || inode->i_mode & S_ISVTX))
     {
        return -EPERM;

--- xattr_default.c	2006-10-19 18:17:11.000000000 -0400
+++ /dev/null	2004-06-24 14:04:38.000000000 -0400
@@ -1,60 +0,0 @@
-/*
- * (C) 2001 Clemson University and The University of Chicago
- *
- * See COPYING in top-level directory.
- */
-
-/** \file
- *  \ingroup pvfs2linux
- *
- *  Extended attributes for PVFS2 that handles all setxattr
- *  stuff even for those keys that do not have a prefix!
- *  This is the 2.6 kernels way of doing extended attributes
- */
-
-#include "pvfs2-kernel.h"
-#include "pvfs2-bufmap.h"
-
-#if !defined(PVFS2_LINUX_KERNEL_2_4) && defined(HAVE_GENERIC_GETXATTR)
-
-#include <linux/xattr.h>
-
-static int pvfs2_xattr_get_default(struct inode *inode,
-        const char *name, void *buffer, size_t size)
-{
-    if (strcmp(name, "") == 0)
-        return -EINVAL;
-    return pvfs2_inode_getxattr(inode, name, buffer, size);
-}
-
-static int pvfs2_xattr_set_default(struct inode *inode, 
-        const char *name, const void *buffer, size_t size, int flags)
-{
-    int internal_flag = 0;
-
-    if (strcmp(name, "") == 0)
-        return -EINVAL;
-    internal_flag = convert_to_internal_xattr_flags(flags);
-    return pvfs2_inode_setxattr(inode, name, buffer, size, internal_flag);
-}
-
-struct xattr_handler pvfs2_xattr_default_handler = {
-    /* 
-     * NOTE: this is set to be the empty string.
-     * so that all un-prefixed xattrs keys get caught
-     * here!
-     */
-    .prefix = PVFS2_XATTR_NAME_DEFAULT, 
-    .get    = pvfs2_xattr_get_default,
-    .set    = pvfs2_xattr_set_default,
-};
-
-#endif
-/*
- * Local variables:
- *  c-indent-level: 4
- *  c-basic-offset: 4
- * End:
- *
- * vim: ts=8 sts=4 sw=4 expandtab
- */



More information about the Pvfs2-cvs mailing list