title: 源码分析之ThreadLocal tags:
- Thread
- ThreadLocal
- ThreadLocalMap
- WeakReference
- InheritableThreadLocal categories: Thread date: 2017-10-12 20:09:11
前言
Java项目中通常为了并发数据准确性经常使用Lock或者synchronized来作为并发的手段。
也就是说作为共享资源必然需要通过同步等手段来实现。那么转换一下思路,
我们确实在每个地方都需要用到共享资源么?
如果我们所有的变量都是私有的 那自然不需要同步就是thread-safe的
ThreadLocal就是这样应运而生。人如其名就是线程私有对象。
实现
ThreadLocal
/*Each thread holds an implicit reference to its copy of a thread-local * variable as long as the thread is alive and the ThreadLocal * instance is accessible; after a thread goes away, all of its copies of * thread-local instances are subject to garbage collection (unless other * references to these copies exist). * * @author Josh Bloch and Doug Lea * @since 1.2 */ public class ThreadLocal
{ /** * ThreadLocals rely on per-thread linear-probe hash maps attached * to each thread (Thread.threadLocals and * inheritableThreadLocals). The ThreadLocal objects act as keys, * searched via threadLocalHashCode. This is a custom hash code * (useful only within ThreadLocalMaps) that eliminates collisions * in the common case where consecutively constructed ThreadLocals * are used by the same threads, while remaining well-behaved in * less common cases. */ private final int threadLocalHashCode = nextHashCode(); /** * The next hash code to be given out. Updated atomically. Starts at * zero. */ private static AtomicInteger nextHashCode = new AtomicInteger(); /** * The difference between successively generated hash codes - turns * implicit sequential thread-local IDs into near-optimally spread * multiplicative hash values for power-of-two-sized tables. */ private static final int HASH_INCREMENT = 0x61c88647; /** * Returns the next hash code. */ private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } /** * Returns the current thread's "initial value" for this * thread-local variable. This method will be invoked the first * time a thread accesses the variable with the { @link #get} * method, unless the thread previously invoked the { @link #set} * method, in which case the initialValue method will not * be invoked for the thread. Normally, this method is invoked at * most once per thread, but it may be invoked again in case of * subsequent invocations of { @link #remove} followed by { @link #get}. * * This implementation simply returns null; if the * programmer desires thread-local variables to have an initial * value other than null, ThreadLocal must be * subclassed, and this method overridden. Typically, an * anonymous inner class will be used. * * @return the initial value for this thread-local */ protected T initialValue() { return null; } /** * Creates a thread local variable. */ public ThreadLocal() { } /** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {
@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) return (T)e.value; } return setInitialValue(); } /** * Variant of set() to establish initialValue. Used instead * of set() in case user has overridden the set() method. * * @return the initial value */ private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } /** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the { @link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */ public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } /** * Removes the current thread's value for this thread-local * variable. If this thread-local variable is subsequently * { @linkplain #get read} by the current thread, its value will be * reinitialized by invoking its { @link #initialValue} method, * unless its value is { @linkplain #set set} by the current thread * in the interim. This may result in multiple invocations of the * initialValue method in the current thread. * * @since 1.5 */ public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); } /** * Get the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @return the map */ ThreadLocalMap getMap(Thread t) { return t.threadLocals; } /** * Create the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the map * @param map the map to store. */ void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } }复制代码
从上述代码可以看到ThreadLocal是存放在当前线程中
Thread t = Thread.currentThread();复制代码
通过上述代码获取到当前线程。而线程存在一个字段threadLocals这个
这个存放了一个ThreadLocalMap(名字是map但是没有实现map的接口 实际确实也是干的map的事情)
其实可以粗略的认为每个线程存在一个HashMap key是ThreadLocal变量 value为对应泛型的值
那么获取对应数据只要使用get即可(也提供了初始化变量功能)
放入数据直接调用set即可
ThreadLocalMap
/** * ThreadLocalMap is a customized hash map suitable only for * maintaining thread local values. No operations are exported * outside of the ThreadLocal class. The class is package private to * allow declaration of fields in class Thread. To help deal with * very large and long-lived usages, the hash table entries use * WeakReferences for keys. However, since reference queues are not * used, stale entries are guaranteed to be removed only when * the table starts running out of space. */ static class ThreadLocalMap { /** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference{ /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal k, Object v) { super(k); value = v; } } /** * The initial capacity -- MUST be a power of two. */ private static final int INITIAL_CAPACITY = 16; /** * The table, resized as necessary. * table.length MUST always be a power of two. */ private Entry[] table; /** * The number of entries in the table. */ private int size = 0; /** * The next size value at which to resize. */ private int threshold; // Default to 0 /** * Set the resize threshold to maintain at worst a 2/3 load factor. */ private void setThreshold(int len) { threshold = len * 2 / 3; } /** * Increment i modulo len. */ private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } /** * Decrement i modulo len. */ private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); } /** * Construct a new map initially containing (firstKey, firstValue). * ThreadLocalMaps are constructed lazily, so we only create * one when we have at least one entry to put in it. */ ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } /** * Construct a new map including all Inheritable ThreadLocals * from given parent map. Called only by createInheritedMap. * * @param parentMap the map associated with parent thread. */ private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { ThreadLocal key = e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } } }复制代码
看一下ThreadLocalMap 有一个很关键的静态类Entry
private static class Entryextends WeakReference
WeakReference可能大部分开发并没有注意过(Android开发者可能经常会使用)
Java从1.2版本开始引入了4种引用,这4种引用的级别由高到低依次为:
** 强引用 > 软引用 > 弱引用 > 虚引用**
⑴强引用(StrongReference)
强引用是使用最普遍的引用。如果一个对象具有强引用,那垃圾回收器绝不会回收它。当内存空间不足,Java虚拟机宁愿抛出OutOfMemoryError错误,使程序异常终止,也不会靠随意回收具有强引用的对象来解决内存不足的问题。⑵软引用(SoftReference)
如果一个对象只具有软引用,则内存空间足够,垃圾回收器就不会回收它;如果内存空间不足了,就会回收这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。软引用可用来实现内存敏感的高速缓存。
软引用可以和一个引用队列(ReferenceQueue)联合使用,如果软引用所引用的对象被垃圾回收器回收,Java虚拟机就会把这个软引用加入到与之关联的引用队列中。
⑶弱引用(WeakReference)
弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程,因此不一定会很快发现那些只具有弱引用的对象。
弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java虚拟机就会把这个弱引用加入到与之关联的引用队列中。
⑷虚引用(PhantomReference)
“虚引用”顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收。
虚引用主要用来跟踪对象被垃圾回收器回收的活动。虚引用与软引用和弱引用的一个区别在于:虚引用必须和引用队列 (ReferenceQueue)联合使用。当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之 关联的引用队列中。
ThreadLocalMap中使用Entry来作为迭代元素,其中WeakReference修饰了ThreadLocal,那么当某个ThreadLocal变量没有强引用那么当GC扫描到就会回收该Entry,那么就无内存泄漏之虞了。
当然我们线程中一定会保留ThreadLocal对象的强引用,因此这边也不会回收。因此一个合理的使用每次线程池归还一定要调用remove(方便释放同时也不会对下一次borrow线程造成影响)
/** * Set the value associated with key. * * @param key the thread local object * @param value the value to be set */ private void set(ThreadLocal key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } /** * Re-pack and/or re-size the table. First scan the entire * table removing stale entries. If this doesn't sufficiently * shrink the size of the table, double the table size. */ private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); } /** * Double the capacity of the table. */ private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; }复制代码
这边数据的策略比较常见,当发生hash冲突直接将对应数据放入下一个不为空节点即可。当然涉及到HashMap(类似)免不了的就是扩容和rehash等操作
当然数据在删除的时候可能要将后面不为空的节点重新 计算需要挪动到指定的位置
/** * Remove the entry for key. */ private void remove(ThreadLocal key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } } /** * Expunge a stale entry by rehashing any possibly colliding entries * lying between staleSlot and the next null slot. This also expunges * any other stale entries encountered before the trailing null. See * Knuth, Section 6.4 * * @param staleSlot index of slot known to have null key * @return the index of the next null slot after staleSlot * (all between staleSlot and this slot will have been checked * for expunging). */ private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }复制代码
当然为了线程互不干扰我们可以需要调用ThreadLocal.remove方法移除数据
/** * Removes the current thread's value for this thread-local * variable. If this thread-local variable is subsequently * { @linkplain #get read} by the current thread, its value will be * reinitialized by invoking its { @link #initialValue} method, * unless its value is { @linkplain #set set} by the current thread * in the interim. This may result in multiple invocations of the * initialValue method in the current thread. * * @since 1.5 */ public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); }复制代码
InheritableThreadLocal
对于ThreadLocal各位可能会有些问题,比如将对应的数据封装到了线程中,但是后面调用比如异步任务之后就会发现对应的ThreadLocal变量取不出数据了。
这个场景很正常 比如在线程池起个线程发送消息等等
那么我们可以使用InheritableThreadLocal来实现,顾名思义,从这个线程中起的子线程将会可以继承对应变量。
/** * This class extends ThreadLocal to provide inheritance of values * from parent thread to child thread: when a child thread is created, the * child receives initial values for all inheritable thread-local variables * for which the parent has values. Normally the child's values will be * identical to the parent's; however, the child's value can be made an * arbitrary function of the parent's by overriding the childValue * method in this class. * *Inheritable thread-local variables are used in preference to * ordinary thread-local variables when the per-thread-attribute being * maintained in the variable (e.g., User ID, Transaction ID) must be * automatically transmitted to any child threads that are created. * * @author Josh Bloch and Doug Lea * @see ThreadLocal * @since 1.2 */ public class InheritableThreadLocal
extends ThreadLocal { /** * Computes the child's initial value for this inheritable thread-local * variable as a function of the parent's value at the time the child * thread is created. This method is called from within the parent * thread before the child is started. * * This method merely returns its input argument, and should be overridden * if a different behavior is desired. * * @param parentValue the parent thread's value * @return the child thread's initial value */ protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. * @param map the map to store. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }复制代码
women看一下Thread的初始化
/** * Initializes a Thread. * * @param g the Thread group * @param target the object whose run() method gets called * @param name the name of the new Thread * @param stackSize the desired stack size for the new thread, or * zero to indicate that this parameter is to be ignored. */ private void init(ThreadGroup g, Runnable target, String name, long stackSize) { if (name == null) { throw new NullPointerException("name cannot be null"); } Thread parent = currentThread(); SecurityManager security = System.getSecurityManager(); if (g == null) { /* Determine if it's an applet or not */ /* If there is a security manager, ask the security manager what to do. */ if (security != null) { g = security.getThreadGroup(); } /* If the security doesn't have a strong opinion of the matter use the parent thread group. */ if (g == null) { g = parent.getThreadGroup(); } } /* checkAccess regardless of whether or not threadgroup is explicitly passed in. */ g.checkAccess(); /* * Do we have the required permissions? */ if (security != null) { if (isCCLOverridden(getClass())) { security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); } } g.addUnstarted(); this.group = g; this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); this.name = name.toCharArray(); if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = AccessController.getContext(); this.target = target; setPriority(priority); if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* Set thread ID */ tid = nextThreadID(); }复制代码
很明显可以看到当parent的inheritableThreadLocals变量不为空将会传递到子线程中。
应用
通常会定义ThreadLocal变量为static final
比如我们会在定义
private static final ThreadLocal
> SQL_LIST_TL = new ThreadLocal<>(); private static final ThreadLocal IP_TL = new ThreadLocal<>(); private static final ThreadLocal USER_TL = new ThreadLocal<>(); private static final ThreadLocal ORG_TL = new ThreadLocal<>(); private static final ThreadLocal MAIN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal > IDS_OWN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal > PERMISSION_IDS_OWN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal DATASOURCE_ROUTING_TL = new ThreadLocal<>(); private static final ThreadLocal ACTION_TL = new ThreadLocal<>(); private static final ThreadLocal TYPE_TL = new ThreadLocal<>(); private static final ThreadLocal CUSTOMER_FOR_SUPPLIER_TL = new ThreadLocal<>(); private static final ThreadLocal CHANNEL_TL = new ThreadLocal<>(); private static final ThreadLocal SECURITY_ENABLE_TL = new ThreadLocal<>(); private static final ThreadLocal COUNT_ENABLE_TL = new ThreadLocal<>(); private static final List THREAD_LOCAL_LIST = new ArrayList<>(); private static final ThreadLocal LOGIN_DOMAIN_TL=new ThreadLocal<>(); static { THREAD_LOCAL_LIST.add(SQL_LIST_TL); THREAD_LOCAL_LIST.add(IP_TL); THREAD_LOCAL_LIST.add(USER_TL); THREAD_LOCAL_LIST.add(ORG_TL); THREAD_LOCAL_LIST.add(IDS_OWN_ORG_TL); THREAD_LOCAL_LIST.add(PERMISSION_IDS_OWN_ORG_TL); THREAD_LOCAL_LIST.add(DATASOURCE_ROUTING_TL); THREAD_LOCAL_LIST.add(ACTION_TL); THREAD_LOCAL_LIST.add(TYPE_TL); THREAD_LOCAL_LIST.add(CUSTOMER_FOR_SUPPLIER_TL); THREAD_LOCAL_LIST.add(CHANNEL_TL); THREAD_LOCAL_LIST.add(SECURITY_ENABLE_TL); THREAD_LOCAL_LIST.add(COUNT_ENABLE_TL); THREAD_LOCAL_LIST.add(MAIN_ORG_TL); THREAD_LOCAL_LIST.add(LOGIN_DOMAIN_TL); } public static void clearThreadLocal() { for (ThreadLocal tl : THREAD_LOCAL_LIST) { if (tl != null) { tl.remove(); } } }复制代码
clearThreadLocal会由统一释放切面的地方进行调用。这样完成threadLocal的清理复制代码