libvmx
VMX Codec
Loading...
Searching...
No Matches
thread_tasks.h
Go to the documentation of this file.
1/*
2* MIT License
3*
4* Copyright (c) 2025 Open Media Transport Contributors
5*
6* Permission is hereby granted, free of charge, to any person obtaining a copy
7* of this software and associated documentation files (the "Software"), to deal
8* in the Software without restriction, including without limitation the rights
9* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10* copies of the Software, and to permit persons to whom the Software is
11* furnished to do so, subject to the following conditions:
12*
13* The above copyright notice and this permission notice shall be included in all
14* copies or substantial portions of the Software.
15*
16* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22* SOFTWARE.
23*
24*/
25
26#pragma once
27#include <thread>
28#include <queue>
29#include <mutex>
30#include <condition_variable>
31#include <functional>
32
34{
35 std::thread thread;
36 std::queue<std::function<void()>> queue;
37 std::mutex mtx;
38 std::condition_variable cv;
39 std::condition_variable complete;
40 bool running = false;
41
42 void Join()
43 {
44 std::unique_lock<std::mutex> lock(mtx);
45 if (queue.size() > 0)
46 {
47 complete.wait(lock);
48 }
49 }
50
51 void TaskLoop()
52 {
53 while (running)
54 {
55 std::unique_lock<std::mutex> lock(mtx);
56 if (queue.size() == 0)
57 {
58 complete.notify_all();
59 cv.wait(lock);
60 }
61 if (!running) break;
62 std::function<void()> func = NULL;
63 if (queue.size() > 0)
64 {
65 func = queue.front();
66 queue.pop();
67 }
68 if (func)
69 {
70 func();
71 }
72
73 }
74 }
76 {
77 running = true;
78 queue = std::queue<std::function<void()>>();
79 thread = std::thread(&ThreadTask::TaskLoop, this);
80 }
81 void Push(std::function<void()> task)
82 {
83 {
84 std::lock_guard<std::mutex> lock(mtx);
85 queue.push(task);
86 }
87 cv.notify_all();
88 }
89 void Destroy()
90 {
91 {
92 std::lock_guard<std::mutex> lock(mtx);
93 running = false;
94 }
95 cv.notify_all();
96 thread.join();
97 }
98};
99
105
106static ThreadTasks* CreateTasks(int numThreads)
107{
108 ThreadTasks* th = new ThreadTasks();
109 th->numThreads = numThreads;
110 th->tasks = new ThreadTask*[numThreads];
111 for (int i = 0; i < numThreads; i++)
112 {
113 ThreadTask* task = new ThreadTask();
114 th->tasks[i] = task;
115 task->Initialize();
116 }
117 return th;
118}
119
120static void DestroyTasks(ThreadTasks* tasks)
121{
122 if (tasks)
123 {
124 for (int i = 0; i < tasks->numThreads; i++)
125 {
126 tasks->tasks[i]->Destroy();
127 ThreadTask* task = tasks->tasks[i];
128 delete task;
129 }
130 delete tasks;
131 }
132}
133
Definition thread_tasks.h:34
std::condition_variable complete
Definition thread_tasks.h:39
std::condition_variable cv
Definition thread_tasks.h:38
bool running
Definition thread_tasks.h:40
void Initialize()
Definition thread_tasks.h:75
std::queue< std::function< void()> > queue
Definition thread_tasks.h:36
void Join()
Definition thread_tasks.h:42
void Destroy()
Definition thread_tasks.h:89
std::thread thread
Definition thread_tasks.h:35
void TaskLoop()
Definition thread_tasks.h:51
std::mutex mtx
Definition thread_tasks.h:37
void Push(std::function< void()> task)
Definition thread_tasks.h:81
Definition thread_tasks.h:101
int numThreads
Definition thread_tasks.h:102
ThreadTask ** tasks
Definition thread_tasks.h:103