MONC
Data Types | Functions/Subroutines | Variables
reduction_inter_io_mod Module Reference

Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as calling the MPI function as it is nondeterministic when messages will arrive and hence when one reduction on a process and a reduction on another should be called. More...

Data Types

type  reduction_progress_type
 

Functions/Subroutines

subroutine, public init_reduction_inter_io (io_configuration)
 Initialises the reduction action. More...
 
subroutine handle_recv_data_from_io_server (io_configuration, data_buffer, inter_io_index)
 Handles the receiving of data from some other IO server. This is issued call back style within a thread to handle that data. More...
 
logical function, public check_reduction_inter_io_for_completion (io_configuration)
 Checks this action for completion, when all are completed then the IO server can shutdown as this is called once all MONC processes have deregistered. More...
 
subroutine, public finalise_reduction_inter_io (io_configuration)
 Finalises the reduction action, waiting for all outstanding requests and then freeing data. More...
 
subroutine, public perform_inter_io_reduction (io_configuration, field_values, field_size, reduction_field_name, reduction_op, root, timestep, completion_procedure)
 Actually handles the processing for this data wrt the vertical reduction. More...
 
subroutine clean_progress (myrank)
 
logical function check_and_clean_progress (myrank)
 Checks all the reduction progresses and will remove any that have completed. This is designed to be called from an IO server other than 0 (the master IO server) and it checks if the outstanding async send handle has completed. Checking on the master IO server or checking any progress that is not currently sending is fine and will not impact the correctness (but obviously the progress wont be freed) More...
 
subroutine integrate_io_server_collective_values (reduction_op, reduction_progress, single_server_values, number_elements, collective_values_empty)
 Integrates the collective values from another IO server into the currently stored values. More...
 
subroutine handle_local_moncs_completed_collective (io_configuration, reduction_progress)
 Handles the case where the local MONC processes have completed their collective operation for a specific reduction and, for this IO server, it either needs to send its value to the master IO server or, if it is the master, check for completion. More...
 
subroutine handle_process_recv_from_other_io_server (io_configuration, inter_io_comm, myrank, data_buffer, number_io_servers)
 Handles the data received from another IO server, locates the correct reduction progress, appends the information and then checks for & deals with the situation where that reduction is completed. More...
 
subroutine handle_collective_completed (io_configuration, reduction_progress)
 Handles the situation where collective communication for a specific reduction has completed across all IO servers. More...
 
type(reduction_progress_type) function, pointer find_or_add_reduction_progress (timestep, reduction_operator, root, field_name, completion_procedure)
 Finds or adds a specific reduction progress based upon the timestep and reduction operator. If none can be found then a new progress is added in. With new progresses this procedure will initialise them. More...
 
type(reduction_progress_type) function, pointer find_reduction_progress (timestep, reduction_operator, field_name, issue_read_lock)
 Locates a specific reduction progress based upon the timestep, operator and field name. More...
 
subroutine remove_reduction_progress (reduction_progress)
 Removes a specific reduction progress. More...
 
character(len=string_length) function generate_reduction_key (field_name, timestep, reduction_operator)
 Generates the lookup key that is used for the map storage of reduction progresses. More...
 
type(reduction_progress_type) function, pointer retrieve_reduction_progress (mapentry)
 Helper function to retrieve the reduction progress from a mapentry. More...
 
integer function, public get_reduction_operator (op_string)
 Given the map of action attributes this procedure will identify the reduction operator that has been selected by the configuration. More...
 

Variables

integer, parameter mean =1
 
integer, parameter min =2
 
integer, parameter max =3
 
integer, parameter sum =4
 
integer, parameter my_inter_io_tag =1
 
integer, parameter perform_clean_every =200
 
character(len= *), parameter my_inter_io_name ="reductioninterio"
 
integer, volatile reduction_progress_rwlock
 
integer, volatile inter_io_description_mutex
 
integer, volatile clean_progress_mutex
 
integer, volatile reduction_count_mutex
 
integer, volatile previous_clean_reduction_count
 
