新闻详细
新闻当前位置:新闻详细

线程池使用及优势,Java实现通用线程池

专业编程培训机构——完成蜕变以后轻松拿高薪

电话+V: 152079-09430 ,欢迎咨询java线程池底层源码分析,[python实用课程],[C++单片机原理],[C#网站搭建],[Nodejs小程序开发],[ios游戏开发],[安卓游戏开发],[教会用大脑用想法赚钱实现阶层跨越]

一、线程池使用及优势

线程池的主要工作是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数超过了最大数量,超出数量的线程就需要排队等候,等待其他线程执行完毕

它的主要特点可以总结为:线程复用控制最大并发数管理线程

线程池主要优势又如下三点:

Java中的线程池使通过Executor框架实现的,使用线程池用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类

其中Executors是一个工厂方法,提供了快捷创建线程池的方法,常用的线程池又如下几种:

通过查看这三个工厂方法的源码得知:

底层都是创建了ThreadPoolExecutor对象,该类的构造方法有7个参数:

线程池的工作流程如下:

当线程池中队列已满且工作线程达到最大数量时,线程池会拒绝新任务的提交直至队列出现空位或有空闲线程,对于拒绝的任务有不同的处理方式,称为拒绝策略。

线程池提供了四种拒绝策略:

以上拒绝策略均实现了RejectedExecutionHandler接口

二、线程池中空闲的线程处于什么状态?

一:阻塞状态,线程并没有销毁,也没有得到CPU时间片执行;

源码追踪:

for(;;){

...

workQueue.take();

...

}

publicEtake()...{

...

while(count.get()==0){//这里就是任务队列中的消息数量

notEmpty.await();

}

...

}

publicfinalvoidawait()...{

...

LockSupport.park(this);

...

}

继续往下:

publicstaticvoidpark(Objectblocker){

Threadt=Thread.currentThread();

setBlocker(t,blocker);

U.park(false,0L);

setBlocker(t,null);

}

privatestaticfinalsun.misc.UnsafeU=sun.misc.Unsafe.getUnsafe();

//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。

publicnativevoidpark(booleanisAbsolute,longtime);

上面就是java11线程池中阻塞的源码追踪;

二.对比object的wait()方法:

@FastNative

publicfinalnativevoidwait(longtimeout,intnanos)throwsInterruptedException;

还有Thread的sleep()方法:

@FastNative

privatestaticnativevoidsleep(Objectlock,longmillis,intnanos)throws...;

可见,线程池中使用的阻塞方式并不是Object中的wait(),也不是Thread.sleep();

这3个方法最终实现都是通过cc++实现的native方法.

三.在<<Java虚拟机(第二版)>>中,对线程状态有以下介绍:

12.4.3 状态转换

Java语言定义了5种线程状态,在任意一个时间点,一个线程只能有且只有其中的一种

状态,这5种状态分别如下。

1)新建(New):创建后尚未启动的线程处于这种状态。

2)运行(Runable):Runable包括了操作系统线程状态中的Running和Ready,也就是处于此

状态的线程有可能正在执行,也有可能正在等待着CPU为它分配执行时间。

3)无限期等待(Waiting):处于这种状态的线程不会被分配CPU执行时间,它们要等待被

其他线程显式地唤醒。以下方法会让线程陷入无限期的等待状态:

●没有设置Timeout参数的Object.wait()方法。

●没有设置Timeout参数的Thread.join()方法。

●LockSupport.park()方法。

4)限期等待(TimedWaiting):处于这种状态的线程也不会被分配CPU执行时间,不过无

须等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程

进入限期等待状态:

●Thread.sleep()方法。

●设置了Timeout参数的Object.wait()方法。

●设置了Timeout参数的Thread.join()方法。

●LockSupport.parkNanos()方法。

●LockSupport.parkUntil()方法。

5)阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是:“阻塞状态”在等

待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状

态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将

进入这种状态。

