提交 c14fa2f0 作者: 史连宁

限制下载协程数

上级 e09266a3
......@@ -7,6 +7,7 @@ require (
github.com/denisenkom/go-mssqldb v0.12.3
github.com/disintegration/imaging v1.6.2
github.com/levigross/grequests v0.0.0-20221222020224-9eee758d18d5
github.com/panjf2000/ants v1.3.0
github.com/schollz/progressbar/v3 v3.14.1
github.com/xuri/excelize/v2 v2.8.0
)
......
......@@ -26,6 +26,8 @@ github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
......
......@@ -19,7 +19,8 @@ var (
solution string
fileDir string
mentioneds string
threadNum int
threadNumIO int
threadNumNet int
)
// 获取命令行参数
......@@ -28,23 +29,45 @@ func getCmdParams() bool {
flag.StringVar(&beginDate, "beginDate", "", "开始时间(必填)(e.g., 2022-07-01)")
flag.StringVar(&endDate, "endDate", "", "结束时间(必填)(e.g., 2022-09-30)")
flag.StringVar(&devIds, "devIds", "", "设备编号(可选)(e.g., 129301230102,12039912093,21399422244)")
flag.StringVar(&solution, "solution", "", "解决方案(必填)(e.g., 01)\n01 计算设备可用率\n02 灯诱及性诱日统计\n03 本地图片核查")
flag.StringVar(&solution, "solution", "", "解决方案(必填)(e.g., 01)\n01 计算设备可用率\n02 灯诱及性诱日统计\n03 本地图片核查\n04 目录下文件数量统计")
flag.StringVar(&fileDir, "fileDir", "", "文件路径(可选)(e.g., D:/iotImage)")
flag.IntVar(&threadNum, "threadNum", 100, "并发线程数量(可选)(e.g., 100)")
flag.IntVar(&threadNumIO, "threadNumIO", 100, "IO操作并发协程数量(可选)(e.g., 100)")
flag.IntVar(&threadNumNet, "threadNumNet", 10, "Net操作并发协程数量(可选)(e.g., 10)")
flag.StringVar(&mentioneds, "mentioneds", "", "消息提醒人员(可选)(e.g., gpguo@jinhetech.com,eszhong@jinhetech.com)")
flag.Parse()
if provIds == "" || beginDate == "" || endDate == "" || solution == "" {
fmt.Println("必要参数缺失,请使用 -help 命令查看入参!")
if solution == "" {
fmt.Println("必要参数(solution)缺失,请使用 -help 命令查看入参!")
return false
} else if solution == "03" && fileDir == "" {
fmt.Println("本地图片核查请配置文件路径 -fileDir!")
}
switch solution {
case "01":
case "02":
if provIds == "" || beginDate == "" || endDate == "" {
fmt.Println("必要参数(provIds beginDate endDate)缺失,请使用 -help 命令查看入参!")
return false
}
break
case "03":
if provIds == "" || beginDate == "" || endDate == "" || fileDir == "" {
fmt.Println("必要参数(provIds beginDate endDate fileDir)缺失,请使用 -help 命令查看入参!")
return false
}
break
case "04":
if fileDir == "" {
fmt.Println("必要参数(fileDir)缺失,请使用 -help 命令查看入参!")
return false
}
break
default:
fmt.Println("无效的参数(solution),请使用 -help 命令查看入参!")
return false
} else {
return true
}
return true
}
func main() {
start := time.Now().Unix()
if !getCmdParams() {
return
......@@ -64,6 +87,7 @@ func main() {
}
provIdList := strings.Split(provIds, ",")
var fileList [][]string
var dataCount int
for _, provId := range provIdList {
prov := provId
var filepath, msg string
......@@ -72,7 +96,11 @@ func main() {
} else if solution == "02" {
filepath, msg = solutions.LightAndSexLureStatistics(prov, beginDate, endDate)
} else if solution == "03" {
filepath, msg = solutions.LocalImageFileCheck(prov, beginDate, endDate, fileDir, threadNum)
var count int
filepath, msg, count = solutions.LocalImageFileCheck(prov, beginDate, endDate, fileDir, threadNumIO, threadNumNet)
dataCount += count
} else if solution == "04" {
solutions.CountFileNum(fileDir)
}
if filepath != "" {
fileList = append(fileList, []string{filepath, msg})
......@@ -82,4 +110,7 @@ func main() {
utils.WechatFileUpload(itm[0], itm[1], &mentionedList, nil)
time.Sleep(1 * time.Second)
}
end := time.Now().Unix()
fmt.Printf("执行结束,共计耗时%dmin,数据量%d\n", (end-start)/60, dataCount)
}
package solutions
import (
"fmt"
"os"
"path/filepath"
)
func CountFileNum(filePath string) {
count := 0
var size int64
err := filepath.Walk(filePath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
count++
}
return nil
})
if err != nil {
fmt.Println("执行异常:" + err.Error())
} else {
fmt.Printf("执行成功:共计文件%d个,文件体积%dMB", count, size/(1024*1024))
}
}
......@@ -6,7 +6,10 @@ import (
"fmt"
"github.com/disintegration/imaging"
"github.com/levigross/grequests"
"github.com/panjf2000/ants"
"github.com/schollz/progressbar/v3"
//"github.com/schollz/progressbar/v3"
"github.com/xuri/excelize/v2"
"math"
"net/http"
......@@ -22,50 +25,91 @@ type datImage struct {
ImageID int64
ImageCode string
OriginUrl sql.NullString
noOriginal bool
noPressed bool
}
func LocalImageFileCheck(provId, beginDate, endDate, fileDir string, threadNum int) (filepath, msg string) {
func LocalImageFileCheck(provId, beginDate, endDate, fileDir string, threadNum int, threadNumNet int) (filepath, msg string, dataNum int) {
fmt.Printf("开始核验T_Dat_Image_%s[%s~%s]的图片\n", provId, beginDate, endDate)
taskGroup, allCount, err := getDBImageList(provId, beginDate, endDate, threadNum)
datImageList, allCount, err := getDBImageList(provId, beginDate, endDate)
if err != nil {
fmt.Println("获取数据异常!")
return "", ""
return "", "", 0
}
wg := sync.WaitGroup{}
wg.Add(len(taskGroup))
noImageChan := make(chan []interface{}, allCount)
fmt.Printf("核验数据量:%d,划分组数:%d\n", allCount, len(taskGroup))
needDownChan := make(chan datImage, allCount)
bar := progressbar.Default(allCount)
for _, tasks := range taskGroup {
tasks := tasks
go func() {
wg := sync.WaitGroup{}
wg.Add(len(datImageList))
p, _ := ants.NewPool(threadNum)
defer p.Release()
fmt.Printf("核验数据量:%d,协程池容量:%d\n", allCount, threadNum)
for _, task := range datImageList {
_ = p.Submit(func() {
defer wg.Done()
for _, task := range tasks {
chackExistFile(needDownChan, task, fileDir)
})
}
wg.Wait()
close(needDownChan)
bar.ChangeMax(len(needDownChan))
}()
noImageChan := make(chan []interface{}, allCount)
wg := sync.WaitGroup{}
pd, _ := ants.NewPool(threadNumNet)
defer pd.Release()
fmt.Printf("下载协程池容量:%d\n", threadNumNet)
for task := range needDownChan {
wg.Add(1)
_ = pd.Submit(func() {
defer wg.Done()
dwonloadOne(noImageChan, task, fileDir)
bar.Add(1)
})
}
wg.Wait()
close(noImageChan)
f, m := genNoImageDataReport(noImageChan, fmt.Sprintf("%s(%s~%s)", static.Citys[provId], beginDate, endDate))
return f, m, len(datImageList)
}
func chackExistFile(needDownChan chan datImage, task datImage, fileDir string) {
originalPath := fmt.Sprintf("%s/%s", fileDir, strings.ReplaceAll(task.ImageCode, "?pa=Y", ""))
pressedPath := strings.ReplaceAll(originalPath, "original", "pressed")
noOriginal := false
noPressed := false
task.noOriginal = false
task.noPressed = false
if _, err := os.Stat(originalPath); err != nil && os.IsNotExist(err) {
noOriginal = downOriginalFile(originalPath, task)
task.noOriginal = true
}
if _, err := os.Stat(pressedPath); err != nil && os.IsNotExist(err) {
noPressed = downPressedFile(pressedPath, task, noOriginal, originalPath)
task.noPressed = true
}
if noOriginal || noPressed {
noImageChan <- []interface{}{task.ImageID, noOriginal, noPressed, task.ImageCode, task.OriginUrl.String}
if task.noOriginal || task.noPressed {
needDownChan <- task
}
bar.Add(1)
}
func dwonloadOne(noImageChan chan []interface{}, task datImage, fileDir string) {
originalPath := fmt.Sprintf("%s/%s", fileDir, strings.ReplaceAll(task.ImageCode, "?pa=Y", ""))
pressedPath := strings.ReplaceAll(originalPath, "original", "pressed")
if task.noOriginal {
task.noOriginal = downOriginalFile(originalPath, task)
}
}()
if task.noPressed {
task.noPressed = downPressedFile(pressedPath, task, task.noOriginal, originalPath)
}
if task.noOriginal || task.noPressed {
noImageChan <- []interface{}{task.ImageID, task.noOriginal, task.noPressed, task.ImageCode, task.OriginUrl.String}
}
wg.Wait()
close(noImageChan)
return genNoImageDataReport(noImageChan, fmt.Sprintf("%s(%s~%s)", static.Citys[provId], beginDate, endDate))
}
func getDBImageList(provId, beginDate, endDate string, threadNum int) ([][]datImage, int64, error) {
func getDBImageList(provId, beginDate, endDate string) ([]datImage, int64, error) {
dbInfo := &eneity.DbInfo{
Server: static.Server,
User: static.User,
......@@ -104,8 +148,7 @@ func getDBImageList(provId, beginDate, endDate string, threadNum int) ([][]datIm
fmt.Println("数据库连接关闭异常:", err)
return nil, 0, err
}
batch := len(datImageList) / threadNum
return splitSlice(datImageList, batch+1), int64(len(datImageList)), nil
return datImageList, int64(len(datImageList)), nil
}
func downOriginalFile(savePath string, di datImage) bool {
......@@ -131,13 +174,13 @@ func downPressedFile(savePath string, di datImage, noOriginal bool, originalPath
if err == nil {
return true
}
if noOriginal {
return false
}
err = genThumbByOriginFile(savePath, originalPath)
if err == nil {
return true
}
//if noOriginal {
// return false
//}
//err = genThumbByOriginFile(savePath, originalPath)
//if err == nil {
// return true
//}
return false
}
......
......@@ -2,7 +2,7 @@ package static
// 数据库连接参数
var (
Server = "218.94.154.74"
Server = "10.51.250.121"
User = "xcanary"
Password = "$L7x5hkeN=YC"
Port = 39782
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论