MONC
Functions/Subroutines | Variables
io_server_mod Module Reference

The main IO server functionality which handles waiting for commands and data both of which are delt with. The lower level details of the communication, configuration parsing etc are all held elsewhere. The server can be thought of similar to a bus, with command and data channels. The command gives context to what is on the data channel and not all commands require data (such as deregistration of MONC process) More...

Functions/Subroutines

subroutine, public io_server_run (options_database, io_communicator_arg, provided_threading, total_global_processes, continuation_run, io_configuration_file)
 Called to start the IO server and once this subroutine returns then it indicates that the IO server has finished. The runtime is spent in here awaiting commands and then dealing with them. Termination occurs when all MONC processes have deregistered, note that to trigger this then at least one MONC process must first register. More...
 
subroutine check_for_condi_conflict (raw_contents, options_database)
 Handle potential conditional diagnostics conflict Provides a more helpful error in the case where conditional diagnostics are requested as output, but their components are not enabled. We check this by searching the io_xml_configuration. More...
 
logical function await_command (command, source, data_buffer)
 Awaits a command or shutdown from MONC processes and other IO servers. More...
 
subroutine termination_callback (io_configuration, values, field_name, timestep)
 This is the termination callback which is called once all MONCs have deregistered, no sends are active by inter IO communications and all threads are idle. This shuts down the inter IO listening and kickstarts finalisation and closure. More...
 
subroutine handle_command_message (command, source, data_buffer)
 Called to handle a specific command that has been recieved. More...
 
subroutine handle_inter_io_communication_command (arguments, data_buffer)
 Handles inter IO server communications. More...
 
subroutine free_individual_registered_monc_aspects ()
 Frees up the memory associated with individual registered MONCs. This is done at the end for all MONCs as we can't deallocate dynamically in a threaded environment without excessive ordering and locking in case some data processing is queued or in progress. More...
 
subroutine handle_deregistration_command (arguments, data_buffer)
 Deregisteres a specific MONC source process. More...
 
subroutine pull_back_data_message_and_handle (source, data_set)
 Retrieves the message from MONC off the data channel and throws this to a thread in the thread pool to actually process We do it this way to enforce ordering between the command (including the data set ID) and the raw data itself. More...
 
subroutine handle_data_message (arguments, data_buffer)
 Handles the command for data download from a specific process. This will allocate the receive buffer and then call to get the data. Once it has been received then the data is run against handling rules. More...
 
subroutine handle_monc_registration (arguments, data_buffer)
 Handles registration from some MONC process. The source process sends some data description to this IO server which basically tells the IO server the size of the array datas (which might be different on different processes in the case of uneven decomposition.) Based upon this a communication (MPI) data type is constructed and the data size in bytes determined. More...
 
integer function, dimension(2) send_configuration_to_registree (source)
 Sends the data and field descriptions to the MONC process that just registered with the IO server. More...
 
subroutine init_data_definition (source, monc_defn)
 Initialise the sizing of data definitions from a MONC process. The IO server determines, from configuration, the structure of each data definition but the size of the arrays depends upon the MONC process (due to uneven distribution of data etc...) This receives the sizing message and then builds the MPI datatype for each data definition that the IO server will receive from that specific MONC process. The field sizings are for all fields in every data definition, and these are applied to each data definition which will simply ignore non matching fields. More...
 
subroutine get_monc_information_data (source)
 Retrieves MONC information data, this is sent by MONC (and received) regardless, but only actioned if the data has not already been set. More...
 
subroutine register_present_field_names_to_federators (data_description, recv_count)
 Registers with the writer federator the set of fields (prognostic and diagnostic) that are available, this is based on the array/optional fields present from MONC and the non-optional scalars. This is quite an expensive operation, so only done once. More...
 
subroutine handle_monc_dimension_information (data_description, monc_defn)
 Handles the provided local MONC dimension and data layout information. More...
 

Variables

integer mpi_type_data_sizing_description
 The MPI type for field sizing (i.e. array size etc send when MONCs register) More...
 
integer mpi_type_definition_description
 The MPI data type for data descriptions sent to MONCs. More...
 
integer mpi_type_field_description
 The MPI data type for field descriptions sent to MONCs. More...
 
type(io_configuration_type), save, volatile io_configuration
 Internal representation of the IO configuration. More...
 
logical, volatile contine_poll_messages
 Whether to continue waiting command messages from any MONC processes. More...
 
logical, volatile initialised_present_data
 
logical, volatile contine_poll_interio_messages
 
