关于Eventbus的问题
1.线程只要非UI线程和非UI线程就可以了,为什么EventBus中要有好几种Threadmode呢?这有什么好处?
2.EventBus的post方法是怎么调用相应register的相应方法的?
4月18日重新又看下代码
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case PostThread: invokeSubscriber(subscription, event); break; case MainThread: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BackgroundThread: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case Async: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
可见PostThread模式的意思是post事件的时候在哪类线程,最终就在哪类线程调用方法. mainThread模式当post的时候不在主线程,是通过mainThreadPost.enqueue去执行的. 看mainThreadPost为啥能执行.
final class HandlerPoster extends Handler
private final HandlerPoster mainThreadPoster;
EventBus(EventBusBuilder builder) { //省略 mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); //省略 } 可以发现这里使用了Looper.getMainLooer(); 就是说让HandlerPoster这个Handler的handleMessage方法会在主线程执行. 这是enqueue方法的内容
void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { //A处 throw new EventBusException("Could not send handler message"); } } } }
可以看到A处代码sendMessage. 然后就到了handleMessage方法.
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); //B处 long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }
所以B处代码一定是在主线程执行的了.
而Async模式
case Async: asyncPoster.enqueue(subscription, event); break;
class AsyncPoster implements Runnable { public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); } }
就是把这个任务放到一个线程池执行.所以必须Async了.
case BackgroundThread: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); }
看一下这个BackGroundThread如果post事件的时候在主线程.利用backGroundPoster.enqueue去执行. 也就是把这个事件放到一个线程池去执行. 如果判断出post的时候不是主线程.则直接执行.那么当前就不在主线程上必然就是background了.
这里的线程池默认是这样 private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
4月18日重新又看下代码
`
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case PostThread: invokeSubscriber(subscription, event); break; case MainThread: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BackgroundThread: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case Async: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
`
private static final String ON_EVENT_METHOD_NAME = "onEvent";
注册的时候最终会调用的方法
private synchronized void register(Object subscriber, boolean sticky, int priority) { //A strart ListsubscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass()); //A end for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod, sticky, priority); } }
1.首先通过反射拿到带包名的类名,跳过java,javax,android开头的类. 2.然后通过反射拿到方法的修饰符等.过滤掉非public方法.判断方法以指定字符串开头,并且方法的参数只能有一个等判断 3.然后通过截取字符串知道方法的threadMode. 4.然后攒出SubscriberMethod对象. 5.等for循环结束后这里就能拿到一个Lis<SubscriberMethod> subscriberMethods列表. 6.另外有一个缓存 private static final Map<String, List<SubscriberMethod>> methodCache = new HashMap<String, List<SubscriberMethod>>(); 这里有一个缓存,key是订阅者的全类名,value为全部找到的SubscriberMethod的列表.
在A处的代码如下:
ListfindSubscriberMethods(Class subscriberClass) { String key = subscriberClass.getName(); List subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key); } if (subscriberMethods != null) { return subscriberMethods; } subscriberMethods = new ArrayList (); Class clazz = subscriberClass; HashSet eventTypesFound = new HashSet (); StringBuilder methodKeyBuilder = new StringBuilder(); while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) { // Skip system classes, this just degrades performance break; } // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again) Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { String methodName = method.getName(); if (methodName.startsWith(ON_EVENT_METHOD_NAME)) { int modifiers = method.getModifiers(); //在java.lang.reflect包中有一个Modifier.java这么一个类,这个类中定义了一些常量表示方法的修饰符. //这里的意思就是public方法. if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { //这里通过反射拿到方法的参数所对应的Class Class [] parameterTypes = method.getParameterTypes(); //然后限制方法的参数只能有一个 if (parameterTypes.length == 1) { //根据方法名截取字符串知道方法的threadMode. String modifierString = methodName.substring(ON_EVENT_METHOD_NAME.length()); ThreadMode threadMode; //默认threadMode为postThread即post的是什么线程就是什么线程 if (modifierString.length() == 0) { threadMode = ThreadMode.PostThread; } else if (modifierString.equals("MainThread")) { threadMode = ThreadMode.MainThread; } else if (modifierString.equals("BackgroundThread")) { threadMode = ThreadMode.BackgroundThread; } else if (modifierString.equals("Async")) { threadMode = ThreadMode.Async; } else { if (skipMethodVerificationForClasses.containsKey(clazz)) { continue; } else { throw new EventBusException("Illegal onEvent method, check for typos: " + method); } } //这里拿到方法参数对应的Class对象 Class eventType = parameterTypes[0]; methodKeyBuilder.setLength(0); methodKeyBuilder.append(methodName); methodKeyBuilder.append('>').append(eventType.getName()); //这里的methodKey为方法名>方法参数全类名 String methodKey = methodKeyBuilder.toString(); //hashSet的add方法有返回值,如果加入成功为true. //这个的eventTypesFound为一个HashSet if (eventTypesFound.add(methodKey)) { // Only add if not already found in a sub class //这个的method为方法对应的反射Method对象. subscriberMethods.add(new SubscriberMethod(method, threadMode, eventType)); } } } else if (!skipMethodVerificationForClasses.containsKey(clazz)) { Log.d(EventBus.TAG, "Skipping method (not public, static or abstract): " + clazz + "." + methodName); } } } //当for循环结束就拿到一个List 对象 clazz = clazz.getSuperclass(); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " has no public methods called " + ON_EVENT_METHOD_NAME); } else { synchronized (methodCache) { methodCache.put(key, subscriberMethods); } return subscriberMethods; } }
然后就到了EventBus类中的subscribe方法 1.subscriptionsByEventType的声明如下: private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; 即一个维护了方法参数Class对象和一个Subscription列表的映射 Subscription对象是一个维护订阅者,订阅者方法对象,和订阅优先级的对象. 因为第一次会put进subscriptionsByEventType的映射,所以以后再注册就是重复注册了.
- typesBySubscriber的声明如下: private final Map<Object, List<Class<?>>> typesBySubscriber; 维护一个订阅者和订阅方法参数Class对象列表的映射. 3.sticky的情况先略过.
// Must be called in synchronized block private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) { //这里是方法的参数的Class对象 Class eventType = subscriberMethod.eventType; CopyOnWriteArrayListsubscriptions = subscriptionsByEventType.get(eventType); //Subsciption是一个维护订阅者,订阅的方法对象和订阅优先级的对象. Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList (); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again) // subscriberMethod.method.setAccessible(true); //如果新的订阅者的优先级更高,那么放到subscriptions列表的更前面一位 int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || newSubscription.priority > subscriptions.get(i).priority) { subscriptions.add(i, newSubscription); break; } } //private final Map
然后看post方法怎么把post的东西在相应订阅者身上调用.
/** Posts the given event to the event bus. */public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List
1.这里的currentPostingThreadState对象的声明如下: private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } };
就是一个ThreadLocal里面存了PostingThreadState,确保获取每一个线程自己的PostingThreadState. 这个PostingThreadState的声明如下:
/** For ThreadLocal, much faster to set (and get multiple values). */ final static class PostingThreadState { final List
然后就到了postSingEvent方法.
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
有一个eventInheritance变量标识事件是不是继承的.然后根据是否做不同的处理. 先看下不是继承的情况.
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class eventClass) { CopyOnWriteArrayListsubscriptions; synchronized (this) { //这里的eventClass是EventBus所post的事件的class对象 //即一个维护了方法参数Class对象和一个Subscription列表的映射 //Subscription对象是一个维护订阅者,订阅者方法对象,和订阅优先级的对象. subscriptions = subscriptionsByEventType.get(eventClass); } //到这里就能拿到订阅这个事件的所有subscription对象. if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { //这个时候postingState已经有了订阅者,订阅的方法,是否在主线程,要发送的事件对象等信息. postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
然后就到了postToSubscription方法 根据不同的threadMode区分调用.
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case PostThread: invokeSubscriber(subscription, event); break; case MainThread: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BackgroundThread: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case Async: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
如果是postThread或者是MainThread的话调用invokdeSubscriber方法. 这里很简单,就是一个反射调用.该有的信息都有了.
void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
如果是BackgroundThread并且是是是是在主线程中调用的话. backgroundPoster.enqueue(subscription, event); 这里边做了什么?
enqueue中最终会调用eventBus.getExecutorService().execute(this); 大概是有一个线程池去执行这个BackgroundPoster.执行的具体任务看run方法 最终有一个eventBus.invokeSubscriber(pendingPost);
void invokeSubscriber(PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } }
然后是反射调用: void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
final class BackgroundPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }}