Class MemoryAwareThreadPoolExecutor
- java.lang.Object
-
- java.util.concurrent.AbstractExecutorService
-
- java.util.concurrent.ThreadPoolExecutor
-
- org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor
-
- All Implemented Interfaces:
Executor
,ExecutorService
- Direct Known Subclasses:
FairOrderedMemoryAwareThreadPoolExecutor
,OrderedMemoryAwareThreadPoolExecutor
public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor
AThreadPoolExecutor
which blocks the task submission when there's too many tasks in the queue. Both per-Channel
and per-Executor
limitation can be applied.When a task (i.e.
Runnable
) is submitted,MemoryAwareThreadPoolExecutor
callsObjectSizeEstimator.estimateSize(Object)
to get the estimated size of the task in bytes to calculate the amount of memory occupied by the unprocessed tasks.If the total size of the unprocessed tasks exceeds either per-
Channel
or per-Executor
threshold, any furtherexecute(Runnable)
call will block until the tasks in the queue are processed so that the total size goes under the threshold.Using an alternative task size estimation strategy
Although the default implementation does its best to guess the size of an object of unknown type, it is always good idea to to use an alternativeObjectSizeEstimator
implementation instead of theDefaultObjectSizeEstimator
to avoid incorrect task size calculation, especially when:- you are using
MemoryAwareThreadPoolExecutor
independently fromExecutionHandler
, - you are submitting a task whose type is not
ChannelEventRunnable
, or - the message type of the
MessageEvent
in theChannelEventRunnable
is notChannelBuffer
.
ObjectSizeEstimator
which understands a user-defined object:public class MyRunnable implements
Runnable
{ private final byte[] data; public MyRunnable(byte[] data) { this.data = data; } public void run() { // Process 'data' .. } } public class MyObjectSizeEstimator extendsDefaultObjectSizeEstimator
{ @Override public int estimateSize(Object o) { if (o instanceof MyRunnable) { return ((MyRunnable) o).data.length + 8; } return super.estimateSize(o); } }ThreadPoolExecutor
pool = newMemoryAwareThreadPoolExecutor
( 16, 65536, 1048576, 30,TimeUnit
.SECONDS, new MyObjectSizeEstimator(),Executors
.defaultThreadFactory()); pool.execute(new MyRunnable(data));Event execution order
Please note that this executor does not maintain the order of theChannelEvent
s for the sameChannel
. For example, you can even receive a"channelClosed"
event before a"messageReceived"
event, as depicted by the following diagram. For example, the events can be processed as depicted below:--------------------------------> Timeline --------------------------------> Thread X: --- Channel A (Event 1) --- Channel A (Event 2) ---------------------------> Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) ---> Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
To maintain the event order, you must useOrderedMemoryAwareThreadPoolExecutor
.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
-
-
Constructor Summary
Constructors Constructor Description MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
Creates a new instance.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
beforeExecute(Thread t, Runnable r)
protected void
decreaseCounter(Runnable task)
protected void
doExecute(Runnable task)
Put the actual execution logic here.protected void
doUnorderedExecute(Runnable task)
Executes the specified task without maintaining the event order.void
execute(Runnable command)
long
getMaxChannelMemorySize()
Returns the maximum total size of the queued events per channel.long
getMaxTotalMemorySize()
Returns the maximum total size of the queued events for this pool.boolean
getNotifyChannelFuturesOnShutdown()
Returns if theChannelFuture
's of theChannelEventRunnable
's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor
.ObjectSizeEstimator
getObjectSizeEstimator()
Returns theObjectSizeEstimator
of this pool.protected void
increaseCounter(Runnable task)
boolean
remove(Runnable task)
void
setMaxChannelMemorySize(long maxChannelMemorySize)
Sets the maximum total size of the queued events per channel.void
setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown)
If set tofalse
no queuedChannelEventRunnable
'sChannelFuture
will get notified onceshutdownNow()
is called.void
setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator)
Sets theObjectSizeEstimator
of this pool.protected boolean
shouldCount(Runnable task)
Returnstrue
if and only if the specifiedtask
should be counted to limit the global and per-channel memory consumption.List<Runnable>
shutdownNow()
This will callshutdownNow(boolean)
with the value ofgetNotifyChannelFuturesOnShutdown()
.List<Runnable>
shutdownNow(boolean notify)
SeeThreadPoolExecutor.shutdownNow()
for how it handles the shutdown.protected void
terminated()
-
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, toString
-
Methods inherited from class java.util.concurrent.AbstractExecutorService
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
-
-
-
-
Constructor Detail
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
threadFactory
- theThreadFactory
of this pool
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
threadFactory
- theThreadFactory
of this poolobjectSizeEstimator
- theObjectSizeEstimator
of this pool
-
-
Method Detail
-
terminated
protected void terminated()
- Overrides:
terminated
in classThreadPoolExecutor
-
shutdownNow
public List<Runnable> shutdownNow()
This will callshutdownNow(boolean)
with the value ofgetNotifyChannelFuturesOnShutdown()
.- Specified by:
shutdownNow
in interfaceExecutorService
- Overrides:
shutdownNow
in classThreadPoolExecutor
-
shutdownNow
public List<Runnable> shutdownNow(boolean notify)
SeeThreadPoolExecutor.shutdownNow()
for how it handles the shutdown. Iftrue
is given to this method it also notifies allChannelFuture
's of the not executedChannelEventRunnable
's.Be aware that if you call this with
false
you will need to handle the notification of theChannelFuture
's by your self. So only use this if you really have a use-case for it.
-
getObjectSizeEstimator
public ObjectSizeEstimator getObjectSizeEstimator()
Returns theObjectSizeEstimator
of this pool.
-
setObjectSizeEstimator
public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator)
Sets theObjectSizeEstimator
of this pool.
-
getMaxChannelMemorySize
public long getMaxChannelMemorySize()
Returns the maximum total size of the queued events per channel.
-
setMaxChannelMemorySize
public void setMaxChannelMemorySize(long maxChannelMemorySize)
Sets the maximum total size of the queued events per channel. Specify0
to disable.
-
getMaxTotalMemorySize
public long getMaxTotalMemorySize()
Returns the maximum total size of the queued events for this pool.
-
setNotifyChannelFuturesOnShutdown
public void setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown)
If set tofalse
no queuedChannelEventRunnable
'sChannelFuture
will get notified onceshutdownNow()
is called. If set totrue
every queuedChannelEventRunnable
will get marked as failed viaChannelFuture.setFailure(Throwable)
.Please only set this to
false
if you want to handle the notification by yourself and know what you are doing. Default istrue
.
-
getNotifyChannelFuturesOnShutdown
public boolean getNotifyChannelFuturesOnShutdown()
Returns if theChannelFuture
's of theChannelEventRunnable
's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor
.
-
execute
public void execute(Runnable command)
- Specified by:
execute
in interfaceExecutor
- Overrides:
execute
in classThreadPoolExecutor
-
doExecute
protected void doExecute(Runnable task)
Put the actual execution logic here. The default implementation simply callsdoUnorderedExecute(Runnable)
.
-
doUnorderedExecute
protected final void doUnorderedExecute(Runnable task)
Executes the specified task without maintaining the event order.
-
remove
public boolean remove(Runnable task)
- Overrides:
remove
in classThreadPoolExecutor
-
beforeExecute
protected void beforeExecute(Thread t, Runnable r)
- Overrides:
beforeExecute
in classThreadPoolExecutor
-
increaseCounter
protected void increaseCounter(Runnable task)
-
decreaseCounter
protected void decreaseCounter(Runnable task)
-
shouldCount
protected boolean shouldCount(Runnable task)
Returnstrue
if and only if the specifiedtask
should be counted to limit the global and per-channel memory consumption. To override this method, you must callsuper.shouldCount()
to make sure important tasks are not counted.
-
-