logical, volatile already_registered_finishing_call
 
type(field_description_type), dimension(:), allocatable registree_field_descriptions
 
type(definition_description_type), dimension(:), allocatable registree_definition_descriptions
 
integer, volatile monc_registration_lock
 

Detailed Description

The main IO server functionality which handles waiting for commands and data both of which are delt with. The lower level details of the communication, configuration parsing etc are all held elsewhere. The server can be thought of similar to a bus, with command and data channels. The command gives context to what is on the data channel and not all commands require data (such as deregistration of MONC process)

Function/Subroutine Documentation

◆ await_command()

logical function io_server_mod::await_command ( integer, intent(out)  command,
integer, intent(out)  source,
character, dimension(:), allocatable  data_buffer 
)
private

Awaits a command or shutdown from MONC processes and other IO servers.

Parameters
commandThe command received is output
sourceThe source process received is output
Returns
Whether to continue polling for commands (and whether to process the current output)

Definition at line 166 of file ioserver.F90.

167  integer, intent(out) :: command, source
168  character, dimension(:), allocatable :: data_buffer
169 
170  logical :: completed, inter_io_complete
171 
172  completed=.false.
173  await_command=.false.
174  do while(.not. completed)
175  if (.not. contine_poll_messages .and. .not. contine_poll_interio_messages) return
176  if (contine_poll_messages) then
177  if (test_for_command(command, source)) then
178  await_command=.true.
179  return
180  end if
181  end if
182  if (contine_poll_interio_messages .and. allocated(io_configuration%inter_io_communications)) then
183  inter_io_complete=test_for_inter_io(io_configuration%inter_io_communications, &
184  io_configuration%number_inter_io_communications, io_configuration%io_communicator, command, source, data_buffer)
185  if (inter_io_complete) then
186  await_command=.true.
187  return
188  end if
189  end if
190  if (.not. contine_poll_messages .and. .not. already_registered_finishing_call) then
191  if (check_diagnostic_federator_for_completion(io_configuration) .and. threadpool_is_idle()) then
192  already_registered_finishing_call=.true.
193  call perform_global_callback(io_configuration, "termination", 1, termination_callback)
194  end if
195  end if
196  if (.not. completed) call pause_for_mpi_interleaving()
197  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ check_for_condi_conflict()

subroutine io_server_mod::check_for_condi_conflict ( character, dimension(:), intent(in)  raw_contents,
type(hashmap_type), intent(inout)  options_database 
)
private

Handle potential conditional diagnostics conflict Provides a more helpful error in the case where conditional diagnostics are requested as output, but their components are not enabled. We check this by searching the io_xml_configuration.

Parameters
raw_contents,intendedto be the io_xml_configuration character array
options_database

Definition at line 145 of file ioserver.F90.

146  character, dimension(:), intent(in) :: raw_contents
147  type(hashmap_type), intent(inout) :: options_database
148  character(len=size(raw_contents)) :: string_to_process
149  integer :: i
150 
151  if (.not. options_get_logical(options_database, "conditional_diagnostics_column_enabled")) then
152  do i=1, size(raw_contents)
153  string_to_process(i:i)=raw_contents(i)
154  end do
155  if (index(string_to_process,"CondDiags_") .ne. 0) then
156  call log_log(log_error, &
157  "Conditional diagnostics are DISABLED but requested via xml. Enable or remove request to resolve.")
158  end if
159  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ free_individual_registered_monc_aspects()

subroutine io_server_mod::free_individual_registered_monc_aspects
private

Frees up the memory associated with individual registered MONCs. This is done at the end for all MONCs as we can't deallocate dynamically in a threaded environment without excessive ordering and locking in case some data processing is queued or in progress.

Definition at line 250 of file ioserver.F90.

251  integer :: i, specific_monc_data_type
252  type(iterator_type) :: types_iterator
253 
254  do i=1, size(io_configuration%registered_moncs)
255  types_iterator=c_get_iterator(io_configuration%registered_moncs(i)%registered_monc_types)
256  do while (c_has_next(types_iterator))
257  specific_monc_data_type=c_get_integer(c_next_mapentry(types_iterator))
258  call free_mpi_type(specific_monc_data_type)
259  end do
260  if (allocated(io_configuration%registered_moncs(i)%field_start_locations)) &
261  deallocate(io_configuration%registered_moncs(i)%field_start_locations)
262  if (allocated(io_configuration%registered_moncs(i)%field_end_locations)) &
263  deallocate(io_configuration%registered_moncs(i)%field_end_locations)
264  if (allocated(io_configuration%registered_moncs(i)%definition_names)) &
265  deallocate(io_configuration%registered_moncs(i)%definition_names)
266  if (allocated(io_configuration%registered_moncs(i)%dimensions)) deallocate(io_configuration%registered_moncs(i)%dimensions)
267  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_monc_information_data()

