概念
asynctask:异步任务,从字面上来说,就是在我们的ui主线程运行的时候,异步的完成一些操作。asynctask允许我们的执行一个异步的任务在后台。我们可以将耗时的操作放在异步任务当中来执行,并随时将任务执行的结果返回给我们的ui线程来更新我们的ui控件。通过asynctask我们可以轻松的解决多线程之间的通信问题。
怎么来理解asynctask呢?通俗一点来说,asynctask就相当于android给我们提供了一个多线程编程的一个框架,其介于thread和handler之间,我们如果要定义一个asynctask,就需要定义一个类来继承asynctask这个抽象类,并实现其唯一的一doinbackgroud 抽象方法。
类简介
asynctask = handler 两个线程池的维护一个任务队列线程池,一个执行线程池
其中:线程池用于线程调度、复用 & 执行任务;
handler
用于异步通信
我们来看看asynctask这个抽象类的定义,当我们定义一个类来继承asynctask这个类的时候,我们需要为其指定3个泛型参数:
asynctask
- params: 这个泛型指定的是我们传递给异步任务执行时的参数的类型
- progress: 这个泛型指定的是我们的异步任务在执行的时候将执行的进度返回给ui线程的参数的类型
- result: 这个泛型指定的异步任务执行完后返回给ui线程的结果的类型
简单例子
private class downloadfilestask extends asynctask{ //忽略 } public void onclick(view v) { try { url url = new ; new downloadfilestask().execute(url); } catch (malformedurlexception e) { e.printstacktrace(); } }
执行流程
一、在执行完 asynctask.excute() 后
二、方法分析
常用的asynctask继承的方法有
- onpreexecute():异步任务开启之前回调,在主线程中执行
- doinbackground():执行异步任务,在线程池中执行
- onprogressupdate():当doinbackground中调用publishprogress时回调,在主线程中执行
- onpostexecute():在异步任务执行之后回调,在主线程中执行
- oncancelled():在异步任务被取消时回调
小demo
效果图:
activity
public class asythreadactivity extends activity implements view.onclicklistener { @bindview(r.id.btn_click) button btnclick; @bindview(r.id.progress) progressbar progress; @bindview(r.id.btn_chancel) button btnchancel; @bindview(r.id.text) textview text; mythread myasythread = new mythread(); @override protected void oncreate(@nullable bundle savedinstancestate) { super.oncreate(savedinstancestate); setcontentview(r.layout.activity_asythread); butterknife.bind(this); initclcik(); } private void initclcik() { btnclick.setonclicklistener(this); btnchancel.setonclicklistener(this); myasythread.setontextchance(new mythread.ontextchance() { @override public void textchance(string s) { text.settext(s); } @override public void progresschance(integer number) { progress.setprogress(number); } }); } @override public void onclick(view v) { switch (v.getid()) { case r.id.btn_click: myasythread.execute(); break; case r.id.btn_chancel: myasythread.cancel(true); break; default: break; } } }
myasynctask
public class mythread extends asynctask{ private ontextchance ontextchance; public mythread() { super(); } @override protected string doinbackground(string... params) { int count = 0; while(count<99){ count = count 1; publishprogress(count); try { thread.sleep(50); } catch (interruptedexception e) { e.printstacktrace(); } } return null; } @override protected void onpreexecute() { ontextchance.textchance("加载中"); super.onpreexecute(); } @override protected void onpostexecute(string s) { ontextchance.textchance("加载完毕"); super.onpostexecute(s); } @override protected void onprogressupdate(integer... values) { ontextchance.progresschance(values[0]); ontextchance.textchance("loading..." values[0] "%"); super.onprogressupdate(values); } @override protected void oncancelled() { ontextchance.textchance("已取消"); ontextchance.progresschance(0); super.oncancelled(); }
源码分析
好吧 ,在想写之前刚刚好瞄到一位小哥哥的文章写得超级好,然后我就偷懒一把,如下。
源码分析是基于api24的源码,我将会按照下面asynctask运行的过程来分析
一、主分支
首先,execute()方法,开启异步任务
接着,onpreexecute()方法,异步任务开启前
接着,doinbackground()方法,异步任务正在执行
最后,onpostexecute()方法,异步任务完成
二、次分支
onprogressupdate()方法,异步任务更新ui
oncancelled()方法,异步任务取消
主分支部分
代码开始的地方,是在创建asynctask类之后执行的execute()方法
public final asynctaskexecute(params... params) { return executeonexecutor(sdefaultexecutor, params); }
execute()方法会调用executeonexecutor()方法
public final asynctaskexecuteonexecutor(executor exec, params... params) { if (mstatus != status.pending) { switch (mstatus) { case running: throw new illegalstateexception("cannot execute task:" " the task is already running."); case finished: throw new illegalstateexception("cannot execute task:" " the task has already been executed " "(a task can be executed only once)"); } } mstatus = status.running; onpreexecute(); mworker.mparams = params; exec.execute(mfuture); return this; }
asynctask定义了一个mstatus变量,表示异步任务的运行状态,分别是pending、running、finished,当只有pending状态时,asynctask才会执行,这样也就保证了asynctask只会被执行一次
继续往下执行,mstatus会被标记为running,接着执行,onpreexecute(),将参数赋值给mworker,然后还有execute(mfuture)
这里的mworker和mfuture究竟是什么,我们往下追踪,来到asynctask的构造函数中,可以找到这两个的初始化
public asynctask() { mworker = new workerrunnable() { public result call() throws exception { mtaskinvoked.set(true); process.setthreadpriority(process.thread_priority_background); //noinspection unchecked result result = doinbackground(mparams); binder.flushpendingcommands(); return postresult(result); } }; mfuture = new futuretask (mworker) { @override protected void done() { try { postresultifnotinvoked(get()); } catch (interruptedexception e) { android.util.log.w(log_tag, e); } catch (executionexception e) { throw new runtimeexception("an error occurred while executing doinbackground()", e.getcause()); } catch (cancellationexception e) { postresultifnotinvoked(null); } } }; }
一、分析mworker
mworker是一个workerrunnable对象,跟踪workerrunnable
private static abstract class workerrunnableimplements callable { params[] mparams; }
实际上,workerrunnable是asynctask的一个抽象内部类,实现了callable接口
二、分析mfuture
mfuture是一个futuretask对象,跟踪futuretask
public futuretask(callablecallable) { if (callable == null) throw new nullpointerexception(); this.callable = callable; this.state = new; // ensure visibility of callable }
实际上,futuretask是java.util.concurrent包下的一个类,参数是个callable,并且将它赋值给futuretask类中的callable
三、回过头来看
回到我们asynctask初始化mfuture,这里的参数是mworker也就不奇怪了,因为mworker就是一个callable,我们在上面赋值给futuretask类中的callable就是这个mworker
mfuture = new futuretask(mworker)
而关于mworker和mfuture的初始化早在我们activity中初始化好了,因为构造函数是跟asynctask类的创建而执行的
new downloadfilestask()
知道了mworker和mfuture是什么后,我们回到原来的executeonexecutor()方法,在这里将mworker的参数传过去后,就开始用线程池execute这个mfuture
mworker.mparams = params; exec.execute(mfuture);
一、分析exec
exec是通过executeonexecutor()参数传进来的
public final asynctaskexecuteonexecutor(executor exec, params... params)
也就是我们execute()方法传过来的
public final asynctaskexecute(params... params) { return executeonexecutor(sdefaultexecutor, params); }
这里可以看到exec就是这个sdefaultexecutor
二、分析sdefaultexecutor
我们跟踪这个sdefaultexecutor,截取有关它的代码
public static final executor serial_executor = new serialexecutor(); private static volatile executor sdefaultexecutor = serial_executor; private static class serialexecutor implements executor { final arraydequemtasks = new arraydeque (); runnable mactive; public synchronized void execute(final runnable r) { mtasks.offer(new runnable() { public void run() { try { r.run(); } finally { schedulenext(); } } }); if (mactive == null) { schedulenext(); } } protected synchronized void schedulenext() { if ((mactive = mtasks.poll()) != null) { thread_pool_executor.execute(mactive); } } }
从serialexecutor可以发现,exec.execute(mfuture)就是在调用serialexecutor类的execute(final runnable r)方法,这里的参数r就是mfuture
继续往下走,serialexecutor的execute()方法会将r封装成runnable,并添加到mtasks任务队列中
继续往下走,如果这时候没有正在活动的asynctask任务,那么就会调用serialexecutor的schedulenext()方法,来执行下一个asynctask任务
if (mactive == null) { schedulenext(); }
继续往下走,通过mtasks.poll()取出,将封装在mtask的runnable交给mactive,最后真正执行的这个mactive的是thread_pool_executor,即执行的这个mactive,也就是包装在runnable里面的mfuture
protected synchronized void schedulenext() { if ((mactive = mtasks.poll()) != null) { thread_pool_executor.execute(mactive); } }
mfuture被执行了,也就会执行它的run()方法
public void run() { try { r.run(); } finally { schedulenext(); } }
我们跟踪到mfuture的run()方法中,切换到futuretask类
public void run() { if (state != new || !u.compareandswapobject(this, runner, null, thread.currentthread())) return; try { callablec = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call(); ran = true; } catch (throwable ex) { result = null; ran = false; setexception(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } }
这一段代码其实就是将之前在mfutrue创建对象时候传进来的mworker交给c
callablec = callable;
然后再调用c的call()方法,也就是mworker的call()方法
result = c.call();
代码又重新的定位到了mworker类的call()方法
mworker = new workerrunnable() { public result call() throws exception { mtaskinvoked.set(true); process.setthreadpriority(process.thread_priority_background); //noinspection unchecked result result = doinbackground(mparams); binder.flushpendingcommands(); return postresult(result); } };
可以发现,这里就调用了我们的doinbackground()方法,最后还返回postresult(),我们跟踪这个postresult()方法
private result postresult(result result) { @suppresswarnings("unchecked") message message = gethandler().obtainmessage(message_post_result, new asynctaskresult(this, result)); message.sendtotarget(); return result; }
观察代码,首先是gethandler(),它是一个单例,返回shandler
private static handler gethandler() { synchronized (asynctask.class) { if (shandler == null) { shandler = new internalhandler(); } return shandler; } }
也就是说postresult方法会通过shandler发送一个message_post_result的消息,这个时候我们追踪到shandler
private static internalhandler shandler; private static class internalhandler extends handler { public internalhandler() { super(looper.getmainlooper()); } @suppresswarnings({"unchecked", "rawuseofparameterizedtype"}) @override public void handlemessage(message msg) { asynctaskresult result = (asynctaskresult) msg.obj; switch (msg.what) { case message_post_result: // there is only one result result.mtask.finish(result.mdata[0]); break; case message_post_progress: result.mtask.onprogressupdate(result.mdata); break; } } }
可以发现,shandler收到message_post_progress消息后会调用result.mtask.finish(result.mdata[0]),那么我们还必须知道result是个什么东西
asynctaskresult result = (asynctaskresult) msg.obj;
result是asynctask的内部类,实际上就是个实体类,用来存储变量的
private static class asynctaskresult { final asynctask mtask; final data[] mdata; asynctaskresult(asynctask task, data... data) { mtask = task; mdata = data; } }
result.mtask也就是asynctask,最后调用result.mtask.finish(result.mdata[0]),即asynctask的finish()方法,我们跟踪到finish()方法
private void finish(result result) { if (iscancelled()) { oncancelled(result); } else { onpostexecute(result); } mstatus = status.finished; }
这里判断asynctask是否已经取消,如果不取消就执行我们的onpostexecute(),最后将状态设置为finished,整一个asynctask的方法都执行完了,我们只需要继承asynctask实现其中的方法就可以按分析的顺序往下执行了
次分支部分
在asynctask中的finish()方法,我们可以看到oncancelled()方法跟onpostexecute()一起的,只要iscancelled()的值为true,就执行oncancelled()方法
private void finish(result result) { if (iscancelled()) { oncancelled(result); } else { onpostexecute(result); } mstatus = status.finished; }
我们代码跟踪iscancelled()方法
public final boolean iscancelled() { return mcancelled.get(); }
发现是在mcancelled中获取的,那我们就必须知道这个mcancelled是什么,代码跟踪到mcancelled
private final atomicboolean mcancelled = new atomicboolean();
mcancelled实际上就是个boolean对象,那我们搜索它是在哪个时候设置的
public final boolean cancel(boolean mayinterruptifrunning) { mcancelled.set(true); return mfuture.cancel(mayinterruptifrunning); }
可以发现,只要我们在asynctask类中调用这个方法即可停止异步任务
而onprogressupdate()方法,是在shandler中执行,shandler收到message_post_progress消息后,执行,我们搜索message_post_progress在什么时候发送的
private static class internalhandler extends handler { public internalhandler() { super(looper.getmainlooper()); } @suppresswarnings({"unchecked", "rawuseofparameterizedtype"}) @override public void handlemessage(message msg) { asynctaskresult result = (asynctaskresult) msg.obj; switch (msg.what) { case message_post_result: // there is only one result result.mtask.finish(result.mdata[0]); break; case message_post_progress: result.mtask.onprogressupdate(result.mdata); break; } } }
可以发现,只要我们在asynctask类中调用publishprogress()方法即可执行onprogressupdate()方法
protected final void publishprogress(progress... values) { if (!iscancelled()) { gethandler().obtainmessage(message_post_progress, new asynctaskresult
源码原文:https://blog.csdn.net/qq_30379689/article/details/53203556
说实话上面源码分析写得真好,我在看了上面的源码感觉少了一丢丢东西,自己在额外的补充一点东西
补充:thread_pool_executor
/** * 源码分析:thread_pool_executor.execute() * 说明: * a. thread_pool_executor实际上是1个已配置好的可执行并行任务的线程池 * b. 调用thread_pool_executor.execute()实际上是调用线程池的execute()去执行具体耗时任务 * c. 而该耗时任务则是步骤2中初始化workerrunnable实例对象时复写的call() * 注:下面先看任务执行线程池的线程配置过程,看完后请回到步骤2中的源码分析call() */ // 步骤1:参数设置 //获得当前cpu的核心数 private static final int cpu_count = runtime.getruntime().availableprocessors(); //设置线程池的核心线程数2-4之间,但是取决于cpu核数 private static final int core_pool_size = math.max(2, math.min(cpu_count - 1, 4)); //设置线程池的最大线程数为 cpu核数*2 1 private static final int maximum_pool_size = cpu_count * 2 1; //设置线程池空闲线程存活时间30s private static final int keep_alive_seconds = 30; //初始化线程工厂 private static final threadfactory sthreadfactory = new threadfactory() { private final atomicinteger mcount = new atomicinteger(1); public thread newthread(runnable r) { return new thread(r, "asynctask #" mcount.getandincrement()); } }; //初始化存储任务的队列为linkedblockingqueue 最大容量为128 private static final blockingqueuespoolworkqueue = new linkedblockingqueue (128); // 步骤2: 根据参数配置执行任务线程池,即 thread_pool_executor public static final executor thread_pool_executor; static { threadpoolexecutor threadpoolexecutor = new threadpoolexecutor( core_pool_size, maximum_pool_size, keep_alive_seconds, timeunit.seconds, spoolworkqueue, sthreadfactory); // 设置核心线程池的 超时时间也为30s threadpoolexecutor.allowcorethreadtimeout(true); thread_pool_executor = threadpoolexecutor; }
还有就是 我们可以通过源码可以知道 使用asynctask执行的后台线程只有一个,这个结局看起来很悲伤。我们原以为执行如下。
但是现实是这样子的
被串行设置了 如下 synchronized
private static class serialexecutor implements executor { // serialexecutor = 静态内部类 // 即 是所有实例化的asynctask对象公有的 // serialexecutor 内部维持了1个双向队列; // 容量根据元素数量调节 final arraydequemtasks = new arraydeque (); runnable mactive; // execute()被同步锁synchronized修饰 // 即说明:通过锁使得该队列保证asynctask中的任务是串行执行的 // 即 多个任务需1个个加到该队列中;然后 执行完队列头部的再执行下一个,以此类推 public synchronized void execute(final runnable r) { // 将实例化后的futuretask类 的实例对象传入 // 即相当于:向队列中加入一个新的任务 mtasks.offer(new runnable() { public void run() { try { r.run(); } finally { schedulenext();->>分析3 } } }); // 若当前无任务执行,则去队列中取出1个执行 if (mactive == null) { schedulenext(); } } // 分析3 protected synchronized void schedulenext() { // 1. 取出队列头部任务 if ((mactive = mtasks.poll()) != null) { // 2. 执行取出的队列头部任务 // 即 调用执行任务线程池类(thread_pool_executor)->>继续往下看 thread_pool_executor.execute(mactive); } } }
好吧,我到现在还是没有明白为什么他要如此限制,等哪位小哥哥,知道这个为什么的时候告诉我一声,拜托拜托
如果我们还是想并行执行,参考如下
方法一,绕过 excute 方法避免serialexecutor 对象
方法二,绕过 excute 方法避免被转换为 自定义线程池
方法三,在excute 方法将默认对象换为我们的 自定义线程池对象
//自定义线程池 private static final int cpu_count = runtime.getruntime().availableprocessors(); private static final int core_pool_size = math.max(2, math.min(cpu_count - 1, 4)); private static final int maximum_pool_size = cpu_count * 2 1; private static final int keep_alive_seconds = 60; public static final executor my_thread_pool_executor; private static final threadfactory sthreadfactory = new threadfactory() { private final atomicinteger mcount = new atomicinteger(1); @override public thread newthread(runnable r) { return new thread(r, "asynctask #" mcount.getandincrement()); } }; private static final blockingqueuespoolworkqueue = new linkedblockingqueue (128); static { threadpoolexecutor threadpoolexecutor = new threadpoolexecutor( core_pool_size, maximum_pool_size, keep_alive_seconds, timeunit.seconds, spoolworkqueue, sthreadfactory); threadpoolexecutor.allowcorethreadtimeout(true); my_thread_pool_executor = threadpoolexecutor; }
case r.id.btasynctaskserial://串行执行 new myasynctask(mactivity, "task#1 ").execute("123"); new myasynctask(mactivity, "task#2 ").execute("123"); new myasynctask(mactivity, "task#3 ").execute("123"); break; case r.id.btasynctaskparallel://并行执行 -- 这里使用 asynctask 自带的线程池 new myasynctask(mactivity, "task#1 ").executeonexecutor(asynctask.thread_pool_executor, "123"); new myasynctask(mactivity, "task#2 ").executeonexecutor(asynctask.thread_pool_executor, "123"); new myasynctask(mactivity, "task#3 ").executeonexecutor(asynctask.thread_pool_executor, "123"); break; case r.id.btasynctaskparallelbyus://并行执行 -- 自定义线程池 new myasynctask(mactivity, "task#1 ").executeonexecutor(my_thread_pool_executor, "123"); new myasynctask(mactivity, "task#2 ").executeonexecutor(my_thread_pool_executor, "123"); new myasynctask(mactivity, "task#3 ").executeonexecutor(my_thread_pool_executor, "123"); break; case r.id.btasynctaskparallelbyus2://并行执行 -- 自定义线程池 另外一种方式 myasynctask.setdefaultexecutor(my_thread_pool_executor);//替换掉默认的 asynctask.serial_executor new myasynctask(mactivity, "task#a ").execute("abc"); new myasynctask(mactivity, "task#b ").execute("abc"); new myasynctask(mactivity, "task#c ").execute("abc"); break;
重要的事再次强调我到现在还是没有明白为什么他要如此限制,等哪位小哥哥,知道这个为什么的时候告诉我一声,拜托拜托
thanks