26struct ThreadPool::ThreadPoolThread final :
public Thread
28 ThreadPoolThread (
ThreadPool& p,
const Options& options)
29 :
Thread { options.threadName, options.threadStackSizeBytes },
38 if (! pool.runNextJob (*
this))
43 std::atomic<ThreadPoolJob*> currentJob {
nullptr };
47 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
59 jassert (pool ==
nullptr || ! pool->
contains (
this));
80 listeners.add (listener);
85 listeners.remove (listener);
91 return t->currentJob.load();
100 jassert (options.numberOfThreads > 0);
102 for (
int i = jmax (1, options.numberOfThreads); --i >= 0;)
103 threads.
add (
new ThreadPoolThread (*
this, options));
105 for (
auto*
t : threads)
106 t->startThread (options.desiredThreadPriority);
110 size_t threadStackSizeBytes,
113 .withThreadStackSizeBytes (threadStackSizeBytes)
114 .withDesiredThreadPriority (desiredThreadPriority) }
124void ThreadPool::stopThreads()
126 for (
auto*
t : threads)
127 t->signalThreadShouldExit();
129 for (
auto*
t : threads)
135 jassert (
job !=
nullptr);
136 jassert (
job->pool ==
nullptr);
138 if (
job->pool ==
nullptr)
141 job->shouldStop =
false;
142 job->isActive =
false;
150 for (
auto*
t : threads)
160 JobStatus runJob()
override {
return job(); }
173 JobStatus runJob()
override {
job();
return ThreadPoolJob::jobHasFinished; }
189 return threads.
size();
216 if (index > 0 && !
job->isActive)
217 jobs.move (index, 0);
231 jobFinishedSignal.
wait (2);
247 if (jobs.contains (
job))
252 job->signalJobShouldExit();
258 jobs.removeFirstMatchingValue (
job);
278 for (
int i = jobs.size(); --i >= 0;)
280 auto*
job = jobs.getUnchecked (i);
289 job->signalJobShouldExit();
319 jobFinishedSignal.
wait (20);
330 for (
auto*
job : jobs)
332 s.
add (
job->getJobName());
344 for (
int i = 0; i < jobs.size(); ++i)
346 if (
auto*
job = jobs[i])
358 job->isActive =
true;
368bool ThreadPool::runNextJob (ThreadPoolThread& thread)
370 if (
auto* job = pickNextJobToRun())
373 thread.currentJob = job;
377 result = job->runJob();
384 thread.currentJob =
nullptr;
386 OwnedArray<ThreadPoolJob> deletionList;
389 const ScopedLock sl (lock);
391 if (jobs.contains (job))
393 job->isActive =
false;
397 jobs.removeFirstMatchingValue (job);
398 addToDeleteList (deletionList, job);
400 jobFinishedSignal.
signal();
405 jobs.move (jobs.indexOf (job), -1);
416void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job)
const
418 job->shouldStop =
true;
421 if (job->shouldBeDeleted)
422 deletionList.add (job);
int size() const noexcept
ObjectClass * add(ObjectClass *newObject)
void add(String stringToAdd)
void signalJobShouldExit()
void setJobName(const String &newName)
String getJobName() const
void addListener(Thread::Listener *)
static ThreadPoolJob * getCurrentThreadPoolJob()
void removeListener(Thread::Listener *)
ThreadPoolJob(const String &name)
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
int getNumThreads() const noexcept
ThreadPoolJob * getJob(int index) const noexcept
int getNumJobs() const noexcept
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
bool isJobRunning(const ThreadPoolJob *job) const noexcept
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
bool contains(const ThreadPoolJob *job) const noexcept
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
static Thread *JUCE_CALLTYPE getCurrentThread()
bool wait(double timeOutMilliseconds) const
Thread(const String &threadName, size_t threadStackSize=osDefaultStackSize)
bool threadShouldExit() const
static uint32 getMillisecondCounter() noexcept
bool wait(double timeOutMilliseconds=-1.0) const