Go to the documentation of this file.
17 use mpi,
only : mpi_double_precision, mpi_int, mpi_any_source, mpi_request_null, mpi_statuses_ignore, mpi_character, mpi_byte
32 integer,
dimension(:),
allocatable :: send_requests
33 character,
dimension(:),
allocatable :: send_buffer
41 character(len=STRING_LENGTH) :: field_name
87 if (
allocated(broadcast_item%send_requests))
then
89 deallocate(broadcast_item%send_requests)
90 if (
allocated(broadcast_item%send_buffer))
deallocate(broadcast_item%send_buffer)
121 if (.not. broadcast_item%handled)
then
138 character,
dimension(:),
intent(inout) :: data_buffer
139 integer,
intent(in) :: inter_io_index
143 character(len=STRING_LENGTH) :: field_name
150 if (
associated(broadcast_item%completion_procedure))
then
152 broadcast_item%handled=.true.
157 allocate(broadcast_item%cached_values(
size(data_values)), source=data_values)
158 broadcast_item%cached_values=data_values
161 if (
allocated(data_values))
deallocate(data_values)
175 timestep, completion_procedure)
178 integer,
intent(in) :: field_size, root, timestep
179 character(len=*),
intent(in) :: field_name
183 integer :: inter_io_comm_index, i, ierr
190 if (io_configuration%my_io_rank == root .and. .not. broadcast_item%handled)
then
191 broadcast_item%handled=.true.
193 allocate(broadcast_item%send_requests(io_configuration%number_of_io_servers))
196 do i=0, io_configuration%number_of_io_servers-1
197 if (i .ne. io_configuration%my_io_rank)
then
199 call mpi_isend(broadcast_item%send_buffer,
size(broadcast_item%send_buffer), mpi_byte, i, &
200 io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
201 io_configuration%io_communicator, broadcast_item%send_requests(i+1), ierr)
204 broadcast_item%send_requests(i+1)=mpi_request_null
210 if (
allocated(broadcast_item%cached_values) .and. .not. broadcast_item%handled)
then
211 broadcast_item%handled=.true.
213 if (
allocated(broadcast_item%cached_values))
deallocate(broadcast_item%cached_values)
226 integer,
intent(in) :: timestep
227 character(len=*),
intent(in) :: field_name
228 real(DEFAULT_PRECISION),
dimension(:),
intent(in) :: values
232 class(*),
pointer :: generic
234 allocate(threaded_callback_state)
236 threaded_callback_state%field_name=field_name
237 threaded_callback_state%timestep=timestep
238 allocate(threaded_callback_state%values(
size(values)), source=values)
239 threaded_callback_state%completion_procedure=>completion_procedure
242 generic=>threaded_callback_state
254 integer,
dimension(:),
intent(in) :: arguments
255 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
257 class(*),
pointer :: generic
264 threaded_callback_state=>generic
269 if (
associated(threaded_callback_state))
then
271 threaded_callback_state%field_name, threaded_callback_state%timestep)
272 deallocate(threaded_callback_state%values)
273 deallocate(threaded_callback_state)
293 integer :: completion_flag, ierr, num_to_remove, have_lock
294 character(len=STRING_LENGTH) :: entry_key
296 logical :: destroy_lock
299 class(*),
pointer :: generic
302 if (have_lock==0)
then
310 if (
allocated(specific_broadcast_item_at_index%send_requests))
then
312 call mpi_testall(
size(specific_broadcast_item_at_index%send_requests), specific_broadcast_item_at_index%send_requests, &
313 completion_flag, mpi_statuses_ignore, ierr)
315 if (completion_flag == 1)
then
316 deallocate(specific_broadcast_item_at_index%send_requests)
317 if (
allocated(specific_broadcast_item_at_index%send_buffer))
deallocate(specific_broadcast_item_at_index%send_buffer)
322 if (specific_broadcast_item_at_index%handled)
then
323 if (
allocated(specific_broadcast_item_at_index%cached_values))
then
324 deallocate(specific_broadcast_item_at_index%cached_values)
346 call c_free(entries_to_remove)
357 character(len=*),
intent(in) :: field_name
358 integer,
intent(in) :: timestep
362 class(*),
pointer :: generic
370 if (
present(completion_procedure))
then
390 character(len=*),
intent(in) :: field_name
391 integer,
intent(in) :: timestep
392 logical,
intent(in) :: do_read_lock
395 class(*),
pointer :: generic
401 if (
associated(generic))
then
418 class(*),
pointer :: generic
422 if (
associated(generic))
then
Conversion between common inbuilt FORTRAN data types.
Returns whether a collection is empty.
integer, parameter my_inter_io_tag
subroutine issue_thread_call_to_completion(field_name, timestep, values, completion_procedure)
Issues the call into the thread pool to call the completion procedure, this runs in a seperate thread...
logical, volatile initialised
integer function forthread_mutex_lock(mutex_id)
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
Puts a generic key-value pair into the map.
Abstraction layer around MPI, this issues and marshals the lower level communication details.
Broadcast inter IO communication which sends a value from one IO server to all others....
integer function forthread_mutex_init(mutex_id, attr_id)
subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
Handles receiving data from another IO server and processing this. If the request has already been re...
integer, volatile bcast_clean_reduction_count
Collection data structures.
Gets a specific string element out of the list, stack, queue or map with the corresponding key.
logical function, public check_broadcast_inter_io_for_completion(io_configuration)
Checks the statuses for broadcast completion and returns whether they are all finished or not.
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
integer, volatile bcast_count
Inter IO server communication specific functionality. This manages all of the communication that migh...
integer, volatile inter_io_description_mutex
integer function forthread_rwlock_destroy(rwlock_id)
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI.
character function, dimension(:), allocatable, public package_inter_io_communication_message(field_name, timestep, field_values, other_int)
Packages up fields into an io binary message (allocated here) which is used for sending.
integer function forthread_mutex_destroy(mutex_id)
integer, parameter perform_clean_every
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data....
Gets a specific generic element out of the list, stack, queue or map with the corresponding key.
subroutine clean_broadcast_progress_if_needed()
Calls out to do a broadcast progress clean if needed (i.e. every n steps.)
type(inter_io_broadcast) function, pointer find_broadcast_item(field_name, timestep, do_read_lock)
Finds a specific broadcast item or null if none is found.
subroutine, public perform_inter_io_broadcast(io_configuration, field_values, field_size, field_name, root, timestep, completion_procedure)
Performs an inter IO broadcast of data from the root to all other IO servers. Note that this is on th...
type(hashmap_type), volatile thread_callback_params
Retrieves the generic value held at the specific map index or null if index > map elements.
integer, volatile bcast_count_mutex
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
integer, volatile clean_progress_mutex
Converts data types to strings.
type(io_configuration_type), pointer stored_io_configuration
character(len= *), parameter my_inter_io_name
Type keeping track of broadcast statuses.
subroutine, public finalise_broadcast_inter_io()
Finalises the broadcast inter IO functionality.
integer function forthread_rwlock_init(rwlock_id, attr_id)
Overall IO configuration.
subroutine, public register_inter_io_communication(io_configuration, message_tag, handling_procedure, name)
Registers an inter IO communication operation.
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map.
integer, volatile thread_callback_params_id
integer, volatile thread_callback_params_mutex
integer function forthread_mutex_trylock(mutex_id)
integer, parameter, public double_precision
Double precision (64 bit) kind.
Contains common definitions for the data and datatypes used by MONC.
integer function, public find_inter_io_from_name(io_configuration, name)
Locates a the index of an inter IO entry from the operator name or returns 0 if none is found.
integer, parameter, public string_length
Default length of strings.
subroutine thread_call_to_completion(arguments, data_buffer)
Called by the thread pool, this will call onto the completion procedure before cleaning up @arguments...
integer function forthread_mutex_unlock(mutex_id)
subroutine, public init_broadcast_inter_io(io_configuration)
Initialises the broadcast inter IO functionality.
subroutine clean_broadcast_progress()
Performs a clean of the broadcast progresses that no longer need to be stored.
subroutine, public unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, other_int)
Unpackages some binary data into its individual fields. The field values are allocated here and the s...
List data structure which implements a doubly linked list. This list will preserve its order.
type(inter_io_broadcast) function, pointer find_or_add_broadcast_item(field_name, timestep, completion_procedure)
Locates and returns or adds and returns a specific broadcast item representing a timestep and field.
type(hashmap_type), volatile broadcast_statuses
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Removes a specific element from the list or map.
integer function forthread_rwlock_wrlock(lock_id)
integer function forthread_rwlock_rdlock(lock_id)
integer, volatile broadcast_statuses_rwlock
Adds a string to the end of the list.
Parses the XML configuration file to produce the io configuration description which contains the data...
integer function forthread_rwlock_unlock(lock_id)
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
type(inter_io_broadcast) function, pointer retrieve_broadcast_item(mapentry)
Locates a broadcast item within a mapentry or null if none exists.