integer, volatile reduction_count
 
type(hashmap_type), volatile reduction_progresses
 
logical, volatile initialised =.false.
 

Detailed Description

Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as calling the MPI function as it is nondeterministic when messages will arrive and hence when one reduction on a process and a reduction on another should be called.

Function/Subroutine Documentation

◆ check_and_clean_progress()

logical function reduction_inter_io_mod::check_and_clean_progress ( integer, intent(in)  myrank)
private

Checks all the reduction progresses and will remove any that have completed. This is designed to be called from an IO server other than 0 (the master IO server) and it checks if the outstanding async send handle has completed. Checking on the master IO server or checking any progress that is not currently sending is fine and will not impact the correctness (but obviously the progress wont be freed)

Definition at line 174 of file reduction-inter-io.F90.

175  integer, intent(in) :: myrank
176 
177  integer :: i, entries, completed, ierr, num_to_remove, have_lock
178  type(list_type) :: entries_to_remove
179  type(iterator_type) :: iterator
180  type(mapentry_type) :: mapentry
181  type(reduction_progress_type), pointer :: specific_reduction_progress
182  character(len=STRING_LENGTH) :: entry_key
183  class(*), pointer :: generic
184  logical :: destroy_lock
185 
186  check_and_clean_progress=.true.
187  have_lock=forthread_mutex_trylock(clean_progress_mutex)
188  if (have_lock == 0) then
189  call check_thread_status(forthread_rwlock_rdlock(reduction_progress_rwlock))
190  iterator=c_get_iterator(reduction_progresses)
191  do while (c_has_next(iterator))
192  mapentry=c_next_mapentry(iterator)
193  destroy_lock=.false.
194  specific_reduction_progress=>retrieve_reduction_progress(mapentry)
195  if (myrank /= specific_reduction_progress%root) then
196  call check_thread_status(forthread_mutex_lock(specific_reduction_progress%mutex))
197  if (specific_reduction_progress%async_handle /= mpi_request_null) then
198  call wait_for_mpi_request(specific_reduction_progress%async_handle)
199  !if (completed == 1) then
200  if (specific_reduction_progress%async_handle == mpi_request_null) then
201  if (allocated(specific_reduction_progress%send_buffer)) deallocate(specific_reduction_progress%send_buffer)
202  destroy_lock=.true.
203  call c_add_string(entries_to_remove, mapentry%key)
204  else
205  check_and_clean_progress=.false.
206  end if
207  end if
208  call check_thread_status(forthread_mutex_unlock(specific_reduction_progress%mutex))
209  if (destroy_lock) call check_thread_status(forthread_mutex_destroy(specific_reduction_progress%mutex))
210  end if
211  end do
212  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
213 
214  if (.not. c_is_empty(entries_to_remove)) then
215  call check_thread_status(forthread_rwlock_wrlock(reduction_progress_rwlock))
216  iterator=c_get_iterator(entries_to_remove)
217  do while (c_has_next(iterator))
218  entry_key=c_next_string(iterator)
219  generic=>c_get_generic(reduction_progresses, entry_key)
220  call c_remove(reduction_progresses, entry_key)
221  deallocate(generic)
222  end do
223  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
224  end if
225  call c_free(entries_to_remove)
226  call check_thread_status(forthread_mutex_unlock(clean_progress_mutex))
227  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ check_reduction_inter_io_for_completion()

logical function, public reduction_inter_io_mod::check_reduction_inter_io_for_completion ( type(io_configuration_type), intent(inout)  io_configuration)

Checks this action for completion, when all are completed then the IO server can shutdown as this is called once all MONC processes have deregistered.

Parameters
io_configurationConfiguration state of the IO server
Returns
Whether the action has completed

Definition at line 82 of file reduction-inter-io.F90.

83  type(io_configuration_type), intent(inout) :: io_configuration
84 
85  check_reduction_inter_io_for_completion=check_and_clean_progress(io_configuration%my_io_rank)
Here is the call graph for this function:

