V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
PungentSauce
V2EX  ›  Go 编程语言

看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?

  •  
  •   PungentSauce · 2025 年 7 月 25 日 · 6496 次点击
    这是一个创建于 179 天前的主题,其中的信息可能已经有所发展或是发生改变。

    这里是一些脚本调用的地方,工具源码放在后面两个代码块了。

    util.TaskConsumer[[]string](10).
    		SetP(lineopt.IterExcel2("xxx.xlsx")).
    		SetC(func(index int, row []string) (err error) {
    			if index == 1 {
    				return
    			}
    			// .....
    			// 这里是逻辑处理函数
    			return
    		}).
    		Run()
    
    

    这是两个封装的函数的源码。

    package lineopt
    
    import (
    	"bufio"
    	"fmt"
    	"github.com/xuri/excelize/v2"
    	"iter"
    	"log/slog"
    	"os"
    )
    
    func IterLine2(filePath string) iter.Seq2[int, string] {
    	return func(yield func(int, string) bool) {
    		f, errF := os.OpenFile(filePath, os.O_RDONLY, 0666)
    		if errF != nil {
    			return
    		}
    		defer func(f *os.File) {
    			err := f.Close()
    			if err != nil {
    				fmt.Println(err)
    			}
    		}(f)
    		scanner := bufio.NewScanner(f)
    		index := 1
    		for scanner.Scan() {
    			line := scanner.Text()
    			if !yield(index, line) {
    				return
    			}
    			index += 1
    		}
    	}
    }
    
    func IterLine(filePath string) iter.Seq[string] {
    	return func(yield func(string) bool) {
    		for _, item := range IterLine2(filePath) {
    			if !yield(item) {
    				return
    			}
    		}
    	}
    }
    
    func MapIterExcel2(config ExcelTarget) iter.Seq2[int, []string] {
    	return func(yield func(int, []string) bool) {
    		f, err := excelize.OpenFile(config.FilePath)
    		if err != nil {
    			slog.Error(err.Error())
    			return
    		}
    		defer f.Close()
    		targetSheet := config.TargetSheet
    		if targetSheet == "" {
    			targetSheet = f.GetSheetName(0)
    		}
    		rows, err := f.Rows(targetSheet)
    		if err != nil {
    			slog.Error(err.Error())
    			return
    		}
    		index := 1
    		for rows.Next() {
    			row, err := rows.Columns()
    			if err != nil {
    				slog.Error(err.Error())
    				return
    			}
    			if !yield(index, row) {
    				return
    			}
    			index += 1
    		}
    		return
    	}
    }
    
    func MapIterExcel(config ExcelTarget) iter.Seq[[]string] {
    	return func(yield func([]string) bool) {
    		for _, value := range MapIterExcel2(config) {
    			if !yield(value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcel2(filePath string) iter.Seq2[int, []string] {
    	return func(yield func(int, []string) bool) {
    		for index, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
    			if !yield(index, value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcel(filePath string) iter.Seq[[]string] {
    	return func(yield func([]string) bool) {
    		for _, value := range MapIterExcel2(ExcelTarget{FilePath: filePath}) {
    			if !yield(value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, []string] {
    	return func(yield func(int, []string) bool) {
    		for index, value := range MapIterExcel2(ExcelTarget{
    			FilePath:    filePath,
    			TargetSheet: sheetName,
    		}) {
    			if !yield(index, value) {
    				return
    			}
    		}
    	}
    }
    
    func IterExcelSheet(filePath string, sheetName string) iter.Seq[[]string] {
    	return func(yield func([]string) bool) {
    		for _, value := range MapIterExcel2(ExcelTarget{
    			FilePath:    filePath,
    			TargetSheet: sheetName,
    		}) {
    			if !yield(value) {
    				return
    			}
    		}
    	}
    }
    
    package util
    
    import (
    	"dt/app/util/lineopt"
    	"errors"
    	"iter"
    	"sync"
    )
    
    func ChannelConsume[d any](queue chan d, job func(item d), number ...int) *sync.WaitGroup {
    	counter := 10
    	if len(number) == 1 && number[0] > 0 {
    		counter = number[0]
    	}
    	return StartTogether(func() {
    		for item := range queue {
    			job(item)
    		}
    	}, counter)
    }
    
    // Together 并行执行
    func Together(job func(), counter int) {
    	var wg sync.WaitGroup
    	for i := 1; i <= counter; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			job()
    		}()
    	}
    	wg.Wait()
    }
    
    func StartTogether(job func(), counter int) *sync.WaitGroup {
    	var wg sync.WaitGroup
    	for i := 1; i <= counter; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			job()
    		}()
    	}
    	return &wg
    }
    
    type chanData[d any] struct {
    	index int
    	data  d
    }
    
    func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d) (err error), number ...int) *sync.WaitGroup {
    	counter := 10
    	if len(number) == 1 && number[0] > 0 {
    		counter = number[0]
    	}
    	return StartTogether(func() {
    		for item := range queue {
    			err := job(item.index, item.data)
    			if errors.Is(err, lineopt.Stop) {
    				// 目前不可以直接停止,会导致消费者阻塞掉
    				//return
    			}
    		}
    	}, counter)
    }
    
    type ProducerConsumer[T any] struct {
    	consumerNumber int
    	queue          chan chanData[T]
    	p              iter.Seq2[int, T]
    	c              func(index int, item T) (err error)
    	once           sync.Once
    }
    
    func (itself *ProducerConsumer[T]) SetC(c func(index int, item T) (err error)) *ProducerConsumer[T] {
    	itself.c = c
    	return itself
    }
    
    func (itself *ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) *ProducerConsumer[T] {
    	itself.p = p
    	return itself
    }
    
    // 生产者消费者都有可能发生阻塞,
    // 生产者阻塞的原因是因为 queue 容量不够了
    // 消费者阻塞的原因的是因为 queue 没有 close
    // 生产者只需要实现即可
    func (itself *ProducerConsumer[T]) do() {
    	task := ChannelConsume2(itself.queue, func(index int, item T) (err error) {
    		return itself.c(index, item)
    	}, itself.consumerNumber)
    	defer task.Wait()
    	defer close(itself.queue)
    	for index, v := range itself.p {
    		select {
    		case itself.queue <- chanData[T]{
    			index,
    			v,
    		}:
    			break
    			// 需要一个可以知道提前截止的操作
    		}
    	}
    
    }
    
    func (itself *ProducerConsumer[T]) Run() {
    	itself.once.Do(func() {
    		itself.do()
    	})
    }
    
    func TaskConsumer[T any](consumerNumber ...int) *ProducerConsumer[T] {
    	n := 1
    	if len(consumerNumber) > 0 {
    		n = consumerNumber[0]
    	}
    	return &ProducerConsumer[T]{
    		queue:          make(chan chanData[T], n),
    		consumerNumber: n,
    	}
    }
    
    
    28 条回复    2025-07-28 17:40:02 +08:00
    PungentSauce
        1
    PungentSauce  
    OP
       2025 年 7 月 25 日
    感觉调用的地方有点不像 go 了
    aladdinding
        2
    aladdinding  
       2025 年 7 月 25 日
    yield 乍一看以为是 py
    henix
        3
    henix  
       2025 年 7 月 25 日   ❤️ 1
    用了 Go 最新的 iterator 搞出了一些偏函数式风格的东西。似乎是为了方便并发处理

    P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方
    RedisMasterNode
        4
    RedisMasterNode  
       2025 年 7 月 25 日
    P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方

    或许没人管,或许也不是多好的代码,但是要有职业素养...
    vincentWdp
        5
    vincentWdp  
       2025 年 7 月 25 日
    一股 Java 味儿
    PungentSauce
        6
    PungentSauce  
    OP
       2025 年 7 月 25 日
    @RedisMasterNode @henix 同学的本地工具代码,不是生产环境用的。
    PungentSauce
        7
    PungentSauce  
    OP
       2025 年 7 月 25 日
    @PungentSauce 不是项目使用的。公司代码都是 java
    PungentSauce
        8
    PungentSauce  
    OP
       2025 年 7 月 25 日
    感觉比较 6 ,但是风格和其他看到的不大一样。
    Nanosk
        9
    Nanosk  
       2025 年 7 月 25 日   ❤️ 4
    这写法看到就烦 谁爱维护谁维护。。
    fumeboy
        10
    fumeboy  
       2025 年 7 月 25 日
    大哥。函数式编程都没见过?
    jheroy
        11
    jheroy  
       2025 年 7 月 25 日   ❤️ 1
    这种代码就是写的人感觉很爽,看的人想骂娘。
    v1
        12
    v1  
       2025 年 7 月 25 日
    链式编程……这不是基本功吗……反过来说,java 那套继承就很……
    archxm
        13
    archxm  
       2025 年 7 月 25 日
    前端风格吗?
    mightybruce
        14
    mightybruce  
       2025 年 7 月 25 日   ❤️ 1
    这代码没什么问题, 不知道你想说什么。
    lysShub
        15
    lysShub  
       2025 年 7 月 25 日   ❤️ 1
    垃圾代理,而且函数变量很影响性能
    strobber16
        16
    strobber16  
       2025 年 7 月 25 日
    这就是 go ,不爽不要来
    nkidgm
        17
    nkidgm  
       2025 年 7 月 25 日
    go 我都是用来做命令行小程序,业务层面的代码可不敢这样子写。
    kneo
        18
    kneo  
       2025 年 7 月 25 日
    有啥问题?
    iceheart
        19
    iceheart  
       2025 年 7 月 26 日 via Android   ❤️ 1
    过两年作者自己也看不懂了。
    Silicon
        20
    Silicon  
       2025 年 7 月 26 日
    这种风骚属于脱离生态后不得已而为之,带有强迫的属性,所以是其他「受害者」。
    上游先有一个预处理器把 excel 干掉,剩下的事情就好办很多……
    geebos
        21
    geebos  
    PRO
       2025 年 7 月 26 日   ❤️ 2
    写了好几年的 go ,很难接受这样的写法
    feedcode
        22
    feedcode  
       2025 年 7 月 26 日
    用了新的 feature, range over functions

    https://go.dev/blog/range-functions
    As of Go 1.23 it now supports ranging over functions that take a single argument. The single argument must itself be a function that takes zero to two arguments and returns a bool; by convention, we call it the yield function.

    ```
    func(yield func() bool)

    func(yield func(V) bool)

    func(yield func(K, V) bool)
    ```
    leowyzhuang
        23
    leowyzhuang  
       2025 年 7 月 26 日
    人和程序有一个能跑就行
    kfpenn
        24
    kfpenn  
       2025 年 7 月 28 日
    看了下这个新特性,所以这个东西只是让以后的库能有一个统一的遍历方法,但里面可能还是用的 scanner.Scan(),Rows.Next () 这些?
    bronyakaka
        25
    bronyakaka  
       2025 年 7 月 28 日
    因为 go 社区很多人吹 go 是函数式编程 结果啥函数式 api 都没有,可能还是泛型的问题
    bronyakaka
        26
    bronyakaka  
       2025 年 7 月 28 日
    可以看下社区模拟函数式 api 的 lo 库,基本上包含了楼主示例里这些封装
    leokun
        27
    leokun  
       2025 年 7 月 28 日
    我说 fp 是防御性编程的一种,大家没意见吧
    R136a1
        28
    R136a1  
       2025 年 7 月 28 日
    函数式编程没啥问题,但是 go 的函数式是真难看,通篇读下来我只看到一堆花括号在天上飞
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   Solana   ·   5809 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 02:46 · PVG 10:46 · LAX 18:46 · JFK 21:46
    ♥ Do have faith in what you're doing.