V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Virace
V2EX  ›  Python

线程池任务结束程序不会继续执行

  •  
  •   Virace · 2021-03-17 22:35:05 +08:00 · 1801 次点击
    这是一个创建于 1339 天前的主题,其中的信息可能已经有所发展或是发生改变。
    res = {}
    with ThreadPoolExecutor() as e:
        fs = {}
        for root, dirs, files in os.walk('.'):
            for file in files:
                fs[e.submit(upload_by_file, os.path.join(root, file))] = os.path.splitext(file)[0]
    
        for f in as_completed(fs):
            try:
                data = f.result()
            except Exception as exc:
                print(f'generated an exception: {exc}, {fs[f]}')
            else:
                res[fs[f]] = data
                print(fs[f], data)
    
    with open('test.json', 'w+', encoding='utf-8') as f:
        json.dump(res, f)
    

    upload_by_file 就是一个上传文件的函数

    测试一共 249 个文件, 通过输出可以知道 249 个文件都已经上传完毕, 但是程序并没有执行到保存 json 文件. 而是假死了.

    与文件 IO 相关, 代码这么写有时候就会卡住, 而有时候不会.

    环境是 Windows 10, Python 3.9

    9 条回复    2021-03-18 19:40:42 +08:00
    hsfzxjy
        1
    hsfzxjy  
       2021-03-17 22:37:11 +08:00 via Android
    看看 upload_by_file 的代码?
    Virace
        2
    Virace  
    OP
       2021-03-17 22:39:56 +08:00
    @hsfzxjy 就是 open file 然后 post. 难道是文件没有 close 的问题?
    Virace
        3
    Virace  
    OP
       2021-03-17 22:46:24 +08:00
    @hsfzxjy def upload_by_file(self, file):
    file_data = open(file, 'rb')
    data = {
    'file': (file_data.name, file_data, self.get_mimetype(file))
    }

    m = MultipartEncoder(fields=data)
    # noinspection PyTypeChecker
    res = self.requests.post(self.API_URL, data=m, headers={'Content-Type': m.content_type, "Connection": "close"})
    if res.status_code == 200:
    return res.text
    ClericPy
        4
    ClericPy  
       2021-03-17 23:38:21 +08:00
    卡住一般就是 data = f.result() 这里没指定超时然后无限等待了?

    不过多线程有个略坑的地方, 就是线程里面跑的函数不太容易在外部终止它(就像 asyncio.Task 的 cancel), 所以就算 TimeoutError 了, 线程里头的任务该跑还是跑, 就是不继续阻塞而已.

    你这种任务的话, 最好搞点日志或者标记统计一下谁卡住了...

    你这代码 res = self.requests.post(self.API_URL, data=m, headers={'Content-Type': m.content_type, "Connection": "close"})
    没指定超时时间的话, 貌似默认超时是 None, 我忘记有没有最大超时了, 总之很久或者永久. 另: timeout 支持同时指定连接超时和读取超时, 前者超时趁早重试, 后者超时检查网络.
    Virace
        5
    Virace  
    OP
       2021-03-18 00:41:35 +08:00
    @ClericPy 没有, 一共 249 个文件, 确认在 else 位置输出了 print(fs[f], data) . 就很奇怪, 而且不是 100%触发, 卡住之后虫重新跑就没事. 不一定什么时候卡住
    no1xsyzy
        6
    no1xsyzy  
       2021-03-18 10:56:16 +08:00
    先在 data = f.result() 前打桩吧,用 print(..., end="\r") 可以避免影响输出观感
    Virace
        7
    Virace  
    OP
       2021-03-18 11:06:29 +08:00 via Android
    @no1xsyzy 这个程序运行卡住 给我的感觉就是, 没有出 with. with 以外的代码都不执行, 就算是 debug 内部代码都执行完了
    no1xsyzy
        8
    no1xsyzy  
       2021-03-18 12:41:41 +08:00
    确实 Executor.__exit__ 也会调用 self.shutdown(wait=True),因此调用了 join
    先打桩锁定一下死在哪儿

    猜想的解决方案:在 with 的最后添加一句 e.shutdown(wait=False, cancel_futures=True)
    可能是 worker 里 work_queue.get(block=True),要用 cancel_futures 去传递一个 None 作循环唤醒器。
    Virace
        9
    Virace  
    OP
       2021-03-18 19:40:42 +08:00
    @no1xsyzy 好尝试一下~ 骚的是问题不能复现~  只能看后续能不能遇到了  = =
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2873 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 07:46 · PVG 15:46 · LAX 23:46 · JFK 02:46
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.