◆ clean_progress()

subroutine reduction_inter_io_mod::clean_progress ( integer, intent(in)  myrank)
private

Definition at line 154 of file reduction-inter-io.F90.

155  integer, intent(in) :: myrank
156 
157  logical :: cc_dummy
158 
159  call check_thread_status(forthread_mutex_lock(reduction_count_mutex))
160  reduction_count=reduction_count+1
161  if (previous_clean_reduction_count + perform_clean_every .lt. reduction_count) then
162  previous_clean_reduction_count=reduction_count
163  call check_thread_status(forthread_mutex_unlock(reduction_count_mutex))
164  cc_dummy=check_and_clean_progress(myrank)
165  else
166  call check_thread_status(forthread_mutex_unlock(reduction_count_mutex))
167  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ finalise_reduction_inter_io()

subroutine, public reduction_inter_io_mod::finalise_reduction_inter_io ( type(io_configuration_type), intent(inout)  io_configuration)

Finalises the reduction action, waiting for all outstanding requests and then freeing data.

Parameters
io_configurationConfiguration state of the IO server

Definition at line 90 of file reduction-inter-io.F90.

91  type(io_configuration_type), intent(inout) :: io_configuration
92 
93  type(reduction_progress_type) :: progress
94  type(iterator_type) :: iterator
95 
96  if (initialised) then
97  call check_thread_status(forthread_rwlock_rdlock(reduction_progress_rwlock))
98  if (.not. c_is_empty(reduction_progresses)) then
99  iterator=c_get_iterator(reduction_progresses)
100  do while (c_has_next(iterator))
101  progress=retrieve_reduction_progress(c_next_mapentry(iterator))
102  if (progress%async_handle /= mpi_request_null) then
103  call wait_for_mpi_request(progress%async_handle)
104  end if
105  end do
106  end if
107  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
108  call check_thread_status(forthread_rwlock_destroy(reduction_progress_rwlock))
109  call check_thread_status(forthread_mutex_destroy(inter_io_description_mutex))
110  call check_thread_status(forthread_mutex_destroy(clean_progress_mutex))
111  call check_thread_status(forthread_mutex_destroy(reduction_count_mutex))
112  initialised=.false.
113  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ find_or_add_reduction_progress()

type(reduction_progress_type) function, pointer reduction_inter_io_mod::find_or_add_reduction_progress ( integer, intent(in)  timestep,
integer, intent(in)  reduction_operator,
integer, intent(in)  root,
character(len=*), intent(in)  field_name,
procedure(handle_completion), optional  completion_procedure 
)
private

Finds or adds a specific reduction progress based upon the timestep and reduction operator. If none can be found then a new progress is added in. With new progresses this procedure will initialise them.

Parameters
timestepThe timestep to match
reduction_operatorThe reduction operator to match
field_nameThe name of the field that the reduction type represents
num_vectorsThe number of reduction vectors (items to reduce) to be stored
Returns
A reduction progress data object

Definition at line 360 of file reduction-inter-io.F90.

361  integer, intent(in) :: timestep, reduction_operator, root
362  type(reduction_progress_type), pointer :: find_or_add_reduction_progress
363  character(len=*), intent(in) :: field_name
364  procedure(handle_completion), optional :: completion_procedure
365 
366  class(*), pointer :: generic
367  type(reduction_progress_type), pointer :: new_progress
368 
369  find_or_add_reduction_progress=>find_reduction_progress(timestep, reduction_operator, field_name)
370  if (.not. associated(find_or_add_reduction_progress)) then
371  call check_thread_status(forthread_rwlock_wrlock(reduction_progress_rwlock))
372  find_or_add_reduction_progress=>find_reduction_progress(timestep, reduction_operator, field_name, issue_read_lock=.false.)
373  if (.not. associated(find_or_add_reduction_progress)) then
374  allocate(new_progress)
375  call check_thread_status(forthread_mutex_init(new_progress%mutex, -1))
376  new_progress%timestep=timestep
377  new_progress%reduction_operator=reduction_operator
378  new_progress%contributed_moncs=0
379  new_progress%contributed_io_servers=0
380  new_progress%root=root
381  new_progress%async_handle=mpi_request_null
382  new_progress%field_name=field_name
383  if (present(completion_procedure)) then
384  new_progress%completion_procedure=>completion_procedure
385  else
386  new_progress%completion_procedure=>null()
387  end if
388  generic=>new_progress
389  call c_put_generic(reduction_progresses, generate_reduction_key(field_name, timestep, reduction_operator), &
390  generic, .false.)
391  find_or_add_reduction_progress=>new_progress
392  end if
393  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
394  end if
395  if (.not. associated(find_or_add_reduction_progress%completion_procedure) .and. present(completion_procedure)) then
396  find_or_add_reduction_progress%completion_procedure=>completion_procedure
397  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ find_reduction_progress()

