Go to the documentation of this file.
38 use mpi,
only : mpi_comm_world, mpi_statuses_ignore, mpi_byte
50 logical,
volatile ::
contine_poll_messages, & !< Whether to continue waiting command messages from any MONC processes
67 provided_threading, total_global_processes, continuation_run, io_configuration_file)
69 integer,
intent(in) :: io_communicator_arg, provided_threading, total_global_processes
70 logical,
intent(in) :: continuation_run
71 character(len=LONG_STRING_LENGTH),
intent(in) :: io_configuration_file
73 integer :: command, source, my_rank, ierr
74 character,
dimension(:),
allocatable :: data_buffer, io_xml_configuration
78 if (continuation_run)
then
81 io_xml_configuration, io_communicator_arg)
84 if (.not.
allocated(io_xml_configuration))
then
85 io_xml_configuration=
get_io_xml(io_configuration_file)
86 if (continuation_run)
then
87 call mpi_comm_rank(io_communicator_arg, my_rank, ierr)
88 if (my_rank == 0)
then
89 call log_log(
log_warn,
"No IO server configuration in checkpoint file - starting from XML provided file instead")
96 deallocate(io_xml_configuration)
114 call c_free(diagnostic_generation_frequency)
146 character,
dimension(:),
intent(in) :: raw_contents
148 character(len=size(raw_contents)) :: string_to_process
151 if (.not.
options_get_logical(options_database,
"conditional_diagnostics_column_enabled"))
then
152 do i=1,
size(raw_contents)
153 string_to_process(i:i)=raw_contents(i)
155 if (index(string_to_process,
"CondDiags_") .ne. 0)
then
157 "Conditional diagnostics are DISABLED but requested via xml. Enable or remove request to resolve.")
167 integer,
intent(out) :: command, source
168 character,
dimension(:),
allocatable :: data_buffer
170 logical :: completed, inter_io_complete
174 do while(.not. completed)
185 if (inter_io_complete)
then
208 real(DEFAULT_PRECISION),
dimension(:) :: values
209 character(len=STRING_LENGTH) :: field_name
219 integer,
intent(in) :: command, source
220 character,
dimension(:),
allocatable,
intent(inout) :: data_buffer
228 deallocate(data_buffer)
237 integer,
dimension(:),
intent(in) :: arguments
238 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
251 integer :: i, specific_monc_data_type
260 if (
allocated(
io_configuration%registered_moncs(i)%field_start_locations)) &
273 integer,
dimension(:),
intent(in) :: arguments
274 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
276 integer :: monc_location, source
281 do while (
io_configuration%registered_moncs(monc_location)%active_threads .gt. 0)
297 integer,
intent(in) :: source, data_set
299 integer :: specific_monc_data_type, specific_monc_buffer_size, recv_count, monc_location
300 character,
dimension(:),
allocatable :: data_buffer
310 allocate(data_buffer(specific_monc_buffer_size))
311 recv_count=
data_receive(specific_monc_data_type, 1, source, dump_data=data_buffer, data_dump_id=data_set)
315 deallocate(data_buffer)
323 integer,
dimension(:),
intent(in) :: arguments
324 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
326 integer :: monc_location, data_set, source, matched_datadefn_index
329 data_set=arguments(2)
340 io_configuration%registered_moncs(monc_location)%definition_names(data_set))
342 if (matched_datadefn_index .gt. 0)
then
348 call log_log(
log_warn,
"IO server can not find matching data definition with name "&
349 //
io_configuration%registered_moncs(monc_location)%definition_names(data_set))
365 integer,
dimension(:),
intent(in) :: arguments
366 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
368 integer :: configuration_send_request(2), ierr, number_data_definitions, this_monc_index, source
379 call log_log(
log_error,
"You have a high ratio of computational cores to IO servers, the limit is currently 100")
393 io_configuration%registered_moncs(this_monc_index)%deactivate_condition_variable, -1))
397 allocate(
io_configuration%registered_moncs(this_monc_index)%field_start_locations(number_data_definitions), &
398 io_configuration%registered_moncs(this_monc_index)%field_end_locations(number_data_definitions), &
399 io_configuration%registered_moncs(this_monc_index)%definition_names(number_data_definitions), &
400 io_configuration%registered_moncs(this_monc_index)%dimensions(number_data_definitions))
411 integer,
intent(in) :: source
414 integer :: ierr, srequest(2)
418 source,
data_tag, mpi_comm_world, srequest(1), ierr)
420 source,
data_tag, mpi_comm_world, srequest(2), ierr)
434 integer,
intent(in) :: source
438 integer :: created_mpi_type, data_size, recv_count, i
440 logical :: field_found
443 source, description_data=data_description)
447 do i=1, io_configuration%number_of_data_definitions
448 created_mpi_type=
build_mpi_datatype(io_configuration%data_definitions(i), data_description, data_size, &
449 monc_defn%field_start_locations(i), monc_defn%field_end_locations(i), monc_defn%dimensions(i))
454 monc_defn%definition_names(i)=io_configuration%data_definitions(i)%name
459 call c_put_integer(io_configuration%dimension_sizing,
"active_q_indicies", field_description%dim_sizes(1))
469 integer,
intent(in) :: source
471 character,
dimension(:),
allocatable :: buffer
472 character(len=STRING_LENGTH) :: q_field_name, cd_field_name
473 integer :: buffer_size, z_size, num_q_fields, n, current_point, recv_count
476 logical :: field_found
482 buffer_size=(kind(dreal)*z_size)*2 + (
string_length * num_q_fields) &
484 allocate(buffer(buffer_size))
485 recv_count=
data_receive(mpi_byte, buffer_size, source, buffer)
493 current_point=(kind(dreal)*z_size)
494 if (num_q_fields .gt. 0)
then
496 q_field_name=transfer(buffer(current_point+1:current_point+
string_length), q_field_name)
502 io_configuration%z_field=transfer(buffer(current_point+1:current_point+kind(dreal)*z_size), &
504 current_point=current_point+(kind(dreal)*z_size)
533 integer,
intent(in) :: recv_count
540 call c_add_string(present_field_names, data_description(i)%field_name)
555 call c_free(present_field_names)
556 call c_free(diagnostics_field_names_and_roots)
568 logical :: field_found
571 if (.not. field_found)
call log_log(
log_error,
"Malformed MONC registration, no local size information")
573 monc_defn%local_dim_sizes(i)=field_description%dim_sizes(i)
576 if (.not. field_found)
call log_log(
log_error,
"Malformed MONC registration, no local start point information")
578 monc_defn%local_dim_starts(i)=field_description%dim_sizes(i)
581 if (.not. field_found)
call log_log(
log_error,
"Malformed MONC registration, no local end point information")
583 monc_defn%local_dim_ends(i)=field_description%dim_sizes(i)
integer function, public get_my_io_rank(io_comm)
Retrieves my IO server rank out of the number of IO servers that are running.
integer, parameter, public log_error
Only log ERROR messages.
Global callback inter IO, which registers the callback with identifiers and then the procedure is act...
Conversion between common inbuilt FORTRAN data types.
Map data structure that holds string (length 20 maximum) key value pairs.
character(len=string_length), parameter, public number_q_indicies_key
Returns whether a collection is empty.
character(len=string_length), dimension(:), allocatable, public cond_request
integer, parameter, public register_command
Reads the IO server state that was stored in a NetCDF checkpoint file.
subroutine, public register_command_receive()
Registers a request for receiving a command from any MONC process on the command channel.
subroutine, public initialise_logging(pid)
Initialises the logging. This is done to make it easier for master logging only, so that we don't hav...
type(field_description_type), dimension(:), allocatable registree_field_descriptions
integer function forthread_mutex_lock(mutex_id)
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
integer, parameter, public deregister_command
integer, parameter, public log_warn
Log WARNING and ERROR messages.
subroutine, public provide_monc_data_to_writer_federator(io_configuration, source, data_id, data_dump)
Data communicated from MONC is provided to this write federator and then included if the configuratio...
character(len=string_length), parameter, public local_start_points_key
subroutine handle_command_message(command, source, data_buffer)
Called to handle a specific command that has been recieved.
integer function, dimension(2) send_configuration_to_registree(source)
Sends the data and field descriptions to the MONC process that just registered with the IO server.
integer mpi_type_field_description
The MPI data type for field descriptions sent to MONCs.
character(len=string_length), dimension(:), allocatable, public diag_long
Abstraction layer around MPI, this issues and marshals the lower level communication details.
integer function, public build_mpi_type_definition_description()
Builds the MPI data type for sending data descriptions to registree MONCs.
The main IO server functionality which handles waiting for commands and data both of which are delt w...
subroutine, public finalise_writer_field_manager()
Finalises the writer field manager.
logical, volatile initialised_present_data
logical function, public threadpool_is_idle()
Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work)
This federates over the writing of diagnostic and prognostic data to the file system....
subroutine, public perform_global_callback(io_configuration, field_name, timestep, completion_procedure)
Performs a global callback.
integer function forthread_mutex_init(mutex_id, attr_id)
The writer field manager will manage aspects of the fields being provided to the writer federator....
Collection data structures.
subroutine handle_monc_dimension_information(data_description, monc_defn)
Handles the provided local MONC dimension and data layout information.
subroutine, public read_io_server_configuration(checkpoint_filename, io_xml_configuration, io_communicator_arg)
Reads the IO server configuration, which is the XML configuration initially run with and stored in th...
character(len=string_length), dimension(:), allocatable, public diag_request
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
integer, parameter, public scalar_field_type
subroutine, public io_server_run(options_database, io_communicator_arg, provided_threading, total_global_processes, continuation_run, io_configuration_file)
Called to start the IO server and once this subroutine returns then it indicates that the IO server h...
subroutine, public replace_character(str, src_char, tgt_char)
Replaces all occurances of a character in a string with another character.
subroutine termination_callback(io_configuration, values, field_name, timestep)
This is the termination callback which is called once all MONCs have deregistered,...
subroutine, public pause_for_mpi_interleaving()
Pauses for a specific number of ms to allow for MPI interleaving, this is to avoid starvation.
subroutine handle_monc_registration(arguments, data_buffer)
Handles registration from some MONC process. The source process sends some data description to this I...
type(definition_description_type), dimension(:), allocatable registree_definition_descriptions
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
subroutine, public finalise_diagnostic_federator(io_configuration)
Finalises the diagnostics federator, waiting for all outstanding requests and then freeing data.
integer function forthread_rwlock_destroy(rwlock_id)
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI.
integer, parameter, public long_string_length
Length of longer strings.
integer, parameter, public data_size_stride
logical, volatile contine_poll_messages
Whether to continue waiting command messages from any MONC processes.
integer function, public build_mpi_type_field_description()
Builds the MPI data type for sending field descriptions to registree MONCs.
type(io_configuration_type), save, volatile io_configuration
Internal representation of the IO configuration.
Configuration of a specific data definition.
integer, parameter, public data_tag
This defines some constants and procedures that are useful to the IO server and clients that call it....
integer, volatile monc_registration_lock
subroutine free_individual_registered_monc_aspects()
Frees up the memory associated with individual registered MONCs. This is done at the end for all MONC...
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data....
integer function forthread_rwlock_tryrdlock(lock_id)
subroutine, public pass_fields_to_diagnostics_federator(io_configuration, source, data_id, data_dump)
Entry point into the diagnostics federator this runs the diagnostics, executing the defined rules bas...
character(len=string_length) function, public options_get_string(options_database, key, index)
Retrieves a string value from the database that matches the provided key.
subroutine register_present_field_names_to_federators(data_description, recv_count)
Registers with the writer federator the set of fields (prognostic and diagnostic) that are available,...
subroutine, public inform_writer_federator_time_point(io_configuration, source, data_id, data_dump)
subroutine, public initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
Initialises the write federator and configures it based on the user configuration....
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
subroutine, public initialise_mpi_communication(provided_threading)
Initialises MPI communication.
logical function, public get_data_description_from_name(descriptions, name, field_description)
Look up the data description that corresponds to a specific field keyed by its name.
character(len=string_length), parameter, public local_sizes_key
Retrieves the integer value held at the specific map index or null if index > map elements.
This diagnostics federator will take in data fields sent from a MONC, perform operators on these as r...
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
logical, volatile already_registered_finishing_call
integer, parameter, public data_command_start
subroutine, public configuration_parse(provided_options_database, raw_configuration, parsed_configuration)
This will parse an XML string into the IO configuration.
logical function, public test_for_command(command, source)
Tests for a command message based upon the request already registered.
Converts data types to strings.
subroutine, public inform_writer_federator_fields_present(io_configuration, field_names, diag_field_names_and_roots)
Informs the writer federator that specific fields are present and should be reflected in the diagnost...
integer function forthread_cond_signal(cond_id)
logical function, public test_for_inter_io(inter_io_communications, number_of_inter_io, io_communicator, command, source, data_buffer)
Tests for inter IO server communication.
subroutine, public extend_registered_moncs_array(io_configuration)
Extends the data definitions array from the current size to the current size + data size stride.
subroutine, public provide_q_field_names_to_writer_federator(q_provided_field_names)
Provides the Q field names to the write federator, this is required as on initialisation we don't kno...
integer function forthread_rwlock_init(rwlock_id, attr_id)
Overall IO configuration.
integer function forthread_cond_wait(cond_id, mutex_id)
Puts an integer key-value pair into the map.
logical, volatile contine_poll_interio_messages
subroutine, public free_mpi_type(the_type)
Frees an MPI type, used in clean up.
subroutine, public cancel_requests()
Cancels all outstanding communication requests.
subroutine handle_inter_io_communication_command(arguments, data_buffer)
Handles inter IO server communications.
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map.
subroutine, public finalise_writer_federator()
Finalises the write federator and the manipulations.
type(hashmap_type) function, public determine_diagnostics_fields_available(monc_field_names)
Determines the diagnostics fields that are available based upon the input MONC fields on registration...
logical function, public options_get_logical(options_database, key, index)
Retrieves a logical value from the database that matches the provided key.
type(definition_description_type) function, dimension(:), allocatable, public build_definition_description_type_from_configuration(io_configuration)
Builds up the data definition description type from the structured definitions in the IO configuratio...
integer mpi_type_definition_description
The MPI data type for data descriptions sent to MONCs.
integer mpi_type_data_sizing_description
The MPI type for field sizing (i.e. array size etc send when MONCs register)
character(len=string_length), parameter, public local_end_points_key
subroutine init_data_definition(source, monc_defn)
Initialise the sizing of data definitions from a MONC process. The IO server determines,...
integer function, public build_mpi_datatype(data_definition, data_size_info, data_size, field_start_locations, field_end_locations, field_dimensions)
Builds the MPI type that corresponds to the data which will be received from a specific MONC process....
integer, parameter, public inter_io_communication
Field type identifiers.
Configuration that representes the state of a registered MONC process.
subroutine, public threadpool_finalise()
Finalises the thread pool.
recursive character function, dimension(:), allocatable, public get_io_xml(filename, funit_num)
Reads in textual data from a file and returns this, used to read the IO server XML configuration file...
Contains common definitions for the data and datatypes used by MONC.
integer function, public data_receive(mpi_datatype, num_elements, source, dump_data, data_dump_id, description_data)
Awaits some data on the data channel. This is of the type, size from the source provided and can eith...
subroutine, public threadpool_deactivate()
This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation an...
integer, parameter, public string_length
Default length of strings.
integer function forthread_mutex_unlock(mutex_id)
subroutine handle_deregistration_command(arguments, data_buffer)
Deregisteres a specific MONC source process.
subroutine get_monc_information_data(source)
Retrieves MONC information data, this is sent by MONC (and received) regardless, but only actioned if...
subroutine, public check_writer_for_trigger(io_configuration, source, data_id, data_dump)
Checks all writer entries for any trigger fires and issues the underlying file storage.
type(hashmap_type) function, public initialise_diagnostic_federator(io_configuration)
Initialises the diagnostics action and sets up the diagnostics master definitions.
integer function, public build_mpi_type_data_sizing_description()
Builds the MPI type used for sending to the IO server a description of the data, namely the size of t...
integer function, public get_monc_location(io_configuration, source)
A helper function to get the location of a MONC's configuration in the IO data structure.
subroutine check_for_condi_conflict(raw_contents, options_database)
Handle potential conditional diagnostics conflict Provides a more helpful error in the case where con...
logical function await_command(command, source, data_buffer)
Awaits a command or shutdown from MONC processes and other IO servers.
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Removes a specific element from the list or map.
character(len=string_length), dimension(:), allocatable, public cond_long
integer function forthread_rwlock_wrlock(lock_id)
Gets a specific integer element out of the list, stack, queue or map with the corresponding key.
integer function forthread_rwlock_rdlock(lock_id)
subroutine handle_data_message(arguments, data_buffer)
Handles the command for data download from a specific process. This will allocate the receive buffer ...
type(field_description_type) function, dimension(:), allocatable, public build_field_description_type_from_configuration(io_configuration)
Builds up the field definition description type from the structured definitions in the IO configurati...
Adds a string to the end of the list.
Manages the options database. Contains administration functions and deduce runtime options from the c...
String utility functionality that is commonly used throughout MONC.
Parses the XML configuration file to produce the io configuration description which contains the data...
integer function forthread_rwlock_unlock(lock_id)
subroutine pull_back_data_message_and_handle(source, data_set)
Retrieves the message from MONC off the data channel and throws this to a thread in the thread pool t...
integer function forthread_cond_init(cond_id, attr_id)
subroutine, public threadpool_init(io_configuration)
Initialises the thread pool and marks each thread as idle.
Hashset structure which will store unique strings. The hashing aspect means that lookup is very fast ...
logical function, public check_diagnostic_federator_for_completion(io_configuration)
Checks whether the diagnostics federator has completed or not, this is really checking all the underl...
integer function, public get_number_io_servers(io_comm)
Retrieves the number of IO servers that are running in total.
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
integer function, public retrieve_data_definition(io_configuration, key)
Retrieves a specific data definition from the configuration which matches a key.
subroutine, public initialise_writer_field_manager(io_configuration, continuation_run)
Initialises the writer field manager.