Java7并发编程实战手册
线程管理
Thread/Runnable/Thread.State
线程的信息获取和设置
线程的中断
sleep/yield
join
daemon
UncaughtExceptionHandler
Thread#setDefaultUncaughtExceptionHandler
ThreadLocal/InheritableThreadLocal
ThreadGroup
uncaughtException
ThreadFactory
进群:697699179可以获取Java各类入门学习资料!
这是我的微信公众号【编程study】各位大佬有空可以关注下,每天更新Java学习方法,感谢!
学习中遇到问题有不明白的地方,推荐加小编Java学习群:697699179内有视频教程 ,直播课程 ,等学习资料,期待你的加入
线程同步基础
synchronized 同步方法 this
synchronized 属性对象 object
同步代码块中使用条件
Object#wait/notify/notifyAll
虚假唤醒(while)
Lock
ReentrantLock
try/finally
ReadWriteLock
公平性 fair
Condition
while
lock/unlock之间
线程同步辅助类
Semaphore
内部计数器
acquire/release
acquireUninterruptibly
忽略线程中断且不会抛出任何异常
CountDownLatch
内部计数器被初始化之后就不能被再次初始化,唯一能改值的是countDown
CylicBarrier
BrokenBarrierException
reset
Phaser
在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步
Phaser(3) 指定参与阶段同步的线程数是3个
被Phaser类置于休眠的线程不会响应中断事件
awaitAdvanceInterruptibly(int phaser),被中断会抛出InterruptedException
onAdavance,阶段改变时被自动执行
Exchanger
只能同步两个线程
exchange调用后将休眠直到其他的线程到达
线程执行器
ThreadPoolExecutor/Executors
shutdown,当执行完所有待运行的任务后,它将结束执行,调用完毕立即返回,结合awaitTermination判断是否线程池是否关闭
awaitTermination(long timeout, TimeUnit unit)
提供很多方法获取自身状态的信息
newCachedThreadPool/newFixedThreadPool
Future/Callbale
submit
call() throws Exception
ThreadPoolExecutor#invokeAny
返回第一个完成任务且没有抛出异常的任务的执行结果
ThreadPoolExecutor#invokeAll
等待所有任务的完成
landon:是否可以用这个方法做场景心跳
while(!isDone) { -longstartTime = getCurrent() - 场景调度器提交场景任务,等待所有场景任务执行完毕(invokeAll),这样亦可以保证潜在的顺序问题,因为每次都是将当前的tick执行完毕 - 而非之前submit,如果一次tick执行超过50ms则下次循环又会向线程池提交任务,会出现同一个任务多个线程执行的潜在情况 -longendTime = getCurrent() -longsleepTime = startTime + TickInterval - endTime -if(sleepTime >0) sleep(sleepTime) -elseprintln("busy,oneTick executeTime:"+ sleepTime) }
ScheduledThreadPoolExecutor
schedule
scheduleAtFixedRate
scheduleWithFixDelay
Future
cancel
get
FutureTask
done,允许在执行器中的任务执行结束之后还可以执行一些代码
FutureTask implements RunnableFuture
FutureTask(Callable callable)
CompletionService
submit,提交任务
poll/take,获取任务已经完成的Future对象
即任务完成后将Future对象放到一个完成的阻塞队列中
RejectedExecutionHandler
Fork/Join框架
分治
fork-将一个任务拆分成更小的多个任务
join-等待子任务的完成执行
工作窃取算法-work-stealing algorithm
ForkJoinPool
ForkJoinTask
RecursiveAction 任务无返回结果
RecursiveTask 任务有返回结果
递归
if(problemsize>defaultsize){ tasks = divide(task) execute(tasks) }else{resolve problem using another algorithm}
Task extends RecursiveAction // 无返回结果
compute
if(...)// divide{Taskt1 =newTask(...)Taskt2 =newTask(...) invokeAll(t1,t2)// 同步调用,执行创建的多个子任务}else{...}
ForkJoinPool#execute(task) --默认创建一个线程数等于计算机cpu数目的线程池
合并任务的结果
if(problemsize>defaultsize) tasks = divide(task) execute(tasks) groupResultsreturnresultelseresolve problemreturnresult
ForkJoinTask#get 等待返回任务计算结果
异步运行任务
同步方式如invokeAll,任务被挂起,直到任务被发送到fork/join线程池中执行。该方式允许ForkJoinPool采用工作窃取算法
异步方式如fork时(立即返回),无法使用该算法
ForkJoinTask#V join()
get和join有区别
任务中抛出异常
ForkJoinTask#isCompletedAbnormally 检查主任务或者子任务是否抛出了异常
getException 获取异常信息
任务抛出运行时异常,会影响其父任务...父任务..
取消任务
在任务开始前可以取消
例:在数字数组中寻找一组数字,拆分为更小的问题,但仅关心数字的一次出现。当我们找到他时,就会取消其他子任务
可存储发送到线程池中的所有任务,当发现当前任务找到数字后,取消非当前任务的所有任务
如果任务正在运行或者已经执行结束,则不能取消,cancel返回false。因此可以尝试去取消所有的任务而不用担心可能带来的间接影响
并发集合
ConcurrentLinkedDeque 非阻塞
getFirst.../peekFirst.../removeFirst.../pollFirst... - 双端队列
LinkedBlockingDeque 阻塞
put/take/poll... 双端队列
PriorityBlockingQueue
队列的元素必须实现Comparable接口
按照排序结果决定插入元素的位置
DelayQueue
元素必须实现Delayed接口
public interface Delayed extends Comparable
两个待实现方法
compareTo(Delayed o)
getDelay(Timeunit unit)
从队列取元素时,到期的元素会返回(未来的元素等待到期_延迟时间)
landon:是否可以用于游戏服务器中的计时器 如果有多个timer 按照到期时间排队_
ConcurrentSkipListMap
根据键值排序所有元素
内部机制-Skip List
# firstEntry/lastEntry/subMap/...
landon:可以和TreeMap做比较,一个线程不安全,一个线程安全
ThreadLocalRandom
# current,该方法是一个静态方法,如果调用线程还未关联随机数对象,就会初始化一个(localInit)
Atomic Variable
compareAndSet,这是是最主要的方法
判断内存变量的值是否是expect,如果是说明未被其他线程改过,可以直接用update新值更新
否则说明被其他线程改过,进而可以选择下一步处理方式
AtomicReference#public final boolean compareAndSet(V expect, V update)
CAS
AtomicIntegerArray -原子数组
LinkedTransferQueue
AtomicReference--实现单例
publicclassSingleton{privatestaticfinalAtomicReference INSTANCE =newAtomicReference();privateSingleton(){}publicstaticSingletongetInstance(){for(;;) { Singleton current = INSTANCE.get();if(current !=null) {returncurrent; } current =newSingleton();if(INSTANCE.compareAndSet(null, current)) {returncurrent; } } }}
定制并发类
定制ThreadPoolExecutor
继承该类
覆写_记得调用super
shutdown
shutdownNow
输出如等待执行的任务数目 getQueue().size...
getCompletedTaskCount,获得已执行过的任务数
getActiveCount,获得正在执行的任务数
beforeExecute
afterExecute
实现基于优先级的Executor类
ThreadPoolExector参数传入PriorityBlockingQueue
如果是fixed两个线程,那么前2个任务是被2个线程执行的;后面的排队任务按照优先级顺序执行
实现ThreadFactory接口生成定制线程
覆写newThread方法
返回的Thread可以是定制的Thread对象(MyThread extends Thread)
可外部直接调用Threadfactory#newThread返回线程
在Executor对象中使用ThreadFactory
线程池参数中指定线程工厂
Executors内部有一个DefaultThreadFactory
Executors$DefaultThreadFactory
定制运行在定时线程池中的任务
继承ScheduledThreadPoolExecutor
覆写protected RunnableScheduledFuture decorateTask(
Runnable runnable, RunnableScheduledFuture task)
可自定义一个调度类extends FutureTask implements RunnableScheduledFuture参考ScheduledThreadPoolExecutor$ScheduledFutureTask
通过实现ThreadFactory接口为Fork/Join框架生成定制线程
ForkJoinPool内部实现
一个任务队列,存放等待被执行的任务
一个执行这些任务的线程池
ForkJoinWorkerThread持有一个ForkJoinPool.WorkQueue workQueue
work-stealing mechanics
MyWorkerThread extends ForkJoinWorkerThread
onStart
onTermination
调用super
MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory
newThread
定制运行在Fork/Join框架中的任务
MyWorkTask extends ForkJoinTask(Void)
getRawResult
exec
调用compute抽象方法
实现定制Lock类
ReentrantLock内部有一个很重要的类Sync(AQS)
abstract static class Sync extends AbstractQueuedSynchronizer
static final class FairSync extends Sync
static final class NonfairSync extends Sync
自定义实现一个MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
tryActuire
tryRelease
自定义实现MyLock implements Lock
lock
unlock
tryLock
newCondition
实现基于优先级的传输队列
MyPriorityTransferQueue extends PriorityBlockingQueue implements TransferQueuetryTransfer
transfer
hasWaitingConsumer
getWaitingConsumerCount
take
实现自己的原子对象
MyCounter extends AtomicInteger
for(;;) {intvalue=get();if(value==10)returnfalse;else{intnewValue =value+1; boolean changed = compareAndSet(value,newValue);if(changed)returntrue; } }
测试并发应用程序
监控Lock接口
ReentrantLock内部方法都是protected的,所以可以继承
MyLock extends ReentrantLock
调用Thread getOwner()
调用Collection getQueuedThreads()
...
监控Phaser类
getPhase
getRegisteredParties
getArrivedParties
...
监控执行器框架
ThreadPoolExecutor
getPoolSize
getCorePoolSize
getActiveCount
getTaskCount
...
监控Fork/Join池
getPoolSize
getParallelism
getActiveThreadCount
getStealCount
...
输出高效的日志信息
输出必要的信息
为消息设定恰当的级别
使用FindBugs分析并发代码
配置Eclipse调试并发代码
可选择Default suspend policy for new breakpoints的值为Suspend VM
默认为Suspend Thread
配置NetBeans调试并发代码
使用MultithreadedTC测试并发代码
MultithreadedTestCase
waitForTick