logo       


Less expensive EventQueue patch: msg#00021

Subject: Less expensive EventQueue patch
This patch makes the CacheEventQueue not spawn unnecessary threads, and keep
them alive for a short time afterwards in case new events come in.

Pay close attention to the (javadoced) THREAD_TIMEOUT constant. This
determines how long a thread lives after emptying out the queue waiting for
more events to come in. It may be desirable at some point in the future to
make this a configuration option.

I did my very best to not let formatting inconsistencies find their way
in... but there's:
A) No way to tell Eclipse to put throws declarations on the next line, and
B) No standard for throws declaration in the Apache coding standards (that I
could find).

Also, this file isn't whitespace conformant to the Apache standards, so my
patch fixes that as well.

-Travis Savo


Index: CacheEventQueue.java
===================================================================
RCS file:
/home/cvspublic/jakarta-turbine-jcs/src/java/org/apache/jcs/engine/CacheEven
tQueue.java,v
retrieving revision 1.7
diff -u -r1.7 CacheEventQueue.java
--- CacheEventQueue.java        17 Apr 2004 14:00:11 -0000      1.7
+++ CacheEventQueue.java        14 May 2004 00:56:33 -0000
@@ -1,6 +1,5 @@
 package org.apache.jcs.engine;
 
-
 /*
  * Copyright 2001-2004 The Apache Software Foundation.
  *
@@ -16,11 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 import java.io.IOException;
 import java.io.Serializable;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.jcs.engine.behavior.ICacheElement;
@@ -29,46 +25,45 @@
 
 /**
  * An event queue is used to propagate ordered cache events to one and only
one
- * target listener.
- *
- * <pre>
+ * target listener. 
+ * <pre> *   
  * Changes:<br>
  * 17 April 2004  Hanson Char
  * <ol><li>Bug fix: add missing synchronization to method
addRemoveEvent();</li>
- * <li>Use the light weight new int[0] for creating the object monitor
queueLock,
- * instead of new Object();</li>
+ * <li>Use the light weight new int[0] for creating the object monitor
queueLock, instead of new Object();</li>
  * <li>Explicitely qualify member variables of CacheEventQueue in inner
classes.
  * Hopefully this will help identify any potential concurrency issue.</li>
  * </ol>
+ * 13 May 2004  Travis Savo<br>
+ * Changed to not spawn a thread when the queue isn't in use, and not kill
said thread until THREAD_TIMEOUT
+ * has passed without a new event.
  * </pre>
  */
 public class CacheEventQueue implements ICacheEventQueue
 {
-    private final static Log log = LogFactory.getLog( CacheEventQueue.class
);
 
+    private final static Log log = LogFactory.getLog( CacheEventQueue.class
);
+    /**
+     * Number of milliseconds after emptying out a queue to wait until
killing the processor thread.
+     */
+    private static final long THREAD_TIMEOUT = 60*1000;
+    
     private static int processorInstanceCount = 0;
-
     // private LinkedQueue queue = new LinkedQueue();
-
     private ICacheListener listener;
     private byte listenerId;
     private String cacheName;
-
     private int failureCount;
     private int maxFailure;
-
     // in milliseconds
     private int waitBeforeRetry;
-
     private boolean destroyed;
-    private Thread t;
-
+    private boolean working;
+    
+    private Thread processorThread;
     // Internal queue implementation
-
     private Object queueLock = new int[0];
-
     // Dummy node
-
     private Node head = new Node();
     private Node tail = head;
 
@@ -79,9 +74,7 @@
      * @param listenerId
      * @param cacheName
      */
-    public CacheEventQueue( ICacheListener listener,
-                            byte listenerId,
-                            String cacheName )
+    public CacheEventQueue( ICacheListener listener, byte listenerId,
String cacheName)
     {
         this( listener, listenerId, cacheName, 10, 500 );
     }
@@ -95,26 +88,19 @@
      * @param maxFailure
      * @param waitBeforeRetry
      */
-    public CacheEventQueue( ICacheListener listener,
-                            byte listenerId,
-                            String cacheName,
-                            int maxFailure,
-                            int waitBeforeRetry )
+    public CacheEventQueue( ICacheListener listener, byte listenerId,
String cacheName, int maxFailure, int waitBeforeRetry)
     {
         if ( listener == null )
         {
             throw new IllegalArgumentException( "listener must not be null"
);
         }
-
         this.listener = listener;
         this.listenerId = listenerId;
         this.cacheName = cacheName;
         this.maxFailure = maxFailure <= 0 ? 10 : maxFailure;
         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 :
waitBeforeRetry;
-
-        this.t = new QProcessor();
-        this.t.start();
-
+        this.processorThread = new QProcessor();
+        this.processorThread.start();
         if ( log.isDebugEnabled() )
         {
             log.debug( "Constructed: " + this );
@@ -129,21 +115,26 @@
         if ( !this.destroyed )
         {
             this.destroyed = true;
-
             // sychronize on queue so the thread will not wait forever,
             // and then interrupt the QueueProcessor
-
             synchronized ( this.queueLock )
             {
-                this.t.interrupt();
+                this.processorThread.interrupt();
             }
-
-            this.t = null;
-
+            this.processorThread = null;
             log.info( "Cache event queue destroyed: " + this );
         }
     }
-
+    
+    /**
+     * Event Q is emtpy.
+     */
+    public synchronized void stopProcessing()
+    {
+        working = false;
+        processorThread = null;
+    }
+    
     /**
      * @return
      */
@@ -160,6 +151,11 @@
         return ( !this.destroyed );
     }
 
+    private boolean isWorking()
+    {
+        return ( this.working );
+    }
+
     /**
      * @return The {3} value
      */
@@ -172,8 +168,7 @@
      * @param ce The feature to be added to the PutEvent attribute
      * @exception IOException
      */
-    public synchronized void addPutEvent( ICacheElement ce )
-        throws IOException
+    public synchronized void addPutEvent( ICacheElement ce ) throws
IOException
     {
         if ( !this.destroyed )
         {
@@ -185,8 +180,7 @@
      * @param key The feature to be added to the RemoveEvent attribute
      * @exception IOException
      */
-    public synchronized void addRemoveEvent( Serializable key )
-        throws IOException
+    public synchronized void addRemoveEvent( Serializable key ) throws
IOException
     {
         if ( !this.destroyed )
         {
@@ -197,8 +191,7 @@
     /**
      * @exception IOException
      */
-    public synchronized void addRemoveAllEvent()
-        throws IOException
+    public synchronized void addRemoveAllEvent() throws IOException
     {
         if ( !this.destroyed )
         {
@@ -209,8 +202,7 @@
     /**
      * @exception IOException
      */
-    public synchronized void addDisposeEvent()
-        throws IOException
+    public synchronized void addDisposeEvent() throws IOException
     {
         if ( !this.destroyed )
         {
@@ -226,59 +218,55 @@
     private void put( AbstractCacheEvent event )
     {
         Node newNode = new Node();
-
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "Event entering Queue for " + cacheName + ": " +
event );
+        }
         newNode.event = event;
-
-        synchronized ( this.queueLock )
+        synchronized ( queueLock )
         {
             this.tail.next = newNode;
             this.tail = newNode;
-
-            this.queueLock.notify();
+            if ( isAlive() )
+                if ( !isWorking() )
+                {
+                    this.working = true;
+                    processorThread = new QProcessor();
+                    processorThread.start();
+                } else
+                {
+                    queueLock.notify();
+                }
         }
     }
 
-    private AbstractCacheEvent take() throws InterruptedException
+    private AbstractCacheEvent take()
     {
-        synchronized ( this.queueLock )
+        synchronized ( queueLock )
         {
             // wait until there is something to read
-
-            while ( this.head == this.tail )
+            if ( head == tail )
             {
-                this.queueLock.wait();
+                return null;
             }
-
-            // we have the lock, and the list is not empty
-
-            Node node = this.head.next;
-
-            // This is an awful bug.  This will always return null.
-            // This make the event Q and event destroyer.
-            //AbstractCacheEvent value = head.event;
-
-            // corrected
+            Node node = head.next;
             AbstractCacheEvent value = node.event;
-
             if ( log.isDebugEnabled() )
             {
-              log.debug( "head.event = " + this.head.event );
-              log.debug( "node.event = " + node.event );
+                log.debug( "head.event = " + head.event );
+                log.debug( "node.event = " + node.event );
             }
-
             // Node becomes the new head (head is always empty)
-
             node.event = null;
-            this.head = node;
-
+            head = node;
             return value;
         }
     }
 
     ///////////////////////////// Inner classes
/////////////////////////////
-
     private static class Node
     {
+
         Node next = null;
         AbstractCacheEvent event = null;
     }
@@ -287,70 +275,80 @@
      */
     private class QProcessor extends Thread
     {
+
         /**
          * Constructor for the QProcessor object
          */
         QProcessor()
         {
-            super( "CacheEventQueue.QProcessor-" + (
++CacheEventQueue.this.processorInstanceCount ) );
-
+            super( "CacheEventQueue.QProcessor-" + (
++processorInstanceCount ) );
             setDaemon( true );
         }
 
+        
         /**
          * Main processing method for the QProcessor object
          */
         public void run()
         {
             AbstractCacheEvent r = null;
-
-            while ( !CacheEventQueue.this.destroyed )
+            while ( isAlive() )
             {
-                try
+                r = take();
+                if ( log.isDebugEnabled() )
                 {
-                    r = take();
-
-                    if ( log.isDebugEnabled() )
-                    {
-                        log.debug( "r from take() = " + r );
-                    }
-
+                    log.debug( "Event from queue = " + r );
                 }
-                catch ( InterruptedException e )
+                if ( r == null )
                 {
-                    // We were interrupted, just continue -- the while loop
-                    // will exit if we have been properly destroyed.
+                    synchronized ( queueLock )
+                    {
+                        try
+                        {
+                            queueLock.wait( THREAD_TIMEOUT );
+                        } catch ( InterruptedException e )
+                        {
+                            log.warn( "Interrupted while waiting for
another event to come in before we die." );
+                            return;
+                        }
+                        r = take();
+                        if ( log.isDebugEnabled() )
+                        {
+                            log.debug( "Event from queue after sleep = " +
r );
+                        }
+                        if ( r == null )
+                        {
+                            stopProcessing();
+                        }
+                    }
                 }
-
-                if ( !CacheEventQueue.this.destroyed && r != null )
+                if ( isAlive() && r != null )
                 {
                     r.run();
                 }
             }
-            // declare failure as listener is permanently unreachable.
-            // queue = null;
-            CacheEventQueue.this.listener = null;
-            // The listener failure logging more the problem of the user
-            // of the q.
-            log.info( "QProcessor exiting for " + CacheEventQueue.this );
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "QProcessor exiting for " + this );
+            }
         }
     }
 
+
     /**
      * Retries before declaring failure.
      *
      */
     private abstract class AbstractCacheEvent implements Runnable
     {
+
         /**
          * Main processing method for the AbstractCacheEvent object
          */
         public void run()
         {
             IOException ex = null;
-
-            while ( !CacheEventQueue.this.destroyed
-                    && CacheEventQueue.this.failureCount <=
CacheEventQueue.this.maxFailure )
+            while ( !CacheEventQueue.this.destroyed &&
CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
             {
                 try
                 {
@@ -359,32 +357,28 @@
                     CacheEventQueue.this.failureCount = 0;
                     return;
                     // happy and done.
-                }
-                catch ( IOException e )
+                } catch ( IOException e )
                 {
                     CacheEventQueue.this.failureCount++;
                     ex = e;
                 }
                 // Let's get idle for a while before retry.
-                if ( !CacheEventQueue.this.destroyed
-                     && CacheEventQueue.this.failureCount <=
CacheEventQueue.this.maxFailure )
+                if ( !CacheEventQueue.this.destroyed &&
CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
                 {
                     try
                     {
                         log.warn( "...retrying propagation " +
CacheEventQueue.this + "..." + CacheEventQueue.this.failureCount );
-                        Thread.currentThread().sleep(
CacheEventQueue.this.waitBeforeRetry );
-                    }
-                    catch ( InterruptedException ie )
+                        Thread.sleep( CacheEventQueue.this.waitBeforeRetry
);
+                    } catch ( InterruptedException ie )
                     {
                         // ignore;
                     }
                 }
             }
-            // Too bad.  The remote host is unreachable, so we give up.
+            // Too bad. The remote host is unreachable, so we give up.
             if ( ex != null )
             {
                 log.warn( "Giving up propagation " + CacheEventQueue.this,
ex );
-
                 destroy();
             }
             return;
@@ -395,8 +389,7 @@
          *
          * @exception IOException
          */
-        protected abstract void doRun()
-            throws IOException;
+        protected abstract void doRun() throws IOException;
     }
 
     /**
@@ -412,14 +405,11 @@
          * @param ice
          * @exception IOException
          */
-        PutEvent( ICacheElement ice )
-            throws IOException
+        PutEvent( ICacheElement ice) throws IOException
         {
             this.ice = ice;
             /*
-             * this.key = key;
-             * this.obj = CacheUtils.dup(obj);
-             * this.attr = attr;
+             * this.key = key; this.obj = CacheUtils.dup(obj); this.attr =
attr;
              * this.groupName = groupName;
              */
         }
@@ -429,13 +419,11 @@
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             /*
              * CacheElement ce = new CacheElement(cacheName, key, obj);
-             * ce.setElementAttributes( attr );
-             * ce.setGroupName( groupName );
+             * ce.setElementAttributes( attr ); ce.setGroupName( groupName
);
              */
             CacheEventQueue.this.listener.handlePut( ice );
         }
@@ -447,6 +435,7 @@
      */
     private class RemoveEvent extends AbstractCacheEvent
     {
+
         private Serializable key;
 
         /**
@@ -455,8 +444,7 @@
          * @param key
          * @exception IOException
          */
-        RemoveEvent( Serializable key )
-            throws IOException
+        RemoveEvent( Serializable key) throws IOException
         {
             this.key = key;
         }
@@ -466,8 +454,7 @@
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             CacheEventQueue.this.listener.handleRemove(
CacheEventQueue.this.cacheName, key );
         }
@@ -479,13 +466,13 @@
      */
     private class RemoveAllEvent extends AbstractCacheEvent
     {
+
         /**
          * Description of the Method
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             CacheEventQueue.this.listener.handleRemoveAll(
CacheEventQueue.this.cacheName );
         }
@@ -497,16 +484,15 @@
      */
     private class DisposeEvent extends AbstractCacheEvent
     {
+
         /**
          * Description of the Method
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             CacheEventQueue.this.listener.handleDispose(
CacheEventQueue.this.cacheName );
         }
     }
 }
-


Ruby Jobs
Java Jobs
Jobs in California
more...
what
job title, keywords
where
city, state, zip
jobs by job search
Search:
Java, servers, webhosting, windows, cisco ...
more...
<Prev in Thread] Current Thread [Next in Thread>
Google Custom Search

Recently Viewed:
encryption.gpg....    ietf.rfc822/199...    freebsd.devel.i...    lang.haskell.li...    mail.squirrelma...    web.zope.plone....    yellowdog.gener...    text.xml.xalan....    recreation.phot...    kde.devel.educa...    hardware.bus.ca...    printing.ghosts...    voip.peering/20...    assembly/2006-0...    org.user-groups...    culture.interne...    network.i2p/200...    boot-loaders.ya...    xfree86.render/...    qnx.openqnx.dev...    jakarta.velocit...    user-groups.pal...   
Home | blog view | USPTO Patent Archive | advertise | OSDir is an inevitable website. super tiny logo

Free Magazines

Cisco News
Receive a free quarterly e-newsletter with exclusive articles on how Cisco IT uses its own products and solutions to enable the business.
subscribe

Systems Management News, the newspaper for IT systems administration and data center managers! Each issue of Systems Management News is chock-full of news and analysis to help you understand what's happening in your field.
subscribe

The Enterprise Newsweekly eWeek is the essential technology information source for builders of e-business.
subscribe

Oracle Magazine Oracle Magazine contains technology strategy articles, sample code, tips, Oracle and partner news, how to articles for developers and DBAs, and more. Oracle (NASDAQ: ORCL) is the world's largest enterprise software company.
subscribe

Total Telecom Total Telecom is "The Economist of the communications industry".
subscribe