type(reduction_progress_type) function, pointer reduction_inter_io_mod::find_reduction_progress ( integer, intent(in)  timestep,
integer, intent(in)  reduction_operator,
character(len=*), intent(in)  field_name,
logical, intent(in), optional  issue_read_lock 
)
private

Locates a specific reduction progress based upon the timestep, operator and field name.

Parameters
timestepThe timestep to search for
reduction_operatorThe reduction operator to search for
field_nameThe field name which must match
reduction_progress_locationOptional location which is set to be the index of the matching progress item
Returns
Pointer to the reduction progress or null if none is found

Definition at line 406 of file reduction-inter-io.F90.

407  integer, intent(in) :: timestep, reduction_operator
408  logical, intent(in), optional :: issue_read_lock
409  type(reduction_progress_type), pointer :: find_reduction_progress
410  character(len=*), intent(in) :: field_name
411 
412  class(*), pointer :: generic
413  logical :: do_read_lock
414 
415  if (present(issue_read_lock)) then
416  do_read_lock=issue_read_lock
417  else
418  do_read_lock=.true.
419  end if
420 
421  if (do_read_lock) call check_thread_status(forthread_rwlock_rdlock(reduction_progress_rwlock))
422  generic=>c_get_generic(reduction_progresses, generate_reduction_key(field_name, timestep, reduction_operator))
423  if (do_read_lock) call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
424  if (associated(generic)) then
425  select type(generic)
426  type is (reduction_progress_type)
427  find_reduction_progress=>generic
428  end select
429  else
430  find_reduction_progress=>null()
431  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ generate_reduction_key()

character(len=string_length) function reduction_inter_io_mod::generate_reduction_key ( character(len=*), intent(in)  field_name,
integer, intent(in)  timestep,
integer, intent(in)  reduction_operator 
)
private

Generates the lookup key that is used for the map storage of reduction progresses.

Parameters
field_nameThe field name
timestepThe timestep
reduction_operatorThe reduction operator

Definition at line 455 of file reduction-inter-io.F90.

456  character(len=*), intent(in) :: field_name
457  integer, intent(in) :: timestep, reduction_operator
458 
459  generate_reduction_key=trim(field_name)//"#"//trim(conv_to_string(timestep))//"#"// trim(conv_to_string(reduction_operator))
Here is the caller graph for this function:

◆ get_reduction_operator()

integer function, public reduction_inter_io_mod::get_reduction_operator ( character(len=*), intent(in)  op_string)

Given the map of action attributes this procedure will identify the reduction operator that has been selected by the configuration.

Parameters
action_attributesAction attributes from the IO server configuration
Returns
The reduction operator

Definition at line 487 of file reduction-inter-io.F90.