三、Java实现通用线程池

  线程池通俗的描述就是预先创建若干空闲线程等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务这样就省去了频繁创建线程的时间因为频繁创建线程是要耗费大量的CPU资源的如果一个应用程序需要频繁地处理大量并发事务不断的创建销毁线程往往会大大地降低系统的效率这时候线程池就派上用场了

  本文旨在使用Java语言编写一个通用的线程池当需要使用线程池处理事务时只需按照指定规范封装好事务处理对象然后用已有的线程池对象去自动选择空闲线程自动调用事务处理对象即可并实现线程池的动态修改(修改当前线程数最大线程数等)下面是实现代码

  //ThreadTaskjava

  packagepolarmanthreadpool;

  /***//**

  *线程任务

  *@authorryang

  *

  */

  publicinterfaceThreadTask{

  publicvoidrun();

  }

  //PooledThreadjava

  packagepolarmanthreadpool;

  importjavautilCollection;importjavautilVector;

  /***//**

  *接受线程池管理的线程

  *@authorryang

  *

  */

  publicclassPooledThreadextendsThread{

  protectedVectortasks=newVector();

  protectedbooleanrunning=false;

  protectedbooleanstopped=false;

  protectedbooleanpaused=false;

  protectedbooleankilled=false;

  privateThreadPoolpool;

  publicPooledThread(ThreadPoolpool){thispool=pool;

  }

  publicvoidputTask(ThreadTasktask){tasksadd(task);

  }

  publicvoidputTasks(ThreadTask[]tasks){for(inti=;i<taskslength;i++)thistasksadd(tasks[i]);

  }

  publicvoidputTasks(Collectiontasks){thistasksaddAll(tasks);

  }

  protectedThreadTaskpopTask(){if(taskssize()>)return(ThreadTask)tasksremove();

  else

  returnnull;

  }

  publicbooleanisRunning(){

  returnrunning;

  }

  publicvoidstopTasks(){

  stopped=true;

  }

  publicvoidstopTasksSync(){

  stopTasks();

  while(isRunning()){try{

  sleep();

  }catch(InterruptedExceptione){

  }

  }

  }

  publicvoidpauseTasks(){

  paused=true;

  }

  publicvoidpauseTasksSync(){

  pauseTasks();

  while(isRunning()){try{

  sleep();

  }catch(InterruptedExceptione){

  }

  }

  }

  publicvoidkill(){if(!running)

  interrupt();

  else

  killed=true;

  }

  publicvoidkillSync(){

  kill();

  while(isAlive()){try{

  sleep();

  }catch(InterruptedExceptione){

  }

  }

  }

  publicsynchronizedvoidstartTasks(){

  running=true;

  thisnotify();

  }

  publicsynchronizedvoidrun(){try{while(true){if(!running||taskssize()==){poolnotifyForIdleThread();//Systemoutprintln(ThreadcurrentThread()getId()+:空闲);thiswait();}else{

  ThreadTasktask;

  while((task=popTask())!=null){taskrun();if(stopped){

  stopped=false;

  if(taskssize()>){tasksclear();Systemoutprintln(ThreadcurrentThread()getId()+:Tasksarestopped);

  break;

  }

  }

  if(paused){

  paused=false;

  if(taskssize()>){Systemoutprintln(ThreadcurrentThread()getId()+:Tasksarepaused);

  break;

  }

  }

  }

  running=false;

  }

  if(killed){

  killed=false;

  break;

  }

  }

  }catch(InterruptedExceptione){

  return;

  }

  //Systemoutprintln(ThreadcurrentThread()getId()+:Killed);

  }

  }

  //ThreadPooljava

  packagepolarmanthreadpool;

  importjavautilCollection;importjavautilIterator;importjavautilVector;

  /***//**

  *线程池

  *@authorryang

  *

  */

  publicclassThreadPool{

  protectedintmaxPoolSize;

  protectedintinitPoolSize;

  protectedVectorthreads=newVector();

  protectedbooleaninitialized=false;

  protectedbooleanhasIdleThread=false;

  publicThreadPool(intmaxPoolSizeintinitPoolSize){thismaxPoolSize=maxPoolSize;thisinitPoolSize=initPoolSize;

  }

  publicvoidinit(){

  initialized=true;

  for(inti=;i<initPoolSize;i++){

  PooledThreadthread=newPooledThread(this);

  threadstart();threadsadd(thread);

  }

  //Systemoutprintln(线程池初始化结束线程数=+threadssize()+最大线程数=+maxPoolSize);

  }

  publicvoidsetMaxPoolSize(intmaxPoolSize){//Systemoutprintln(重设最大线程数最大线程数=+maxPoolSize);thismaxPoolSize=maxPoolSize;

  if(maxPoolSize<getPoolSize())

  setPoolSize(maxPoolSize);

  }

  /***//**

  *重设当前线程数

  *若需杀掉某线程线程不会立刻杀掉而会等到线程中的事务处理完成*但此方法会立刻从线程池中移除该线程不会等待事务处理结束

  *@paramsize

  */

  publicvoidsetPoolSize(intsize){if(!initialized){

  initPoolSize=size;

  return;

  }elseif(size>getPoolSize()){for(inti=getPoolSize();i<size&&i<maxPoolSize;i++){

  PooledThreadthread=newPooledThread(this);

  threadstart();threadsadd(thread);

  }

  }elseif(size<getPoolSize()){while(getPoolSize()>size){PooledThreadth=(PooledThread)threadsremove();thkill();

  }

  }

  //Systemoutprintln(重设线程数线程数=+threadssize());

  }

  publicintgetPoolSize(){returnthreadssize();

  }

  protectedvoidnotifyForIdleThread(){

  hasIdleThread=true;

  }

  protectedbooleanwaitForIdleThread(){

  hasIdleThread=false;

  while(!hasIdleThread&&getPoolSize()>=maxPoolSize){try{Threadsleep();}catch(InterruptedExceptione){

  returnfalse;

  }

  }

  returntrue;

  }

  publicsynchronizedPooledThreadgetIdleThread(){while(true){for(Iteratoritr=erator();itrhasNext();){PooledThreadth=(PooledThread)itrnext();if(!thisRunning())

  returnth;

  }

  if(getPoolSize()<maxPoolSize){

  PooledThreadthread=newPooledThread(this);

  threadstart();threadsadd(thread);

  returnthread;

  }

  //Systemoutprintln(线程池已满等待);

  if(waitForIdleThread()==false)

  returnnull;

  }

  }

  publicvoidprocessTask(ThreadTasktask){

  PooledThreadth=getIdleThread();

  if(th!=null){thputTask(task);thstartTasks();

  }

  }

  publicvoidprocessTasksInSingleThread(ThreadTask[]tasks){

  PooledThreadth=getIdleThread();

  if(th!=null){thputTasks(tasks);thstartTasks();

  }

  }

  publicvoidprocessTasksInSingleThread(Collectiontasks){

  PooledThreadth=getIdleThread();

  if(th!=null){thputTasks(tasks);thstartTasks();

  }

  }

  }

  下面是线程池的测试程序

  //ThreadPoolTestjava

  importjavaioBufferedReader;importjavaioIOException;importjavaioInputStreamReader;

  importpolarmanthreadpoolThreadPool;importpolarmanthreadpoolThreadTask;

  publicclassThreadPoolTest{

  publicstaticvoidmain(String[]args){Systemoutprintln(quit退出);Systemoutprintln(taskA启动任务A时长为秒);Systemoutprintln(size设置当前线程池大小为);Systemoutprintln(max设置线程池最大线程数为);Systemoutprintln();

  finalThreadPoolpool=newThreadPool();poolinit();

  ThreadcmdThread=newThread(){publicvoidrun(){

  BufferedReaderreader=newBufferedReader(newInputStreamReader(Systemin));

  while(true){try{Stringline=readerreadLine();Stringwords[]=linesplit();if(words[]equalsIgnoreCase(quit)){Systemexit();}elseif(words[]equalsIgnoreCase(size)&&wordslength>=){try{intsize=IntegerparseInt(words[]);poolsetPoolSize(size);}catch(Exceptione){

  }

  }elseif(words[]equalsIgnoreCase(max)&&wordslength>=){try{intmax=IntegerparseInt(words[]);poolsetMaxPoolSize(max);}catch(Exceptione){

  }

  }elseif(words[]equalsIgnoreCase(task)&&wordslength>=){try{inttimelen=IntegerparseInt(words[]);SimpleTasktask=newSimpleTask(words[]timelen*);poolprocessTask(task);}catch(Exceptione){

  }

  }

  }catch(IOExceptione){eprintStackTrace();

  }

  }

  }

  };

  cmdThreadstart();

  /**//*

  for(inti=;i<;i++){

  SimpleTasktask=newSimpleTask(Task+i(i+)*);poolprocessTask(task);

  }*/

  }

  }

  classSimpleTaskimplementsThreadTask{

  privateStringtaskName;

  privateinttimeLen;

  publicSimpleTask(StringtaskNameinttimeLen){thistaskName=taskName;thistimeLen=timeLen;

  }

  publicvoidrun(){Systemoutprintln(ThreadcurrentThread()getId()+

  :STARTTASK+taskName+);

  try{Threadsleep(timeLen);}catch(InterruptedExceptione){

  }

  Systemoutprintln(ThreadcurrentThread()getId()+

  :ENDTASK+taskName+);

  }

  }

  使用此线程池相当简单下面两行代码初始化线程池

  ThreadPoolpool=newThreadPool();poolinit();

  要处理的任务实现ThreadTask接口即可(如测试代码里的SimpleTask)这个接口只有一个方法run()

  两行代码即可调用

lishixinzhi/Article/program/Java/hx/201311/27203

【FUTURE PROGRAMMING COURSE】尊享对接老板

电话+V: 152079-09430

机构由一批拥有10年以上开发管理经验,且来自互联网或研究机构的IT精英组成,负责研究、开发教学模式和课程内容。公司具有完善的课程研发体系,一直走在整个行业发展的前端,在行业内竖立起了良好的品质口碑。

java线程池底层源码分析
Copyright2023未知推广科技