Update of /cvsroot/ssic-linux/openssi/lustre/ldlm
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv587/ldlm
Modified Files:
Tag: OPENSSI-RH
Makefile.am l_lock.c ldlm_extent.c ldlm_lock.c ldlm_lockd.c
ldlm_request.c ldlm_resource.c
Log Message:
Finish lustre-1.0.1 import.
Index: Makefile.am
===================================================================
RCS file: /cvsroot/ssic-linux/openssi/lustre/ldlm/Makefile.am,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** Makefile.am 31 Mar 2003 18:22:12 -0000 1.1
--- Makefile.am 28 Jan 2004 23:48:05 -0000 1.2
***************
*** 4,24 ****
# See the file COPYING in this distribution
! DEFS=
! LDLMSOURCES= l_lock.c ldlm_lock.c ldlm_resource.c \
! ldlm_extent.c ldlm_request.c ldlm_lockd.c
if LIBLUSTRE
lib_LIBRARIES = libldlm.a
! libldlm_a_SOURCES = $(LDLMSOURCES)
! else
! MODULE = ldlm
! modulefs_DATA = ldlm.o
! EXTRA_PROGRAMS = ldlm
!
! ldlm_SOURCES = $(LDLMSOURCES)
endif
include $(top_srcdir)/Rules
-
-
--- 4,18 ----
# See the file COPYING in this distribution
! #
! # ldlm is built into ptlrpc
! #
! DEFS=
if LIBLUSTRE
lib_LIBRARIES = libldlm.a
! libldlm_a_SOURCES = l_lock.c ldlm_lock.c ldlm_resource.c ldlm_lib.c \
! ldlm_plain.c ldlm_extent.c ldlm_request.c ldlm_lockd.c ldlm_internal.h
endif
include $(top_srcdir)/Rules
Index: l_lock.c
===================================================================
RCS file: /cvsroot/ssic-linux/openssi/lustre/ldlm/l_lock.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** l_lock.c 31 Mar 2003 18:22:12 -0000 1.1
--- l_lock.c 28 Jan 2004 23:48:15 -0000 1.2
***************
*** 47,51 ****
#include <linux/lustre_dlm.h>
- #include <linux/obd_class.h>
#include <linux/lustre_lib.h>
--- 47,50 ----
***************
*** 115,116 ****
--- 114,138 ----
return owner;
}
+
+ #ifdef __KERNEL__
+ #include <linux/lustre_version.h>
+ void l_check_no_ns_lock(struct ldlm_namespace *ns)
+ {
+ static long next_msg;
+
+ if (l_has_lock(&ns->ns_lock) && time_after(jiffies, next_msg)) {
+ CERROR("namespace %s lock held during RPCs; tell phil\n",
+ ns->ns_name);
+ #if (LUSTRE_KERNEL_VERSION >= 30)
+ CERROR(portals_debug_dumpstack());
+ #endif
+ next_msg = jiffies + 60 * HZ;
+ }
+ }
+
+ #else
+ void l_check_no_ns_lock(struct ldlm_namespace *ns)
+ {
+ #warning "FIXME: check lock in user space??"
+ }
+ #endif /* __KERNEL__ */
Index: ldlm_extent.c
===================================================================
RCS file: /cvsroot/ssic-linux/openssi/lustre/ldlm/ldlm_extent.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** ldlm_extent.c 22 Apr 2003 16:28:55 -0000 1.1
--- ldlm_extent.c 28 Jan 2004 23:48:16 -0000 1.2
***************
*** 2,6 ****
* vim:expandtab:shiftwidth=8:tabstop=8:
*
! * Copyright (c) 2002 Cluster File Systems, Inc.
* Author: Peter Braam <braam@xxxxxxxxxxxxx>
* Author: Phil Schwan <phil@xxxxxxxxxxxxx>
--- 2,6 ----
* vim:expandtab:shiftwidth=8:tabstop=8:
*
! * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
* Author: Peter Braam <braam@xxxxxxxxxxxxx>
* Author: Phil Schwan <phil@xxxxxxxxxxxxx>
***************
*** 31,43 ****
#include <linux/lustre_lib.h>
! /* This function will be called to judge if one extent overlaps with another
*/
! int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
! {
! if ((a->l_extent.start <= b->l_extent.end) &&
! (a->l_extent.end >= b->l_extent.start))
! RETURN(0);
!
! RETURN(1);
! }
/* The purpose of this function is to return:
--- 31,35 ----
#include <linux/lustre_lib.h>
! #include "ldlm_internal.h"
/* The purpose of this function is to return:
***************
*** 46,56 ****
* - and not overlapping existing conflicting extents outside the requested
one
*
! * An alternative policy is to not shrink the new extent when conflicts exist.
! *
! * To reconstruct our formulas, take a deep breath. */
! static void policy_internal(struct list_head *queue, struct ldlm_extent
*req_ex,
! struct ldlm_extent *new_ex, ldlm_mode_t mode)
{
struct list_head *tmp;
list_for_each(tmp, queue) {
--- 38,56 ----
* - and not overlapping existing conflicting extents outside the requested
one
*
! * An alternative policy is to not shrink the new extent when conflicts exist
*/
! static void
! ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
! struct ldlm_extent *new_ex)
{
struct list_head *tmp;
+ ldlm_mode_t req_mode = req->l_req_mode;
+ __u64 req_start = req->l_policy_data.l_extent.start;
+ __u64 req_end = req->l_policy_data.l_extent.end;
+ ENTRY;
+
+ if (new_ex->start == req_start && new_ex->end == req_end) {
+ EXIT;
+ return;
+ }
list_for_each(tmp, queue) {
***************
*** 58,142 ****
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! #if 0
/* if lock doesn't overlap new_ex, skip it. */
! if (lock->l_extent.end < new_ex->start ||
! lock->l_extent.start > new_ex->end)
continue;
/* Locks are compatible, overlap doesn't matter */
! if (lockmode_compat(lock->l_req_mode, mode))
continue;
! if (lock->l_extent.start < req_ex->start) {
! if (lock->l_extent.end == ~0) {
! new_ex->start = req_ex->start;
! new_ex->end = req_ex->end;
return;
}
! new_ex->start = MIN(lock->l_extent.end + 1,
! req_ex->start);
}
! if (lock->l_extent.end > req_ex->end) {
! if (lock->l_extent.start == 0) {
! new_ex->start = req_ex->start;
! new_ex->end = req_ex->end;
return;
}
! new_ex->end = MAX(lock->l_extent.start - 1,
! req_ex->end);
! }
! #else
! if (lock->l_extent.end < req_ex->start) {
! new_ex->start = MIN(lock->l_extent.end + 1,
! new_ex->start);
! } else {
! if (lock->l_extent.start < req_ex->start &&
! !lockmode_compat(lock->l_req_mode, mode))
! /* Policy: minimize conflict overlap */
! new_ex->start = req_ex->start;
! }
! if (lock->l_extent.start > req_ex->end) {
! new_ex->end = MAX(lock->l_extent.start - 1,
! new_ex->end);
! } else {
! if (lock->l_extent.end > req_ex->end &&
! !lockmode_compat(lock->l_req_mode, mode))
! /* Policy: minimize conflict overlap */
! new_ex->end = req_ex->end;
}
- #endif
}
}
! /* apply the internal policy by walking all the lists */
! int ldlm_extent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
! void *req_cookie, ldlm_mode_t mode, int flags,
! void *data)
{
- struct ldlm_lock *lock = *lockp;
struct ldlm_resource *res = lock->l_resource;
! struct ldlm_extent *req_ex = req_cookie;
! struct ldlm_extent new_ex;
! new_ex.start = 0;
! new_ex.end = ~0;
! if (!res)
! LBUG();
! l_lock(&ns->ns_lock);
! policy_internal(&res->lr_granted, req_ex, &new_ex, mode);
! policy_internal(&res->lr_converting, req_ex, &new_ex, mode);
! policy_internal(&res->lr_waiting, req_ex, &new_ex, mode);
! l_unlock(&ns->ns_lock);
! memcpy(&lock->l_extent, &new_ex, sizeof(new_ex));
! LDLM_DEBUG(lock, "new extent "LPU64" -> "LPU64, new_ex.start,
! new_ex.end);
! if (new_ex.end != req_ex->end || new_ex.start != req_ex->start)
! return ELDLM_LOCK_CHANGED;
! else
! return 0;
}
--- 58,211 ----
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! if (req == lock) {
! EXIT;
! return;
! }
!
/* if lock doesn't overlap new_ex, skip it. */
! if (lock->l_policy_data.l_extent.end < new_ex->start ||
! lock->l_policy_data.l_extent.start > new_ex->end)
continue;
/* Locks are compatible, overlap doesn't matter */
! if (lockmode_compat(lock->l_req_mode, req_mode))
continue;
! if (lock->l_policy_data.l_extent.start < req_start) {
! if (lock->l_policy_data.l_extent.end == ~0) {
! new_ex->start = req_start;
! new_ex->end = req_end;
! EXIT;
return;
}
! new_ex->start =
MIN(lock->l_policy_data.l_extent.end+1,
! req_start);
}
! if (lock->l_policy_data.l_extent.end > req_end) {
! if (lock->l_policy_data.l_extent.start == 0) {
! new_ex->start = req_start;
! new_ex->end = req_end;
! EXIT;
return;
}
! new_ex->end =
MAX(lock->l_policy_data.l_extent.start-1,
! req_end);
}
}
+ EXIT;
}
! /* Determine if the lock is compatible with all locks on the queue. */
! static int
! ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
! int send_cbs)
! {
! struct list_head *tmp;
! struct ldlm_lock *lock;
! ldlm_mode_t req_mode = req->l_req_mode;
! __u64 req_start = req->l_policy_data.l_extent.start;
! __u64 req_end = req->l_policy_data.l_extent.end;
! int compat = 1;
! ENTRY;
!
! list_for_each(tmp, queue) {
! lock = list_entry(tmp, struct ldlm_lock, l_res_link);
!
! if (req == lock)
! RETURN(compat);
!
! /* locks are compatible, overlap doesn't matter */
! if (lockmode_compat(lock->l_req_mode, req_mode))
! continue;
!
! /* if lock doesn't overlap skip it */
! if (lock->l_policy_data.l_extent.end < req_start ||
! lock->l_policy_data.l_extent.start > req_end)
! continue;
!
! if (!send_cbs)
! RETURN(0);
!
! compat = 0;
! if (lock->l_blocking_ast)
! ldlm_add_ast_work_item(lock, req, NULL, 0);
! }
!
! RETURN(compat);
! }
!
! /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
! * - blocking ASTs have already been sent
! * - the caller has already initialized req->lr_tmp
! * - must call this function with the ns lock held
! *
! * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
! * - blocking ASTs have not been sent
! * - the caller has NOT initialized req->lr_tmp, so we must
! * - must call this function with the ns lock held once */
! int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int
first_enq,
! ldlm_error_t *err)
{
struct ldlm_resource *res = lock->l_resource;
! struct ldlm_extent new_ex = {0, ~0};
! struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
! int rc;
! ENTRY;
! LASSERT(list_empty(&res->lr_converting));
! if (!first_enq) {
! LASSERT(res->lr_tmp != NULL);
! rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0);
! if (!rc)
! RETURN(LDLM_ITER_STOP);
! rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0);
! if (!rc)
! RETURN(LDLM_ITER_STOP);
! ldlm_resource_unlink_lock(lock);
! ldlm_grant_lock(lock, NULL, 0, 1);
! RETURN(LDLM_ITER_CONTINUE);
! }
! /* In order to determine the largest possible extent we can
! * grant, we need to scan all of the queues. */
! ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
! ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
! if (new_ex.start != lock->l_policy_data.l_extent.start ||
! new_ex.end != lock->l_policy_data.l_extent.end) {
! *flags |= LDLM_FL_LOCK_CHANGED;
! lock->l_policy_data.l_extent.start = new_ex.start;
! lock->l_policy_data.l_extent.end = new_ex.end;
! }
!
! restart:
! LASSERT(res->lr_tmp == NULL);
! res->lr_tmp = &rpc_list;
! rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1);
! rc += ldlm_extent_compat_queue(&res->lr_waiting, lock, 1);
! res->lr_tmp = NULL;
!
! if (rc != 2) {
! /* If either of the compat_queue()s returned 0, then we
! * have ASTs to send and must go onto the waiting list.
! *
! * bug 2322: we used to unlink and re-add here, which was a
! * terrible folly -- if we goto restart, we could get
! * re-ordered! Causes deadlock, because ASTs aren't sent! */
! if (list_empty(&lock->l_res_link))
! ldlm_resource_add_lock(res, &res->lr_waiting, lock);
! l_unlock(&res->lr_namespace->ns_lock);
! rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
! l_lock(&res->lr_namespace->ns_lock);
! if (rc == -ERESTART)
! GOTO(restart, -ERESTART);
! *flags |= LDLM_FL_BLOCK_GRANTED;
! } else {
! ldlm_resource_unlink_lock(lock);
! ldlm_grant_lock(lock, NULL, 0, 0);
! }
! RETURN(0);
}
Index: ldlm_lock.c
===================================================================
RCS file: /cvsroot/ssic-linux/openssi/lustre/ldlm/ldlm_lock.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** ldlm_lock.c 23 Apr 2003 18:20:49 -0000 1.1
--- ldlm_lock.c 28 Jan 2004 23:48:17 -0000 1.2
***************
*** 25,38 ****
#ifdef __KERNEL__
! #include <linux/slab.h>
! #include <linux/module.h>
! #include <linux/lustre_dlm.h>
! #include <linux/lustre_mds.h>
#else
! #include <liblustre.h>
! #include <linux/kp30.h>
#endif
[...1036 lines suppressed...]
! else if (lock->l_resource->lr_type == LDLM_FLOCK)
! CDEBUG(level, " Pid: %d Extent: "LPU64" -> "LPU64"\n",
! lock->l_policy_data.l_flock.pid,
! lock->l_policy_data.l_flock.start,
! lock->l_policy_data.l_flock.end);
}
***************
*** 1165,1169 ****
return;
! ldlm_lock_dump(D_OTHER, lock);
LDLM_LOCK_PUT(lock);
--- 1126,1130 ----
return;
! ldlm_lock_dump(D_OTHER, lock, 0);
LDLM_LOCK_PUT(lock);
Index: ldlm_lockd.c
===================================================================
RCS file: /cvsroot/ssic-linux/openssi/lustre/ldlm/ldlm_lockd.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** ldlm_lockd.c 24 Apr 2003 04:55:24 -0000 1.1
--- ldlm_lockd.c 28 Jan 2004 23:48:17 -0000 1.2
***************
*** 22,26 ****
*/
! #define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_LDLM
--- 22,28 ----
*/
! #ifndef EXPORT_SYMTAB
! # define EXPORT_SYMTAB
[...1603 lines suppressed...]
! module_exit(ldlm_exit);
! #endif
--- 1359,1375 ----
EXPORT_SYMBOL(l_unlock);
! /* ldlm_lib.c */
! EXPORT_SYMBOL(client_obd_setup);
! EXPORT_SYMBOL(client_obd_cleanup);
! EXPORT_SYMBOL(client_connect_import);
! EXPORT_SYMBOL(client_disconnect_export);
! EXPORT_SYMBOL(target_abort_recovery);
! EXPORT_SYMBOL(target_handle_connect);
! EXPORT_SYMBOL(target_destroy_export);
! EXPORT_SYMBOL(target_cancel_recovery_timer);
! EXPORT_SYMBOL(target_send_reply);
! EXPORT_SYMBOL(target_queue_recovery_request);
! EXPORT_SYMBOL(target_handle_ping);
! EXPORT_SYMBOL(target_handle_disconnect);
! EXPORT_SYMBOL(target_queue_final_reply);
! EXPORT_SYMBOL(ldlm_put_lock_into_req);
Index: ldlm_request.c
===================================================================
RCS file: /cvsroot/ssic-linux/openssi/lustre/ldlm/ldlm_request.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** ldlm_request.c 22 Apr 2003 18:38:35 -0000 1.1
--- ldlm_request.c 28 Jan 2004 23:48:17 -0000 1.2
***************
*** 30,33 ****
--- 30,35 ----
#include <linux/obd.h>
+ #include "ldlm_internal.h"
+
static void interrupted_completion_wait(void *data)
{
***************
*** 41,60 ****
int ldlm_expired_completion_wait(void *data)
{
struct lock_wait_data *lwd = data;
struct ldlm_lock *lock = lwd->lwd_lock;
! struct obd_device *obd = class_conn2obd(lock->l_connh);
! struct obd_import *imp = obd->u.cli.cl_import;
ptlrpc_fail_import(imp, lwd->lwd_generation);
- LDLM_DEBUG(lock, "timed out waiting for completion");
- CERROR("lock timed out for res "LPU64"/"LPU64", mode %s: "
- "entering recovery for %s@%s\n",
- lock->l_resource->lr_name.name[0],
- lock->l_resource->lr_name.name[1],
- ldlm_lockname[lock->l_granted_mode],
- imp->imp_target_uuid.uuid,
- imp->imp_connection->c_remote_uuid.uuid);
-
RETURN(0);
}
--- 43,72 ----
int ldlm_expired_completion_wait(void *data)
{
+ static unsigned long next_dump = 0;
struct lock_wait_data *lwd = data;
struct ldlm_lock *lock = lwd->lwd_lock;
! struct obd_import *imp;
! struct obd_device *obd;
+ if (lock->l_conn_export == NULL) {
+ LDLM_ERROR(lock, "lock timed out; not entering recovery in "
+ "server code, just going back to sleep");
+ if (time_after(jiffies, next_dump)) {
+ unsigned int debug = portal_debug;
+ next_dump = jiffies + 300 * HZ;
+ portal_debug |= D_OTHER;
+ ldlm_namespace_dump(lock->l_resource->lr_namespace);
+ portal_debug = debug;
+ }
+ RETURN(0);
+ }
+
+ obd = lock->l_conn_export->exp_obd;
+ imp = obd->u.cli.cl_import;
ptlrpc_fail_import(imp, lwd->lwd_generation);
+ LDLM_ERROR(lock, "lock timed out, entering recovery for %s@%s",
+ imp->imp_target_uuid.uuid,
+ imp->imp_connection->c_remote_uuid.uuid);
RETURN(0);
}
***************
*** 62,65 ****
--- 74,78 ----
int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
+ /* XXX ALLOCATE - 160 mytes */
struct lock_wait_data lwd;
unsigned long irqflags;
***************
*** 68,82 ****
int rc = 0;
struct l_wait_info lwi;
-
- obd = class_conn2obd(lock->l_connh);
-
- /* if this is a local lock, then there is no import */
- if (obd != NULL)
- imp = obd->u.cli.cl_import;
-
- lwd.lwd_lock = lock;
-
- lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
- interrupted_completion_wait, &lwd);
ENTRY;
--- 81,84 ----
***************
*** 95,102 ****
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
"sleeping");
! ldlm_lock_dump(D_OTHER, lock);
ldlm_reprocess_all(lock->l_resource);
! noreproc:
if (imp != NULL) {
spin_lock_irqsave(&imp->imp_lock, irqflags);
--- 97,115 ----
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
"sleeping");
! ldlm_lock_dump(D_OTHER, lock, 0);
ldlm_reprocess_all(lock->l_resource);
! noreproc:
!
! obd = class_exp2obd(lock->l_conn_export);
!
! /* if this is a local lock, then there is no import */
! if (obd != NULL)
! imp = obd->u.cli.cl_import;
!
! lwd.lwd_lock = lock;
!
! lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
! interrupted_completion_wait, &lwd);
if (imp != NULL) {
spin_lock_irqsave(&imp->imp_lock, irqflags);
***************
*** 135,139 ****
ldlm_blocking_callback blocking,
void *data,
- void *cp_data,
struct lustre_handle *lockh)
{
--- 148,151 ----
***************
*** 148,152 ****
lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
! blocking, data, cp_data);
if (!lock)
GOTO(out_nolock, err = -ENOMEM);
--- 160,164 ----
lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
! blocking, completion, data);
if (!lock)
GOTO(out_nolock, err = -ENOMEM);
***************
*** 157,167 ****
lock->l_flags |= LDLM_FL_LOCAL;
! err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
! completion);
if (err != ELDLM_OK)
GOTO(out, err);
! if (type == LDLM_EXTENT)
! memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
if ((*flags) & LDLM_FL_LOCK_CHANGED)
memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id));
--- 169,178 ----
lock->l_flags |= LDLM_FL_LOCAL;
! err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
if (err != ELDLM_OK)
GOTO(out, err);
! if (type != LDLM_PLAIN)
! memcpy(cookie, &lock->l_policy_data, cookielen);
if ((*flags) & LDLM_FL_LOCK_CHANGED)
memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id));
***************
*** 181,185 ****
}
! int ldlm_cli_enqueue(struct lustre_handle *connh,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
--- 192,209 ----
}
! static void failed_lock_cleanup(struct ldlm_namespace *ns,
! struct ldlm_lock *lock,
! struct lustre_handle *lockh, int mode)
! {
! /* Set a flag to prevent us from sending a CANCEL (bug 407) */
! l_lock(&ns->ns_lock);
! lock->l_flags |= LDLM_FL_LOCAL_ONLY;
! LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
! l_unlock(&ns->ns_lock);
!
! ldlm_lock_decref_and_cancel(lockh, mode);
! }
!
! int ldlm_cli_enqueue(struct obd_export *exp,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
***************
*** 193,197 ****
ldlm_blocking_callback blocking,
void *data,
- void *cp_data,
struct lustre_handle *lockh)
{
--- 217,220 ----
***************
*** 203,213 ****
is_replay = *flags & LDLM_FL_REPLAY;
! LASSERT(connh != NULL || !is_replay);
! if (connh == NULL) {
rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
type, cookie, cookielen, mode,
flags, completion, blocking, data,
! cp_data, lockh);
RETURN(rc);
}
--- 226,236 ----
is_replay = *flags & LDLM_FL_REPLAY;
! LASSERT(exp != NULL || !is_replay);
! if (exp == NULL) {
rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
type, cookie, cookielen, mode,
flags, completion, blocking, data,
! lockh);
RETURN(rc);
}
***************
*** 218,243 ****
lock = ldlm_handle2lock(lockh);
LDLM_DEBUG(lock, "client-side enqueue START");
! LASSERT(connh == lock->l_connh);
} else {
lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
! mode, blocking, data, cp_data);
if (lock == NULL)
GOTO(out_nolock, rc = -ENOMEM);
- /* ugh. I set this early (instead of waiting for _enqueue)
- * because the completion AST might arrive early, and we need
- * (in just this one case) to run the completion_cb even if it
- * arrives before the reply. */
- lock->l_completion_ast = completion;
- LDLM_DEBUG(lock, "client-side enqueue START");
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
! if (type == LDLM_EXTENT)
! memcpy(&lock->l_extent, cookie,
! sizeof(body->lock_desc.l_extent));
}
if (req == NULL) {
! req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE,
1,
&size, NULL);
if (!req)
--- 241,260 ----
lock = ldlm_handle2lock(lockh);
LDLM_DEBUG(lock, "client-side enqueue START");
! LASSERT(exp == lock->l_conn_export);
} else {
lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
! mode, blocking, completion, data);
if (lock == NULL)
GOTO(out_nolock, rc = -ENOMEM);
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
! if (type != LDLM_PLAIN)
! memcpy(&lock->l_policy_data, cookie, cookielen);
! LDLM_DEBUG(lock, "client-side enqueue START");
}
if (req == NULL) {
! req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
&size, NULL);
if (!req)
***************
*** 262,266 ****
req->rq_replen = lustre_msg_size(1, &size);
}
! lock->l_connh = connh;
lock->l_export = NULL;
lock->l_blocking_ast = blocking;
--- 279,283 ----
req->rq_replen = lustre_msg_size(1, &size);
}
! lock->l_conn_export = exp;
lock->l_export = NULL;
lock->l_blocking_ast = blocking;
***************
*** 273,290 ****
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
! /* Set a flag to prevent us from sending a CANCEL (bug 407) */
! l_lock(&ns->ns_lock);
! lock->l_flags |= LDLM_FL_LOCAL_ONLY;
! l_unlock(&ns->ns_lock);
!
! ldlm_lock_decref_and_cancel(lockh, mode);
!
if (rc == ELDLM_LOCK_ABORTED) {
! /* caller expects reply buffer 0 to have been swabbed
*/
! reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
! CERROR ("Can't unpack ldlm_reply\n");
! GOTO (out_req, rc = -EPROTO);
}
}
--- 290,301 ----
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
! failed_lock_cleanup(ns, lock, lockh, mode);
if (rc == ELDLM_LOCK_ABORTED) {
! /* Before we return, swab the reply */
! reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
! CERROR("Can't unpack ldlm_reply\n");
! GOTO(out_req, rc = -EPROTO);
}
}
***************
*** 292,328 ****
}
! reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
! CERROR ("Can't unpack ldlm_reply\n");
! GOTO (out_req, rc = -EPROTO);
}
!
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
*flags = reply->lock_flags;
! CDEBUG(D_INFO, "local: %p, remote: %p, flags: %d\n", lock,
! (void *)(unsigned long)reply->lock_handle.addr, *flags);
if (type == LDLM_EXTENT) {
CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
"extent "LPU64" -> "LPU64"\n",
! body->lock_desc.l_extent.start,
! body->lock_desc.l_extent.end,
! reply->lock_extent.start, reply->lock_extent.end);
!
! if ((reply->lock_extent.end & ~PAGE_MASK) != ~PAGE_MASK) {
! /* XXX Old versions of BA OST code have a fencepost
bug
! * which will cause them to grant a lock that's one
! * byte too large. This can be safely removed after
BA
! * ships their next release -phik (02 Apr 2003) */
! reply->lock_extent.end--;
! } else if ((reply->lock_extent.start & ~PAGE_MASK) ==
! ~PAGE_MASK) {
! reply->lock_extent.start++;
! }
! cookie = &reply->lock_extent; /* FIXME bug 267 */
! cookielen = sizeof(reply->lock_extent);
}
--- 303,332 ----
}
! reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
! CERROR("Can't unpack ldlm_reply\n");
! GOTO(out_req, rc = -EPROTO);
}
!
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
*flags = reply->lock_flags;
! CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
! lock, reply->lock_handle.cookie, *flags);
if (type == LDLM_EXTENT) {
CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
"extent "LPU64" -> "LPU64"\n",
! body->lock_desc.l_policy_data.l_extent.start,
! body->lock_desc.l_policy_data.l_extent.end,
! reply->lock_policy_data.l_extent.start,
! reply->lock_policy_data.l_extent.end);
! cookie = &reply->lock_policy_data; /* FIXME bug 267 */
! cookielen = sizeof(struct ldlm_extent);
! } else if (type == LDLM_FLOCK) {
! cookie = &reply->lock_policy_data;
! cookielen = sizeof(struct ldlm_flock);
}
***************
*** 355,367 ****
}
}
!
! if (!is_replay) {
l_lock(&ns->ns_lock);
! lock->l_completion_ast = NULL;
! rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
! completion);
l_unlock(&ns->ns_lock);
! if (lock->l_completion_ast)
! lock->l_completion_ast(lock, *flags, NULL);
}
--- 359,378 ----
}
}
! if ((*flags) & LDLM_FL_AST_SENT) {
l_lock(&ns->ns_lock);
! lock->l_flags |= LDLM_FL_CBPENDING;
l_unlock(&ns->ns_lock);
! LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
! }
!
! if (!is_replay) {
! rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
! if (lock->l_completion_ast != NULL) {
! int err = lock->l_completion_ast(lock, *flags, NULL);
! if (err)
! failed_lock_cleanup(ns, lock, lockh, mode);
! if (!rc)
! rc = err;
! }
}
***************
*** 377,418 ****
}
- int ldlm_match_or_enqueue(struct lustre_handle *connh,
- struct ptlrpc_request *req,
- struct ldlm_namespace *ns,
- struct lustre_handle *parent_lock_handle,
- struct ldlm_res_id res_id,
- __u32 type,
- void *cookie, int cookielen,
- ldlm_mode_t mode,
- int *flags,
- ldlm_completion_callback completion,
- ldlm_blocking_callback blocking,
- void *data,
- void *cp_data,
- struct lustre_handle *lockh)
- {
- int rc;
- ENTRY;
- if (connh == NULL) {
- /* Just to make sure that I understand things --phil */
- LASSERT(*flags & LDLM_FL_LOCAL_ONLY);
- }
-
- LDLM_DEBUG_NOLOCK("resource "LPU64"/"LPU64, res_id.name[0],
- res_id.name[1]);
- rc = ldlm_lock_match(ns, *flags, &res_id, type, cookie, cookielen,
mode,
- lockh);
- if (rc == 0) {
- rc = ldlm_cli_enqueue(connh, req, ns, parent_lock_handle,
- res_id, type, cookie, cookielen, mode,
- flags, completion, blocking, data,
- cp_data, lockh);
- if (rc != ELDLM_OK)
- CERROR("ldlm_cli_enqueue: err: %d\n", rc);
- RETURN(rc);
- }
- RETURN(0);
- }
-
int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
{
--- 388,391 ----
***************
*** 421,427 ****
int flags = LDLM_FL_REPLAY;
ldlm_lock2handle(lock, &lockh);
! return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, junk,
lock->l_resource->lr_type, NULL, 0, -1,
&flags,
! NULL, NULL, NULL, 0, &lockh);
}
--- 394,400 ----
int flags = LDLM_FL_REPLAY;
ldlm_lock2handle(lock, &lockh);
! return ldlm_cli_enqueue(lock->l_conn_export, NULL, NULL, NULL, junk,
lock->l_resource->lr_type, NULL, 0, -1,
&flags,
! NULL, NULL, NULL, &lockh);
}
***************
*** 449,453 ****
{
struct ldlm_request *body;
- struct lustre_handle *connh;
struct ldlm_reply *reply;
struct ldlm_lock *lock;
--- 422,425 ----
***************
*** 463,475 ****
}
*flags = 0;
- connh = lock->l_connh;
! if (!connh)
RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
LDLM_DEBUG(lock, "client-side convert");
! req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1,
&size,
! NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
--- 435,446 ----
}
*flags = 0;
! if (lock->l_conn_export == NULL)
RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
LDLM_DEBUG(lock, "client-side convert");
! req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
! LDLM_CONVERT, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
***************
*** 495,499 ****
GOTO (out, rc = -EPROTO);
}
!
res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
if (res != NULL)
--- 466,470 ----
GOTO (out, rc = -EPROTO);
}
!
res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
if (res != NULL)
***************
*** 523,528 ****
RETURN(0);
! if (lock->l_connh) {
int local_only;
LDLM_DEBUG(lock, "client-side cancel");
--- 494,500 ----
RETURN(0);
! if (lock->l_conn_export) {
int local_only;
+ struct obd_import *imp;
LDLM_DEBUG(lock, "client-side cancel");
***************
*** 540,545 ****
}
! req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh),
! LDLM_CANCEL, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
--- 512,523 ----
}
! imp = class_exp2cliimp(lock->l_conn_export);
! if (imp == NULL || imp->imp_invalid) {
! CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
! imp);
! goto local_cancel;
! }
!
! req = ptlrpc_prep_req(imp, LDLM_CANCEL, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
***************
*** 569,577 ****
ldlm_lock_cancel(lock);
} else {
- LDLM_DEBUG(lock, "client-side local cancel");
if (lock->l_resource->lr_namespace->ns_client) {
! CERROR("Trying to cancel local lock\n");
LBUG();
}
ldlm_lock_cancel(lock);
ldlm_reprocess_all(lock->l_resource);
--- 547,555 ----
ldlm_lock_cancel(lock);
} else {
if (lock->l_resource->lr_namespace->ns_client) {
! LDLM_ERROR(lock, "Trying to cancel local lock\n");
LBUG();
}
+ LDLM_DEBUG(lock, "client-side local cancel");
ldlm_lock_cancel(lock);
ldlm_reprocess_all(lock->l_resource);
***************
*** 643,648 ****
}
! int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
! struct ldlm_res_id res_id, int flags)
{
struct ldlm_resource *res;
--- 621,627 ----
}
! static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
! struct ldlm_res_id res_id, int
flags,
! void *opaque)
{
struct ldlm_resource *res;
***************
*** 663,668 ****
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! if (lock->l_readers || lock->l_writers)
continue;
/* See CBPENDING comment in ldlm_cancel_lru */
--- 642,659 ----
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! if (opaque != NULL && lock->l_ast_data != opaque) {
! LDLM_ERROR(lock, "data %p doesn't match opaque %p",
! lock->l_ast_data, opaque);
! //LBUG();
continue;
+ }
+
+ if (lock->l_readers || lock->l_writers) {
+ if (flags & LDLM_FL_WARN) {
+ LDLM_ERROR(lock, "lock in use");
+ //LBUG();
+ }
+ continue;
+ }
/* See CBPENDING comment in ldlm_cancel_lru */
***************
*** 711,717 ****
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server.
! * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback. */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
! struct ldlm_res_id *res_id, int flags)
{
int i;
--- 702,709 ----
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server.
! * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
! * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
! struct ldlm_res_id *res_id, int flags, void
*opaque)
{
int i;
***************
*** 722,726 ****
if (res_id)
! RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags));
l_lock(&ns->ns_lock);
--- 714,719 ----
if (res_id)
! RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
! opaque));
l_lock(&ns->ns_lock);
***************
*** 734,738 ****
rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
! flags);
if (rc)
--- 727,731 ----
rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
! flags, opaque);
if (rc)
***************
*** 815,824 ****
{
int i, rc = LDLM_ITER_CONTINUE;
!
l_lock(&ns->ns_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
struct list_head *tmp, *next;
list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
! struct ldlm_resource *res =
list_entry(tmp, struct ldlm_resource,
lr_hash);
--- 808,817 ----
{
int i, rc = LDLM_ITER_CONTINUE;
!
l_lock(&ns->ns_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
struct list_head *tmp, *next;
list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
! struct ldlm_resource *res =
list_entry(tmp, struct ldlm_resource,
lr_hash);
***************
*** 835,838 ****
--- 828,858 ----
}
+ /* non-blocking function to manipulate a lock whose cb_data is being put
away.*/
+ void ldlm_change_cbdata(struct ldlm_namespace *ns,
+ struct ldlm_res_id *res_id,
+ ldlm_iterator_t iter,
+ void *data)
+ {
+ struct ldlm_resource *res;
+ ENTRY;
+
+ if (ns == NULL) {
+ CERROR("must pass in namespace");
+ LBUG();
+ }
+
+ res = ldlm_resource_get(ns, NULL, *res_id, 0, 0);
+ if (res == NULL) {
+ EXIT;
+ return;
+ }
+
+ l_lock(&ns->ns_lock);
+ ldlm_resource_foreach(res, iter, data);
+ l_unlock(&ns->ns_lock);
+ ldlm_resource_putref(res);
+ EXIT;
+ }
+
/* Lock replay */
***************
*** 876,880 ****
else
flags = LDLM_FL_REPLAY;
!
size = sizeof(*body);
req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
--- 896,900 ----
else
flags = LDLM_FL_REPLAY;
!
size = sizeof(*body);
req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
***************
*** 883,888 ****
/* We're part of recovery, so don't wait for it. */
! req->rq_level = LUSTRE_CONN_RECOVD;
!
body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
ldlm_lock2desc(lock, &body->lock_desc);
--- 903,908 ----
/* We're part of recovery, so don't wait for it. */
! req->rq_send_state = LUSTRE_IMP_REPLAY;
!
body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
ldlm_lock2desc(lock, &body->lock_desc);
***************
*** 897,908 ****
if (rc != ELDLM_OK)
GOTO(out, rc);
!
reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
! CERROR ("Can't unpack ldlm_reply\n");
GOTO (out, rc = -EPROTO);
}
!
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
--- 917,928 ----
if (rc != ELDLM_OK)
GOTO(out, rc);
!
reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
! CERROR("Can't unpack ldlm_reply\n");
GOTO (out, rc = -EPROTO);
}
!
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
***************
*** 919,923 ****
struct ldlm_lock *lock;
int rc = 0;
!
ENTRY;
INIT_LIST_HEAD(&list);
--- 939,943 ----
struct ldlm_lock *lock;
int rc = 0;
!
ENTRY;
INIT_LIST_HEAD(&list);
Index: ldlm_resource.c
===================================================================
RCS file: /cvsroot/ssic-linux/openssi/lustre/ldlm/ldlm_resource.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** ldlm_resource.c 23 Apr 2003 18:21:00 -0000 1.1
--- ldlm_resource.c 28 Jan 2004 23:48:18 -0000 1.2
***************
*** 24,33 ****
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
! #include <linux/lustre_dlm.h>
#else
! #include <liblustre.h>
#endif
#include <linux/obd_class.h>
kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
--- 24,34 ----
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
! # include <linux/lustre_dlm.h>
#else
! # include <liblustre.h>
#endif
#include <linux/obd_class.h>
+ #include "ldlm_internal.h"
kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
***************
*** 35,64 ****
spinlock_t ldlm_namespace_lock = SPIN_LOCK_UNLOCKED;
struct list_head ldlm_namespace_list = LIST_HEAD_INIT(ldlm_namespace_list);
! static struct proc_dir_entry *ldlm_ns_proc_dir = NULL;
! int ldlm_proc_setup(struct obd_device *obd)
{
int rc;
ENTRY;
LASSERT(ldlm_ns_proc_dir == NULL);
! LASSERT(obd != NULL);
! rc = lprocfs_obd_attach(obd, 0);
! if (rc) {
CERROR("LProcFS failed in ldlm-init\n");
! RETURN(rc);
}
! ldlm_ns_proc_dir = obd->obd_proc_entry;
RETURN(0);
}
! void ldlm_proc_cleanup(struct obd_device *obd)
{
if (ldlm_ns_proc_dir) {
! lprocfs_obd_detach(obd);
ldlm_ns_proc_dir = NULL;
}
}
- #ifdef __KERNEL__
static int lprocfs_uint_rd(char *page, char **start, off_t off,
int count, int *eof, void *data)
--- 36,119 ----
spinlock_t ldlm_namespace_lock = SPIN_LOCK_UNLOCKED;
struct list_head ldlm_namespace_list = LIST_HEAD_INIT(ldlm_namespace_list);
! struct proc_dir_entry *ldlm_type_proc_dir = NULL;
! struct proc_dir_entry *ldlm_ns_proc_dir = NULL;
! struct proc_dir_entry *ldlm_svc_proc_dir = NULL;
! #ifdef __KERNEL__
! static int ldlm_proc_dump_ns(struct file *file, const char *buffer, unsigned
long count, void *data)
! {
! ldlm_dump_all_namespaces();
! RETURN(count);
! }
!
! int ldlm_proc_setup(void)
{
int rc;
+ struct lprocfs_vars list[] = {
+ { "dump_namespaces", NULL, ldlm_proc_dump_ns, NULL },
+ { NULL }};
ENTRY;
LASSERT(ldlm_ns_proc_dir == NULL);
!
! ldlm_type_proc_dir = lprocfs_register(OBD_LDLM_DEVICENAME,
! proc_lustre_root,
! NULL, NULL);
! if (IS_ERR(ldlm_type_proc_dir)) {
CERROR("LProcFS failed in ldlm-init\n");
! rc = PTR_ERR(ldlm_type_proc_dir);
! GOTO(err, rc);
}
!
! ldlm_ns_proc_dir = lprocfs_register("namespaces",
! ldlm_type_proc_dir,
! NULL, NULL);
! if (IS_ERR(ldlm_ns_proc_dir)) {
! CERROR("LProcFS failed in ldlm-init\n");
! rc = PTR_ERR(ldlm_ns_proc_dir);
! GOTO(err_type, rc);
! }
!
! ldlm_svc_proc_dir = lprocfs_register("services",
! ldlm_type_proc_dir,
! NULL, NULL);
! if (IS_ERR(ldlm_svc_proc_dir)) {
! CERROR("LProcFS failed in ldlm-init\n");
! rc = PTR_ERR(ldlm_svc_proc_dir);
! GOTO(err_ns, rc);
! }
!
! rc = lprocfs_add_vars(ldlm_type_proc_dir, list, NULL);
!
RETURN(0);
+
+ err_ns:
+ lprocfs_remove(ldlm_ns_proc_dir);
+ err_type:
+ lprocfs_remove(ldlm_type_proc_dir);
+ err:
+ ldlm_type_proc_dir = NULL;
+ ldlm_ns_proc_dir = NULL;
+ ldlm_svc_proc_dir = NULL;
+ RETURN(rc);
}
! void ldlm_proc_cleanup(void)
{
+ if (ldlm_svc_proc_dir) {
+ lprocfs_remove(ldlm_svc_proc_dir);
+ ldlm_svc_proc_dir = NULL;
+ }
+
if (ldlm_ns_proc_dir) {
! lprocfs_remove(ldlm_ns_proc_dir);
ldlm_ns_proc_dir = NULL;
}
+
+ if (ldlm_type_proc_dir) {
+ lprocfs_remove(ldlm_type_proc_dir);
+ ldlm_type_proc_dir = NULL;
+ }
}
static int lprocfs_uint_rd(char *page, char **start, off_t off,
int count, int *eof, void *data)
***************
*** 68,73 ****
--- 123,166 ----
}
+ static int lprocfs_read_lru_size(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+ {
+ struct ldlm_namespace *ns = data;
+ return lprocfs_uint_rd(page, start, off, count, eof,
+ &ns->ns_max_unused);
+ }
#define MAX_STRING_SIZE 128
+ static int lprocfs_write_lru_size(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+ {
+ struct ldlm_namespace *ns = data;
+ char dummy[MAX_STRING_SIZE + 1];
+ unsigned long tmp;
+
+ dummy[MAX_STRING_SIZE] = '\0';
+ copy_from_user(dummy, buffer, MAX_STRING_SIZE);
+
+ if (count == 6 && memcmp(dummy, "clear", 5) == 0) {
+ CDEBUG(D_DLMTRACE,
+ "dropping all unused locks from namespace %s\n",
+ ns->ns_name);
+ tmp = ns->ns_max_unused;
+ ns->ns_max_unused = 0;
+ ldlm_cancel_lru(ns);
+ ns->ns_max_unused = tmp;
+ return count;
+ }
+
+ tmp = simple_strtoul(dummy, NULL, 0);
+ CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n",
+ ns->ns_name, ns->ns_max_unused, (unsigned int)tmp);
+ ns->ns_max_unused = (unsigned int)tmp;
+
+ ldlm_cancel_lru(ns);
+
+ return count;
+ }
+
void ldlm_proc_namespace(struct ldlm_namespace *ns)
{
***************
*** 82,124 ****
memset(lock_vars, 0, sizeof(lock_vars));
lock_vars[0].read_fptr = lprocfs_rd_u64;
-
lock_vars[0].name = lock_name;
snprintf(lock_name, MAX_STRING_SIZE, "%s/resource_count",
ns->ns_name);
-
lock_vars[0].data = &ns->ns_resources;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_count", ns->ns_name);
-
lock_vars[0].data = &ns->ns_locks;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
! snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_unused_count",
! ns->ns_name);
! lock_vars[0].data = &ns->ns_nr_unused;
! lock_vars[0].read_fptr = lprocfs_uint_rd;
! lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
}
#endif
#undef MAX_STRING_SIZE
- #define LDLM_MAX_UNUSED 20
struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
{
struct ldlm_namespace *ns = NULL;
struct list_head *bucket;
ENTRY;
OBD_ALLOC(ns, sizeof(*ns));
if (!ns)
! RETURN(NULL);
! ns->ns_hash = vmalloc(sizeof(*ns->ns_hash) * RES_HASH_SIZE);
if (!ns->ns_hash)
GOTO(out_ns, NULL);
- atomic_add(sizeof(*ns->ns_hash) * RES_HASH_SIZE, &obd_memory);
-
OBD_ALLOC(ns->ns_name, strlen(name) + 1);
if (!ns->ns_name)
--- 175,227 ----
memset(lock_vars, 0, sizeof(lock_vars));
lock_vars[0].read_fptr = lprocfs_rd_u64;
lock_vars[0].name = lock_name;
snprintf(lock_name, MAX_STRING_SIZE, "%s/resource_count",
ns->ns_name);
lock_vars[0].data = &ns->ns_resources;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_count", ns->ns_name);
lock_vars[0].data = &ns->ns_locks;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
! if (ns->ns_client) {
! snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_unused_count",
! ns->ns_name);
! lock_vars[0].data = &ns->ns_nr_unused;
! lock_vars[0].read_fptr = lprocfs_uint_rd;
! lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
!
! snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_size",
! ns->ns_name);
! lock_vars[0].data = ns;
! lock_vars[0].read_fptr = lprocfs_read_lru_size;
! lock_vars[0].write_fptr = lprocfs_write_lru_size;
! lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
! }
}
#endif
#undef MAX_STRING_SIZE
struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
{
struct ldlm_namespace *ns = NULL;
struct list_head *bucket;
+ int rc;
ENTRY;
+ rc = ldlm_get_ref();
+ if (rc) {
+ CERROR("ldlm_get_ref failed: %d\n", rc);
+ RETURN(NULL);
+ }
+
OBD_ALLOC(ns, sizeof(*ns));
if (!ns)
! GOTO(out_ref, NULL);
! OBD_VMALLOC(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
if (!ns->ns_hash)
GOTO(out_ns, NULL);
OBD_ALLOC(ns->ns_name, strlen(name) + 1);
if (!ns->ns_name)
***************
*** 141,145 ****
INIT_LIST_HEAD(&ns->ns_unused_list);
ns->ns_nr_unused = 0;
! ns->ns_max_unused = LDLM_MAX_UNUSED;
spin_lock(&ldlm_namespace_lock);
--- 244,248 ----
INIT_LIST_HEAD(&ns->ns_unused_list);
ns->ns_nr_unused = 0;
! ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
spin_lock(&ldlm_namespace_lock);
***************
*** 153,166 ****
out_hash:
POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
! vfree(ns->ns_hash);
! atomic_sub(sizeof(*ns->ns_hash) * RES_HASH_SIZE, &obd_memory);
out_ns:
OBD_FREE(ns, sizeof(*ns));
! return NULL;
}
extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
! /* If 'local_only' is true, don't try to tell the server, just cleanup.
* This is currently only used for recovery, and we make certain assumptions
* as a result--notably, that we shouldn't cancel locks with refs. -phil
--- 256,270 ----
out_hash:
POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
! OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
out_ns:
OBD_FREE(ns, sizeof(*ns));
! out_ref:
! ldlm_put_ref(0);
! RETURN(NULL);
}
extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
! /* If flags contains FL_LOCAL_ONLY, don't try to tell the server, just
cleanup.
* This is currently only used for recovery, and we make certain assumptions
* as a result--notably, that we shouldn't cancel locks with refs. -phil
***************
*** 168,175 ****
* Called with the ns_lock held. */
static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
! int local_only)
{
struct list_head *tmp, *pos;
int rc = 0, client = res->lr_namespace->ns_client;
ENTRY;
--- 272,280 ----
* Called with the ns_lock held. */
static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
! int flags)
{
struct list_head *tmp, *pos;
int rc = 0, client = res->lr_namespace->ns_client;
+ int local_only = (flags & LDLM_FL_LOCAL_ONLY);
ENTRY;
***************
*** 187,198 ****
/* ... without sending a CANCEL message. */
lock->l_flags |= LDLM_FL_LOCAL_ONLY;
! /* ... and without calling the cancellation callback
*/
! lock->l_flags |= LDLM_FL_CANCEL;
LDLM_LOCK_PUT(lock);
continue;
}
! /* At shutdown time, don't call the cancellation callback */
! lock->l_flags |= LDLM_FL_CANCEL;
if (client) {
--- 292,304 ----
/* ... without sending a CANCEL message. */
lock->l_flags |= LDLM_FL_LOCAL_ONLY;
! /* caller may also specify additional flags */
! lock->l_flags |= flags;
! LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
LDLM_LOCK_PUT(lock);
continue;
}
!
! lock->l_flags |= flags;
if (client) {
***************
*** 219,223 ****
}
! int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int local_only)
{
int i;
--- 325,329 ----
}
! int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
{
int i;
***************
*** 236,242 ****
ldlm_resource_getref(res);
! cleanup_resource(res, &res->lr_granted, local_only);
! cleanup_resource(res, &res->lr_converting,
local_only);
! cleanup_resource(res, &res->lr_waiting, local_only);
/* XXX what a mess: don't force cleanup if we're
--- 342,348 ----
ldlm_resource_getref(res);
! cleanup_resource(res, &res->lr_granted, flags);
! cleanup_resource(res, &res->lr_converting, flags);
! cleanup_resource(res, &res->lr_waiting, flags);
/* XXX what a mess: don't force cleanup if we're
***************
*** 244,248 ****
* case, we probably still have outstanding lock refs
* which reference these resources. -phil */
! if (!ldlm_resource_putref(res) && !local_only) {
CERROR("Resource refcount nonzero (%d) after "
"lock cleanup; forcing cleanup.\n",
--- 350,355 ----
* case, we probably still have outstanding lock refs
* which reference these resources. -phil */
! if (!ldlm_resource_putref(res) &&
! !(flags & LDLM_FL_LOCAL_ONLY)) {
CERROR("Resource refcount nonzero (%d) after "
"lock cleanup; forcing cleanup.\n",
***************
*** 260,264 ****
/* Cleanup, but also free, the namespace */
! int ldlm_namespace_free(struct ldlm_namespace *ns)
{
if (!ns)
--- 367,371 ----
/* Cleanup, but also free, the namespace */
! int ldlm_namespace_free(struct ldlm_namespace *ns, int force)
{
if (!ns)
***************
*** 270,281 ****
spin_unlock(&ldlm_namespace_lock);
! ldlm_namespace_cleanup(ns, 0);
POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
! vfree(ns->ns_hash /* , sizeof(*ns->ns_hash) * RES_HASH_SIZE */);
! atomic_sub(sizeof(*ns->ns_hash) * RES_HASH_SIZE, &obd_memory);
OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
OBD_FREE(ns, sizeof(*ns));
return ELDLM_OK;
}
--- 377,403 ----
spin_unlock(&ldlm_namespace_lock);
! /* At shutdown time, don't call the cancellation callback */
! ldlm_namespace_cleanup(ns, LDLM_FL_CANCEL);
!
! #ifdef __KERNEL__
! {
! struct proc_dir_entry *dir;
! dir = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name);
! if (dir == NULL) {
! CERROR("dlm namespace %s has no procfs dir?\n",
! ns->ns_name);
! } else {
! lprocfs_remove(dir);
! }
! }
! #endif
POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
! OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
OBD_FREE(ns, sizeof(*ns));
+ ldlm_put_ref(force);
+
return ELDLM_OK;
}
***************
*** 376,379 ****
--- 498,502 ----
LASSERT(ns != NULL);
LASSERT(ns->ns_hash != NULL);
+ LASSERT(name.name[0] != 0);
l_lock(&ns->ns_lock);
***************
*** 402,405 ****
--- 525,530 ----
struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
{
+ LASSERT(res != NULL);
+ LASSERT(res != (void *)0x5a5a5a5a);
atomic_inc(&res->lr_refcount);
CDEBUG(D_INFO, "getref res: %p count: %d\n", res,
***************
*** 454,460 ****
list_del_init(&res->lr_hash);
list_del_init(&res->lr_childof);
OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
- l_unlock(&ns->ns_lock);
spin_lock(&ns->ns_counter_lock);
--- 579,585 ----
list_del_init(&res->lr_hash);
list_del_init(&res->lr_childof);
+ l_unlock(&ns->ns_lock);
OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
spin_lock(&ns->ns_counter_lock);
***************
*** 476,484 ****
ldlm_resource_dump(res);
CDEBUG(D_OTHER, "About to add this lock:\n");
! ldlm_lock_dump(D_OTHER, lock);
if (lock->l_destroyed) {
CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
! return;
}
--- 601,609 ----
ldlm_resource_dump(res);
CDEBUG(D_OTHER, "About to add this lock:\n");
! ldlm_lock_dump(D_OTHER, lock, 0);
if (lock->l_destroyed) {
CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
! goto out;
}
***************
*** 486,489 ****
--- 611,615 ----
list_add_tail(&lock->l_res_link, head);
+ out:
l_unlock(&res->lr_namespace->ns_lock);
}
***************
*** 540,554 ****
{
struct list_head *tmp;
! char name[256];
! if (RES_NAME_SIZE != 3)
LBUG();
! snprintf(name, sizeof(name), "%Lx %Lx %Lx",
! (unsigned long long)res->lr_name.name[0],
! (unsigned long long)res->lr_name.name[1],
! (unsigned long long)res->lr_name.name[2]);
!
! CDEBUG(D_OTHER, "--- Resource: %p (%s) (rc: %d)\n", res, name,
atomic_read(&res->lr_refcount));
CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
--- 666,677 ----
{
struct list_head *tmp;
! int pos;
! if (RES_NAME_SIZE != 4)
LBUG();
! CDEBUG(D_OTHER, "--- Resource: %p ("LPU64"/"LPU64"/"LPU64"/"LPU64
! ") (rc: %d)\n", res, res->lr_name.name[0],
res->lr_name.name[1],
! res->lr_name.name[2], res->lr_name.name[3],
atomic_read(&res->lr_refcount));
CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
***************
*** 557,578 ****
CDEBUG(D_OTHER, "Granted locks:\n");
list_for_each(tmp, &res->lr_granted) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! ldlm_lock_dump(D_OTHER, lock);
}
!
CDEBUG(D_OTHER, "Converting locks:\n");
list_for_each(tmp, &res->lr_converting) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! ldlm_lock_dump(D_OTHER, lock);
}
!
CDEBUG(D_OTHER, "Waiting locks:\n");
list_for_each(tmp, &res->lr_waiting) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! ldlm_lock_dump(D_OTHER, lock);
}
}
--- 680,702 ----
CDEBUG(D_OTHER, "Granted locks:\n");
+ pos = 0;
list_for_each(tmp, &res->lr_granted) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! ldlm_lock_dump(D_OTHER, lock, ++pos);
}
! pos = 0;
CDEBUG(D_OTHER, "Converting locks:\n");
list_for_each(tmp, &res->lr_converting) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! ldlm_lock_dump(D_OTHER, lock, ++pos);
}
! pos = 0;
CDEBUG(D_OTHER, "Waiting locks:\n");
list_for_each(tmp, &res->lr_waiting) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
! ldlm_lock_dump(D_OTHER, lock, ++pos);
}
}
-------------------------------------------------------
The SF.Net email is sponsored by EclipseCon 2004
Premiere Conference on Open Tools Development and Integration
See the breadth of Eclipse activity. February 3-5 in Anaheim, CA.
http://www.eclipsecon.org/osdn
|