作者:Goland貓?
對(duì)于大型的互聯(lián)網(wǎng)應(yīng)用程序,如電商平臺(tái)、社交網(wǎng)絡(luò)、金融交易平臺(tái)等,每秒鐘都會(huì)收到大量的請(qǐng)求。在這些應(yīng)用程序中,需要使用高效的技術(shù)來(lái)應(yīng)對(duì)高并發(fā)的請(qǐng)求,尤其是在短時(shí)間內(nèi)處理大量的請(qǐng)求,如1分鐘百萬(wàn)請(qǐng)求。
同時(shí),為了降低用戶的使用門檻和提升用戶體驗(yàn),前端需要實(shí)現(xiàn)參數(shù)的無(wú)感知傳遞。這樣用戶在使用時(shí),無(wú)需擔(dān)心參數(shù)傳遞的問(wèn)題,能夠輕松地享受應(yīng)用程序的服務(wù)。
在處理1分鐘百萬(wàn)請(qǐng)求時(shí),需要使用高效的技術(shù)和算法,以提高請(qǐng)求的響應(yīng)速度和處理能力。Go語(yǔ)言以其高效性和并發(fā)性而聞名,因此成為處理高并發(fā)請(qǐng)求的優(yōu)秀選擇。Go中有多種模式可供選擇,如基于goroutine和channel的并發(fā)模型、使用池技術(shù)的協(xié)程模型等,以便根據(jù)具體應(yīng)用的需要來(lái)選擇適合的技術(shù)模式。
本文代碼參考搬至
W1
W1 結(jié)構(gòu)體類型,它有五個(gè)成員:
WgSend 用于等待任務(wù)發(fā)送的 goroutine 完成。
Wg 用于等待任務(wù)處理的 goroutine 完成。
MaxNum 表示 goroutine 池的大小。
Ch 是一個(gè)字符串類型的通道,用于傳遞任務(wù)。
DispatchStop 是一個(gè)空結(jié)構(gòu)體類型的通道,用于停止任務(wù)分發(fā)。
?
type?W1?struct?{
?WgSend???????*sync.WaitGroup
?Wg???????????*sync.WaitGroup
?MaxNum???????int
?Ch???????????chan?string
?DispatchStop?chan?struct{}
}
?
接下來(lái)是 Dispatch 方法,它將任務(wù)發(fā)送到通道 Ch 中。它通過(guò) for 循環(huán)來(lái)發(fā)送 10 倍于 MaxNum 的任務(wù),每個(gè)任務(wù)都是一個(gè) goroutine。defer 語(yǔ)句用于在任務(wù)完成時(shí)減少 WgSend 的計(jì)數(shù)。select 語(yǔ)句用于在任務(wù)分發(fā)被中止時(shí)退出任務(wù)發(fā)送。
Dispatch
?
func?(w?*W1)?Dispatch(job?string)?{
?w.WgSend.Add(10?*?w.MaxNum)
?for?i?:=?0;?i?10*w.MaxNum;?i++?{
??go?func(i?int)?{
???defer?w.WgSend.Done()
???select?{
???case?w.Ch?<-?fmt.Sprintf("%d",?i):
????return
???case?<-w.DispatchStop:
????fmt.Println("退出發(fā)送?job:?",?fmt.Sprintf("%d",?i))
????return
???}
??}(i)
?}
}
?
StartPool
然后是 StartPool 方法,它創(chuàng)建了一個(gè) goroutine 池來(lái)處理從通道 Ch 中讀取到的任務(wù)。
如果通道 Ch 還沒(méi)有被創(chuàng)建,那么它將被創(chuàng)建。如果計(jì)數(shù)器 WgSend 還沒(méi)有被創(chuàng)建,那么它也將被創(chuàng)建。如果計(jì)數(shù)器 Wg 還沒(méi)有被創(chuàng)建,那么它也將被創(chuàng)建。
如果通道 DispatchStop 還沒(méi)有被創(chuàng)建,那么它也將被創(chuàng)建。
for 循環(huán)用于創(chuàng)建 MaxNum 個(gè) goroutine 來(lái)處理從通道中讀取到的任務(wù)。defer 語(yǔ)句用于在任務(wù)完成時(shí)減少 Wg 的計(jì)數(shù)。
?
func?(w?*W1)?StartPool()?{
?if?w.Ch?==?nil?{
??w.Ch?=?make(chan?string,?w.MaxNum)
?}
?if?w.WgSend?==?nil?{
??w.WgSend?=?&sync.WaitGroup{}
?}
?if?w.Wg?==?nil?{
??w.Wg?=?&sync.WaitGroup{}
?}
?if?w.DispatchStop?==?nil?{
??w.DispatchStop?=?make(chan?struct{})
?}
?w.Wg.Add(w.MaxNum)
?for?i?:=?0;?i?
?
Stop
最后是 Stop 方法,它停止任務(wù)分發(fā)并等待所有任務(wù)完成。
它關(guān)閉了通道 DispatchStop,等待 WgSend 中的任務(wù)發(fā)送 goroutine 完成,然后關(guān)閉通道 Ch,等待 Wg 中的任務(wù)處理 goroutine 完成。
?
func?(w?*W1)?Stop()?{
?close(w.DispatchStop)
?w.WgSend.Wait()
?close(w.Ch)
?w.Wg.Wait()
}

?
W2
SubWorker
?
type?SubWorker?struct?{
?JobChan?chan?string
}
?
子協(xié)程,它有一個(gè) JobChan,用于接收任務(wù)。
Run:SubWorker 的方法,用于啟動(dòng)一個(gè)子協(xié)程,從 JobChan 中讀取任務(wù)并執(zhí)行。
?
func?(sw?*SubWorker)?Run(wg?*sync.WaitGroup,?poolCh?chan?chan?string,?quitCh?chan?struct{})?{
?if?sw.JobChan?==?nil?{
??sw.JobChan?=?make(chan?string)
?}
?wg.Add(1)
?go?func()?{
??defer?wg.Done()
??for?{
???poolCh?<-?sw.JobChan
???select?{
???case?res?:=?<-sw.JobChan:
????fmt.Printf("完成工作:?%s?
",?res)
???case?<-quitCh:
????fmt.Printf("消費(fèi)者結(jié)束......?
")
????return
???}
??}
?}()
}
?
W2
?
type?W2?struct?{
?SubWorkers?[]SubWorker
?Wg?????????*sync.WaitGroup
?MaxNum?????int
?ChPool?????chan?chan?string
?QuitChan???chan?struct{}
}
?
Dispatch
Dispatch:W2 的方法,用于從 ChPool 中獲取 TaskChan,將任務(wù)發(fā)送給一個(gè) SubWorker 執(zhí)行。
?
func?(w?*W2)?Dispatch(job?string)?{
?jobChan?:=?<-w.ChPool
?select?{
?case?jobChan?<-?job:
??fmt.Printf("發(fā)送任務(wù)?:?%s?完成?
",?job)
??return
?case?<-w.QuitChan:
??fmt.Printf("發(fā)送者(%s)結(jié)束?
",?job)
??return
?}
}
?
StartPool
StartPool:W2 的方法,用于初始化協(xié)程池,啟動(dòng)所有子協(xié)程并把 TaskChan 存儲(chǔ)在 ChPool 中。
?
func?(w?*W2)?StartPool()?{
?if?w.ChPool?==?nil?{
??w.ChPool?=?make(chan?chan?string,?w.MaxNum)
?}
?if?w.SubWorkers?==?nil?{
??w.SubWorkers?=?make([]SubWorker,?w.MaxNum)
?}
?if?w.Wg?==?nil?{
??w.Wg?=?&sync.WaitGroup{}
?}
?for?i?:=?0;?i?
?
Stop
Stop:W2 的方法,用于停止協(xié)程的工作,并等待所有協(xié)程結(jié)束。
?
func?(w?*W2)?Stop()?{
?close(w.QuitChan)
?w.Wg.Wait()
?close(w.ChPool)
}
?
DealW2 函數(shù)則是整個(gè)協(xié)程池的入口,它通過(guò) NewWorker 方法創(chuàng)建一個(gè) W2 實(shí)例,然后調(diào)用 StartPool 啟動(dòng)協(xié)程池,并通過(guò) Dispatch 發(fā)送任務(wù),最后調(diào)用 Stop 停止協(xié)程池。
?
func?DealW2(max?int)?{
?w?:=?NewWorker(w2,?max)
?w.StartPool()
?for?i?:=?0;?i?10*max;?i++?{
??go?w.Dispatch(fmt.Sprintf("%d",?i))
?}
?w.Stop()
}

?
個(gè)人見(jiàn)解
看到這里對(duì)于w2我已經(jīng)有點(diǎn)迷糊了,還能傳遞w.Wg, w.ChPool, w.QuitChan?
?
原來(lái)是golang里如果方法傳遞的不是地址,那么就會(huì)做一個(gè)拷貝,所以這里調(diào)用的wg根本就不是一個(gè)對(duì)象。
傳遞的地方傳遞地址就可以了,如果不傳遞地址,將會(huì)出現(xiàn)死鎖
go?doSomething(i,?&wg,?ch)
func?doSomething(index?int,?wg?*sync.WaitGroup,?ch?chan?int)?{
?
w1也有一個(gè)比較大的問(wèn)題。在處理請(qǐng)求時(shí),每個(gè) Goroutine 都會(huì)占用一定的系統(tǒng)資源,如果請(qǐng)求量過(guò)大,會(huì)造成 Goroutine 數(shù)量的劇增,消耗過(guò)多系統(tǒng)資源,程序可能會(huì)崩潰
探究原文
在這段代碼中,poolCh代表工作者池,sw.JobChan代表工作者的工作通道。當(dāng)一個(gè)工作者完成了工作后,它會(huì)將工作結(jié)果發(fā)送到sw.JobChan,此時(shí)可以通過(guò)case res := <-sw.JobChan:來(lái)接收該工作的結(jié)果。
在這個(gè)代碼塊中,還需要處理一個(gè)退出信號(hào)quitCh。因此,第二個(gè)case <-quitCh:用于檢測(cè)是否接收到了退出信號(hào)。如果接收到了退出信號(hào),程序?qū)⒋蛴〕鱿⒉⒔Y(jié)束。
需要注意的是,這兩個(gè)case語(yǔ)句是互斥的,只有當(dāng)工作者完成工作或收到退出信號(hào)時(shí),才會(huì)進(jìn)入其中一個(gè)語(yǔ)句。因此,這個(gè)循環(huán)可以保證在工作者完成工作或收到退出信號(hào)時(shí)退出。
需要讀取兩次sw.JobChan的原因是:第一次讀取用于將工作者的工作通道放回工作者池中,這樣其他工作者就可以使用該通道。第二次讀取用于接收工作者的工作結(jié)果或退出信號(hào)。因此,這兩次讀取是為了確保能夠在正確的時(shí)刻將工作者的工作通道放回工作者池中并正確地處理工作結(jié)果或退出信號(hào)。
根據(jù)w2的特點(diǎn) 我自己寫(xiě)了一個(gè)w2
?
import?(
???"fmt"
???"sync"
)
type?SubWorkerNew?struct?{
???JobChan?chan?string
}
type?W2New?struct?{
???SubWorkers?[]SubWorkerNew
???Wg?????????*sync.WaitGroup
???MaxNum?????int
???ChPool?????chan?chan?string
???QuitChan???chan?struct{}
}
func?NewW2(maxNum?int)?*W2New?{
???subWorkers?:=?make([]SubWorkerNew,?maxNum)
???for?i?:=?0;?i?
?
但是有幾個(gè)點(diǎn)需要注意
1.沒(méi)有考慮JobChan通道的緩沖區(qū)大小,如果有大量任務(wù)被并發(fā)分配,容易導(dǎo)致內(nèi)存占用過(guò)高;
2.每個(gè)線程都會(huì)執(zhí)行無(wú)限循環(huán),此時(shí)線程退出的條件是接收到QuitChan通道的信號(hào),可能導(dǎo)致線程的阻塞等問(wèn)題;
3.Dispatch函數(shù)的默認(rèn)情況下只會(huì)輸出"All workers busy",而不是阻塞,這意味著當(dāng)所有線程都處于忙碌狀態(tài)時(shí),任務(wù)會(huì)丟失
4.線程池啟動(dòng)后無(wú)法動(dòng)態(tài)擴(kuò)展或縮小。
優(yōu)化
這個(gè)優(yōu)化版本改了很多次。有一些需要注意的點(diǎn)是,不然會(huì)一直死鎖
?
1.使用sync.WaitGroup來(lái)確保線程池中所有線程都能夠啟動(dòng)并運(yùn)行;
2.在Stop函數(shù)中,先向SubWorker的JobChan中發(fā)送一個(gè)關(guān)閉信號(hào),再等待所有SubWorker線程退出;
3.在Dispatch函數(shù)中,將默認(rèn)情況下的輸出改為阻塞等待可用通道;
?
w2new
?
package?handle_million_requests
import?(
?"fmt"
?"sync"
?"time"
)
type?SubWorkerNew?struct?{
?Id??????int
?JobChan?chan?string
}
type?W2New?struct?{
?SubWorkers?[]SubWorkerNew
?MaxNum?????int
?ChPool?????chan?chan?string
?QuitChan???chan?struct{}
?Wg?????????*sync.WaitGroup
}
func?NewW2(maxNum?int)?*W2New?{
?chPool?:=?make(chan?chan?string,?maxNum)
?subWorkers?:=?make([]SubWorkerNew,?maxNum)
?for?i?:=?0;?i??1?{
??worker?:=?w.SubWorkers[w.MaxNum-1]
??close(worker.JobChan)
??w.MaxNum--
??w.SubWorkers?=?w.SubWorkers[:w.MaxNum]
?}
}
?
AddWorker和RemoveWorker,用于動(dòng)態(tài)擴(kuò)展/縮小線程池。
在AddWorker函數(shù)中,我們首先將MaxNum增加了1,然后創(chuàng)建一個(gè)新的SubWorkerNew結(jié)構(gòu)體,將其添加到SubWorkers中,并將其JobChan通道添加到ChPool通道中。最后,我們創(chuàng)建一個(gè)新的協(xié)程來(lái)處理新添加的SubWorkerNew并讓它進(jìn)入無(wú)限循環(huán),等待接收任務(wù)。
在RemoveWorker函數(shù)中,我們首先將MaxNum減少1,然后獲取最后一個(gè)SubWorkerNew結(jié)構(gòu)體,將它的JobChan通道發(fā)送到ChPool通道中,并從其通道中讀取任何待處理的任務(wù),最后創(chuàng)建一個(gè)新的協(xié)程來(lái)處理SubWorkerNew,繼續(xù)處理任務(wù)。
測(cè)試用例
?
func?TestW2New(t?*testing.T)?{??
????pool?:=?NewW2(3)??
????pool.StartPool()??
????pool.Dispatch("task?1")??
????pool.Dispatch("task?2")??
????pool.Dispatch("task?3")??
????pool.AddWorker()??
????pool.AddWorker()??
????pool.RemoveWorker()??
????pool.Stop()??
}

?
當(dāng)Dispatch函數(shù)向ChPool通道獲取可用通道時(shí),會(huì)從通道中取出一個(gè)SubWorker的JobChan通道,并將任務(wù)發(fā)送到該通道中。而對(duì)于SubWorker來(lái)說(shuō),并沒(méi)有進(jìn)行任務(wù)的使用次數(shù)限制,所以它可以處理多個(gè)任務(wù)。
在這個(gè)例子中,當(dāng)任務(wù)數(shù)量比SubWorker數(shù)量多時(shí),一個(gè)SubWorker的JobChan通道會(huì)接收到多個(gè)任務(wù),它們會(huì)在SubWorker的循環(huán)中按順序依次處理,直到JobChan中沒(méi)有未處理的任務(wù)為止。因此,如果任務(wù)數(shù)量特別大,可能會(huì)導(dǎo)致某些SubWorker的JobChan通道暫時(shí)處于未處理任務(wù)狀態(tài),而其他的SubWorker在執(zhí)行任務(wù)。
在測(cè)試結(jié)果中,最后三行中出現(xiàn)了多個(gè)"SubWorker 0 processing job",說(shuō)明SubWorker 0的JobChan通道接收了多個(gè)任務(wù),并且在其循環(huán)中處理這些任務(wù)。下面的代碼片段顯示了這個(gè)過(guò)程:
// SubWorker 0 的循環(huán)部分
?
for?{
????select?{
????case?job?:=?<-subWorker.JobChan:
????????fmt.Printf("SubWorker?%d?processing?job?%s
",?subWorker.Id,?job)
????case?<-w.QuitChan:
????????return
????}
}

審核編輯:湯梓紅
?
電子發(fā)燒友App































評(píng)論