488  character(len=*), intent(in) :: op_string
489 
490  if (op_string .eq. "mean") then
491  get_reduction_operator=mean
492  else if (op_string .eq. "min") then
493  get_reduction_operator=min
494  else if (op_string .eq. "max") then
495  get_reduction_operator=max
496  else if (op_string .eq. "sum") then
497  get_reduction_operator=sum
498  else
499  call log_log(log_error, "Reduction operator '"//trim(op_string)//"' not recognised")
500  end if
Here is the call graph for this function:

◆ handle_collective_completed()

subroutine reduction_inter_io_mod::handle_collective_completed ( type(io_configuration_type), intent(inout)  io_configuration,
type(reduction_progress_type), intent(inout)  reduction_progress 
)
private

Handles the situation where collective communication for a specific reduction has completed across all IO servers.

Parameters
reduction_progressThe reduction progress data type
number_io_serversThe total number of IO servers

Definition at line 338 of file reduction-inter-io.F90.

339  type(io_configuration_type), intent(inout) :: io_configuration
340  type(reduction_progress_type), intent(inout) :: reduction_progress
341 
342  if (reduction_progress%reduction_operator == mean) then
343  reduction_progress%values=reduction_progress%values/io_configuration%number_of_global_moncs
344  end if
345  call reduction_progress%completion_procedure(io_configuration, reduction_progress%values, &
346  reduction_progress%field_name, reduction_progress%timestep)
347  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
348  call check_thread_status(forthread_mutex_destroy(reduction_progress%mutex))
349  if (allocated(reduction_progress%values)) deallocate(reduction_progress%values)
350  call remove_reduction_progress(reduction_progress)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_local_moncs_completed_collective()

subroutine reduction_inter_io_mod::handle_local_moncs_completed_collective ( type(io_configuration_type), intent(inout)  io_configuration,
type(reduction_progress_type), intent(inout)  reduction_progress 
)
private

Handles the case where the local MONC processes have completed their collective operation for a specific reduction and, for this IO server, it either needs to send its value to the master IO server or, if it is the master, check for completion.

Parameters
io_configurationConfiguration state of the IO server
reduction_progressThe specific reduction progress data item that represents this reduction
z_sizeSize in Z
reduction_progress_locationLocation in the reduction progresses list that this single progress item resides at

Definition at line 272 of file reduction-inter-io.F90.

273  type(io_configuration_type), intent(inout) :: io_configuration
274  type(reduction_progress_type), intent(inout) :: reduction_progress
275 
276  integer :: ierr, inter_io_comm_index
277 
278  if (io_configuration%my_io_rank == reduction_progress%root .and. &
279  reduction_progress%contributed_io_servers == io_configuration%number_of_io_servers) then
280  call handle_collective_completed(io_configuration, reduction_progress)
281  else
282  if (io_configuration%my_io_rank /= reduction_progress%root) then
283  inter_io_comm_index=find_inter_io_from_name(io_configuration, my_inter_io_name)
284 
285  reduction_progress%send_buffer=package_inter_io_communication_message(reduction_progress%field_name, &
286  reduction_progress%timestep, reduction_progress%values, reduction_progress%reduction_operator)
287  call lock_mpi()
288  call mpi_isend(reduction_progress%send_buffer, size(reduction_progress%send_buffer), &
289  mpi_byte, reduction_progress%root, &
290  io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
291  io_configuration%io_communicator, reduction_progress%async_handle, ierr)
292  call unlock_mpi()
293  ! Deallocate the current value as this is finished with and has been packed into the send buffer
294  if (allocated(reduction_progress%values)) deallocate(reduction_progress%values)
295  end if
296  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
297  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_process_recv_from_other_io_server()

subroutine reduction_inter_io_mod::handle_process_recv_from_other_io_server ( type(io_configuration_type), intent(inout)  io_configuration,
type(io_configuration_inter_communication_description), intent(inout)  inter_io_comm,
integer, intent(in)  myrank,
character, dimension(:), intent(inout)  data_buffer,
integer, intent(in)  number_io_servers 
)
private

Handles the data received from another IO server, locates the correct reduction progress, appends the information and then checks for & deals with the situation where that reduction is completed.

Parameters
number_io_serversThe total number of IO servers
z_sizeNumber of levels in the vertical

Definition at line 304 of file reduction-inter-io.F90.

305  type(io_configuration_type), intent(inout) :: io_configuration
306  type(io_configuration_inter_communication_description), intent(inout) :: inter_io_comm
307  character, dimension(:), intent(inout) :: data_buffer
308  integer, intent(in) :: number_io_servers, myrank
309 
310  type(reduction_progress_type), pointer :: reduction_progress
311  character(len=STRING_LENGTH) :: field_name
312  integer :: timestep, reduction_op
313  real(kind=default_precision), dimension(:), allocatable :: field_values
314  logical :: collective_values_new
315 
316  call unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, reduction_op)
317  reduction_progress=>find_or_add_reduction_progress(timestep, reduction_op, myrank, field_name)
318 
319  call check_thread_status(forthread_mutex_lock(reduction_progress%mutex))
320  collective_values_new=.not. allocated(reduction_progress%values)
321  if (collective_values_new) allocate(reduction_progress%values(size(field_values)))
322 
323  reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
324  call integrate_io_server_collective_values(reduction_op, reduction_progress, &
325  field_values, size(field_values), collective_values_new)
326  if (reduction_progress%contributed_io_servers == number_io_servers) then
327  call handle_collective_completed(io_configuration, reduction_progress)
328  deallocate(field_values)
329  return
330  end if
331  deallocate(field_values)
332  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_recv_data_from_io_server()

