1.项目介绍
本项目基于物联量平台远程的视频监控项目,通过MQTT协议实现两个设备间的数据上报与订阅。通过这个项目来演示,两个MQTT设备如何互相订阅,进行消息流转。在阿里云服务器上创建2个设备,分为为设备A和设备B;设备A负责采集本地摄像头画面上传,设备B负责接收设备A上传的数据然后解析显示出来。在阿里云服务器上需要配置云产品流转,让设备A的数据上传后自动发送给设备B。这样就完成了视频画面数据的流转。不过因为阿里云的最大数据限制,每次最大发送10240字节的数据。
#define SERVER_IP "asfdda.iot-as-mqtt.cn-shanghai.aliyuncs.com"//服务器IP #define SERVER_PORT 1883 //端口号 #define ClientID "aasfsaXABf.Imasfas|securemode=2,signmethod=hmacsha256,timestamp=1678323607797|" #define Username "ImsfeA&a1sadf8XABf" #define Password "15566ab496e81da728a3792ebe532fd4a3f4026a2b831df5af24da06"//密文 #define SET_TOPIC "/sys/a14dXABf/ImagfA/thing/service/property/set" //订阅 #define POST_TOPIC "/sys/a14sdf8XABf/ImdfeA/thing/event/property/post" //发布 int main() { pthread_t id; signal(SIGPIPE,SIG_IGN);/*忽略SIGPIPE信号*/ signal(SIGALRM,signal_func);/*闹钟信号*/ sockfd=socket(AF_INET,SOCK_STREAM,0); if(sockfd==-1) { printf("网络套接字打开失败n"); return 0; } /*设置发送缓冲区大小*/ int nSendBuf=40*1024;//设置为 20K if(setsockopt(sockfd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))) { printf("setsockopt(SO_SNDBUF) 设置错误!n"); return 0; } /*域名解析*/ struct hostent *hostent; while(1) { hostent=gethostbyname(SERVER_IP); if(hostent==NULL) { printf("域名解析失败n"); sleep(1); } else break; } printf("主机名:%sn",hostent->h_name); printf("协议类型:%sn",(hostent->h_addrtype == AF_INET)?"AF_INET":"AF_INET6"); printf("IP地址长度:%dn",hostent->h_length); char *ip; for(int i=0;hostent->h_addr_list[i];i++) { ip=inet_ntoa(*(struct in_addr *)hostent->h_addr_list[i]); printf("ip=%sn",ip); } /*连接服务器*/ struct sockaddr_in addr; addr.sin_family=AF_INET;//IPV4 addr.sin_port=htons(SERVER_PORT);/*端口号*/ addr.sin_addr.s_addr=inet_addr(ip);//服务器IP if(connect(sockfd, (struct sockaddr *)&addr,sizeof(struct sockaddr_in))==0) { printf("服务器连接成功n"); while(1) { MQTT_Init(); /*登录服务器*/ if(MQTT_Connect(ClientID,Username,Password)==0) { break; } sleep(1); printf("服务器连接中....n"); } printf("连接成功rn"); //订阅物联网平台数据 stat=MQTT_SubscribeTopic(SET_TOPIC,1,1); if(stat) { close(sockfd); printf("订阅失败rn"); exit(0); } printf("订阅成功rn"); /*创建线程*/ pthread_create(&id, NULL,pth_work_func,NULL); pthread_detach(id);//设置分离属性 alarm(3);//闹钟函数,时间到达会产生SIGALRM信号 int a=0; while(1) { sprintf(mqtt_message,"{"method":"thing.event.property.post","params":{"image":"阿里云物联网平台测试"}}"); MQTT_PublishData(POST_TOPIC,mqtt_message,0);//发布数据 } } }
3.云产品流转
云产品流转文档: https: //help. aliyun. com/document_detail/68677. html
3.1 什么是云产品流转
设备基于 Topic 与物联网平台进行通信时, 您可以在数据流转中, 编写 SQL 对 Topic 中的数据进行处理, 并配置转发规则将处理后的数据转发到其他设备 Topic 或阿里云其他服务。
3.2 云产品流转配置
1 .创建解析器
2.关联数据源
3.关联数据目的
4.编写解析器脚本
解析器说明文档: https: //help. aliyun. com/document_detail/270931. html
格式示例:
//通过 payload 函数, 获取设备上报的消息内容, 并按照 JSON 格式转换。 var data = payload("json"); //直接流转物模型上报数据。 writeIotTopic(1000, topic, data);
topic 如下:
编辑好后发布即可, 至此, 阿里物联网平台配置完成。
4.代码实现
4.1 设备 A 发送方
1 .USB 摄像头应用编程
采用 Linux 下 V4L2 框架初始化 USB 摄像头, 采集图像数据。
/* 摄像头初始化 返回值:成功返回摄像头描述符,失败返回负数 */ int Video_Init(struct CAMERA *camera) { int video_fd; int i=0; /*1.打开设备节点*/ video_fd=open(VIDEO_DEV,O_RDWR); if(video_fd==-1)return -1; /*2.设置摄像头格式*/ struct v4l2_format format; memset(&format,0,sizeof(format)); format.type=V4L2_BUF_TYPE_VIDEO_CAPTURE;//视频捕获格式 format.fmt.pix.width=320; format.fmt.pix.height=240; format.fmt.pix.pixelformat=V4L2_PIX_FMT_YUYV;//图像数据格式yuyv if(ioctl(video_fd,VIDIOC_S_FMT,&format))return -2; printf("图像尺寸:%d * %dn",format.fmt.pix.width,format.fmt.pix.height); camera->image_w=format.fmt.pix.width; camera->image_h=format.fmt.pix.height; /*3.向内核请求缓冲区*/ struct v4l2_requestbuffers reqbuf; memset(&reqbuf,0,sizeof(reqbuf)); reqbuf.count=4;/*缓冲区个数*/ reqbuf.type=V4L2_BUF_TYPE_VIDEO_CAPTURE;//视频捕获格式 reqbuf.memory=V4L2_MEMORY_MMAP;/*内存映射*/ if(ioctl(video_fd,VIDIOC_REQBUFS,&reqbuf))return -3; printf("缓冲区个数:%dn",reqbuf.count); /*4.将缓冲区映射到进程空间*/ struct v4l2_buffer quebuff; for(i=0;imamp_buff[i]=mmap(NULL,quebuff.length,PROT_READ|PROT_WRITE,MAP_SHARED,video_fd,quebuff.m.offset); printf("buff[%d]=%pn",i,camera->mamp_buff[i]); camera->mmap_size=quebuff.length; } /*5.将缓冲区添加到采集队列*/ for(i=0;i
2.图片编码处理
实时采集图像数据, 将图片数据编码为 jpg 图像格式, 再进 base64 格式编码。Base64 编码是一种将二进制数据转换为 ASCII 字符的方法, 它使用 64 个字符来表示任意序列的二进制数据。 Base64 编码后的数据长度会比原始二进制数据略长, 但可以方便地被转换为文本格式并在网络上进行传输。
3.base64 格式编码
#include static const char * base64char = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; /* 函数功能:将图片编码为base64格式 形参:bindata 源图片数据 base64 编码后的数据 binlength --源文件大小 返回值:返回编码后的base64数据 */ char * base64_encode( const unsigned char * bindata, char * base64, int binlength ) { int i, j; unsigned char current; for ( i = 0, j = 0 ; i < binlength ; i += 3 ) { current = (bindata[i] >> 2) ; current &= (unsigned char)0x3F; base64[j++] = base64char[(int)current]; current = ( (unsigned char)(bindata[i] << 4 ) ) & ( (unsigned char)0x30 ) ; if ( i + 1 >= binlength ) { base64[j++] = base64char[(int)current]; base64[j++] = '='; base64[j++] = '='; break; } current |= ( (unsigned char)(bindata[i+1] >> 4) ) & ( (unsigned char) 0x0F ); base64[j++] = base64char[(int)current]; current = ( (unsigned char)(bindata[i+1] << 2) ) & ( (unsigned char)0x3C ) ; if ( i + 2 >= binlength ) { base64[j++] = base64char[(int)current]; base64[j++] = '='; break; } current |= ( (unsigned char)(bindata[i+2] >> 6) ) & ( (unsigned char) 0x03 ); base64[j++] = base64char[(int)current]; current = ( (unsigned char)bindata[i+2] ) & ( (unsigned char)0x3F ) ; base64[j++] = base64char[(int)current]; } base64[j] = ''; return base64; }
4. base64 格式解码
/* 函数功能:base64格式数据解码 形参:base64 base64格式数据 bindata 保存解码成功的图像数据 返回值:成功返回解码的图像大小 */ static const char * base64char = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; int base64_decode( const char * base64, unsigned char * bindata ) { int i, j; unsigned char k; unsigned char temp[4]; for ( i = 0, j = 0; base64[i] != '' ; i += 4 ) { memset( temp, 0xFF, sizeof(temp) ); for ( k = 0 ; k < 64 ; k ++ ) { if ( base64char[k] == base64[i] ) temp[0]= k; } for ( k = 0 ; k < 64 ; k ++ ) { if ( base64char[k] == base64[i+1] ) temp[1]= k; } for ( k = 0 ; k < 64 ; k ++ ) { if ( base64char[k] == base64[i+2] ) temp[2]= k; } for ( k = 0 ; k < 64 ; k ++ ) { if ( base64char[k] == base64[i+3] ) temp[3]= k; } bindata[j++] = ((unsigned char)(((unsigned char)(temp[0] << 2))&0xFC)) | ((unsigned char)((unsigned char)(temp[1]>>4)&0x03)); if ( base64[i+2] == '=' ) break; bindata[j++] = ((unsigned char)(((unsigned char)(temp[1] << 4))&0xF0)) | ((unsigned char)((unsigned char)(temp[2]>>2)&0x0F)); if ( base64[i+3] == '=' ) break; bindata[j++] = ((unsigned char)(((unsigned char)(temp[2] << 6))&0xF0)) | ((unsigned char)(temp[3]&0x3F)); } return j; }
5.数据上报
Linux 下 socket 网络编程, 连接阿里云服务器, 接入阿里云物联网平台, 通过 MQTT 协议实时上报数据。
4.2 设备 B 订阅方
1 .数据获取
Linux 下 socket 网络编程, 连接阿里云服务器, 接入阿里云物联网平台, 订阅设备 A 端发送的消息。
2.数据解析
物联网平台下发消息格式为 JSON 格式, 进行消息数据解析, 提取图像数据, 将图像数据进行 base64 格式解码得到 JPG 图片数据。
3.JPG 图片解析
解析 JPG 图片获取到 RGB 颜色数据。
//显示JPEG 编译时加-ljpeg int LCD_ShowJPEG(unsigned char *jpg_buffer,int size,struct ImageDecodingInfo *image_rgb) { struct jpeg_decompress_struct cinfo; //存放图像的数据 struct jpeg_error_mgr jerr; //存放错误信息 unsigned char *buffer; unsigned int i,j; unsigned int color; static int written; /*init jpeg压缩对象错误处理程序*/ cinfo.err = jpeg_std_error(&jerr); //初始化标准错误,用来存放错误信息 jpeg_create_decompress(&cinfo); //创建解压缩结构信息 jpeg_mem_src(&cinfo, (unsigned char *)jpg_buffer, size); //jpeg_stdio_src(&cinfo, infile); /*读jpeg头*/ jpeg_read_header(&cinfo, TRUE); /*开始解压*/ jpeg_start_decompress(&cinfo); #if 0 printf("JPEG图片高度: %dn",cinfo.output_height); printf("JPEG图片宽度: %dn",cinfo.output_width); printf("JPEG图片颜色位数(字节单位): %dn",cinfo.output_components); #endif image_rgb->Height=cinfo.output_height; image_rgb->Width=cinfo.output_width; unsigned char *rgb_data=image_rgb->rgb; /*为一条扫描线上的像素点分配存储空间,一行一行的解码*/ int row_stride = cinfo.output_width * cinfo.output_components; buffer = (unsigned char *)malloc(row_stride); //将图片内容显示到framebuffer上,cinfo.output_scanline表示当前行的位置,读取数据是会自动增加 i=0; while(cinfo.output_scanline < cinfo.output_height) { //读取一行的数据 jpeg_read_scanlines(&cinfo,&buffer,1); memcpy(rgb_data + i * cinfo.output_width * 3, buffer, row_stride); i++; } /*完成解压,摧毁解压对象*/ jpeg_finish_decompress(&cinfo); //结束解压 jpeg_destroy_decompress(&cinfo); //释放结构体占用的空间 /*释放内存缓冲区*/ free(buffer); return 0; }
4.GTK 窗口渲染
创建 GTK 窗口, 将原始图片进行缩放, 实时渲染图像数据。
/*****************************BMP图片放大缩小************************ **image_rgb --图像结构体信息 **int lcd_width,int lcd_hight --屏幕大小 **返回值:0 -- 成功; 其它值 -- 失败 *********************************************************************/ int ZoomInandOut(struct ImageDecodingInfo *image_rgb,int lcd_width,int lcd_hight) { //printf("源图片宽:%dn",image_rgb->Width); //printf("源图片高:%dn",image_rgb->Height); u32 w=image_rgb->Width; u32 h=image_rgb->Height; u8 *src_rgb=image_rgb->rgb;//源图片RGB值 unsigned long oneline_byte=w*3;//一行字节数 float zoom_count=0; /*按比例缩放*/ zoom_count=(lcd_width/(w*1.0)) > (lcd_hight/(h*1.0)) ? (lcd_hight/(h*1.0)):(lcd_width/(w*1.0)); int new_w,new_h; new_w=zoom_count*w;//新图片宽 new_h=zoom_count*h;//新图片高 //printf("新图片宽:%dn",new_w); //printf("新图片高:%dn", new_h); //printf("缩放比例:%.0f%%n",(new_w*1.0/w)*100); unsigned long new_oneline_byte=new_w*3; unsigned char *newbmp_buff=(unsigned char *)malloc(new_h*new_oneline_byte);//动态分配新图片RGB颜色数据缓冲区 if(newbmp_buff==NULL) { printf("[%s line %d]动态分配空间失败n",__FUNCTION__,__LINE__); return -1; } memset(newbmp_buff, 0, new_h*new_oneline_byte); /************************图像处理算法(双线性插值)*******************************/ int i,j; for(i=0;irgb,newbmp_buff,new_h*new_oneline_byte);//新图像RGB数据 image_rgb->Width=new_w;//新图像宽 image_rgb->Height=new_h;//新图像高 free(newbmp_buff); return 0; }
4.3 项目效果
审核编辑黄宇
-
物联网
+关注
关注
2909文章
44578浏览量
372882 -
阿里云
+关注
关注
3文章
952浏览量
43010 -
MQTT
+关注
关注
5文章
650浏览量
22489
发布评论请先 登录
相关推荐
评论