You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mk_thread.cpp 4.6KB

8 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*
  2. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #include "mk_thread.h"
  11. #include "mk_tcp_private.h"
  12. #include "Util/logger.h"
  13. #include "Poller/EventPoller.h"
  14. #include "Thread/WorkThreadPool.h"
  15. using namespace std;
  16. using namespace toolkit;
  17. API_EXPORT mk_thread API_CALL mk_thread_from_tcp_session(mk_tcp_session ctx){
  18. assert(ctx);
  19. SessionForC *obj = (SessionForC *)ctx;
  20. return obj->getPoller().get();
  21. }
  22. API_EXPORT mk_thread API_CALL mk_thread_from_tcp_client(mk_tcp_client ctx){
  23. assert(ctx);
  24. TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
  25. return (*client)->getPoller().get();
  26. }
  27. API_EXPORT mk_thread API_CALL mk_thread_from_pool(){
  28. return EventPollerPool::Instance().getPoller().get();
  29. }
  30. API_EXPORT mk_thread API_CALL mk_thread_from_pool_work(){
  31. return WorkThreadPool::Instance().getPoller().get();
  32. }
  33. API_EXPORT void API_CALL mk_async_do(mk_thread ctx,on_mk_async cb, void *user_data){
  34. assert(ctx && cb);
  35. EventPoller *poller = (EventPoller *)ctx;
  36. poller->async([cb,user_data](){
  37. cb(user_data);
  38. });
  39. }
  40. API_EXPORT void API_CALL mk_async_do_delay(mk_thread ctx, size_t ms, on_mk_async cb, void *user_data) {
  41. assert(ctx && cb && ms);
  42. EventPoller *poller = (EventPoller *) ctx;
  43. poller->doDelayTask(ms, [cb, user_data]() {
  44. cb(user_data);
  45. return 0;
  46. });
  47. }
  48. API_EXPORT void API_CALL mk_sync_do(mk_thread ctx,on_mk_async cb, void *user_data){
  49. assert(ctx && cb);
  50. EventPoller *poller = (EventPoller *)ctx;
  51. poller->sync([cb,user_data](){
  52. cb(user_data);
  53. });
  54. }
  55. class TimerForC : public std::enable_shared_from_this<TimerForC>{
  56. public:
  57. using Ptr = std::shared_ptr<TimerForC>;
  58. TimerForC(on_mk_timer cb, void *user_data){
  59. _cb = cb;
  60. _user_data = user_data;
  61. }
  62. ~TimerForC(){}
  63. uint64_t operator()(){
  64. lock_guard<recursive_mutex> lck(_mxt);
  65. if(!_cb){
  66. return 0;
  67. }
  68. return _cb(_user_data);
  69. }
  70. void cancel(){
  71. lock_guard<recursive_mutex> lck(_mxt);
  72. _cb = nullptr;
  73. _task->cancel();
  74. }
  75. void start(uint64_t ms ,EventPoller &poller){
  76. weak_ptr<TimerForC> weak_self = shared_from_this();
  77. _task = poller.doDelayTask(ms, [weak_self]() {
  78. auto strong_self = weak_self.lock();
  79. if (!strong_self) {
  80. return (uint64_t) 0;
  81. }
  82. return (*strong_self)();
  83. });
  84. }
  85. private:
  86. on_mk_timer _cb = nullptr;
  87. void *_user_data = nullptr;
  88. recursive_mutex _mxt;
  89. EventPoller::DelayTask::Ptr _task;
  90. };
  91. API_EXPORT mk_timer API_CALL mk_timer_create(mk_thread ctx,uint64_t delay_ms,on_mk_timer cb, void *user_data){
  92. assert(ctx && cb);
  93. EventPoller *poller = (EventPoller *)ctx;
  94. TimerForC::Ptr *ret = new TimerForC::Ptr(new TimerForC(cb, user_data));
  95. (*ret)->start(delay_ms,*poller);
  96. return ret;
  97. }
  98. API_EXPORT void API_CALL mk_timer_release(mk_timer ctx){
  99. assert(ctx);
  100. TimerForC::Ptr *obj = (TimerForC::Ptr *)ctx;
  101. (*obj)->cancel();
  102. delete obj;
  103. }
  104. class WorkThreadPoolForC : public TaskExecutorGetterImp {
  105. public:
  106. ~WorkThreadPoolForC() override = default;
  107. WorkThreadPoolForC(const char *name, size_t n_thread, int priority) {
  108. //最低优先级
  109. addPoller(name, n_thread, (ThreadPool::Priority) priority, false);
  110. }
  111. EventPoller::Ptr getPoller() {
  112. return dynamic_pointer_cast<EventPoller>(getExecutor());
  113. }
  114. };
  115. API_EXPORT mk_thread_pool API_CALL mk_thread_pool_create(const char *name, size_t n_thread, int priority) {
  116. return new WorkThreadPoolForC(name, n_thread, priority);
  117. }
  118. API_EXPORT int API_CALL mk_thread_pool_release(mk_thread_pool pool) {
  119. assert(pool);
  120. delete (WorkThreadPoolForC *) pool;
  121. return 0;
  122. }
  123. API_EXPORT mk_thread API_CALL mk_thread_from_thread_pool(mk_thread_pool pool) {
  124. assert(pool);
  125. return ((WorkThreadPoolForC *) pool)->getPoller().get();
  126. }
  127. API_EXPORT mk_sem API_CALL mk_sem_create() {
  128. return new toolkit::semaphore;
  129. }
  130. API_EXPORT void API_CALL mk_sem_release(mk_sem sem) {
  131. assert(sem);
  132. delete (toolkit::semaphore *) sem;
  133. }
  134. API_EXPORT void API_CALL mk_sem_post(mk_sem sem, size_t n) {
  135. assert(sem);
  136. ((toolkit::semaphore *) sem)->post(n);
  137. }
  138. API_EXPORT void API_CALL mk_sem_wait(mk_sem sem) {
  139. assert(sem);
  140. ((toolkit::semaphore *) sem)->wait();
  141. }