两个线程,线程 1(processData)生成数据 p 并写入数据库,另一个线程 2(someSocket)将线程 1 生成的数据 p 通过 socket 发送到客户端。
当线程 1 中的for
循环结束时,如何通知线程 2 while(res == TRUE)
应当结束了,线程 2 因为 pthread_cond_wait
一直在阻塞,但是此时线程 1 不会再发出信号了。这是不是就是死锁了。。。
代码如下
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <mysql.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#define TRUE 1
#define FALSE 0
#define MAX_STRING 128
#define PORT 3389
#define SA struct sockaddr
pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;
// 存储温湿度结构体
typedef struct {
int temp;
int humd;
}humiture;
// 可变长消息体
typedef struct {
int nLen;
char data[ 0];
}MyMessage;
//全局变量
humiture p;
int res = TRUE;
void error(char *msg)
{
fprintf(stderr, "%s: %s\n", msg, strerror(errno));
exit(1);
}
void info(char *msg)
{
fprintf(stdout,"%s\n",msg);
}
void finish_with_error(MYSQL *con)
{
fprintf(stderr, "%s\n", mysql_error(con));
mysql_close(con);
exit(1);
}
// socket 发送数据
int sendall(int s, char *buf, int *len)
{
int total = 0;
int bytesleft = *len;
int n;
while(total < *len) {
n = send(s, buf+total, bytesleft, 0);
if (n == -1) { break; }
total += n;
bytesleft -= n;
}
*len = total;
return n==-1?-1:0;
}
// 生成数据
humiture collectData()
{
int temperature,humidity;
srand((unsigned)time(NULL)); // 根据时间来播种随机数种子
// 生成数据
temperature = rand()%40+10; // 生成 10~50 的随机数 当做温度
humidity = rand()%70+10; // 生成 10~80 的随机数当做湿度
humiture p = {humidity, temperature};
return p;
}
// 启动 MySQL 建立连接
MYSQL* startMysql()
{
MYSQL *con = mysql_init(NULL);
if (con == NULL)
{
fprintf(stderr, "%s\n", mysql_error(con));
exit(1);
}
if (mysql_real_connect(con, "localhost", "root", "root#admin","test", 0, NULL, 0) == NULL)
{
finish_with_error(con);
}
return con;
}
// 生成数据并存入数据库
void * processData()
{
MYSQL * con = startMysql();
for(int i = 0; i <20;i++)
{
pthread_mutex_lock(&plock);
p = collectData();
pthread_cond_signal(&pready);
pthread_mutex_unlock(&plock);
char query[MAX_STRING] = {0};
snprintf(query,MAX_STRING,"INSERT INTO humiture (temperature,humidity) VALUES (%d,%d)", p.temp, p.humd);
if (mysql_query(con, query))
{
finish_with_error(con);
}
sleep(2);
}
// 循环结束给出信号
res = FALSE;
mysql_close(con);
mysql_library_end();
return NULL;
}
void * someSocket()
{
int sockfd, connfd;
struct sockaddr_in servaddr, cli;
socklen_t len;
char buff[10];
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
error("socket creation failed...");
}
else
info("Socket successfully created...");
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(PORT);
int reuse = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(int)) == -1)
error("Can't set the reuse option on the socket...");
if ((bind(sockfd, (SA*)&servaddr, sizeof(servaddr))) != 0) {
error("socket bind failed...");
}
else
fprintf(stdout,"%s\n","Socket successfully binded...");
if ((listen(sockfd, 5)) != 0) {
error("Listen failed...");
}
else
info("Server listening...");
while(TRUE)
{
len = sizeof(cli);
connfd = accept(sockfd, (SA*)&cli, &len);
if (connfd < 0) {
error("server acccept failed...");
}
else
info("server acccept the client...");
MyMessage * myMessage = (MyMessage*)malloc(sizeof(MyMessage)+sizeof(humiture));
int needSend = sizeof(MyMessage)+sizeof(humiture);
char *buffer =(char*)malloc(needSend);
while(res == TRUE)
{
myMessage->nLen = htonl(sizeof(humiture));
pthread_mutex_lock(&plock);
pthread_cond_wait(&pready,&plock);
memcpy(myMessage->data,&p,sizeof(humiture));
pthread_mutex_unlock(&plock);
memcpy(buffer,myMessage,needSend);
sendall(connfd,buffer,&needSend);
recv(connfd,buff,sizeof(buff),0);
}
// 当需要停止的时候发送 0 字节信息让客户端停止循环
if(res == FALSE)
{
// 将发送消息定义为 0
myMessage->nLen = htonl(res);
char *buffer =(char*)malloc(sizeof(int));
memcpy(buffer,myMessage,sizeof(MyMessage));
send(connfd,buffer,sizeof(MyMessage),0);
shutdown(connfd,SHUT_RDWR);
free(buffer);
}
free(myMessage);
free(buffer);
close(connfd);
break;
}
close(sockfd);
return NULL;
}
int main(void)
{
pthread_t t0,t1;
if(pthread_create(&t0, NULL,processData,NULL)==-1)
{
error("Can't create thread processData");
}
if(pthread_create(&t1,NULL,someSocket,NULL)==-1)
{
error("Can't create thread someSocket");
}
void *reslut;
if(pthread_join(t0,&reslut)==-1)
{
error("Can't reclaim thread t0");
}
if(pthread_join(t1,&reslut)==-1)
{
error("Can't reclaim thread t1");
}
return 0;
}
1
wevsty 2021-08-10 09:29:11 +08:00
线程 1 退出的时候再发一个 pthread_cond_signal 这样线程 2 收到信号就不会锁死了。最后线程 2 检查一下退出标志再发数据不就行了么?
|
2
commoccoom OP @wevsty 确实,我人傻了😂
|
3
FranzKafka95 2021-08-10 13:11:31 +08:00 via Android
或者线程 1 执行完 for 循环以后先别置 res 为 false,再加一个条件变量在这儿等,等到线程 2 发送完以后通过信号量通知线程 1 再置为 false,等到线程 2 再执行 while 循环时 res 已经为 flalse,这样就可以同步了。
|
4
commoccoom OP @FranzKafka95 后续我的想法是线程 1(processData)会一直循环,线程 2(someSocket)是接收别的信号,然后启动或者停止这样。所以线程 1 不能阻塞,线程 2 可以阻塞。线程 1 现在加了循环次数是因为之前我发现有内存泄漏,所以加个停止条件看看哪里有问题😂。
|