18 use mpi,
only : mpi_double_precision, mpi_int, mpi_any_source, mpi_request_null, mpi_status_ignore, mpi_character, mpi_byte
33 character,
dimension(:),
allocatable :: send_buffer
34 character(len=STRING_LENGTH) :: field_name
35 integer :: contributed_moncs, contributed_io_servers, timestep, reduction_operator, async_handle, mutex, root
71 character,
dimension(:),
intent(inout) :: data_buffer
72 integer,
intent(in) :: inter_io_index
75 io_configuration%my_io_rank, data_buffer, io_configuration%number_of_io_servers)
102 if (progress%async_handle /= mpi_request_null)
then
126 root, timestep, completion_procedure)
129 integer,
intent(in) :: field_size, reduction_op, root, timestep
130 character(len=*),
intent(in) :: reduction_field_name
134 logical :: collective_values_new
140 reduction_progress%contributed_moncs=reduction_progress%contributed_moncs+1
142 collective_values_new=.not.
allocated(reduction_progress%values)
143 if (collective_values_new)
allocate(reduction_progress%values(field_size))
146 if (reduction_progress%contributed_moncs == io_configuration%number_of_moncs)
then
147 reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
155 integer,
intent(in) :: myrank
175 integer,
intent(in) :: myrank
177 integer :: i, entries, completed, ierr, num_to_remove, have_lock
182 character(len=STRING_LENGTH) :: entry_key
183 class(*),
pointer :: generic
184 logical :: destroy_lock
188 if (have_lock == 0)
then
195 if (myrank /= specific_reduction_progress%root)
then
197 if (specific_reduction_progress%async_handle /= mpi_request_null)
then
200 if (specific_reduction_progress%async_handle == mpi_request_null)
then
201 if (
allocated(specific_reduction_progress%send_buffer))
deallocate(specific_reduction_progress%send_buffer)
225 call c_free(entries_to_remove)
238 number_elements, collective_values_empty)
239 integer,
intent(in) :: reduction_op, number_elements
240 logical,
intent(in) :: collective_values_empty
242 real(kind=
double_precision),
dimension(:),
intent(in) :: single_server_values
246 if (collective_values_empty)
then
247 reduction_progress%values=single_server_values
249 if (reduction_op ==
mean .or. reduction_op ==
sum)
then
250 reduction_progress%values=reduction_progress%values+single_server_values
251 else if (reduction_op ==
min .or. reduction_op ==
max)
then
252 do k=1, number_elements
253 if (reduction_op ==
min)
then
254 if (single_server_values(k) .lt. reduction_progress%values(k)) &
255 reduction_progress%values(k)=single_server_values(k)
256 else if (reduction_op ==
max)
then
257 if (single_server_values(k) .gt. reduction_progress%values(k)) &
258 reduction_progress%values(k)=single_server_values(k)
276 integer :: ierr, inter_io_comm_index
278 if (io_configuration%my_io_rank == reduction_progress%root .and. &
279 reduction_progress%contributed_io_servers == io_configuration%number_of_io_servers)
then
282 if (io_configuration%my_io_rank /= reduction_progress%root)
then
286 reduction_progress%timestep, reduction_progress%values, reduction_progress%reduction_operator)
288 call mpi_isend(reduction_progress%send_buffer,
size(reduction_progress%send_buffer), &
289 mpi_byte, reduction_progress%root, &
290 io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
291 io_configuration%io_communicator, reduction_progress%async_handle, ierr)
294 if (
allocated(reduction_progress%values))
deallocate(reduction_progress%values)
307 character,
dimension(:),
intent(inout) :: data_buffer
308 integer,
intent(in) :: number_io_servers, myrank
311 character(len=STRING_LENGTH) :: field_name
312 integer :: timestep, reduction_op
314 logical :: collective_values_new
320 collective_values_new=.not.
allocated(reduction_progress%values)
321 if (collective_values_new)
allocate(reduction_progress%values(
size(field_values)))
323 reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
325 field_values,
size(field_values), collective_values_new)
326 if (reduction_progress%contributed_io_servers == number_io_servers)
then
328 deallocate(field_values)
331 deallocate(field_values)
342 if (reduction_progress%reduction_operator ==
mean)
then
343 reduction_progress%values=reduction_progress%values/io_configuration%number_of_global_moncs
345 call reduction_progress%completion_procedure(io_configuration, reduction_progress%values, &
346 reduction_progress%field_name, reduction_progress%timestep)
349 if (
allocated(reduction_progress%values))
deallocate(reduction_progress%values)
361 integer,
intent(in) :: timestep, reduction_operator, root
363 character(len=*),
intent(in) :: field_name
366 class(*),
pointer :: generic
374 allocate(new_progress)
376 new_progress%timestep=timestep
377 new_progress%reduction_operator=reduction_operator
378 new_progress%contributed_moncs=0
379 new_progress%contributed_io_servers=0
380 new_progress%root=root
381 new_progress%async_handle=mpi_request_null
382 new_progress%field_name=field_name
383 if (
present(completion_procedure))
then
384 new_progress%completion_procedure=>completion_procedure
386 new_progress%completion_procedure=>null()
388 generic=>new_progress
407 integer,
intent(in) :: timestep, reduction_operator
408 logical,
intent(in),
optional :: issue_read_lock
410 character(len=*),
intent(in) :: field_name
412 class(*),
pointer :: generic
413 logical :: do_read_lock
415 if (
present(issue_read_lock))
then
416 do_read_lock=issue_read_lock
424 if (
associated(generic))
then
439 class(*),
pointer :: generic
440 character(len=STRING_LENGTH) :: specific_key
443 reduction_progress%reduction_operator)
448 if (
associated(generic))
deallocate(generic)
456 character(len=*),
intent(in) :: field_name
457 integer,
intent(in) :: timestep, reduction_operator
469 class(*),
pointer :: generic
473 if (
associated(generic))
then
488 character(len=*),
intent(in) :: op_string
490 if (op_string .eq.
"mean")
then
492 else if (op_string .eq.
"min")
then
494 else if (op_string .eq.
"max")
then
496 else if (op_string .eq.
"sum")
then
499 call log_log(
log_error,
"Reduction operator '"//trim(op_string)//
"' not recognised")