完善资料让更多小伙伴认识你,还能领取20积分哦, 立即完善>
|
|
相关推荐
1个回答
|
|
本文在前一篇的基础上进行MQTT的移植,并实现对步进电机驱动器的控制。
分两步完成: 1、移植MQTT协议栈,并进行验证; 2、对步进电机进行控制。 一,移植MQTT协议 以前一篇文章完成的代码为基础,在工程目录下的APP文件夹中新建一个文件夹命名为MQTTClient, 把paho.mqtt.embedded-c源码包中的 paho.mqtt.embedded-cMQTTPacketsrc下的: MQTTConnect.h MQTTConnectClient.c MQTTDeserializePublish.c MQTTFormat.h MQTTPacket.c MQTTPacket.h MQTTPublish.h MQTTSerializePublish.c MQTTSubscribe.h MQTTSubscribeClient.c MQTTUnsubscribe.h MQTTUnsubscribeClient.c StackTrace.h 拷贝到MQTTClient文件夹中。 接下来,就是实现两个层面的连接处理: 1、tcp/ip建立连接,或由socket连接完成。 这部分放在 MQTTFreeRTOSImpl.c MQTTFreeRTOSImpl.h 2、MQTT连接。 这部分放在: MQTTClient.c MQTTClient.h MQTTFreeRTOSImpl.h的代码如下: #ifndef MQTTFREERTOSIMPL_H #define MQTTFREERTOSIMPL_H #include “FreeRTOS.h” #include “portmacro.h” typedef struct Timer Timer; struct Timer { portTickType end_time; }; typedef struct Network Network; struct Network { int my_socket; int (*mqttread) (Network*, unsigned char*, int, int); int (*mqttwrite) (Network*, unsigned char*, int, int); }; char expired(Timer*); void countdown_ms(Timer*, unsigned int); void countdown(Timer*, unsigned int); int left_ms(Timer*); void InitTimer(Timer*); int FreeRTOS_MQTT_read(Network*, unsigned char*, int, int); int FreeRTOS_MQTT_write(Network*, unsigned char*, int, int); void FreeRTOS_MQTT_disconnect(Network*); void NewNetwork(Network* n); int ConnectNetwork(Network* n, const char* host, int port); int DisconnectNetwork(Network* n); #endif /* MQTTFREERTOSIMPL_H */ MQTTFreeRTOSImpl.c的代码如下: /* ²Î¿¼https://github.com/baoshi/ESP-RTOS-Paho/tree/63c2c74dfe978f215b3bb05f7e1258454908c4fb */ #include 《string.h》 #include “FreeRTOS.h” #include “portmacro.h” #include “lwip/sockets.h” #include “lwip/inet.h” #include “lwip/netdb.h” #include “lwip/sys.h” #include “MQTTFreeRTOSImpl.h” #define _DEBUG #include “dprintf.h” char expired(Timer* timer) { portTickType now = xTaskGetTickCount(); int32_t left = timer-》end_time - now; return (left 《 0); } void countdown_ms(Timer* timer, unsigned int timeout) { portTickType now = xTaskGetTickCount(); timer-》end_time = now + timeout / portTICK_RATE_MS; } void countdown(Timer* timer, unsigned int timeout) { countdown_ms(timer, timeout); } int left_ms(Timer* timer) { portTickType now = xTaskGetTickCount(); int32_t left = timer-》end_time - now; return (left 《 0) ? 0 : left / portTICK_RATE_MS; } void InitTimer(Timer* timer) { timer-》end_time = 0; } #include “task.h” #include “timers.h” __asm void _nop(void) { nop } void delay(int i) { for(;i》0;i--) _nop(); } int FreeRTOS_MQTT_read(Network* n, unsigned char* buffer, int len, int timeout_ms) { TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */ TimeOut_t xTimeOut; int recvLen = 0; vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */ do { int rc = 0; lwip_setsockopt(n-》my_socket, SOL_SOCKET, SO_RCVTIMEO, &xTicksToWait, sizeof(TickType_t)); rc = recv(n-》my_socket, buffer + recvLen, len - recvLen, 0); //dprintf(“rc=%dn”,rc); delay(50);//must delay enough,otherwise,it will be blocked if (rc 》= 0) recvLen += rc; else if (rc 《0) { recvLen = rc; break; } } while (recvLen 《 len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE); return recvLen; } int FreeRTOS_MQTT_write(Network* n, unsigned char* buffer, int len, int timeout_ms) { TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */ TimeOut_t xTimeOut; int sentLen = 0; vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */ do { int rc = 0; lwip_setsockopt(n-》my_socket, SOL_SOCKET, SO_SNDTIMEO, &xTicksToWait, sizeof(xTicksToWait)); rc = send(n-》my_socket, buffer + sentLen, len - sentLen, 0); if (rc 》 0) sentLen += rc; else if (rc 《 0) { sentLen = rc; break; } } while (sentLen 《 len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE); return sentLen; } void NewNetwork(Network* n) { n-》my_socket = -1; n-》mqttread = FreeRTOS_MQTT_read; n-》mqttwrite = FreeRTOS_MQTT_write; } int host2addr(const char *hostname , struct in_addr *in) { struct addrinfo hints, *servinfo, *p; struct sockaddr_in *h; int rv; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; rv = getaddrinfo(hostname, 0 , &hints , &servinfo); if (rv != 0) { return rv; } // loop through all the results and get the first resolve for (p = servinfo; p != 0; p = p-》ai_next) { h = (struct sockaddr_in *)p-》ai_addr; in-》s_addr = h-》sin_addr.s_addr; } freeaddrinfo(servinfo); // all done with this structure return 0; } int ConnectNetwork(Network* n, const char* host, int port) { struct sockaddr_in addr; int ret; if (host2addr(host, &(addr.sin_addr)) != 0) { return -1; } addr.sin_family = AF_INET; addr.sin_port = htons(port); n-》my_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); if( n-》my_socket 《 0 ) { // error return -1; } ret = connect(n-》my_socket, ( struct sockaddr *)&addr, sizeof(struct sockaddr_in)); if( ret 《 0 ) { // error lwip_close(n-》my_socket); return ret; } return ret; } int DisconnectNetwork(Network* n) { lwip_close(n-》my_socket); n-》my_socket = -1; return 0; } MQTTClient.h代码如下: /******************************************************************************* * Copyright (c) 2014 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ #ifndef __MQTT_CLIENT_C_ #define __MQTT_CLIENT_C_ #include “MQTTPacket.h” #include “MQTTFreeRTOSImpl.h” #define MAX_PACKET_ID 65535 #define MAX_MESSAGE_HANDLERS 5 #define MAX_FAIL_ALLOWED 2 enum QoS { QOS0, QOS1, QOS2 }; // all failure return codes must be negative enum returnCode {DISCONNECTED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; void NewTimer(Timer*); typedef struct _MQTTMessage { enum QoS qos; char retained; char dup; unsigned short id; void *payload; size_t payloadlen; } MQTTMessage; typedef struct _MessageData { MQTTString* topic; MQTTMessage* message; } MessageData; typedef void (*messageHandler)(MessageData*); struct _MQTTClient { unsigned int next_packetid; unsigned int command_timeout_ms; size_t buf_size, readbuf_size; unsigned char *buf; unsigned char *readbuf; unsigned int keepAliveInterval; char ping_outstanding; int fail_count; int isconnected; struct MessageHandlers { const char* topicFilter; void (*fp) (MessageData*); } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic void (*defaultMessageHandler) (MessageData*); Network* ipstack; Timer ping_timer; }; typedef struct _MQTTClient MQTTClient; int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options); int MQTTPublish(MQTTClient* c, const char* topic, MQTTMessage* message); int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandler handler); int MQTTUnsubscribe(MQTTClient* c, const char* topic); int MQTTDisconnect(MQTTClient* c); int MQTTYield(MQTTClient* c, int timeout_ms); void NewMQTTClient(MQTTClient*, Network*, unsigned int, unsigned char*, size_t, unsigned char*, size_t); #define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0} #endif MQTTClient.c的代码如下: /******************************************************************************* * Copyright (c) 2014 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ #include “MQTTClient.h” //#define _DEBUG #include “dprintf.h” void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessgage) { md-》topic = aTopicName; md-》message = aMessgage; } int getNextPacketId(MQTTClient *c) { return c-》next_packetid = (c-》next_packetid == MAX_PACKET_ID) ? 1 : c-》next_packetid + 1; } int sendPacket(MQTTClient* c, int length, Timer* timer) { int rc = FAILURE, sent = 0; while (sent 《 length && !expired(timer)) { rc = c-》ipstack-》mqttwrite(c-》ipstack, &c-》buf[sent], length, left_ms(timer)); if (rc 《 0) // there was an error writing the data break; sent += rc; } if (sent == length) { countdown(&(c-》ping_timer), c-》keepAliveInterval); // record the fact that we have successfully sent the packet rc = SUCCESS; } else rc = FAILURE; return rc; } int decodePacket(MQTTClient* c, int* value, int timeout) { unsigned char i; int multiplier = 1; int len = 0; const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; *value = 0; do { int rc = MQTTPACKET_READ_ERROR; if (++len 》 MAX_NO_OF_REMAINING_LENGTH_BYTES) { rc = MQTTPACKET_READ_ERROR; /* bad data */ goto exit; } rc = c-》ipstack-》mqttread(c-》ipstack, &i, 1, timeout); if (rc != 1) goto exit; *value += (i & 127) * multiplier; multiplier *= 128; } while ((i & 128) != 0); exit: return len; } int readPacket(MQTTClient* c, Timer* timer) { int rc = FAILURE; MQTTHeader header = {0}; int len = 0; int rem_len = 0; int data_real_len=0; /* 1. read the header byte. This has the packet type in it */ if (c-》ipstack-》mqttread(c-》ipstack, c-》readbuf, 1, left_ms(timer)) != 1) goto exit; len = 1; /* 2. read the remaining length. This is variable in itself */ decodePacket(c, &rem_len, left_ms(timer)); len += MQTTPacket_encode(c-》readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ /* 3. read the rest of the buffer using a callback to supply the rest of the data */ if (rem_len 》 0 && ((data_real_len=c-》ipstack-》mqttread(c-》ipstack, c-》readbuf + len, rem_len, left_ms(timer))) != rem_len)) { goto exit; } #if 0 /* Commented @ 2017-Apr-25 1:34:59 */ else { int i; dprintf(“data len=%d ,,,, data_real_len=%dn”,rem_len,data_real_len); dprintf(“c-》readbuf=%pn”,c-》readbuf); for(i=0;i《data_real_len;i++) { printf(“%02X ,”,c-》readbuf[i]); if(i%10==9) printf(“n”); } } #endif /* Commented */ header.byte = c-》readbuf[0]; rc = header.bits.type; exit: dprintf(“readPacket=%dn”, rc); return rc; } // assume topic filter and name is in correct format // # can only be at end // + and # can only be next to separator char isTopicMatched(char* topicFilter, MQTTString* topicName) { char* curf = topicFilter; char* curn = topicName-》lenstring.data; char* curn_end = curn + topicName-》lenstring.len; while (*curf && curn 《 curn_end) { if (*curn == ‘/’ && *curf != ‘/’) break; if (*curf != ‘+’ && *curf != ‘#’ && *curf != *curn) break; if (*curf == ‘+’) { // skip until we meet the next separator, or end of string char* nextpos = curn + 1; while (nextpos 《 curn_end && *nextpos != ‘/’) nextpos = ++curn + 1; } else if (*curf == ‘#’) curn = curn_end - 1; // skip until end of string curf++; curn++; }; return (curn == curn_end) && (*curf == ‘ ’); } int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) { int i; int rc = FAILURE; // we have to find the right message handler - indexed by topic for (i = 0; i 《 MAX_MESSAGE_HANDLERS; ++i) { if (c-》messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c-》messageHandlers[i].topicFilter) || isTopicMatched((char*)c-》messageHandlers[i].topicFilter, topicName))) { if (c-》messageHandlers[i].fp != NULL) { MessageData md; NewMessageData(&md, topicName, message); c-》messageHandlers[i].fp(&md); rc = SUCCESS; } } } if (rc == FAILURE && c-》defaultMessageHandler != NULL) { MessageData md; NewMessageData(&md, topicName, message); c-》defaultMessageHandler(&md); rc = SUCCESS; } return rc; } int keepalive(MQTTClient* c) { int rc = SUCCESS; if (c-》keepAliveInterval == 0) { rc = SUCCESS; goto exit; } if (expired(&(c-》ping_timer))) { if (c-》ping_outstanding) { // if ping failure accumulated above MAX_FAIL_ALLOWED, the connection is broken ++(c-》fail_count); if (c-》fail_count 》= MAX_FAIL_ALLOWED) { rc = DISCONNECTED; goto exit; } } else { Timer timer; int len; InitTimer(&timer); countdown_ms(&timer, 1000); c-》ping_outstanding = 1; len = MQTTSerialize_pingreq(c-》buf, c-》buf_size); if (len 》 0){dprintf(“send pingn”); sendPacket(c, len, &timer);} } // re-arm ping counter countdown(&(c-》ping_timer), c-》keepAliveInterval); } exit: return rc; } int cycle(MQTTClient* c, Timer* timer) { // read the socket, see what work is due int packet_type = readPacket(c, timer); int len = 0, rc = SUCCESS; dprintf(“packet_type=%dn”,packet_type); switch (packet_type) { case CONNACK: case PUBACK: case SUBACK: break; case PUBLISH: { MQTTString topicName; MQTTMessage msg; if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c-》readbuf, c-》readbuf_size) != 1) goto exit; deliverMessage(c, &topicName, &msg); if (msg.qos != QOS0) { if (msg.qos == QOS1) len = MQTTSerialize_ack(c-》buf, c-》buf_size, PUBACK, 0, msg.id); else if (msg.qos == QOS2) len = MQTTSerialize_ack(c-》buf, c-》buf_size, PUBREC, 0, msg.id); if (len 《= 0) rc = FAILURE; else rc = sendPacket(c, len, timer); if (rc == FAILURE) goto exit; // there was a problem } break; } case PUBREC: { unsigned short mypacketid; unsigned char dup, type; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c-》readbuf, c-》readbuf_size) != 1) rc = FAILURE; else if ((len = MQTTSerialize_ack(c-》buf, c-》buf_size, PUBREL, 0, mypacketid)) 《= 0) rc = FAILURE; else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet rc = FAILURE; // there was a problem if (rc == FAILURE) goto exit; // there was a problem break; } case PUBCOMP: break; case PINGRESP: { c-》ping_outstanding = 0; c-》fail_count = 0; } break; } if (c-》isconnected) rc = keepalive(c); exit: if (rc == SUCCESS) rc = packet_type; return rc; } void NewMQTTClient(MQTTClient* c, Network* network, unsigned int command_timeout_ms, unsigned char* buf, size_t buf_size, unsigned char* readbuf, size_t readbuf_size) { int i; c-》ipstack = network; for (i = 0; i 《 MAX_MESSAGE_HANDLERS; ++i) c-》messageHandlers[i].topicFilter = 0; c-》command_timeout_ms = command_timeout_ms; c-》buf = buf; c-》buf_size = buf_size; c-》readbuf = readbuf; c-》readbuf_size = readbuf_size; c-》isconnected = 0; c-》ping_outstanding = 0; c-》fail_count = 0; c-》defaultMessageHandler = NULL; InitTimer(&(c-》ping_timer)); } int MQTTYield(MQTTClient* c, int timeout_ms) { int rc = SUCCESS; Timer timer; InitTimer(&timer); countdown_ms(&timer, timeout_ms); while (!expired(&timer)) { rc = cycle(c, &timer); // cycle could return 0 or packet_type or 65535 if nothing is read // cycle returns DISCONNECTED only if keepalive() fails. if (rc == DISCONNECTED) break; rc = SUCCESS; } return rc; } // only used in single-threaded mode where one command at a time is in process int waitfor(MQTTClient* c, int packet_type, Timer* timer) { int rc = FAILURE; do { if (expired(timer)) break; // we timed out } while ((rc = cycle(c, timer)) != packet_type); return rc; } int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) { Timer connect_timer; int rc = FAILURE; MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; int len = 0; InitTimer(&connect_timer); countdown_ms(&connect_timer, c-》command_timeout_ms); if (c-》isconnected) // don‘t send connect packet again if we are already connected goto exit; if (options == 0) options = &default_options; // set default options if none were supplied c-》keepAliveInterval = options-》keepAliveInterval; countdown(&(c-》ping_timer), c-》keepAliveInterval); if ((len = MQTTSerialize_connect(c-》buf, c-》buf_size, options)) 《= 0) goto exit; if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet goto exit; // there was a problem // this will be a blocking call, wait for the connack if (waitfor(c, CONNACK, &connect_timer) == CONNACK) { unsigned char connack_rc = 255; char sessionPresent = 0; if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, c-》readbuf, c-》readbuf_size) == 1) rc = connack_rc; else rc = FAILURE; } else rc = FAILURE; exit: if (rc == SUCCESS) c-》isconnected = 1; return rc; } int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandler handler) { int rc = FAILURE; Timer timer; int len = 0; MQTTString topicStr = MQTTString_initializer; topicStr.cstring = (char *)topic; InitTimer(&timer); countdown_ms(&timer, c-》command_timeout_ms); if (!c-》isconnected) goto exit; len = MQTTSerialize_subscribe(c-》buf, c-》buf_size, 0, getNextPacketId(c), 1, &topicStr, (int*)&qos); if (len 《= 0) goto exit; if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet { goto exit; // there was a problem } if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback { int count = 0, grantedQoS = -1; unsigned short mypacketid; if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c-》readbuf, c-》readbuf_size) == 1) rc = grantedQoS; // 0, 1, 2 or 0x80 if (rc != 0x80) { int i; for (i = 0; i 《 MAX_MESSAGE_HANDLERS; ++i) { if (c-》messageHandlers[i].topicFilter == 0) { c-》messageHandlers[i].topicFilter = topic; c-》messageHandlers[i].fp = handler; rc = 0; break; } } } } else rc = FAILURE; exit: return rc; } int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) { int rc = FAILURE; int len = 0; Timer timer; MQTTString topic = MQTTString_initializer; topic.cstring = (char *)topicFilter; InitTimer(&timer); countdown_ms(&timer, c-》command_timeout_ms); if (!c-》isconnected) goto exit; if ((len = MQTTSerialize_unsubscribe(c-》buf, c-》buf_size, 0, getNextPacketId(c), 1, &topic)) 《= 0) goto exit; if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet goto exit; // there was a problem if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) { unsigned short mypacketid; // should be the same as the packetid above if (MQTTDeserialize_unsuback(&mypacketid, c-》readbuf, c-》readbuf_size) == 1) rc = 0; } else rc = FAILURE; exit: return rc; } int MQTTPublish(MQTTClient* c, const char* topic, MQTTMessage* message) { int rc = FAILURE; int len = 0; Timer timer; MQTTString topicStr = MQTTString_initializer; topicStr.cstring = (char *)topic; InitTimer(&timer); countdown_ms(&timer, c-》command_timeout_ms); if (!c-》isconnected) goto exit; if (message-》qos == QOS1 || message-》qos == QOS2) message-》id = getNextPacketId(c); len = MQTTSerialize_publish(c-》buf, c-》buf_size, 0, message-》qos, message-》retained, message-》id, topicStr, (unsigned char*)message-》payload, message-》payloadlen); if (len 《= 0) goto exit; if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet { goto exit; // there was a problem } if (message-》qos == QOS1) { if (waitfor(c, PUBACK, &timer) == PUBACK) { unsigned short mypacketid; unsigned char dup, type; // We still can receive from broker, treat as recoverable c-》fail_count = 0; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c-》readbuf, c-》readbuf_size) != 1) rc = FAILURE; else rc = SUCCESS; } else { rc = FAILURE; } } else if (message-》qos == QOS2) { if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) { unsigned short mypacketid; unsigned char dup, type; // We still can receive from broker, treat as recoverable c-》fail_count = 0; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c-》readbuf, c-》readbuf_size) != 1) rc = FAILURE; else rc = SUCCESS; } else { rc = FAILURE; } } exit: return rc; } int MQTTDisconnect(MQTTClient* c) { int rc = FAILURE; Timer timer; // we might wait for incomplete incoming publishes to complete int len = MQTTSerialize_disconnect(c-》buf, c-》buf_size); InitTimer(&timer); countdown_ms(&timer, c-》command_timeout_ms); if (len 》 0) rc = sendPacket(c, len, &timer); // send the disconnect packet c-》isconnected = 0; return rc; } 把MQTTClient.c和MQTTFreeRTOSImpl.c加入到IDE中,并把。.APPMQTTClient和。.APP加入到包含路径中。 编译一下通过。 然后写个测试程序:TestTask.c: #include “FreeRTOS.h” #include “task.h” #include “queue.h” #include “MQTTClient.h” #include 《stdio.h》 //#define _DEBUG #include “dprintf.h” #define USR_SD_CARD_INI // testing mosquitto server #define MQTT_HOST “10.0.0.108” #define MQTT_PORT 1883 #define MQTT_USER “” #define MQTT_PASS “” #define mqtt_client_id “MQTT on STM32” #define SUB_TOPIC_1 “FCS/out” void topic_received(MessageData* md) { int i; MQTTMessage* message = md-》message; printf(“Received Topic[”); for (i = 0; i 《 md-》topic-》lenstring.len; ++i) printf(“%c”,md-》topic-》lenstring.data[i]); printf(“], Message[ ”); for (i = 0; i 《 (int)message-》payloadlen; ++i) printf(“%c”,((char*)message-》payload)[i]); printf(“]n”); } xQueueHandle publish_queue; #define PUB_MSG_LEN 16 #include “common.h” void mqtt_task(void *pvParameters) { int ret; struct Network network; MQTTClient client = DefaultClient; //char mqtt_client_id[20]; unsigned char mqtt_buf[100]; unsigned char mqtt_readbuf[100]; MQTTPacket_connectData data = MQTTPacket_connectData_initializer; NewNetwork(&network); while (1) { #ifndef USR_SD_CARD_INI printf(“(Re)connecting to MQTT server %s 。.. ”, MQTT_HOST); ret = ConnectNetwork(&network, MQTT_HOST, MQTT_PORT); #else if(InitMQTTServerInfo()《0) continue; printf(“(Re)connecting to MQTT server %s:%d 。.. ”, getMQTTServerIP(),getMQTTServerPort()); ret = ConnectNetwork(&network, getMQTTServerIP(), getMQTTServerPort()); #endif if (!ret) { printf(“ok.rn”); NewMQTTClient(&client, &network, 1000, mqtt_buf, 100, mqtt_readbuf, 100); data.willFlag = 0; data.MQTTVersion = 3; data.clientID.cstring = mqtt_client_id; #ifndef USR_SD_CARD_INI data.username.cstring = MQTT_USER; data.password.cstring = MQTT_PASS; #else data.username.cstring = getMQTTServerName(); data.password.cstring = getMQTTServerPassword(); #endif data.keepAliveInterval = 60; data.cleansession = 1; printf(“Send MQTT connect 。..”); ret = MQTTConnect(&client, &data); if (!ret) { printf(“ok.rn”); // Subscriptions MQTTSubscribe(&client, SUB_TOPIC_1, QOS1, topic_received); // Empty the publish queue xQueueReset(publish_queue);dprintf(“n”); while (1) { dprintf(“n”); // Receiving / Ping ret = MQTTYield(&client, 1000); dprintf(“n”); if (ret == DISCONNECTED) { dprintf(“DISCONNECTEDn”); break; } } printf(“Connection broken, request restartrn”); } else { printf(“failed.rn”); } DisconnectNetwork(&network);dprintf(“n”); } else { printf(“failed.rn”); } vTaskDelay(1000 / portTICK_RATE_MS); } printf(“MQTT task ended ,ret=%dn”, ret); vTaskDelete(NULL); } #define MQTTClient_TASK_PRIO ( tskIDLE_PRIORITY+2 ) void testTask(void) { printf(“MQTT: TestTask started.。.n”); publish_queue = xQueueCreate(3, PUB_MSG_LEN); xTaskCreate(mqtt_task, “mqtt”, 1024, NULL, MQTTClient_TASK_PRIO, NULL); } 这部分代码如果没有用sd卡的话,可以把 #define USR_SD_CARD_INI 注释掉,并把: #define MQTT_HOST “10.0.0.108” 修改为待连接的ip地址。 然后,把它存到APP目录下,并加入到IDE上。 然后修改一下BSP/netconf.c,在路由动态分配IP,打印消息以后,加入下面代码: extern void testTask(void); testTask(); 意思是,当动态IP分配完毕之后,立即创建一个任务,来执行我们的MQTT测试。 编译链接,把hex档烧到开发板上,然后上电运行。 对了,板子上电前别忘了把sd卡中的ip配置文件修改成本地电脑的ip: [server] ip=10.0.0.108 port=1883 name= password= 好了,下面验证一下: 在电脑上打开cmd,启动mosquitto 可以看到: 好了,MQTT已经跑起来了。 心跳包也正常。 然后,我们再开一个cmd窗口,用mosquito发一个话题为FCS/out的消息如下: mosquitto_pub -h 127.0.0.1 -m “{idx:0,act:1}” -t “FCS/out” 可以看到: 已经收到消息了。 说明MQTTClient程序工作正常。 接下来我们做一个mosquitto_pub另外的一种使用方法来测试一下。 因为后面我们将会发送带有换行符的消息,这样我们在应用程序中可以借用换行符来对消息进行解析。 而在测试中,不能使用mosquitto_pub命令行直接输入消息文本的方式来发送带有换行符的消息。幸好,mosquitto_pub有另一种方法,就是发送文件数据,有两种方式: 1、mosquitto_pub -t my/topic -f 。/data 2、mosquitto_pub -t my/topic -s 《 。/data 我们选用了第二种方式。 为了测试,我们写一个批处理文件,来临时性的设置windows下的path环境变量,以便在本地目录中使用mosquito命令行程序: @set path==%path%;%MOSQUITTO_DIR%“@cmd.exe 把以上代码保存为:test.bat 然后双击运行: 然后我们创建两个文本文件:mytxt1.txt、mytxt2.txt 内容分别如下: mytxt1.txt: {idx:0,act:1} mytxt2.txt {idx:0,act:0} 把两个文本跟test.bat放在同目录下。 然后在刚才test.bat调出来的cmd界面上就可以使用下面两行命令进行测试了: mosquitto_pub -t ”FCS/out“ -s 《 。/mytxt1.txtmosquitto_pub -t ”FCS/out“ -s 《 。/mytxt2.txt 可以看到: ok,测试完成。 二、对步进电机进行控制 本人使用的是16细分的步进电机驱动器,型号为 :ZD6560 v3c 使用共阴极的接法: STM32芯片管脚跟电机驱动器的对应关系如下: PC5(定义为Dir) ——–DIR PC6(定义为Free) ——-EN PC7(定义为Pulse)——-CP 脱机使能逻辑为: 0或悬空:正常工作(hold); 1:脱机。 所谓正常工作是指,无论有无脉冲,驱动器对电机线圈都一直供电,如果有脉冲,则按脉冲数据供电; 所谓脱机是指,无论有无脉冲,驱动器对电机线圈都不供电。 关于步进电机控制的设置说明: 我们使用的设置: 上图是侧面的拨码开关,向下是ON,向上是OFF。 按照说明书的设置规范,我们目前使用的是 16细分 电流1.2A 衰减:慢 本文的STM32芯片使用GPIO在硬件定时器的控制下输出方波的方式提供电机驱动器所需脉冲,之所以这样是因为本文需要对步进电机的步数进行统计和控制。因此,代码中使用了硬件定时器(TIM4)。进而需要在启动代码stm32f10x.s中开启TIM4的定时中断,并在stm32fx_it.c中添加定时器中断服务程序代码。还需要添加TIM4的配置初始化代码等。 对于MQTT消息的解析部分的程序框架可以参考: 由于代码比较多,就不一一贴上来了。 稍微说明一下: 本代码中的步进电机控制只是验证性的控制,如用在实际控制环境中还需要进一步完善或修改。 还是发几张效果图吧: 下一篇,将使用CrossApp框架是用mosquitto库开发PC端或移动端的对步进电机的控制应用程序。 |
|
|
|
只有小组成员才能发言,加入小组>>
调试STM32H750的FMC总线读写PSRAM遇到的问题求解?
1784 浏览 1 评论
X-NUCLEO-IHM08M1板文档中输出电流为15Arms,15Arms是怎么得出来的呢?
1621 浏览 1 评论
1088 浏览 2 评论
STM32F030F4 HSI时钟温度测试过不去是怎么回事?
729 浏览 2 评论
ST25R3916能否对ISO15693的标签芯片进行分区域写密码?
1680 浏览 2 评论
1938浏览 9评论
STM32仿真器是选择ST-LINK还是选择J-LINK?各有什么优势啊?
734浏览 4评论
STM32F0_TIM2输出pwm2后OLED变暗或者系统重启是怎么回事?
570浏览 3评论
596浏览 3评论
stm32cubemx生成mdk-arm v4项目文件无法打开是什么原因导致的?
559浏览 3评论
小黑屋| 手机版| Archiver| 电子发烧友 ( 湘ICP备2023018690号 )
GMT+8, 2024-12-24 20:28 , Processed in 0.951961 second(s), Total 76, Slave 60 queries .
Powered by 电子发烧友网
© 2015 bbs.elecfans.com
关注我们的微信
下载发烧友APP
电子发烧友观察
版权所有 © 湖南华秋数字科技有限公司
电子发烧友 (电路图) 湘公网安备 43011202000918 号 电信与信息服务业务经营许可证:合字B2-20210191 工商网监 湘ICP备2023018690号