You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mk_tcp.cpp 13KB

8 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. /*
  2. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #include "string.h"
  11. #include "mk_tcp.h"
  12. #include "mk_tcp_private.h"
  13. #include "Http/WebSocketClient.h"
  14. #include "Http/WebSocketSession.h"
  15. #include "Network/Buffer.h"
  16. using namespace toolkit;
  17. using namespace mediakit;
  18. class BufferForC : public Buffer {
  19. public:
  20. BufferForC(const char *data, size_t len, on_mk_buffer_free cb, void *user_data) {
  21. if (len <= 0) {
  22. len = strlen(data);
  23. }
  24. if (!cb) {
  25. auto ptr = malloc(len);
  26. memcpy(ptr, data, len);
  27. data = (char *) ptr;
  28. cb = [](void *user_data, void *data) {
  29. free(data);
  30. };
  31. }
  32. _data = (char *) data;
  33. _size = len;
  34. _cb = cb;
  35. _user_data = user_data;
  36. }
  37. ~BufferForC() override {
  38. _cb(_user_data, _data);
  39. }
  40. char *data() const override {
  41. return _data;
  42. }
  43. size_t size() const override {
  44. return _size;
  45. }
  46. private:
  47. char *_data;
  48. size_t _size;
  49. on_mk_buffer_free _cb;
  50. void *_user_data;
  51. };
  52. API_EXPORT mk_buffer API_CALL mk_buffer_from_char(const char *data, size_t len, on_mk_buffer_free cb, void *user_data) {
  53. assert(data);
  54. return new Buffer::Ptr(std::make_shared<BufferForC>(data, len, cb, user_data));
  55. }
  56. API_EXPORT mk_buffer API_CALL mk_buffer_ref(mk_buffer buffer) {
  57. assert(buffer);
  58. return new Buffer::Ptr(*((Buffer::Ptr *) buffer));
  59. }
  60. API_EXPORT void API_CALL mk_buffer_unref(mk_buffer buffer) {
  61. assert(buffer);
  62. delete (Buffer::Ptr *) buffer;
  63. }
  64. API_EXPORT const char *API_CALL mk_buffer_get_data(mk_buffer buffer) {
  65. assert(buffer);
  66. return (*((Buffer::Ptr *) buffer))->data();
  67. }
  68. API_EXPORT size_t API_CALL mk_buffer_get_size(mk_buffer buffer) {
  69. assert(buffer);
  70. return (*((Buffer::Ptr *) buffer))->size();
  71. }
  72. API_EXPORT const char* API_CALL mk_sock_info_peer_ip(const mk_sock_info ctx, char *buf){
  73. assert(ctx);
  74. SockInfo *sock = (SockInfo *)ctx;
  75. strcpy(buf,sock->get_peer_ip().c_str());
  76. return buf;
  77. }
  78. API_EXPORT const char* API_CALL mk_sock_info_local_ip(const mk_sock_info ctx, char *buf){
  79. assert(ctx);
  80. SockInfo *sock = (SockInfo *)ctx;
  81. strcpy(buf,sock->get_local_ip().c_str());
  82. return buf;
  83. }
  84. API_EXPORT uint16_t API_CALL mk_sock_info_peer_port(const mk_sock_info ctx){
  85. assert(ctx);
  86. SockInfo *sock = (SockInfo *)ctx;
  87. return sock->get_peer_port();
  88. }
  89. API_EXPORT uint16_t API_CALL mk_sock_info_local_port(const mk_sock_info ctx){
  90. assert(ctx);
  91. SockInfo *sock = (SockInfo *)ctx;
  92. return sock->get_local_port();
  93. }
  94. ////////////////////////////////////////////////////////////////////////////////////////
  95. API_EXPORT mk_sock_info API_CALL mk_tcp_session_get_sock_info(const mk_tcp_session ctx){
  96. assert(ctx);
  97. SessionForC *session = (SessionForC *)ctx;
  98. return (SockInfo *)session;
  99. }
  100. API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg){
  101. assert(ctx);
  102. SessionForC *session = (SessionForC *)ctx;
  103. session->safeShutdown(SockException((ErrCode)err,err_msg));
  104. }
  105. API_EXPORT void API_CALL mk_tcp_session_send_buffer(const mk_tcp_session ctx, mk_buffer buffer) {
  106. assert(ctx && buffer);
  107. SessionForC *session = (SessionForC *) ctx;
  108. session->send(*((Buffer::Ptr *) buffer));
  109. }
  110. API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx, const char *data, size_t len) {
  111. auto buffer = mk_buffer_from_char(data, len, nullptr, nullptr);
  112. mk_tcp_session_send_buffer(ctx, buffer);
  113. mk_buffer_unref(buffer);
  114. }
  115. API_EXPORT void API_CALL mk_tcp_session_send_buffer_safe(const mk_tcp_session ctx, mk_buffer buffer) {
  116. assert(ctx && buffer);
  117. try {
  118. std::weak_ptr<Session> weak_session = ((SessionForC *) ctx)->shared_from_this();
  119. auto ref = mk_buffer_ref(buffer);
  120. ((SessionForC *) ctx)->async([weak_session, ref]() {
  121. auto session_session = weak_session.lock();
  122. if (session_session) {
  123. session_session->send(*((Buffer::Ptr *) ref));
  124. }
  125. mk_buffer_unref(ref);
  126. });
  127. } catch (std::exception &ex) {
  128. WarnL << "can not got the strong pionter of this mk_tcp_session:" << ex.what();
  129. }
  130. }
  131. API_EXPORT mk_tcp_session_ref API_CALL mk_tcp_session_ref_from(const mk_tcp_session ctx) {
  132. auto ref = ((SessionForC *) ctx)->shared_from_this();
  133. return new std::shared_ptr<SessionForC>(std::dynamic_pointer_cast<SessionForC>(ref));
  134. }
  135. API_EXPORT void mk_tcp_session_ref_release(const mk_tcp_session_ref ref) {
  136. delete (std::shared_ptr<SessionForC> *) ref;
  137. }
  138. API_EXPORT mk_tcp_session mk_tcp_session_from_ref(const mk_tcp_session_ref ref) {
  139. return ((std::shared_ptr<SessionForC> *) ref)->get();
  140. }
  141. API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx, const char *data, size_t len) {
  142. auto buffer = mk_buffer_from_char(data, len, nullptr, nullptr);
  143. mk_tcp_session_send_buffer_safe(ctx, buffer);
  144. mk_buffer_unref(buffer);
  145. }
  146. ////////////////////////////////////////SessionForC////////////////////////////////////////////////
  147. static TcpServer::Ptr s_tcp_server[4];
  148. static mk_tcp_session_events s_events_server = {0};
  149. SessionForC::SessionForC(const Socket::Ptr &pSock) : Session(pSock) {
  150. _local_port = get_local_port();
  151. if (s_events_server.on_mk_tcp_session_create) {
  152. s_events_server.on_mk_tcp_session_create(_local_port,this);
  153. }
  154. }
  155. void SessionForC::onRecv(const Buffer::Ptr &buffer) {
  156. if (s_events_server.on_mk_tcp_session_data) {
  157. s_events_server.on_mk_tcp_session_data(_local_port, this, (mk_buffer)&buffer);
  158. }
  159. }
  160. void SessionForC::onError(const SockException &err) {
  161. if (s_events_server.on_mk_tcp_session_disconnect) {
  162. s_events_server.on_mk_tcp_session_disconnect(_local_port,this, err.getErrCode(), err.what());
  163. }
  164. }
  165. void SessionForC::onManager() {
  166. if (s_events_server.on_mk_tcp_session_manager) {
  167. s_events_server.on_mk_tcp_session_manager(_local_port,this);
  168. }
  169. }
  170. void stopAllTcpServer(){
  171. CLEAR_ARR(s_tcp_server);
  172. }
  173. API_EXPORT void API_CALL mk_tcp_session_set_user_data(mk_tcp_session session,void *user_data){
  174. assert(session);
  175. SessionForC *obj = (SessionForC *)session;
  176. obj->_user_data = user_data;
  177. }
  178. API_EXPORT void* API_CALL mk_tcp_session_get_user_data(mk_tcp_session session){
  179. assert(session);
  180. SessionForC *obj = (SessionForC *)session;
  181. return obj->_user_data;
  182. }
  183. API_EXPORT void API_CALL mk_tcp_server_events_listen(const mk_tcp_session_events *events){
  184. if (events) {
  185. memcpy(&s_events_server, events, sizeof(s_events_server));
  186. } else {
  187. memset(&s_events_server, 0, sizeof(s_events_server));
  188. }
  189. }
  190. API_EXPORT uint16_t API_CALL mk_tcp_server_start(uint16_t port, mk_tcp_type type){
  191. type = MAX(mk_type_tcp, MIN(type, mk_type_wss));
  192. try {
  193. s_tcp_server[type] = std::make_shared<TcpServer>();
  194. switch (type) {
  195. case mk_type_tcp:
  196. s_tcp_server[type]->start<SessionForC>(port);
  197. break;
  198. case mk_type_ssl:
  199. s_tcp_server[type]->start<SessionWithSSL<SessionForC> >(port);
  200. break;
  201. case mk_type_ws:
  202. //此处你也可以修改WebSocketHeader::BINARY
  203. s_tcp_server[type]->start<WebSocketSession<SessionForC, HttpSession, WebSocketHeader::TEXT> >(port);
  204. break;
  205. case mk_type_wss:
  206. //此处你也可以修改WebSocketHeader::BINARY
  207. s_tcp_server[type]->start<WebSocketSession<SessionForC, HttpsSession, WebSocketHeader::TEXT> >(port);
  208. break;
  209. default:
  210. return 0;
  211. }
  212. return s_tcp_server[type]->getPort();
  213. } catch (std::exception &ex) {
  214. s_tcp_server[type].reset();
  215. WarnL << ex.what();
  216. return 0;
  217. }
  218. }
  219. ///////////////////////////////////////////////////TcpClientForC/////////////////////////////////////////////////////////
  220. TcpClientForC::TcpClientForC(mk_tcp_client_events *events){
  221. _events = *events;
  222. }
  223. void TcpClientForC::onRecv(const Buffer::Ptr &pBuf) {
  224. if (_events.on_mk_tcp_client_data) {
  225. _events.on_mk_tcp_client_data(_client, (mk_buffer)&pBuf);
  226. }
  227. }
  228. void TcpClientForC::onErr(const SockException &ex) {
  229. if(_events.on_mk_tcp_client_disconnect){
  230. _events.on_mk_tcp_client_disconnect(_client,ex.getErrCode(),ex.what());
  231. }
  232. }
  233. void TcpClientForC::onManager() {
  234. if(_events.on_mk_tcp_client_manager){
  235. _events.on_mk_tcp_client_manager(_client);
  236. }
  237. }
  238. void TcpClientForC::onConnect(const SockException &ex) {
  239. if(_events.on_mk_tcp_client_connect){
  240. _events.on_mk_tcp_client_connect(_client,ex.getErrCode(),ex.what());
  241. }
  242. }
  243. TcpClientForC::~TcpClientForC() {
  244. TraceL << "mk_tcp_client_release:" << _client;
  245. }
  246. void TcpClientForC::setClient(mk_tcp_client client) {
  247. _client = client;
  248. TraceL << "mk_tcp_client_create:" << _client;
  249. }
  250. TcpClientForC::Ptr *mk_tcp_client_create_l(mk_tcp_client_events *events, mk_tcp_type type){
  251. assert(events);
  252. type = MAX(mk_type_tcp, MIN(type, mk_type_wss));
  253. switch (type) {
  254. case mk_type_tcp:
  255. return new TcpClientForC::Ptr(new TcpClientForC(events));
  256. case mk_type_ssl:
  257. return (TcpClientForC::Ptr *)new std::shared_ptr<SessionWithSSL<TcpClientForC> >(new SessionWithSSL<TcpClientForC>(events));
  258. case mk_type_ws:
  259. //此处你也可以修改WebSocketHeader::BINARY
  260. return (TcpClientForC::Ptr *)new std::shared_ptr<WebSocketClient<TcpClientForC, WebSocketHeader::TEXT, false> >(new WebSocketClient<TcpClientForC, WebSocketHeader::TEXT, false>(events));
  261. case mk_type_wss:
  262. //此处你也可以修改WebSocketHeader::BINARY
  263. return (TcpClientForC::Ptr *)new std::shared_ptr<WebSocketClient<TcpClientForC, WebSocketHeader::TEXT, true> >(new WebSocketClient<TcpClientForC, WebSocketHeader::TEXT, true>(events));
  264. default:
  265. return nullptr;
  266. }
  267. }
  268. API_EXPORT mk_sock_info API_CALL mk_tcp_client_get_sock_info(const mk_tcp_client ctx){
  269. assert(ctx);
  270. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
  271. return (SockInfo *)client->get();
  272. }
  273. API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *events, mk_tcp_type type){
  274. auto ret = mk_tcp_client_create_l(events,type);
  275. (*ret)->setClient(ret);
  276. return ret;
  277. }
  278. API_EXPORT void API_CALL mk_tcp_client_release(mk_tcp_client ctx){
  279. assert(ctx);
  280. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
  281. delete client;
  282. }
  283. API_EXPORT void API_CALL mk_tcp_client_connect(mk_tcp_client ctx, const char *host, uint16_t port, float time_out_sec){
  284. assert(ctx);
  285. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
  286. (*client)->startConnect(host,port);
  287. }
  288. API_EXPORT void API_CALL mk_tcp_client_send_buffer(mk_tcp_client ctx, mk_buffer buffer) {
  289. assert(ctx && buffer);
  290. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *) ctx;
  291. (*client)->send(*((Buffer::Ptr *) buffer));
  292. }
  293. API_EXPORT void API_CALL mk_tcp_client_send(mk_tcp_client ctx, const char *data, int len) {
  294. auto buffer = mk_buffer_from_char(data, len, nullptr, nullptr);
  295. mk_tcp_client_send_buffer(ctx, buffer);
  296. mk_buffer_unref(buffer);
  297. }
  298. API_EXPORT void API_CALL mk_tcp_client_send_buffer_safe(mk_tcp_client ctx, mk_buffer buffer) {
  299. assert(ctx && buffer);
  300. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *) ctx;
  301. std::weak_ptr<TcpClient> weakClient = *client;
  302. auto ref = mk_buffer_ref(buffer);
  303. (*client)->async([weakClient, ref]() {
  304. auto strongClient = weakClient.lock();
  305. if (strongClient) {
  306. strongClient->send(*((Buffer::Ptr *) ref));
  307. }
  308. mk_buffer_unref(ref);
  309. });
  310. }
  311. API_EXPORT void API_CALL mk_tcp_client_send_safe(mk_tcp_client ctx, const char *data, int len){
  312. auto buffer = mk_buffer_from_char(data, len, nullptr, nullptr);
  313. mk_tcp_client_send_buffer_safe(ctx, buffer);
  314. mk_buffer_unref(buffer);
  315. }
  316. API_EXPORT void API_CALL mk_tcp_client_set_user_data(mk_tcp_client ctx,void *user_data){
  317. assert(ctx);
  318. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
  319. (*client)->_user_data = user_data;
  320. }
  321. API_EXPORT void* API_CALL mk_tcp_client_get_user_data(mk_tcp_client ctx){
  322. assert(ctx);
  323. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
  324. return (*client)->_user_data;
  325. }