subroutine io_server_mod::get_monc_information_data ( integer, intent(in)  source)
private

Retrieves MONC information data, this is sent by MONC (and received) regardless, but only actioned if the data has not already been set.

Parameters
sourceMONC source process

Definition at line 468 of file ioserver.F90.

469  integer, intent(in) :: source
470 
471  character, dimension(:), allocatable :: buffer
472  character(len=STRING_LENGTH) :: q_field_name, cd_field_name
473  integer :: buffer_size, z_size, num_q_fields, n, current_point, recv_count
474  type(data_sizing_description_type) :: field_description
475  real(kind=default_precision) :: dreal
476  logical :: field_found
477 
478 
479  z_size=c_get_integer(io_configuration%dimension_sizing, "z")
480  num_q_fields=c_get_integer(io_configuration%dimension_sizing, "qfields")
481 
482  buffer_size=(kind(dreal)*z_size)*2 + (string_length * num_q_fields) &
483  + 2*ncond*string_length + 2*ndiag*string_length
484  allocate(buffer(buffer_size))
485  recv_count=data_receive(mpi_byte, buffer_size, source, buffer)
486  if (.not. io_configuration%general_info_set) then
487  call check_thread_status(forthread_mutex_lock(io_configuration%general_info_mutex))
488  if (.not. io_configuration%general_info_set) then
489  io_configuration%general_info_set=.true.
490  allocate(io_configuration%zn_field(z_size))
491  allocate(io_configuration%z_field(z_size))
492  io_configuration%zn_field=transfer(buffer(1:kind(dreal)*z_size), io_configuration%zn_field)
493  current_point=(kind(dreal)*z_size)
494  if (num_q_fields .gt. 0) then
495  do n=1, num_q_fields
496  q_field_name=transfer(buffer(current_point+1:current_point+string_length), q_field_name)
497  current_point=current_point+string_length
498  call replace_character(q_field_name, " ", "_")
499  call c_add_string(io_configuration%q_field_names, q_field_name)
500  end do
501  end if
502  io_configuration%z_field=transfer(buffer(current_point+1:current_point+kind(dreal)*z_size), &
503  io_configuration%z_field)
504  current_point=current_point+(kind(dreal)*z_size)
505 
506  do n=1,ncond
507  cond_request(n)=transfer(buffer(current_point+1:current_point+string_length), cd_field_name)
508  current_point=current_point+string_length
509  cond_long(n)=transfer(buffer(current_point+1:current_point+string_length), cd_field_name)
510  current_point=current_point+string_length
511  end do
512  do n=1,ndiag
513  diag_request(n)=transfer(buffer(current_point+1:current_point+string_length), cd_field_name)
514  current_point=current_point+string_length
515  diag_long(n)=transfer(buffer(current_point+1:current_point+string_length), cd_field_name)
516  current_point=current_point+string_length
517  end do
518 
519  end if
520  call provide_q_field_names_to_writer_federator(io_configuration%q_field_names)
521  call check_thread_status(forthread_mutex_unlock(io_configuration%general_info_mutex))
522  end if
523  deallocate(buffer)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_command_message()

subroutine io_server_mod::handle_command_message ( integer, intent(in)  command,
integer, intent(in)  source,
character, dimension(:), intent(inout), allocatable  data_buffer 
)
private

Called to handle a specific command that has been recieved.

Parameters
commandThe command which has been received from some process
sourceThe PID of the source (MONC) process

Definition at line 218 of file ioserver.F90.

219  integer, intent(in) :: command, source
220  character, dimension(:), allocatable, intent(inout) :: data_buffer
221 
222  if (command == register_command) then
223  call threadpool_start_thread(handle_monc_registration, (/ source /))
224  else if (command == deregister_command) then
225  call threadpool_start_thread(handle_deregistration_command, (/ source /))
226  else if (command == inter_io_communication) then
227  call threadpool_start_thread(handle_inter_io_communication_command, (/ source /), data_buffer=data_buffer)
228  deallocate(data_buffer)
229  else if (command .ge. data_command_start) then
230  call pull_back_data_message_and_handle(source, command-data_command_start)
231  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_data_message()

