多線程開發可以更好的發揮多核cpu性能,常用的多線程設計模式有:Future、Master-Worker、Guard
Susperionsion(保護性暫掛模式)、不變模式、生產者-消費者
模式;jdk除了定義了若干并發的數據結構,也內置了多線程框架和各種線程池;
鎖(分為內部鎖、重入鎖、讀寫鎖)、ThreadLocal、信號量等在并發控制中發揮著巨大的作用。
一、Future模型
1.什么是Future模型
該模型是將異步請求和代理模式聯合的模型產物。見下圖:
客戶端發送一個長時間的請求,服務端不需等待該數據處理完成便立即返回一個偽造的代理數據(相當于商品訂單,不是商品本身),用戶也無需等待,先去執行其他的若干操作后,再去調用服務器已經完成組裝的真實數據。該模型充分利用了等待的時間片段。
2.Future模式的核心結構:
Main:啟動系統,調用Client發出請求;
Client:返回Data對象,理解返回FutureData(偽造的數據或未來數據),并開啟ClientThread線程裝配RealData(真實數據);
Data:返回數據的接口;
FutureData:Future數據,構造很快,但是是一個虛擬的數據,需要裝配RealData;
RealData:真實數據,構造比較慢。
3.Future模式的代碼實現:
(1)Main函數:
public class Main {
public static void main(String[] args){
Client client = new Client();
//理解返回一個FutureData
Data data = client.request("name");
System.out.println("請求完畢!");
try{
//處理其他業務
//這個過程中,真是數據RealData組裝完成,重復利用等待時間
Thread.sleep(2000);
}catch (Exception e){
}
//真實數據
System.out.println("數據 = "+ data.getResult());
}
}
(2)Client的實現:
public class Client {
public Data request(final String queryStr){
final FutureData future = new FutureData();
//開啟一個新的線程來構造真實數據
new Thread(){
public void run(){
RealData realData = new RealData(queryStr);
future.setRealData(realData); }
}.start();
return future;
}
}
(3)Data的實現:
public interface Data {
public String getResult();
}
(4)FutureData:
/**
* 是對RealData的一個包裝
* @author limin
*
*/
public class FutureData implements Data {
protected RealData realData =null;
protected boolean isReady = false;
public synchronized void setRealData(RealData realData){
if(isReady){
return;
}
this.realData=realData;
isReady=true;
notifyAll();
}
@Override
public synchronized String getResult() {
while(!isReady){
try{
wait();
}catch (Exception e){
}
}
return realData.result;
}
}
(5)RealData實現:
public class RealData implements Data {
protected String result;
public RealData(String para){
//構造比較慢
StringBuffer sb= new StringBuffer();
for(int i=0;i<10;i++){
sb.append(para);
try{
Thread.sleep(1000);
}catch(Exception e){
}
result= sb.toString();
}
}
@Override
public String getResult() {
return result;
}
}
4.注意:
FutureData是對RealData的包裝,是對真實數據的一個代理,封裝了獲取真實數據的等待過程。它們都實現了共同的接口,所以,針對客戶端程序組是沒有區別的;
客戶端在調用的方法中,單獨啟用一個線程來完成真實數據的組織,這對調用客戶端的main函數式封閉的;
因為咋FutureData中的notifyAll和wait函數,主程序會等待組裝完成后再會繼續主進程,也就是如果沒有組裝完成,main函數會一直等待。
二、Master-Worker模式
Master-Worker模式是常用的并行模式之一,它的核心思想是,系統有兩個進程協作工作:Master進程,負責接收和分配任務;Worker進程,負責處理子任務。當Worker進程將子任務處理完成后,結果返回給Master進程,由Master進程做歸納匯總,最后得到最終的結果。
2.1什么是Master-Worker模式:
該模式的結構圖:
結構圖:
Worker:用于實際處理一個任務;
Master:任務的分配和最終結果的合成;
Main:啟動程序,調度開啟Master。
2.2代碼實現:
下面的是一個簡易的Master-Worker框架實現。
1
(1)Master部分:
[java] view plain copy
package MasterWorker;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
//任務隊列
protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();
//Worker進程隊列
protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();
//子任務處理結果集
protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();
//是否所有的子任務都結束了
public boolean isComplete(){
for(Map.Entry<String , Thread> entry:threadMap.entrySet()){
if(entry.getValue().getState()!=Thread.State.TERMINATED){
return false;
}
}
return true ;
}
//Master的構造,需要一個Worker進程邏輯,和需要Worker進程數量
public Master(Worker worker,int countWorker){
worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
for(int i=0;i<countWorker;i++){
threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
}
}
//提交一個任務
public void submit(Object job){
workQueue.add(job);
}
//返回子任務結果集
public Map<String ,Object> getResultMap(){
return resultMap;
}
//開始運行所有的Worker進程,進行處理
public void execute(){
for(Map.Entry<String , Thread> entry:threadMap.entrySet()){
entry.getValue().start();
}
}
}
(2)Worker進程實現:
package MasterWorker;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable{
//任務隊列,用于取得子任務
protected Queue<Object> workQueue;
//子任務處理結果集
protected Map<String ,Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue){
this.workQueue= workQueue;
}
public void setResultMap(Map<String ,Object> resultMap){
this.resultMap=resultMap;
}
//子任務處理的邏輯,在子類中實現具體邏輯
public Object handle(Object input){
return input;
}
@Override
public void run() {
while(true){
//獲取子任務
Object input= workQueue.poll();
if(input==null){
break;
}
//處理子任務
Object re = handle(input);
resultMap.put(Integer.toString(input.hashCode()), re);
}
}
}
(3)運用這個小框架計算1——100的立方和,PlusWorker的實現:
package MasterWorker;
public class PlusWorker extends Worker {
@Override
public Object handle(Object input) {
Integer i =(Integer)input;
return i*i*i;
}
}
(4)進行計算的Main函數:
package MasterWorker;
import java.util.Map;
import java.util.Set;
public class Main {
/**
* @param args
*/
public static void main(String[] args) {
//固定使用5個Worker,并指定Worker
Master m = new Master(new PlusWorker(), 5);
//提交100個子任務
for(int i=0;i<100;i++){
m.submit(i);
}
//開始計算
m.execute();
int re= 0;
//保存最終結算結果
Map<String ,Object> resultMap =m.getResultMap();
//不需要等待所有Worker都執行完成,即可開始計算最終結果
while(resultMap.size()>0 || !m.isComplete()){
Set<String> keys = resultMap.keySet();
String key =null;
for(String k:keys){
key=k;
break;
}
Integer i =null;
if(key!=null){
i=(Integer)resultMap.get(key);
}
if(i!=null){
//最終結果
re+=i;
}
if(key!=null){
//移除已經被計算過的項
resultMap.remove(key);
}
}
}
}
2.3總結:
Master-Worker模式是一種將串行任務并行化的方案,被分解的子任務在系統中可以被并行處理,同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果集。
三、生產者-消費模式
生產者-消費模式,通常有兩類線程,即若干個生產者線程和若干個消費者線程。生產者線程負責提交用戶請求,消費者線程負責具體處理生產者提交的任務。兩者之間通過共享內存緩沖去進行通信。
3.1.架構模式圖:
類圖:
生產者:提交用戶請求,提取用戶任務,并裝入內存緩沖區;
消費者:在內存緩沖區中提取并處理任務;
內存緩沖區:緩存生產者提交的任務或數據,供消費者使用;
任務:生產者向內存緩沖區提交的數據結構;
Main:使用生產者和消費者的客戶端。
3.2代碼實現一個基于生產者-消費者模式的求整數平方的并行計算:
(1)Producer生產者線程:
package ProducerConsumer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable{
//Volatile修飾的成員變量在每次被線程訪問時,都強迫從共享內存中重讀該成員變量的值。
//而且,當成員變量發生變化時,強迫線程將變化值回寫到共享內存。
//這樣在任何時刻,兩個不同的線程總是看到某個成員變量的同一個值。
private volatile boolean isRunning= true;
//內存緩沖區
private BlockingQueue<PCData> queue;
//總數,原子操作
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME=1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
PCData data=null;
Random r = new Random();
System.out.println("start producer id = "+ Thread .currentThread().getId());
try{
while(isRunning){
Thread.sleep(r.nextInt(SLEEPTIME));
//構造任務數據
data= new PCData(count.incrementAndGet());
System.out.println("data is put into queue ");
//提交數據到緩沖區
if(!queue.offer(data,2,TimeUnit.SECONDS)){
System.out.println("faile to put data: "+ data);
}
}
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){
isRunning=false;
}
}
(2)Consumer消費者線程:
package ProducerConsumer;
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
//緩沖區
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME=1000;
public Consumer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id= "+ Thread .currentThread().getId());
Random r = new Random();
try {
//提取任務
while(true){
PCData data= queue.take();
if(null!= data){
//計算平方
int re= data.getData()*data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}",
data.getData(),data.getData(),re
));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
(3)PCData共享數據模型:
package ProducerConsumer;
public final class PCData {
private final int intData;
public PCData(int d) {
intData=d;
}
public PCData(String d) {
intData=Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+ intData ;
}
}
(4)Main函數:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class Main {
/**
* @param args
*/
public static void main(String[] args) throws InterruptedException{
//建立緩沖區
BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);
//建立生產者
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
//建立消費者
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
//建立線程池
ExecutorService service = Executors.newCachedThreadPool();
//運行生產者
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
//運行消費者
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(10*1000);
//停止生產者
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
3.3注意:
volatile關鍵字:Volatile修飾的成員變量在每次被線程訪問時,都強迫從共享內存中重讀該成員變量的值。而且,當成員變量發生變化時,強迫線程將變化值回寫到共享內存。這樣在任何時刻,兩個不同的線程總是看到某個成員變量的同一個值。
生產-消費模式的核心組件是共享內存緩沖區,是兩者的通信橋梁,起到解耦作用,優化系統整體結構。
由于緩沖區的存在,生產者和消費者,無論誰在某一局部時間內速度相對較高,都可以使用緩沖區得到緩解,保證系統正常運行,這在一定程度上緩解了性能瓶頸對系統系能的影響。
四、不變模式
一個類的內部狀態創建后,在整個生命期間都不會發生變化時,就是不變類
4.1 不變模式不需要同步
public final class Product {
//確保無子類
private final String no;
// 私有屬性,不會被其他對象獲取
private final String name;
//final保證屬性不會被2次賦值
private final double price;
public Product(String no, String name, double price) {
//在創建對象時,必須指定數據
// super();
// 因為創建之后,無法進行修改
this.no = no;
this.name = name;
this.price = price;
}
public String getNo() {
return no;
}
public String getName() {
return name;
}
public double getPrice() {
return price;
}
}
4.2下面是JDK提供幾種不變的模式
java.lang.String
java.lang.Boolean
java.lang.Byte
java.lang.Character
java.lang.Double
java.lang.Float
java.lang.Integer
java.lang.Long
java.lang.Short