V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
rekulas
V2EX  ›  Go 编程语言

解压 zlib 数据流,困扰了一天多了没能解决

  •  
  •   rekulas ·
    del-xiong · 262 天前 · 1867 次点击
    这是一个创建于 262 天前的主题,其中的信息可能已经有所发展或是发生改变。

    对接 discord 的 zlib 数据流,它是分段传输的,只有第一个片段包含头部信息,后续片段不包含头部信息。解压后面的片段需要依赖第一个段的 header
    python 里很好实现

    deobj = zlib.decompressobj()
    

    然后只要用 deobj 按顺序解压每个片段就行了
    deobj.decompress(数据流)

    改写到 go 里,发现 zlib.NewReader()只能用于第一个片段,后续片段无法解压(会报 invalid header 错误) 感觉 go 的 zlib 没有提供一个能保存上下文状态的对象,每次解压都要从头开始,导致后续不完整的 zlib 流无法解压

    reader := bytes.NewReader(bin1)
    zlib.NewReader(reader)
    
    reader = bytes.NewReader(bin2)
    zlib.NewReader(reader) // 会报错
    
    binmerge := append(bin1, bin2...)
    reader = bytes.NewReader(binmerge)
    zlib.NewReader(reader) // 正常
    
    

    如果把 bin1 bin2 合并起来是可以解压的,但显然这样不合理,内存和资源占用会越来越大
    所以我想只依赖 bin1 的 header 信息,然后用 bin2 的数据流来解压
    除此之外还尝试过讲 bin1 的头 N 个字节放到 bin2 前面,也没成功,似乎 header 不是单纯拷贝字节那么简单,可能涉及到其他计算
    问过 gpt ,没能解决。
    搜过 google 等各种资料,也没有解决。
    不熟悉 zlib 格式,想问问各位有没有办法实现

    2 个对应的测试数据我打包放在这里了 https://drive.google.com/file/d/1zdAbgWVWewqcovaHxRq3ZhPObPnr3m-5/view?usp=sharing

    第 1 条附言  ·  260 天前
    python 的测试代码
    ```
    import zlib,time


    f = open('1.bin', 'rb')
    z1 = f.read()
    f.close()
    f = open('2.bin', 'rb')
    z2 = f.read()

    deobj = zlib.decompressobj()
    print(deobj.decompress(z1).decode('utf-8'))
    time.sleep(3)
    print(deobj.decompress(z2).decode('utf-8'))

    ```
    MoYi123
        1
    MoYi123  
       262 天前
    你可以写一个比较复杂的 reader 吧
    用一个 queue 来实现 reader,
    Read(p []byte) (n int, err error) 可以知道已经读了多少, 可以释放队列头部已使用的压缩数据
    到 zlib.NewReader 读不出数据的时候, 再往 queue 里添加新的压缩数据

    没试, 应该是可以的.
    rekulas
        2
    rekulas  
    OP
       262 天前
    @MoYi123 目前考虑也是自己写,看了下 zlib 源码 打算复制过来魔改试试,还没成功
    boboliu
        3
    boboliu  
       262 天前
    r := io.MultiReader(bin1, bin2, bin3)
    zlib.NewReader(r)
    rekulas
        4
    rekulas  
    OP
       262 天前
    @boboliu 这相当于合并起来了 会引起资源问题
    lance6716
        5
    lance6716  
       262 天前 via Android
    按照你的描述,它本身就是“一个”流,你直接把流的 reader 传过去就行啊,为啥要拆成“两个”bin1 bin2 呢
    rekulas
        6
    rekulas  
    OP
       262 天前
    @lance6716 并不是我要拆开,而且平台给的数据就是这样的
    比如第 5 秒的时候,平台给我一个流 1, 然后继续处理任务
    第 50 秒,平台发给我流 2,通知我任务状态

    我要流式处理,肯定得这样的, js python 里面都很简单, go 居然没能成功
    boboliu
        7
    boboliu  
       262 天前
    @rekulas 没听懂,什么叫资源问题

    就 #6 而言,chunk 当然是你自己负责整成流,优雅的方案就是开一个 buffer
    rekulas
        8
    rekulas  
    OP
       261 天前
    @boboliu 看我上面发的啊,这种情况下你要解压就要把所有流合并到一起,处理第 200 个数据要把 1-200 全部处理一遍,你觉得合理?
    bv
        9
    bv  
       261 天前
    是这样吗?

    input := new(bytes.Buffer)
    output, _ := zlib.NewReader(input)
    // 压缩的数据流往 input 里面写入。
    // 从 output 读取解压后的数据流。
    guonaihong
        10
    guonaihong  
       261 天前
    把 chan 包装成一个 io.Reader, 收数据的地方直接并发 chan , 读的地方 select chan 就行。

    type myReader struct {
    c chan []byte
    }

    func (m *myReader) Read(p []byte) (n int, err error) {

    copy()
    }
    guonaihong
        11
    guonaihong  
       261 天前
    忽略我上一个回答,直接用 io.Pipe 。然后 zlib 解决套下 io.Pipe 的 reader 对象。另外收 gzip 数据的地方并发写就行。
    https://pkg.go.dev/io#Pipe
    rekulas
        12
    rekulas  
    OP
       261 天前
    @guonaihong pipe 我也试过没成功 ,也可能用法不对 空了我再试试
    bv
        13
    bv  
       261 天前
    和 zlib 无关,只是流式解析没处理好而已。

    package main

    import (
    "bytes"
    "compress/zlib"
    "fmt"
    "io"
    "os"
    "sync"
    )

    //goland:noinspection GoUnhandledErrorResult
    func main() {
    bin1, _ := os.Open("1.bin")
    defer bin1.Close()
    bin2, _ := os.Open("2.bin")
    defer bin2.Close()

    input := new(bytes.Buffer)
    input.ReadFrom(bin1)
    zr, err := zlib.NewReader(input)
    if err != nil {
    fmt.Printf("zlib error: %v\n", err)
    return
    }

    wg := new(sync.WaitGroup)
    wg.Add(1)
    go func() {
    defer wg.Done()
    io.Copy(os.Stderr, zr)
    zr.Close()
    fmt.Printf("\noutput over\n")
    }()

    input.ReadFrom(bin2)

    wg.Wait()
    fmt.Printf("main over\n")
    }

    输出结果:
    {"t":null,"s":null,"op":10,"d":{"heartbeat_interval":41250,"_trace":["[\"gateway-prd-us-east1-c-0bwh\",{\"micros\":0.0}]"]}}{"t":null,"s":null,"op":11,"d":null}
    output over
    main over
    bv
        14
    bv  
       261 天前
    如果是 HTTP 客户端可以改造的更简单一些,例如:

    func Get(u string) (*http.Response, error) {
    resp, err := http.Get(u)
    if err != nil {
    return nil, err
    }

    encoding := resp.Header.Get("Content-Encoding")
    if encoding == "deflate" { // 代表 body 使用了 zlib 压缩
    body := resp.Body
    rc, exx := zlib.NewReader(body)
    if exx != nil {
    _ = body.Close()
    return nil, exx
    }
    resp.Body = rc
    }

    return resp, nil
    }
    guonaihong
        15
    guonaihong  
       261 天前
    @rekulas 有一个简单的方法验证, 如果对端传过来的 gzip 包,都缓存到 bytes.Buffer ,完毕可以解出来。那就说明你的 io.Pipe 的用法不对。
    rekulas
        16
    rekulas  
    OP
       261 天前
    @bv 感谢测试 但是这样也是合并到一起解压的吧 并没能实现下一个包延迟处理的效果
    bv
        17
    bv  
       261 天前
    我大概理解了你的想法:就好比分卷压缩,只要得到第一个压缩包(第一个压缩包内含有元数据),跳过任意个块包,也照样可以解压后面的任何一块压缩包。

    这应该实现不了吧,块与块之间大概率是存在依赖关系的,环环相扣,一环缺失就会导致后面数据无效。不太了解 zlib 的二进制格式,OP 要想深入研究可自行查阅资料。
    比如:ts (MPEG2-TS) 这种分块格式是经过设计的,每一块内都含有元数据,不依赖前后 ts 数据块,每一块都单独可用。

    还有:压缩包只是个容器,里面的数据才是有用的,一段数据被分块压缩后,怎么知道想要的数据被压缩分块到了哪个块区?这也是一个问题。
    rekulas
        18
    rekulas  
    OP
       261 天前
    @bv 有一点点小区别 并不是跳过任意块包,只要按顺序 1,2,3,4....依次解压即可
    在 py 和 js 中确实是可以实现的, 只是 go 里面我不清楚怎么实现, 感觉它的 zlib 接口还比较简陋
    lesismal
        19
    lesismal  
       239 天前
    上接: https://www.v2ex.com/t/1024087#reply11


    package main

    import (
    "bytes"
    "compress/zlib"
    "fmt"
    "os"
    )

    func main() {
    bin1, _ := os.Open("1.bin")
    defer bin1.Close()
    bin2, _ := os.Open("2.bin")
    defer bin2.Close()

    input := new(bytes.Buffer)
    input.ReadFrom(bin1)
    zr, err := zlib.NewReader(input)
    if err != nil {
    fmt.Printf("zlib error: %v\n", err)
    return
    }

    defer zr.Close()

    buf := make([]byte, 1024)
    n1, err := zr.Read(buf)
    fmt.Println("read 1 over:", n1, err)
    fmt.Println("buf 1:", string(buf[:n1]))
    input.ReadFrom(bin2)
    n2, err := zr.Read(buf[n1:])
    fmt.Println("read 2 over:", n2, err)
    fmt.Println("buf 2:", string(buf[n1:n1+n2]))
    }


    output:

    read 1 over: 124 <nil>
    buf 1: {"t":null,"s":null,"op":10,"d":{"heartbeat_interval":41250,"_trace":["[\"gateway-prd-us-east1-c-0bwh\",{\"micros\":0.0}]"]}}
    read 2 over: 36 <nil>
    buf 2: {"t":null,"s":null,"op":11,"d":null}
    lesismal
        20
    lesismal  
       239 天前
    BTW ,OP 自己的 python 代码里用的就是同一个 deobj = zlib.decompressobj(),go 里用了不同的 zlib reader 读取两个片段、第二个片段没有 header 、当然就出错了
    rekulas
        21
    rekulas  
    OP
       238 天前
    @lesismal 感谢大佬帮忙, 白天有点事 晚上回来测测
    body007
        22
    body007  
       238 天前
    测试没问题,就像你用 python 一样,只需要创建一个 zlib.NewReader ,使用 io.Pipe 就可以了。

    ```go

    package main

    import (
    "compress/zlib"
    "errors"
    "io"
    "os"
    )

    func main() {
    err := test()
    if err != nil {
    panic(err)
    }
    }

    func test() error {
    var (
    ir, iw = io.Pipe()
    done = errors.New("done")
    )

    go func() {
    list := []string{"1.bin", "2.bin"}
    for _, f := range list {
    fr, err := os.Open(f)
    if err != nil {
    iw.CloseWithError(err)
    return
    }

    _, err = io.Copy(iw, fr)
    if err1 := fr.Close(); err == nil {
    err = err1
    }

    if err != nil {
    iw.CloseWithError(err)
    return
    }
    }
    iw.CloseWithError(done)
    }()

    fw, err := os.Create("dst.txt")
    if err != nil {
    return err
    }
    defer fw.Close()

    zr, err := zlib.NewReader(ir)
    if err != nil {
    return err
    }

    _, err = io.Copy(fw, zr)
    if err1 := zr.Close(); err == nil {
    err = err1
    }

    if errors.Is(done, err) {
    err = nil
    }
    return err
    }

    ```
    rekulas
        23
    rekulas  
    OP
       238 天前
    @lesismal 非常感谢 lesismal 大佬提供的方案, 测试确实可行, 我相信这也应该是最佳方案了, 也谢谢楼上其他的小伙伴们 🌹🌹🌹


    @body007 我也试过 pipe, 这样确实可以解密但并不符合我想实现的目标, 因为在执行 zr, err := zlib.NewReader(ir)之前, 已经把 2 个 bin 文件数据合并到一起了, 而真实情况下 1 和 2 是分别解压的, 当然也可能我对 pipe 的理解不到位, 不清楚是否还有其他通过 pipe 方式实现流式解压的
    lesismal
        24
    lesismal  
       238 天前
    Welcome!
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2010 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 00:25 · PVG 08:25 · LAX 16:25 · JFK 19:25
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.