subroutine reduction_inter_io_mod::handle_recv_data_from_io_server ( type(io_configuration_type), intent(inout)  io_configuration,
character, dimension(:), intent(inout)  data_buffer,
integer, intent(in)  inter_io_index 
)
private

Handles the receiving of data from some other IO server. This is issued call back style within a thread to handle that data.

Parameters
io_configurationConfiguration state of the IO server
inter_io_indexIndex of the inter IO communication description

Definition at line 69 of file reduction-inter-io.F90.

70  type(io_configuration_type), intent(inout) :: io_configuration
71  character, dimension(:), intent(inout) :: data_buffer
72  integer, intent(in) :: inter_io_index
73 
74  call handle_process_recv_from_other_io_server(io_configuration, io_configuration%inter_io_communications(inter_io_index), &
75  io_configuration%my_io_rank, data_buffer, io_configuration%number_of_io_servers)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ init_reduction_inter_io()

subroutine, public reduction_inter_io_mod::init_reduction_inter_io ( type(io_configuration_type), intent(inout)  io_configuration)

Initialises the reduction action.

Parameters
io_configurationThe IO server configuration

Definition at line 50 of file reduction-inter-io.F90.

51  type(io_configuration_type), intent(inout) :: io_configuration
52 
53  if (.not. initialised) then
54  initialised=.true.
55  previous_clean_reduction_count=0
56  reduction_count=0
57  call check_thread_status(forthread_rwlock_init(reduction_progress_rwlock, -1))
58  call check_thread_status(forthread_mutex_init(inter_io_description_mutex, -1))
59  call check_thread_status(forthread_mutex_init(clean_progress_mutex, -1))
60  call check_thread_status(forthread_mutex_init(reduction_count_mutex, -1))
61  call register_inter_io_communication(io_configuration, my_inter_io_tag, handle_recv_data_from_io_server, my_inter_io_name)
62  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ integrate_io_server_collective_values()

subroutine reduction_inter_io_mod::integrate_io_server_collective_values ( integer, intent(in)  reduction_op,
type(reduction_progress_type), intent(inout)  reduction_progress,
real(kind=double_precision), dimension(:), intent(in)  single_server_values,
integer, intent(in)  number_elements,
logical, intent(in)  collective_values_empty 
)
private

Integrates the collective values from another IO server into the currently stored values.

Parameters
reduction_opThe reduction operator to perform
reduction_progressThe progress data type which is updated
single_server_valuesThe values from the IO server which need to be integrated
dim_one_sizeSize in first dimension
target_indexThe index where we are putting the values into the current value array of reduction progress
collective_values_emptyWhether the collective values is empty

Definition at line 237 of file reduction-inter-io.F90.

