V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
dragonszy
V2EX  ›  问与答

请问 InfluxDB 如何返回筛选后的多个 series 的平均值 series

  •  
  •   dragonszy · 2023-01-11 14:08:43 +08:00 · 973 次点击
    这是一个创建于 680 天前的主题,其中的信息可能已经有所发展或是发生改变。

    之前一直用 pandas 做数据处理,刚接触时序数据库 Influxdb 2.6 ,有个地方不太明白,想请教一下。

    假设 tag 是 lat 和 lon ,组合有(32,119),(33,119),(32,120),(33,120),即32<=lat<=33, 119<=lon<=120

    假设 field 有温度 temperature,风速 wind_speed 等字段

    时间戳范围是 2021-01-01 到 2021-12-31

    我想返回这些经纬度 temperature 和 wind_speed 的平均值时间序列,类似下面这些数据,时间和所需的 field 的均值。

    time,temperature,wind_speed
    2021-01-01,10.2,5.2
    2021-01-02,11.1,4.2
    .....
    2021-12-30,5.6,4.1
    2021-12-31,7.6,3.6
    

    还有一个补充问题就是能否像 pandas agg 一样,对于不同列(field)做不同统计,比如 temperature 求 mean ,wind_speed 求 max 之类的。

    请问应该如何写 InfluxQL 或者 Flux 语句,求一下大神指导。

    5 条回复    2023-01-16 10:15:43 +08:00
    sc104501
        1
    sc104501  
       2023-01-11 14:52:44 +08:00
    用最新的 influxdb 的话建议用 flux 。
    首先取温度,做 mean ,结果保存到 a1 。
    然后取风速,做 max ,保存到 a2 。
    最后调用 flux 的 join.inner(),基于“时序+tag”组合两个结果输出就可以了。

    如果找到了更合适的方法告诉我一声,手动狗头。

    另外,经纬度除非可能性特别少,不然还是作为 field 比较合适,max-values-per-tag 根据官方的建议不建议超过 100000 。
    dragonszy
        2
    dragonszy  
    OP
       2023-01-11 15:53:49 +08:00
    @sc104501 感谢回复,“首先取温度,做 mean ,结果保存到 a1”就是这块不太会弄,之前这么写的,返回值就不是时间序列了,而是每个经纬度的平均值。

    ```
    query = f"""
    from(bucket: "xny_data")
    |> range(start: {start_timestamp}, stop: {end_timestamp})
    |> filter(fn: (r) => r["_measurement"] == "weatherData")
    |> filter(fn: (r) => r["lat"] >= "20")
    |> filter(fn: (r) => r["lon"] >= "119")
    |> filter(fn: (r) => r["_field"] == "cloudcover")
    |> mean()
    """
    tables = query_api.query(query, org="xny")
    for table in tables:
    for record in table.records:
    print(record)
    ```
    返回值
    ```
    [<FluxTable: 9 columns, 1 records>, <FluxTable: 9 columns, 1 records>]
    ```

    ```
    FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': datetime.datetime(2020, 12, 31, 16, 0, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 12, 31, 16, 0, tzinfo=tzutc()), '_field': 'cloudcover', '_measurement': 'weatherData', 'lat': '32', 'lon': '119', '_value': 43.14566210045662} FluxRecord() table: 1, {'result': '_result', 'table': 1, '_start': datetime.datetime(2020, 12, 31, 16, 0, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 12, 31, 16, 0, tzinfo=tzutc()), '_field': 'cloudcover', '_measurement': 'weatherData', 'lat': '34', 'lon': '120', '_value': 39.43207762557078}
    ```
    sc104501
        3
    sc104501  
       2023-01-12 10:03:44 +08:00
    @dragonszy 每天一个数据应该是用 aggregateWindow 。按照一个时间窗口汇总数据。
    https://docs.influxdata.com/flux/v0.x/stdlib/universe/aggregatewindow/

    代码没有测试过,可能不对。
    from(bucket: "xny_data")
    |> range(start: {start_timestamp}, stop: {end_timestamp})
    |> filter(fn: (r) => r["_measurement"] == "weatherData")
    |> filter(fn: (r) => r["lat"] >= "20")
    |> filter(fn: (r) => r["lon"] >= "119")
    |> filter(fn: (r) => r["_field"] == "cloudcover")
    |> aggregateWindow(every: 1d, fn: mean)
    dragonszy
        4
    dragonszy  
    OP
       2023-01-13 15:58:41 +08:00
    @sc104501 非常感谢!但是目前还是有个问题,我想得到一个平均值时间序列(不分经纬度),现在采用 aggregateWindow 还是每个经纬度返回一个序列,想知道一下最终得到的是一个根据经纬度平均的序列该怎么做。

    其实意思就是,我有 10 个时间序列,怎么求平均,得到 1 个时间序列。
    sc104501
        5
    sc104501  
       2023-01-16 10:15:43 +08:00
    @dragonszy 意思是返回的 tables 含有多个表(每个表一个经纬度组合)需要合并成一个坐标范围内的平均值吧。

    如果是数据点数量加权,在 aggregateWindow() 前加上 group() 取消分组。
    - 原始数据每个数据的权重相同。
    如果是“经纬度组合后的时间序列”加权,在 aggregateWindow() 后加上 group() 再用 aggregateWindow() 求一个平均值。
    - 范围内的每个坐标点(采集器?)不管产生的数据多少,权重相同。

    关于 group(),group 完全不带参数就是 ungroup 的功能。
    或者直接用 group 按一个其他指定的 tag 分组,应该也可以避免现在的经纬度分组。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3728 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 00:55 · PVG 08:55 · LAX 16:55 · JFK 19:55
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.