情况: 在 main()
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# 建立全局变量字典
GOLVAR = Manager().dict()
# SQL 处理队列
SQLQueue = Manager().Queue()
# 处理 SQL 队列功能,一个单独进程在运行
ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
# 启动
ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR)
def AA(someData,sqlqueue):
#略
XXX
sqlCommand = XXX
sqlqueue.put(sqlCommand)
retrun
def BB(someData,sqlqueue):
#略,和 AA 结构一样,最后往队列里 put(sqlCommand)
sqlqueue.put(sqlCommand)
retrun
def CC(someData,sqlqueue):
#略
sqlqueue.put(sqlCommand)
retrun
# 开动制造
while True:
# AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句
# 进去的 SQL 语句只有四种,
# INSERT INTO tblname (x) VALUE (x);
# INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层);
# UPDATE SET...
# DELETE FROM...
# SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条
time.sleep(100)
# 处理 SQL 队列
def procSQLcmd(sqlinfo, sqlqueue, golvar):
import time
import datetime
from dbutils.pooled_db import PooledDB
import pymysql
from concurrent.futures import ThreadPoolExecutor
from MYFunc import SQLcmdData
from myFunc import colrRedB
from myFunc import TranDicttoSQLcmd
from warnings import filterwarnings
filterwarnings("error", category=pymysql.Warning)
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=600, # 连接池允许的最大连接数,0 和 None 表示不限制连接数
mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=1, # ping MySQL 服务端,检查是否服务可用。
host=sqlinfo['ip'],
port=sqlinfo['port'],
user=sqlinfo['user'],
password=sqlinfo['password'],
database=sqlinfo['database'],
charset=sqlinfo['charset']
)
DBconn = POOL.connection()
def exeCu(conn, sqltext):
try:
cur = conn.cursor()
cur.execute(sqltext)
# cur.commit()
cur.close()
except pymysql.Warning as e:
# print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}"))
sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"')
resonsql = str(e).replace("'","\\'").replace('"','\\"')
SQLErrorDict = {'sqlcmd': sqlsqlcmd,
'reson': resonsql,
'UpdateTime': datetime.datetime.now().replace(microsecond=0)}
SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None)
SQLcmdData(sqlinfo, SQLCmd)
return
while True:
if sqlqueue.qsize() == 0:
# 开关
if golvar['stopsqlflag'] == False:
time.sleep(2)
break
# SQL 语句执行,必须按队列 FIFO 顺序写入
while not sqlqueue.empty():
with ThreadPoolExecutor(1) as executor:
executor.submit(exeCu, DBconn, sqlqueue.get())
DBconn.close()
return
请教问题:
1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环
2 、从 MySQL 的服务器的性能判断来看,
SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%');
SHOW PROCESSLIST;
MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?)
3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善
4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆?
5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。
1
BBCCBB 2021-08-28 20:07:18 +08:00
看你这个貌似没用到批量写入? 可以尝试批量
|
2
heyjei 2021-08-28 20:37:14 +08:00
代码没细看,但思路其实很简单,攒一波数据,到 1 千条或者 1 千条没到但 1 秒钟到了,再批量输入。如果批量写入的方案还是不满足,可以把数据写入到文件里,然后再定时调用 load data infile,load data infile 的写入速度可以达到磁盘的最大 IO 速度(前提是使用 MyISAM,并且没有索引)
|
3
heyjei 2021-08-28 20:45:00 +08:00
还有一种改动最小的一种方式:
我们的 SQL 语句是 insert into table_name (column1, column2) values (value1, value2) 在下面的语句中,你不要把整个语句 put 进去,把 (value1, value2) put 进去 sqlqueue.put(sqlCommand) 在下面的语句,get 之后,不要立即执行,攒够 1000 个数据,或者 1 秒超时,然后拼接 SQL 成完整的语句并执行。 # SQL 语句执行,必须按队列 FIFO 顺序写入 while not sqlqueue.empty(): with ThreadPoolExecutor(1) as executor: executor.submit(exeCu, DBconn, sqlqueue.get()) |
4
uti6770werty OP |
5
liprais 2021-08-28 21:19:46 +08:00
你的 mysql 服务器有几个核心?
600 个 connection 太多了 |
6
uti6770werty OP 上面忘了说一个事情,就是就算是峰值 17W 条数据里也好,平时 5,6 千条也好,很多时候都队列的数据,是表里已经有的了,表的索引机制已经避免了重复插入数据,所以存表里的数据量其实不多的。。。
@liprais 8 核,16 线程,CentOS 6 + MySQL 5.5,按月份分表,最多的表数据不过 800 万 |
7
uti6770werty OP @liprais PooledDB 的 connection 很奇怪的,它这里 600 只是最高允许 600 而已,我现在只是一台数据处理电脑向 MySQL 写数据而已,SHOW PROCESSLIST 看,也就 7,8 条连接
|
8
noparking188 2021-08-29 09:01:51 +08:00
所以这段代码真是生产上用的嘛?
看这段代码是用线程做并发,线程开销比较大,这种 IO 密集任务不大行,可以换协成程测下效果,更轻量级,几年前写过类似脚本用的 gevent,ayncio 我没研究过,楼主也可以看看,个人感觉主要是想办法提高并发处理能力吧,和 MySQL 没啥关系 说的不对还望指正 |
9
noparking188 2021-08-29 09:15:39 +08:00
个人感觉比较简单的做法就是换 redis 做队列,生成 SQL 单独一个程序跑,消费队列数据发 SQL 请求的一个程序,要提高并发起简单地多个进程就行了,多进程+多线程(协程)的方式,用 supervisor 托管更好
当然这样的方案是建立在示例代码用线程并发造成网络 IO 请求瓶颈的猜测上 |
10
todd7zhang 2021-08-30 10:16:35 +08:00
没动啊,既然严格要求 FIFO, 为啥去执行 SQL 的时候,还要开 ThreadPoolExecutor ?这种情况,从 sqlqueue 拿出来虽然是顺序的,但是感觉执行过去就可能乱序啊,毕竟 while 里面在不停的开 executor 。
我感觉都不用 POOL, 就一个 conn 不停的执行 sql 就好了? with conn.cursor() as cr: while not sqlqueue.empty(): cr.execute(sqlqueue.get()) |