博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
EventBus
阅读量:6824 次
发布时间:2019-06-26

本文共 21273 字,大约阅读时间需要 70 分钟。

hot3.png

关于Eventbus的问题

1.线程只要非UI线程和非UI线程就可以了,为什么EventBus中要有好几种Threadmode呢?这有什么好处?

2.EventBus的post方法是怎么调用相应register的相应方法的?

4月18日重新又看下代码

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {        switch (subscription.subscriberMethod.threadMode) {            case PostThread:                invokeSubscriber(subscription, event);                break;            case MainThread:                if (isMainThread) {                    invokeSubscriber(subscription, event);                } else {                    mainThreadPoster.enqueue(subscription, event);                }                break;            case BackgroundThread:                if (isMainThread) {                    backgroundPoster.enqueue(subscription, event);                } else {                    invokeSubscriber(subscription, event);                }                break;            case Async:                asyncPoster.enqueue(subscription, event);                break;            default:                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);        }    }

可见PostThread模式的意思是post事件的时候在哪类线程,最终就在哪类线程调用方法. mainThread模式当post的时候不在主线程,是通过mainThreadPost.enqueue去执行的. 看mainThreadPost为啥能执行.

final class HandlerPoster extends Handler

private final HandlerPoster mainThreadPoster;

EventBus(EventBusBuilder builder) { //省略 mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); //省略 } 可以发现这里使用了Looper.getMainLooer(); 就是说让HandlerPoster这个Handler的handleMessage方法会在主线程执行. 这是enqueue方法的内容

void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            queue.enqueue(pendingPost);            if (!handlerActive) {                handlerActive = true;                if (!sendMessage(obtainMessage())) {        //A处                    throw new EventBusException("Could not send handler message");                }            }        }    }

可以看到A处代码sendMessage. 然后就到了handleMessage方法.

@Override    public void handleMessage(Message msg) {        boolean rescheduled = false;        try {            long started = SystemClock.uptimeMillis();            while (true) {                PendingPost pendingPost = queue.poll();                if (pendingPost == null) {                    synchronized (this) {                        // Check again, this time in synchronized                        pendingPost = queue.poll();                        if (pendingPost == null) {                            handlerActive = false;                            return;                        }                    }                }                eventBus.invokeSubscriber(pendingPost); //B处                long timeInMethod = SystemClock.uptimeMillis() - started;                if (timeInMethod >= maxMillisInsideHandleMessage) {                    if (!sendMessage(obtainMessage())) {                        throw new EventBusException("Could not send handler message");                    }                    rescheduled = true;                    return;                }            }        } finally {            handlerActive = rescheduled;        }    }

所以B处代码一定是在主线程执行的了.

而Async模式

case Async:        asyncPoster.enqueue(subscription, event);        break;
class AsyncPoster implements Runnable {        public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        queue.enqueue(pendingPost);        eventBus.getExecutorService().execute(this);    }    }

就是把这个任务放到一个线程池执行.所以必须Async了.

case BackgroundThread:        if (isMainThread) {            backgroundPoster.enqueue(subscription, event);        } else {            invokeSubscriber(subscription, event);        }

看一下这个BackGroundThread如果post事件的时候在主线程.利用backGroundPoster.enqueue去执行. 也就是把这个事件放到一个线程池去执行. 如果判断出post的时候不是主线程.则直接执行.那么当前就不在主线程上必然就是background了.

这里的线程池默认是这样 private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

4月18日重新又看下代码

`

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {        switch (subscription.subscriberMethod.threadMode) {            case PostThread:                invokeSubscriber(subscription, event);                break;            case MainThread:                if (isMainThread) {                    invokeSubscriber(subscription, event);                } else {                    mainThreadPoster.enqueue(subscription, event);                }                break;            case BackgroundThread:                if (isMainThread) {                    backgroundPoster.enqueue(subscription, event);                } else {                    invokeSubscriber(subscription, event);                }                break;            case Async:                asyncPoster.enqueue(subscription, event);                break;            default:                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);        }    }

`

private static final String ON_EVENT_METHOD_NAME = "onEvent";

注册的时候最终会调用的方法

private synchronized void register(Object subscriber, boolean sticky, int priority) {        //A strart        List
subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass()); //A end for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod, sticky, priority); } }

1.首先通过反射拿到带包名的类名,跳过java,javax,android开头的类. 2.然后通过反射拿到方法的修饰符等.过滤掉非public方法.判断方法以指定字符串开头,并且方法的参数只能有一个等判断 3.然后通过截取字符串知道方法的threadMode. 4.然后攒出SubscriberMethod对象. 5.等for循环结束后这里就能拿到一个Lis<SubscriberMethod> subscriberMethods列表. 6.另外有一个缓存 private static final Map<String, List<SubscriberMethod>> methodCache = new HashMap<String, List<SubscriberMethod>>(); 这里有一个缓存,key是订阅者的全类名,value为全部找到的SubscriberMethod的列表.

在A处的代码如下:

List
findSubscriberMethods(Class
subscriberClass) { String key = subscriberClass.getName(); List
subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key); } if (subscriberMethods != null) { return subscriberMethods; } subscriberMethods = new ArrayList
(); Class
clazz = subscriberClass; HashSet
eventTypesFound = new HashSet
(); StringBuilder methodKeyBuilder = new StringBuilder(); while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) { // Skip system classes, this just degrades performance break; } // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again) Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { String methodName = method.getName(); if (methodName.startsWith(ON_EVENT_METHOD_NAME)) { int modifiers = method.getModifiers(); //在java.lang.reflect包中有一个Modifier.java这么一个类,这个类中定义了一些常量表示方法的修饰符. //这里的意思就是public方法. if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { //这里通过反射拿到方法的参数所对应的Class Class
[] parameterTypes = method.getParameterTypes(); //然后限制方法的参数只能有一个 if (parameterTypes.length == 1) { //根据方法名截取字符串知道方法的threadMode. String modifierString = methodName.substring(ON_EVENT_METHOD_NAME.length()); ThreadMode threadMode; //默认threadMode为postThread即post的是什么线程就是什么线程 if (modifierString.length() == 0) { threadMode = ThreadMode.PostThread; } else if (modifierString.equals("MainThread")) { threadMode = ThreadMode.MainThread; } else if (modifierString.equals("BackgroundThread")) { threadMode = ThreadMode.BackgroundThread; } else if (modifierString.equals("Async")) { threadMode = ThreadMode.Async; } else { if (skipMethodVerificationForClasses.containsKey(clazz)) { continue; } else { throw new EventBusException("Illegal onEvent method, check for typos: " + method); } } //这里拿到方法参数对应的Class对象 Class
eventType = parameterTypes[0]; methodKeyBuilder.setLength(0); methodKeyBuilder.append(methodName); methodKeyBuilder.append('>').append(eventType.getName()); //这里的methodKey为方法名>方法参数全类名 String methodKey = methodKeyBuilder.toString(); //hashSet的add方法有返回值,如果加入成功为true. //这个的eventTypesFound为一个HashSet
if (eventTypesFound.add(methodKey)) { // Only add if not already found in a sub class //这个的method为方法对应的反射Method对象. subscriberMethods.add(new SubscriberMethod(method, threadMode, eventType)); } } } else if (!skipMethodVerificationForClasses.containsKey(clazz)) { Log.d(EventBus.TAG, "Skipping method (not public, static or abstract): " + clazz + "." + methodName); } } } //当for循环结束就拿到一个List
对象 clazz = clazz.getSuperclass(); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " has no public methods called " + ON_EVENT_METHOD_NAME); } else { synchronized (methodCache) { methodCache.put(key, subscriberMethods); } return subscriberMethods; } }

然后就到了EventBus类中的subscribe方法 1.subscriptionsByEventType的声明如下: private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; 即一个维护了方法参数Class对象和一个Subscription列表的映射 Subscription对象是一个维护订阅者,订阅者方法对象,和订阅优先级的对象. 因为第一次会put进subscriptionsByEventType的映射,所以以后再注册就是重复注册了.

  1. typesBySubscriber的声明如下: private final Map<Object, List<Class<?>>> typesBySubscriber; 维护一个订阅者和订阅方法参数Class对象列表的映射. 3.sticky的情况先略过.
// Must be called in synchronized block    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority)     {        //这里是方法的参数的Class对象        Class
eventType = subscriberMethod.eventType; CopyOnWriteArrayList
subscriptions = subscriptionsByEventType.get(eventType); //Subsciption是一个维护订阅者,订阅的方法对象和订阅优先级的对象. Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList
(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again) // subscriberMethod.method.setAccessible(true); //如果新的订阅者的优先级更高,那么放到subscriptions列表的更前面一位 int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || newSubscription.priority > subscriptions.get(i).priority) { subscriptions.add(i, newSubscription); break; } } //private final Map
>> typesBySubscriber; //维护一个订阅者和订阅方法参数对象列表的映射. List
> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList
>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (sticky) { Object stickyEvent; synchronized (stickyEvents) { stickyEvent = stickyEvents.get(eventType); } if (stickyEvent != null) { // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state) // --> Strange corner case, which we don't take care of here. postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper()); } } }

然后看post方法怎么把post的东西在相应订阅者身上调用.

/** Posts the given event to the event bus. */public void post(Object event) {    PostingThreadState postingState = currentPostingThreadState.get();    List eventQueue = postingState.eventQueue;    //给当前线程的PostingThreadState对象赋值,这里的值是要发送的事件.    eventQueue.add(event);    if (!postingState.isPosting)     {        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();        postingState.isPosting = true;        if (postingState.canceled) {            throw new EventBusException("Internal error. Abort state was not reset");        }        try         {            while (!eventQueue.isEmpty())             {                postSingleEvent(eventQueue.remove(0), postingState);            }        }        finally {            postingState.isPosting = false;            postingState.isMainThread = false;        }    }}

1.这里的currentPostingThreadState对象的声明如下: private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } };

就是一个ThreadLocal里面存了PostingThreadState,确保获取每一个线程自己的PostingThreadState. 这个PostingThreadState的声明如下:

/** For ThreadLocal, much faster to set (and get multiple values). */    final static class PostingThreadState {        final List eventQueue = new ArrayList();        boolean isPosting;        boolean isMainThread;        Subscription subscription;        Object event;        boolean canceled;    }

然后就到了postSingEvent方法.

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error  {        Class
eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List
> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class
clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }

有一个eventInheritance变量标识事件是不是继承的.然后根据是否做不同的处理. 先看下不是继承的情况.

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class
eventClass) { CopyOnWriteArrayList
subscriptions; synchronized (this) { //这里的eventClass是EventBus所post的事件的class对象 //即一个维护了方法参数Class对象和一个Subscription列表的映射 //Subscription对象是一个维护订阅者,订阅者方法对象,和订阅优先级的对象. subscriptions = subscriptionsByEventType.get(eventClass); } //到这里就能拿到订阅这个事件的所有subscription对象. if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { //这个时候postingState已经有了订阅者,订阅的方法,是否在主线程,要发送的事件对象等信息. postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }

然后就到了postToSubscription方法 根据不同的threadMode区分调用.

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {        switch (subscription.subscriberMethod.threadMode)         {            case PostThread:                invokeSubscriber(subscription, event);                break;            case MainThread:                if (isMainThread)                 {                    invokeSubscriber(subscription, event);                }                else                 {                    mainThreadPoster.enqueue(subscription, event);                }                break;            case BackgroundThread:                if (isMainThread)                 {                    backgroundPoster.enqueue(subscription, event);                }                else                 {                    invokeSubscriber(subscription, event);                }                break;            case Async:                asyncPoster.enqueue(subscription, event);                break;            default:                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);        }    }

如果是postThread或者是MainThread的话调用invokdeSubscriber方法. 这里很简单,就是一个反射调用.该有的信息都有了.

void invokeSubscriber(Subscription subscription, Object event) {        try {            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);        } catch (InvocationTargetException e) {            handleSubscriberException(subscription, event, e.getCause());        } catch (IllegalAccessException e) {            throw new IllegalStateException("Unexpected exception", e);        }    }

如果是BackgroundThread并且是是是是在主线程中调用的话. backgroundPoster.enqueue(subscription, event); 这里边做了什么?

enqueue中最终会调用eventBus.getExecutorService().execute(this); 大概是有一个线程池去执行这个BackgroundPoster.执行的具体任务看run方法 最终有一个eventBus.invokeSubscriber(pendingPost);

void invokeSubscriber(PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } }

然后是反射调用: void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }

final class BackgroundPoster implements Runnable {    private final PendingPostQueue queue;    private final EventBus eventBus;    private volatile boolean executorRunning;    BackgroundPoster(EventBus eventBus) {        this.eventBus = eventBus;        queue = new PendingPostQueue();    }    public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            queue.enqueue(pendingPost);            if (!executorRunning) {                executorRunning = true;                eventBus.getExecutorService().execute(this);            }        }    }    @Override    public void run() {        try {            try {                while (true) {                    PendingPost pendingPost = queue.poll(1000);                    if (pendingPost == null) {                        synchronized (this) {                            // Check again, this time in synchronized                            pendingPost = queue.poll();                            if (pendingPost == null) {                                executorRunning = false;                                return;                            }                        }                    }                    eventBus.invokeSubscriber(pendingPost);                }            } catch (InterruptedException e) {                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);            }        } finally {            executorRunning = false;        }    }}

转载于:https://my.oschina.net/tanghaoo/blog/661098

你可能感兴趣的文章