239  integer, intent(in) :: reduction_op, number_elements
240  logical, intent(in) :: collective_values_empty
241  type(reduction_progress_type), intent(inout) :: reduction_progress
242  real(kind=double_precision), dimension(:), intent(in) :: single_server_values
243 
244  integer :: k
245 
246  if (collective_values_empty) then
247  reduction_progress%values=single_server_values
248  else
249  if (reduction_op == mean .or. reduction_op == sum) then
250  reduction_progress%values=reduction_progress%values+single_server_values
251  else if (reduction_op == min .or. reduction_op == max) then
252  do k=1, number_elements
253  if (reduction_op == min) then
254  if (single_server_values(k) .lt. reduction_progress%values(k)) &
255  reduction_progress%values(k)=single_server_values(k)
256  else if (reduction_op == max) then
257  if (single_server_values(k) .gt. reduction_progress%values(k)) &
258  reduction_progress%values(k)=single_server_values(k)
259  end if
260  end do
261  end if
262  end if
Here is the caller graph for this function:

◆ perform_inter_io_reduction()

subroutine, public reduction_inter_io_mod::perform_inter_io_reduction ( type(io_configuration_type), intent(inout)  io_configuration,
real(kind=double_precision), dimension(:)  field_values,
integer, intent(in)  field_size,
character(len=*), intent(in)  reduction_field_name,
integer, intent(in)  reduction_op,
integer, intent(in)  root,
integer, intent(in)  timestep,
procedure(handle_completion completion_procedure 
)

Actually handles the processing for this data wrt the vertical reduction.

Parameters
io_configurationConfiguration of the IO server
field_valuesThe values to communicate
field_sizeNumber of elements to communicate
reduction_field_nameField name that the reduction will be performed over
reduction_opThe reduction operator to use
rootThe root IO server process
timestepThe timestep this is issued at
completion_procedureCallback completion procedure

Definition at line 125 of file reduction-inter-io.F90.

127  type(io_configuration_type), intent(inout) :: io_configuration
128  real(kind=double_precision), dimension(:) :: field_values
129  integer, intent(in) :: field_size, reduction_op, root, timestep
130  character(len=*), intent(in) :: reduction_field_name
131  procedure(handle_completion) :: completion_procedure
132 
133  type(reduction_progress_type), pointer :: reduction_progress
134  logical :: collective_values_new
135 
136  call clean_progress(io_configuration%my_io_rank)
137  reduction_progress=>find_or_add_reduction_progress(timestep, reduction_op, root, reduction_field_name, completion_procedure)
138 
139  call check_thread_status(forthread_mutex_lock(reduction_progress%mutex))
140  reduction_progress%contributed_moncs=reduction_progress%contributed_moncs+1
141 
142  collective_values_new=.not. allocated(reduction_progress%values)
143  if (collective_values_new) allocate(reduction_progress%values(field_size))
144 
145  call integrate_io_server_collective_values(reduction_op, reduction_progress, field_values, field_size, collective_values_new)
146  if (reduction_progress%contributed_moncs == io_configuration%number_of_moncs) then
147  reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
148  call handle_local_moncs_completed_collective(io_configuration, reduction_progress)
149  else
150  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
151  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ remove_reduction_progress()

subroutine reduction_inter_io_mod::remove_reduction_progress ( type(reduction_progress_type), intent(in)  reduction_progress)
private

Removes a specific reduction progress.

Parameters
reduction_progressThe reduction progress to remove from the list

Definition at line 436 of file reduction-inter-io.F90.

437  type(reduction_progress_type), intent(in) :: reduction_progress
438 
439  class(*), pointer :: generic
440  character(len=STRING_LENGTH) :: specific_key
441 
442  specific_key=generate_reduction_key(reduction_progress%field_name, reduction_progress%timestep,&
443  reduction_progress%reduction_operator)
444  call check_thread_status(forthread_rwlock_wrlock(reduction_progress_rwlock))
445  generic=>c_get_generic(reduction_progresses, specific_key)
446  call c_remove(reduction_progresses, specific_key)
447  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
448  if (associated(generic)) deallocate(generic)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ retrieve_reduction_progress()

type(reduction_progress_type) function, pointer reduction_inter_io_mod::retrieve_reduction_progress ( type(mapentry_type), intent(in)  mapentry)
private

Helper function to retrieve the reduction progress from a mapentry.

Parameters
mapentryThe map entry to retrieve from
Returns
The progress data object in the map entry or null if none is found

Definition at line 465 of file reduction-inter-io.F90.

466  type(mapentry_type), intent(in) :: mapentry
467  type(reduction_progress_type), pointer :: retrieve_reduction_progress
468 
469  class(*), pointer :: generic
470 
471  generic=>c_get_generic(mapentry)
472 
473  if (associated(generic)) then
474  select type(generic)
475  type is (reduction_progress_type)
476  retrieve_reduction_progress=>generic
477  end select
478  else
479  retrieve_reduction_progress=>null()
480  end if
Here is the caller graph for this function:

Variable Documentation

◆ clean_progress_mutex

integer, volatile reduction_inter_io_mod::clean_progress_mutex
private

Definition at line 39 of file reduction-inter-io.F90.

◆ initialised

logical, volatile reduction_inter_io_mod::initialised =.false.
private

Definition at line 42 of file reduction-inter-io.F90.

42  logical, volatile :: initialised=.false.

◆ inter_io_description_mutex

integer, volatile reduction_inter_io_mod::inter_io_description_mutex
private

Definition at line 39 of file reduction-inter-io.F90.

◆ max

integer, parameter reduction_inter_io_mod::max =3
private

Definition at line 27 of file reduction-inter-io.F90.

◆ mean

integer, parameter reduction_inter_io_mod::mean =1
private

Definition at line 27 of file reduction-inter-io.F90.

27  integer, parameter :: MEAN=1, min=2, max=3, sum=4

◆ min

integer, parameter reduction_inter_io_mod::min =2
private

Definition at line 27 of file reduction-inter-io.F90.

◆ my_inter_io_name

character(len=*), parameter reduction_inter_io_mod::my_inter_io_name ="reductioninterio"
private

Definition at line 29 of file reduction-inter-io.F90.

29  character(len=*), parameter :: MY_INTER_IO_NAME="reductioninterio"

◆ my_inter_io_tag

integer, parameter reduction_inter_io_mod::my_inter_io_tag =1
private

Definition at line 28 of file reduction-inter-io.F90.

28  integer, parameter :: MY_INTER_IO_TAG=1, perform_clean_every=200

◆ perform_clean_every

integer, parameter reduction_inter_io_mod::perform_clean_every =200
private

Definition at line 28 of file reduction-inter-io.F90.

◆ previous_clean_reduction_count

integer, volatile reduction_inter_io_mod::previous_clean_reduction_count
private

Definition at line 39 of file reduction-inter-io.F90.

◆ reduction_count

integer, volatile reduction_inter_io_mod::reduction_count
private

Definition at line 39 of file reduction-inter-io.F90.

◆ reduction_count_mutex

integer, volatile reduction_inter_io_mod::reduction_count_mutex
private

Definition at line 39 of file reduction-inter-io.F90.

◆ reduction_progress_rwlock

integer, volatile reduction_inter_io_mod::reduction_progress_rwlock
private

Definition at line 39 of file reduction-inter-io.F90.

39  integer, volatile :: reduction_progress_rwlock, inter_io_description_mutex, clean_progress_mutex, &
40  reduction_count_mutex, previous_clean_reduction_count, reduction_count

◆ reduction_progresses

type(hashmap_type), volatile reduction_inter_io_mod::reduction_progresses
private

Definition at line 41 of file reduction-inter-io.F90.

41  type(hashmap_type), volatile :: reduction_progresses

◆ sum

integer, parameter reduction_inter_io_mod::sum =4
private

Definition at line 27 of file reduction-inter-io.F90.

logging_mod::log_error
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
logging_mod::log_log
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
datadefn_mod::default_precision
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17