subroutine io_server_mod::handle_data_message ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Handles the command for data download from a specific process. This will allocate the receive buffer and then call to get the data. Once it has been received then the data is run against handling rules.

Parameters
arguments,element1 is the source & element 2 is the data_set
data_bufferThe actual data from MONC read from the data channel

Definition at line 322 of file ioserver.F90.

323  integer, dimension(:), intent(in) :: arguments
324  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
325 
326  integer :: monc_location, data_set, source, matched_datadefn_index
327 
328  source=arguments(1)
329  data_set=arguments(2)
330 
331  call check_thread_status(forthread_rwlock_rdlock(monc_registration_lock))
332  monc_location=get_monc_location(io_configuration, source)
333 
334  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
335  io_configuration%registered_moncs(monc_location)%active_threads=&
336  io_configuration%registered_moncs(monc_location)%active_threads+1
337  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
338 
339  matched_datadefn_index=retrieve_data_definition(io_configuration, &
340  io_configuration%registered_moncs(monc_location)%definition_names(data_set))
341 
342  if (matched_datadefn_index .gt. 0) then
343  call inform_writer_federator_time_point(io_configuration, source, data_set, data_buffer)
344  call pass_fields_to_diagnostics_federator(io_configuration, source, data_set, data_buffer)
345  call provide_monc_data_to_writer_federator(io_configuration, source, data_set, data_buffer)
346  call check_writer_for_trigger(io_configuration, source, data_set, data_buffer)
347  else
348  call log_log(log_warn, "IO server can not find matching data definition with name "&
349  //io_configuration%registered_moncs(monc_location)%definition_names(data_set))
350  end if
351 
352  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
353  io_configuration%registered_moncs(monc_location)%active_threads=&
354  io_configuration%registered_moncs(monc_location)%active_threads-1
355  call check_thread_status(forthread_cond_signal(io_configuration%registered_moncs(monc_location)%deactivate_condition_variable))
356  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
357  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_deregistration_command()

subroutine io_server_mod::handle_deregistration_command ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Deregisteres a specific MONC source process.

Parameters
sourceThe MONC process PID that we are deregistering

Definition at line 272 of file ioserver.F90.

273  integer, dimension(:), intent(in) :: arguments
274  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
275 
276  integer :: monc_location, source
277 
278  source=arguments(1)
279  monc_location=get_monc_location(io_configuration, source)
280  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
281  do while (io_configuration%registered_moncs(monc_location)%active_threads .gt. 0)
282  call check_thread_status(forthread_cond_wait(io_configuration%registered_moncs(monc_location)%deactivate_condition_variable,&
283  io_configuration%registered_moncs(monc_location)%active_mutex))
284  end do
285  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
286  call check_thread_status(forthread_rwlock_wrlock(monc_registration_lock))
287  io_configuration%active_moncs=io_configuration%active_moncs-1
288  if (io_configuration%active_moncs==0) contine_poll_messages=.false.
289  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_inter_io_communication_command()

subroutine io_server_mod::handle_inter_io_communication_command ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Handles inter IO server communications.

Parameters
argumentsThe thread based arguments, this is the index of the inter IO server description

Definition at line 236 of file ioserver.F90.

237  integer, dimension(:), intent(in) :: arguments
238  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
239 
240  integer :: source
241 
242  source=arguments(1)
243 
244  call io_configuration%inter_io_communications(source)%handling_procedure(io_configuration, data_buffer, source)
Here is the caller graph for this function:

◆ handle_monc_dimension_information()

subroutine io_server_mod::handle_monc_dimension_information ( type(data_sizing_description_type), dimension(:)  data_description,
type(io_configuration_registered_monc_type), intent(inout)  monc_defn 
)
private

Handles the provided local MONC dimension and data layout information.

Parameters
data_descriptionThe data descriptions sent over from MONC
monc_defnThe corresponding MONC definition data structure

Definition at line 562 of file ioserver.F90.

563  type(io_configuration_registered_monc_type), intent(inout) :: monc_defn
564  type(data_sizing_description_type), dimension(:) :: data_description
565 
566  type(data_sizing_description_type) :: field_description
567  integer :: i
568  logical :: field_found
569 
570  field_found=get_data_description_from_name(data_description, local_sizes_key, field_description)
571  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local size information")
572  do i=1,3
573  monc_defn%local_dim_sizes(i)=field_description%dim_sizes(i)
574  end do
575  field_found=get_data_description_from_name(data_description, local_start_points_key, field_description)
576  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local start point information")
577  do i=1,3
578  monc_defn%local_dim_starts(i)=field_description%dim_sizes(i)
579  end do
580  field_found=get_data_description_from_name(data_description, local_end_points_key, field_description)
581  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local end point information")
582  do i=1,3
583  monc_defn%local_dim_ends(i)=field_description%dim_sizes(i)
584  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_monc_registration()

subroutine io_server_mod::handle_monc_registration ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Handles registration from some MONC process. The source process sends some data description to this IO server which basically tells the IO server the size of the array datas (which might be different on different processes in the case of uneven decomposition.) Based upon this a communication (MPI) data type is constructed and the data size in bytes determined.

Parameters
sourceThe PID of the MONC process that is registering itself

Definition at line 364 of file ioserver.F90.

365  integer, dimension(:), intent(in) :: arguments
366  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
367 
368  integer :: configuration_send_request(2), ierr, number_data_definitions, this_monc_index, source
369 
370  source=arguments(1)
371  configuration_send_request=send_configuration_to_registree(source)
372  number_data_definitions=io_configuration%number_of_data_definitions
373 
374  call check_thread_status(forthread_rwlock_wrlock(monc_registration_lock))
375 
376  io_configuration%number_of_moncs=io_configuration%number_of_moncs+1
377  this_monc_index=io_configuration%number_of_moncs
378  if (io_configuration%number_of_moncs .gt. size(io_configuration%registered_moncs)) then
379  call log_log(log_error, "You have a high ratio of computational cores to IO servers, the limit is currently 100")
380  ! The extension of the MONC registration array is broken as the pointers involved in the map does not get copied across
381  ! we could manually do this, but that is for another day! If you need to extend these limits either increase the constants
382  ! or fix the extension, I don't think it will be too hard to fix the extension bit (copy the maps manually)
383  call extend_registered_moncs_array(io_configuration)
384  end if
385 
386  io_configuration%active_moncs=io_configuration%active_moncs+1
387  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
388 
389  call c_put_integer(io_configuration%monc_to_index, conv_to_string(source), this_monc_index)
390 
391  call check_thread_status(forthread_mutex_init(io_configuration%registered_moncs(this_monc_index)%active_mutex, -1))
392  call check_thread_status(forthread_cond_init(&
393  io_configuration%registered_moncs(this_monc_index)%deactivate_condition_variable, -1))
394  io_configuration%registered_moncs(this_monc_index)%active_threads=0
395  io_configuration%registered_moncs(this_monc_index)%source_id=source
396 
397  allocate(io_configuration%registered_moncs(this_monc_index)%field_start_locations(number_data_definitions), &
398  io_configuration%registered_moncs(this_monc_index)%field_end_locations(number_data_definitions), &
399  io_configuration%registered_moncs(this_monc_index)%definition_names(number_data_definitions), &
400  io_configuration%registered_moncs(this_monc_index)%dimensions(number_data_definitions))
401 
402  ! Wait for configuration to have been sent to registree
403  call waitall_for_mpi_requests(configuration_send_request, 2)
404  call init_data_definition(source, io_configuration%registered_moncs(this_monc_index))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ init_data_definition()

subroutine io_server_mod::init_data_definition ( integer, intent(in)  source,
type(io_configuration_registered_monc_type), intent(inout)  monc_defn 
)
private

Initialise the sizing of data definitions from a MONC process. The IO server determines, from configuration, the structure of each data definition but the size of the arrays depends upon the MONC process (due to uneven distribution of data etc...) This receives the sizing message and then builds the MPI datatype for each data definition that the IO server will receive from that specific MONC process. The field sizings are for all fields in every data definition, and these are applied to each data definition which will simply ignore non matching fields.

Parameters
sourceThe source MONC PID
monc_defnThe corresponding MONC definition data structure

Definition at line 433 of file ioserver.F90.

434  integer, intent(in) :: source
435  type(io_configuration_registered_monc_type), intent(inout) :: monc_defn
436 
437  type(data_sizing_description_type) :: data_description(io_configuration%number_of_distinct_data_fields+4)
438  integer :: created_mpi_type, data_size, recv_count, i
439  type(data_sizing_description_type) :: field_description
440  logical :: field_found
441 
442  recv_count=data_receive(mpi_type_data_sizing_description, io_configuration%number_of_distinct_data_fields+4, &
443  source, description_data=data_description)
444 
445  call handle_monc_dimension_information(data_description, monc_defn)
446 
447  do i=1, io_configuration%number_of_data_definitions
448  created_mpi_type=build_mpi_datatype(io_configuration%data_definitions(i), data_description, data_size, &
449  monc_defn%field_start_locations(i), monc_defn%field_end_locations(i), monc_defn%dimensions(i))
450 
451  call c_put_integer(monc_defn%registered_monc_types, conv_to_string(i), created_mpi_type)
452  call c_put_integer(monc_defn%registered_monc_buffer_sizes, conv_to_string(i), data_size)
453 
454  monc_defn%definition_names(i)=io_configuration%data_definitions(i)%name
455  end do
456  if (.not. initialised_present_data) then
457  initialised_present_data=.true.
458  field_found=get_data_description_from_name(data_description, number_q_indicies_key, field_description)
459  call c_put_integer(io_configuration%dimension_sizing, "active_q_indicies", field_description%dim_sizes(1))
460  call register_present_field_names_to_federators(data_description, recv_count)
461  end if
462  call get_monc_information_data(source)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ io_server_run()

subroutine, public io_server_mod::io_server_run ( type(hashmap_type), intent(inout)  options_database,
integer, intent(in)  io_communicator_arg,
integer, intent(in)  provided_threading,
integer, intent(in)  total_global_processes,
logical, intent(in)  continuation_run,
character(len=long_string_length), intent(in)  io_configuration_file 
)

Called to start the IO server and once this subroutine returns then it indicates that the IO server has finished. The runtime is spent in here awaiting commands and then dealing with them. Termination occurs when all MONC processes have deregistered, note that to trigger this then at least one MONC process must first register.

Parameters
io_communicator_argThe IO communicator containing just the IO servers
io_xml_configurationTextual XML configuration that is used to set up the IO server

Definition at line 66 of file ioserver.F90.

68  type(hashmap_type), intent(inout) :: options_database
69  integer, intent(in) :: io_communicator_arg, provided_threading, total_global_processes
70  logical, intent(in) :: continuation_run
71  character(len=LONG_STRING_LENGTH), intent(in) :: io_configuration_file
72 
73  integer :: command, source, my_rank, ierr
74  character, dimension(:), allocatable :: data_buffer, io_xml_configuration
75  type(hashmap_type) :: diagnostic_generation_frequency
76 
77 
78  if (continuation_run) then
79  ! Handle case where we need to allocate this due to no IO server config
80  call read_io_server_configuration(options_get_string(options_database, "checkpoint"), &
81  io_xml_configuration, io_communicator_arg)
82  end if
83 
84  if (.not. allocated(io_xml_configuration)) then
85  io_xml_configuration=get_io_xml(io_configuration_file)
86  if (continuation_run) then
87  call mpi_comm_rank(io_communicator_arg, my_rank, ierr)
88  if (my_rank == 0) then
89  call log_log(log_warn, "No IO server configuration in checkpoint file - starting from XML provided file instead")
90  end if
91  end if
92  end if
93 
94  call check_for_condi_conflict(io_xml_configuration, options_database)
95  call configuration_parse(options_database, io_xml_configuration, io_configuration)
96  deallocate(io_xml_configuration)
97  call threadpool_init(io_configuration)
98  call initialise_mpi_communication(provided_threading)
99  call check_thread_status(forthread_rwlock_init(monc_registration_lock, -1))
100  call check_thread_status(forthread_mutex_init(io_configuration%general_info_mutex, -1))
101  initialised_present_data=.false.
102  contine_poll_messages=.true.
103  contine_poll_interio_messages=.true.
104  already_registered_finishing_call=.false.
105  io_configuration%io_communicator=io_communicator_arg
106  io_configuration%number_of_io_servers=get_number_io_servers(io_communicator_arg)
107  io_configuration%number_of_global_moncs=total_global_processes-io_configuration%number_of_io_servers
108  io_configuration%my_io_rank=get_my_io_rank(io_communicator_arg)
109  call initialise_logging(io_configuration%my_io_rank)
110  registree_definition_descriptions=build_definition_description_type_from_configuration(io_configuration)
111  registree_field_descriptions=build_field_description_type_from_configuration(io_configuration)
112  diagnostic_generation_frequency=initialise_diagnostic_federator(io_configuration)
113  call initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
114  call c_free(diagnostic_generation_frequency)
115  call initialise_writer_field_manager(io_configuration, continuation_run)
116 
117  mpi_type_data_sizing_description=build_mpi_type_data_sizing_description()
118  mpi_type_definition_description=build_mpi_type_definition_description()
119  mpi_type_field_description=build_mpi_type_field_description()
120 
121  call register_command_receive()
122 
123  do while (await_command(command, source, data_buffer))
124  call handle_command_message(command, source, data_buffer)
125  end do
126  call threadpool_deactivate()
127  call finalise_writer_field_manager()
128  call finalise_writer_federator()
129  call finalise_diagnostic_federator(io_configuration)
130  call check_thread_status(forthread_rwlock_destroy(monc_registration_lock))
131  call free_individual_registered_monc_aspects()
132  call cancel_requests()
133  call free_mpi_type(mpi_type_data_sizing_description)
134  call free_mpi_type(mpi_type_definition_description)
135  call free_mpi_type(mpi_type_field_description)
136  call threadpool_finalise()
Here is the call graph for this function:
Here is the caller graph for this function:

◆ pull_back_data_message_and_handle()

subroutine io_server_mod::pull_back_data_message_and_handle ( integer, intent(in)  source,
integer, intent(in)  data_set 
)
private

Retrieves the message from MONC off the data channel and throws this to a thread in the thread pool to actually process We do it this way to enforce ordering between the command (including the data set ID) and the raw data itself.

Parameters
sourceSource PID of the MONC process
data_setID of the data set being communicated

Definition at line 296 of file ioserver.F90.

297  integer, intent(in) :: source, data_set
298 
299  integer :: specific_monc_data_type, specific_monc_buffer_size, recv_count, monc_location
300  character, dimension(:), allocatable :: data_buffer
301 
302  call check_thread_status(forthread_rwlock_rdlock(monc_registration_lock))
303  monc_location=get_monc_location(io_configuration, source)
304 
305  specific_monc_data_type=c_get_integer(io_configuration%registered_moncs(monc_location)%registered_monc_types, &
306  conv_to_string(data_set))
307  specific_monc_buffer_size=c_get_integer(io_configuration%registered_moncs(monc_location)%registered_monc_buffer_sizes, &
308  conv_to_string(data_set))
309 
310  allocate(data_buffer(specific_monc_buffer_size))
311  recv_count=data_receive(specific_monc_data_type, 1, source, dump_data=data_buffer, data_dump_id=data_set)
312 
313  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
314  call threadpool_start_thread(handle_data_message, (/ source, data_set /), data_buffer=data_buffer)
315  deallocate(data_buffer)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ register_present_field_names_to_federators()

subroutine io_server_mod::register_present_field_names_to_federators ( type(data_sizing_description_type), dimension(:), intent(in)  data_description,
integer, intent(in)  recv_count 
)
private

Registers with the writer federator the set of fields (prognostic and diagnostic) that are available, this is based on the array/optional fields present from MONC and the non-optional scalars. This is quite an expensive operation, so only done once.

Parameters
data_descriptionArray of data descriptions from MONC
recv_countNumber of data descriptions

Definition at line 531 of file ioserver.F90.

532  type(data_sizing_description_type), dimension(:), intent(in) :: data_description
533  integer, intent(in) :: recv_count
534 
535  type(hashset_type) :: present_field_names
536  type(hashmap_type) :: diagnostics_field_names_and_roots
537  integer :: i, j
538 
539  do i=1, recv_count
540  call c_add_string(present_field_names, data_description(i)%field_name)
541  end do
542  do i=1, io_configuration%number_of_data_definitions
543  do j=1, io_configuration%data_definitions(i)%number_of_data_fields
544  if (io_configuration%data_definitions(i)%fields(j)%field_type == scalar_field_type .and. .not. &
545  io_configuration%data_definitions(i)%fields(j)%optional) then
546  call c_add_string(present_field_names, io_configuration%data_definitions(i)%fields(j)%name)
547  end if
548  end do
549  end do
550  call c_add_string(present_field_names, "time")
551  call c_add_string(present_field_names, "timestep")
552  call inform_writer_federator_fields_present(io_configuration, present_field_names)
553  diagnostics_field_names_and_roots=determine_diagnostics_fields_available(present_field_names)
554  call inform_writer_federator_fields_present(io_configuration, diag_field_names_and_roots=diagnostics_field_names_and_roots)
555  call c_free(present_field_names)
556  call c_free(diagnostics_field_names_and_roots)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ send_configuration_to_registree()

integer function, dimension(2) io_server_mod::send_configuration_to_registree ( integer, intent(in)  source)
private

Sends the data and field descriptions to the MONC process that just registered with the IO server.

Parameters
sourceThe MPI rank (MPI_COMM_WORLD) of the registree
Returns
The nonblocking send request handles which can be waited for completion later (overlap compute and communication)

Definition at line 410 of file ioserver.F90.

411  integer, intent(in) :: source
412  integer :: send_configuration_to_registree(2)
413 
414  integer :: ierr, srequest(2)
415 
416  call lock_mpi()
417  call mpi_isend(registree_definition_descriptions, size(registree_definition_descriptions), mpi_type_definition_description, &
418  source, data_tag, mpi_comm_world, srequest(1), ierr)
419  call mpi_isend(registree_field_descriptions, size(registree_field_descriptions), mpi_type_field_description, &
420  source, data_tag, mpi_comm_world, srequest(2), ierr)
421  call unlock_mpi()
422 
423  send_configuration_to_registree=srequest
Here is the call graph for this function:
Here is the caller graph for this function:

◆ termination_callback()

subroutine io_server_mod::termination_callback ( type(io_configuration_type), intent(inout)  io_configuration,
real(default_precision), dimension(:)  values,
character(len=string_length)  field_name,
integer  timestep 
)
private

This is the termination callback which is called once all MONCs have deregistered, no sends are active by inter IO communications and all threads are idle. This shuts down the inter IO listening and kickstarts finalisation and closure.

Parameters
io_configurationThe IO server configuration
valuesValues (ignored)
field_nameField name identifier
timestepTimestep identifier

Definition at line 206 of file ioserver.F90.

207  type(io_configuration_type), intent(inout) :: io_configuration
208  real(DEFAULT_PRECISION), dimension(:) :: values
209  character(len=STRING_LENGTH) :: field_name
210  integer :: timestep
211 
212  contine_poll_interio_messages=.false.
Here is the caller graph for this function:

Variable Documentation

◆ already_registered_finishing_call

logical, volatile io_server_mod::already_registered_finishing_call
private

Definition at line 52 of file ioserver.F90.

◆ contine_poll_interio_messages

logical, volatile io_server_mod::contine_poll_interio_messages
private

Definition at line 52 of file ioserver.F90.

52  logical, volatile :: contine_poll_interio_messages, already_registered_finishing_call

◆ contine_poll_messages

logical, volatile io_server_mod::contine_poll_messages
private

Whether to continue waiting command messages from any MONC processes.

Definition at line 50 of file ioserver.F90.

50  logical, volatile :: contine_poll_messages, & !< Whether to continue waiting command messages from any MONC processes
51  initialised_present_data

◆ initialised_present_data

logical, volatile io_server_mod::initialised_present_data
private

Definition at line 50 of file ioserver.F90.

◆ io_configuration

type(io_configuration_type), save, volatile io_server_mod::io_configuration
private

Internal representation of the IO configuration.

Definition at line 49 of file ioserver.F90.

49  type(io_configuration_type), volatile, save :: io_configuration

◆ monc_registration_lock

integer, volatile io_server_mod::monc_registration_lock
private

Definition at line 56 of file ioserver.F90.

56  integer, volatile :: monc_registration_lock

◆ mpi_type_data_sizing_description

integer io_server_mod::mpi_type_data_sizing_description
private

The MPI type for field sizing (i.e. array size etc send when MONCs register)

Definition at line 46 of file ioserver.F90.

46  integer :: mpi_type_data_sizing_description, & !< The MPI type for field sizing (i.e. array size etc send when MONCs register)
47  mpi_type_definition_description, & !< The MPI data type for data descriptions sent to MONCs
48  mpi_type_field_description

◆ mpi_type_definition_description

integer io_server_mod::mpi_type_definition_description
private

The MPI data type for data descriptions sent to MONCs.

Definition at line 46 of file ioserver.F90.

◆ mpi_type_field_description

integer io_server_mod::mpi_type_field_description
private

The MPI data type for field descriptions sent to MONCs.

Definition at line 46 of file ioserver.F90.

◆ registree_definition_descriptions

type(definition_description_type), dimension(:), allocatable io_server_mod::registree_definition_descriptions
private

Definition at line 54 of file ioserver.F90.

54  type(definition_description_type), dimension(:), allocatable :: registree_definition_descriptions

◆ registree_field_descriptions

type(field_description_type), dimension(:), allocatable io_server_mod::registree_field_descriptions
private

Definition at line 53 of file ioserver.F90.

53  type(field_description_type), dimension(:), allocatable :: registree_field_descriptions
logging_mod::log_error
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
logging_mod::log_warn
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Definition: logging.F90:12
logging_mod::log_log
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
datadefn_mod::default_precision
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17