This is a thread pool and the single management "main" thread will spawn out free threads in the pool to perform specific work. If there are no free threads then it will block until one becomes available. It uses ForThreads, which is a wrapper around pthreads. The thread pool works by creating a number of threads and then passing the work to these threads, rather than creating a new thread for each piece of work.
More...
|
subroutine, public | threadpool_init (io_configuration) |
| Initialises the thread pool and marks each thread as idle. More...
|
|
subroutine, public | threadpool_lock_netcdf_access () |
| Aquires the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it. More...
|
|
subroutine, public | threadpool_unlock_netcdf_access () |
| Releases the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it. More...
|
|
subroutine, public | threadpool_start_thread (proc, arguments, data_buffer) |
| Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle. More...
|
|
subroutine | threadpool_thread_entry_procedure (thread_id) |
| Entry point called by each thread creation in the pool, this calls out to the actual procedure to execute and doing it this way allows us to perform some actions before or after which can help with the management of the pool. More...
|
|
logical function, public | threadpool_is_idle () |
| Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work) More...
|
|
subroutine, public | threadpool_deactivate () |
| This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation and finalisation procedures are split out as we want to deactivate the pool (to ensure no threads are running actions), finalise these actions which might involve destroying mutexes, and then destroying the threading environment in finalisation. More...
|
|
subroutine, public | threadpool_finalise () |
| Finalises the thread pool. More...
|
|
integer function | find_idle_thread () |
| Finds an idle thread, if one is not available then will block until one becomes free. More...
|
|
integer function | get_index_of_idle_thread () |
| Specifically gets the index of the next idle thread or -1 if they are all busy. This starts from a next suggested idle thread and will wrap around, as often the next thread will be idle rather than searching from the beginning again. More...
|
|
subroutine, public | check_thread_status (ierr) |
| Checks the error status of any thread operation and reports an error if it failed. More...
|
|
|
integer, parameter | default_thread_pool_size =10 |
| Number of threads in the pool. More...
|
|
logical, dimension(:), allocatable, volatile | thread_busy |
|
logical, dimension(:), allocatable, volatile | thread_start |
|
integer, dimension(:), allocatable, volatile | thread_ids |
|
integer, dimension(:), allocatable, volatile | thread_pass_data |
|
integer, dimension(:), allocatable, volatile | activate_thread_condition_variables |
|
integer, dimension(:), allocatable, volatile | activate_thread_mutex |
|
type(threaded_procedure_container_type), dimension(:), allocatable, volatile | thread_entry_containers |
|
integer, volatile | netcdfmutex |
| Mutex used for controling NetCDF access. More...
|
|
integer, volatile | next_suggested_idle_thread |
|
logical, volatile | threadpool_active |
|
integer, volatile | active_threads |
|
integer, volatile | total_number_of_threads |
|
integer, volatile | active_scalar_mutex |
|
This is a thread pool and the single management "main" thread will spawn out free threads in the pool to perform specific work. If there are no free threads then it will block until one becomes available. It uses ForThreads, which is a wrapper around pthreads. The thread pool works by creating a number of threads and then passing the work to these threads, rather than creating a new thread for each piece of work.
◆ check_thread_status()
subroutine, public threadpool_mod::check_thread_status |
( |
integer, intent(in) |
ierr | ) |
|
Checks the error status of any thread operation and reports an error if it failed.
- Parameters
-
ierr | The error/success flag returned from the ForThreads library, which itself is returned from pthreads |
Definition at line 228 of file threadpool.F90.
229 integer,
intent(in) :: ierr
231 if (ierr .ne. 0)
then
232 call log_log(
log_error,
"Pthreads error in IO server, error code="//conv_to_string(ierr))
◆ find_idle_thread()
integer function threadpool_mod::find_idle_thread |
|
private |
Finds an idle thread, if one is not available then will block until one becomes free.
- Returns
- The id of the idle thread which can be used
Definition at line 201 of file threadpool.F90.
202 find_idle_thread=get_index_of_idle_thread()
203 do while (find_idle_thread == -1)
204 find_idle_thread=get_index_of_idle_thread()
◆ get_index_of_idle_thread()
integer function threadpool_mod::get_index_of_idle_thread |
|
private |
Specifically gets the index of the next idle thread or -1 if they are all busy. This starts from a next suggested idle thread and will wrap around, as often the next thread will be idle rather than searching from the beginning again.
- Returns
- The index of the next idle thread or -1 if there is none
Definition at line 211 of file threadpool.F90.
214 do i=next_suggested_idle_thread, total_number_of_threads
215 if (.not. thread_busy(i))
then
216 get_index_of_idle_thread=i
217 next_suggested_idle_thread=i+1
218 if (next_suggested_idle_thread .gt. total_number_of_threads) next_suggested_idle_thread=1
222 next_suggested_idle_thread=1
223 get_index_of_idle_thread=-1
◆ threadpool_deactivate()
subroutine, public threadpool_mod::threadpool_deactivate |
This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation and finalisation procedures are split out as we want to deactivate the pool (to ensure no threads are running actions), finalise these actions which might involve destroying mutexes, and then destroying the threading environment in finalisation.
Definition at line 173 of file threadpool.F90.
175 integer,
pointer :: retval
179 threadpool_active=.false.
180 do i=1, total_number_of_threads
181 call check_thread_status(forthread_mutex_lock(activate_thread_mutex(i)))
182 call check_thread_status(forthread_cond_signal(activate_thread_condition_variables(i)))
183 call check_thread_status(forthread_mutex_unlock(activate_thread_mutex(i)))
184 call check_thread_status(forthread_join(thread_ids(i),retval))
185 call check_thread_status(forthread_mutex_destroy(activate_thread_mutex(i)))
186 call check_thread_status(forthread_cond_destroy(activate_thread_condition_variables(i)))
◆ threadpool_finalise()
subroutine, public threadpool_mod::threadpool_finalise |
Finalises the thread pool.
Definition at line 191 of file threadpool.F90.
192 call check_thread_status(forthread_mutex_destroy(netcdfmutex))
193 call check_thread_status(forthread_mutex_destroy(active_scalar_mutex))
194 deallocate(thread_busy, thread_start, thread_ids, thread_pass_data, activate_thread_condition_variables, &
195 activate_thread_mutex, thread_entry_containers)
196 call check_thread_status(forthread_destroy())
◆ threadpool_init()
subroutine, public threadpool_mod::threadpool_init |
( |
type(io_configuration_type), intent(inout) |
io_configuration | ) |
|
Initialises the thread pool and marks each thread as idle.
Definition at line 50 of file threadpool.F90.
51 type(io_configuration_type),
intent(inout) :: io_configuration
55 call check_thread_status(forthread_init())
56 call check_thread_status(forthread_mutex_init(netcdfmutex, -1))
57 if (io_configuration%number_of_threads .ge. 1)
then
58 total_number_of_threads=io_configuration%number_of_threads
60 if (io_configuration%my_io_rank==0)
then
61 call log_log(
log_warn,
"No setting for IO server thread pool size which must be 1 or more so using default size")
63 total_number_of_threads=default_thread_pool_size
65 allocate(thread_busy(total_number_of_threads), thread_start(total_number_of_threads), &
66 thread_ids(total_number_of_threads), thread_pass_data(total_number_of_threads), &
67 activate_thread_condition_variables(total_number_of_threads), activate_thread_mutex(total_number_of_threads), &
68 thread_entry_containers(total_number_of_threads))
69 threadpool_active=.true.
70 active_threads=total_number_of_threads
71 next_suggested_idle_thread=1
72 call check_thread_status(forthread_mutex_init(active_scalar_mutex, -1))
73 do n=1, total_number_of_threads
74 call check_thread_status(forthread_cond_init(activate_thread_condition_variables(n), -1))
75 call check_thread_status(forthread_mutex_init(activate_thread_mutex(n), -1))
76 thread_busy(n)=.false.
77 thread_start(n)=.false.
79 call check_thread_status(forthread_create(thread_ids(n), -1, threadpool_thread_entry_procedure, thread_pass_data(n)))
◆ threadpool_is_idle()
logical function, public threadpool_mod::threadpool_is_idle |
Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work)
- Returns
- Whether the thread pool is idle
Definition at line 163 of file threadpool.F90.
165 call check_thread_status(forthread_mutex_lock(active_scalar_mutex))
166 threadpool_is_idle = active_threads==total_number_of_threads
167 call check_thread_status(forthread_mutex_unlock(active_scalar_mutex))
◆ threadpool_lock_netcdf_access()
subroutine, public threadpool_mod::threadpool_lock_netcdf_access |
Aquires the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it.
Definition at line 84 of file threadpool.F90.
85 #ifdef ENFORCE_THREAD_SAFETY
86 call check_thread_status(forthread_mutex_lock(netcdfmutex))
◆ threadpool_start_thread()
subroutine, public threadpool_mod::threadpool_start_thread |
( |
procedure(thread_procedure) |
proc, |
|
|
integer, dimension(:), intent(in) |
arguments, |
|
|
character, dimension(:), intent(in), optional, allocatable |
data_buffer |
|
) |
| |
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
- Parameters
-
proc | The procedure for the thread to execute |
data | Data to pass into the thread |
Definition at line 101 of file threadpool.F90.
102 procedure(thread_procedure) :: proc
103 integer,
dimension(:),
intent(in) :: arguments
104 character,
dimension(:),
allocatable,
intent(in),
optional :: data_buffer
106 integer :: idle_thread_id
108 if (.not. threadpool_active)
call log_log(
log_error,
"Attemping to start IO thread on deactivated thread pool")
110 idle_thread_id=find_idle_thread()
111 if (idle_thread_id .ne. -1)
then
112 thread_busy(idle_thread_id)=.true.
113 thread_entry_containers(idle_thread_id)%proc=>proc
114 allocate(thread_entry_containers(idle_thread_id)%arguments(
size(arguments)))
115 thread_entry_containers(idle_thread_id)%arguments=arguments
116 if (
present(data_buffer))
allocate(thread_entry_containers(idle_thread_id)%data_buffer(
size(data_buffer)), &
119 call check_thread_status(forthread_mutex_lock(activate_thread_mutex(idle_thread_id)))
120 thread_start(idle_thread_id)=.true.
121 call check_thread_status(forthread_cond_signal(activate_thread_condition_variables(idle_thread_id)))
122 call check_thread_status(forthread_mutex_unlock(activate_thread_mutex(idle_thread_id)))
◆ threadpool_thread_entry_procedure()
subroutine threadpool_mod::threadpool_thread_entry_procedure |
( |
integer |
thread_id | ) |
|
|
private |
Entry point called by each thread creation in the pool, this calls out to the actual procedure to execute and doing it this way allows us to perform some actions before or after which can help with the management of the pool.
- Parameters
-
thread_id | The thread pool id (index) of this thread |
Definition at line 129 of file threadpool.F90.
132 do while (threadpool_active)
133 call check_thread_status(forthread_mutex_lock(activate_thread_mutex(thread_id)))
134 do while (.not. thread_start(thread_id) .and. threadpool_active)
135 call check_thread_status(forthread_cond_wait(activate_thread_condition_variables(thread_id), &
136 activate_thread_mutex(thread_id)))
138 call check_thread_status(forthread_mutex_unlock(activate_thread_mutex(thread_id)))
139 if (.not. threadpool_active)
return
140 thread_busy(thread_id)=.true.
141 thread_start(thread_id)=.false.
143 call check_thread_status(forthread_mutex_lock(active_scalar_mutex))
144 active_threads=active_threads-1
145 call check_thread_status(forthread_mutex_unlock(active_scalar_mutex))
146 if (
allocated(thread_entry_containers(thread_id)%data_buffer))
then
147 call thread_entry_containers(thread_id)%proc(thread_entry_containers(thread_id)%arguments, &
148 data_buffer=thread_entry_containers(thread_id)%data_buffer)
149 deallocate(thread_entry_containers(thread_id)%data_buffer)
151 call thread_entry_containers(thread_id)%proc(thread_entry_containers(thread_id)%arguments)
153 deallocate(thread_entry_containers(thread_id)%arguments)
154 call check_thread_status(forthread_mutex_lock(active_scalar_mutex))
155 active_threads=active_threads+1
156 call check_thread_status(forthread_mutex_unlock(active_scalar_mutex))
157 thread_busy(thread_id)=.false.
◆ threadpool_unlock_netcdf_access()
subroutine, public threadpool_mod::threadpool_unlock_netcdf_access |
Releases the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it.
Definition at line 91 of file threadpool.F90.
92 #ifdef ENFORCE_THREAD_SAFETY
93 call check_thread_status(forthread_mutex_unlock(netcdfmutex))
◆ activate_thread_condition_variables
integer, dimension(:), allocatable, volatile threadpool_mod::activate_thread_condition_variables |
|
private |
Definition at line 37 of file threadpool.F90.
37 integer,
volatile,
dimension(:),
allocatable :: activate_thread_condition_variables, activate_thread_mutex
◆ activate_thread_mutex
integer, dimension(:), allocatable, volatile threadpool_mod::activate_thread_mutex |
|
private |
◆ active_scalar_mutex
integer, volatile threadpool_mod::active_scalar_mutex |
|
private |
◆ active_threads
integer, volatile threadpool_mod::active_threads |
|
private |
Definition at line 43 of file threadpool.F90.
43 integer,
volatile :: active_threads, total_number_of_threads, active_scalar_mutex
◆ default_thread_pool_size
integer, parameter threadpool_mod::default_thread_pool_size =10 |
|
private |
Number of threads in the pool.
Definition at line 34 of file threadpool.F90.
34 integer,
parameter :: DEFAULT_THREAD_POOL_SIZE=10
◆ netcdfmutex
integer, volatile threadpool_mod::netcdfmutex |
|
private |
Mutex used for controling NetCDF access.
Definition at line 39 of file threadpool.F90.
39 integer,
volatile :: netcdfmutex
◆ next_suggested_idle_thread
integer, volatile threadpool_mod::next_suggested_idle_thread |
|
private |
Definition at line 40 of file threadpool.F90.
40 integer,
volatile :: next_suggested_idle_thread
◆ thread_busy
logical, dimension(:), allocatable, volatile threadpool_mod::thread_busy |
|
private |
Definition at line 35 of file threadpool.F90.
35 logical,
volatile,
dimension(:),
allocatable :: thread_busy, thread_start
◆ thread_entry_containers
Definition at line 38 of file threadpool.F90.
38 type(threaded_procedure_container_type),
volatile,
dimension(:),
allocatable :: thread_entry_containers
◆ thread_ids
integer, dimension(:), allocatable, volatile threadpool_mod::thread_ids |
|
private |
Definition at line 36 of file threadpool.F90.
36 integer,
volatile,
dimension(:),
allocatable :: thread_ids, thread_pass_data
◆ thread_pass_data
integer, dimension(:), allocatable, volatile threadpool_mod::thread_pass_data |
|
private |
◆ thread_start
logical, dimension(:), allocatable, volatile threadpool_mod::thread_start |
|
private |
◆ threadpool_active
logical, volatile threadpool_mod::threadpool_active |
|
private |
Definition at line 41 of file threadpool.F90.
41 logical,
volatile :: threadpool_active
◆ total_number_of_threads
integer, volatile threadpool_mod::total_number_of_threads |
|
private |
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...