Skip to content
Snippets Groups Projects
Commit d4ad52f8 authored by David Rouquet's avatar David Rouquet
Browse files

Stuff for multiprocessing including remove etree from config

parent 76ced0b3
Branches
No related tags found
No related merge requests found
......@@ -44,11 +44,11 @@ class Config:
technical_dir_path=None):
# -- Config XML Tree
self.config_tree = etree.parse(config_file)
config_tree = etree.parse(config_file)
# -- Base paremeters
self.config_file = config_file
c_base = self.config_tree.xpath("base")[0]
c_base = config_tree.xpath("base")[0]
self.uuid_str = uuid_str
self.technical_base_name = uuid_str
self.source_corpus = source_corpus
......@@ -61,7 +61,7 @@ class Config:
# self.cts_ref = ""
# -- Directories
c_dir = self.config_tree.xpath("directory")[0]
c_dir = config_tree.xpath("directory")[0]
if base_dir == None:
self.base_dir = c_dir.get("base_dir")
else:
......@@ -79,7 +79,7 @@ class Config:
self.sentence_output_dir = ''
# -- Config File Definition
c_file = self.config_tree.xpath("file")[0]
c_file = config_tree.xpath("file")[0]
self.schema_file = ""
self.semantic_net_file = self.structure_dir
self.semantic_net_file += c_file.get("semantic_net_schema") + ".ttl"
......@@ -91,7 +91,7 @@ class Config:
# self.cts_file = ""
# -- Ontology References
c_ref = self.config_tree.xpath("reference")[0]
c_ref = config_tree.xpath("reference")[0]
self.base_uri = c_ref.get("default_base_uri")
self.onto_suffix = c_ref.get("default_ontology_suffix")
self.onto_seed_suffix = c_ref.get("default_ontology_seed_suffix")
......@@ -105,7 +105,7 @@ class Config:
self.frame_ontology_seed_file = target_ontology + self.onto_seed_suffix
# -- Output
c_out = self.config_tree.xpath("output")[0]
c_out = config_tree.xpath("output")[0]
self.output_ontology_namespace = c_out.get("ontology_namespace")
# self.output_file = self.sentence_output_dir + self.uuid_str + ".ttl"
self.output_file = self.sentence_output_dir + self.technical_base_name + ".ttl"
......@@ -176,8 +176,9 @@ class Config:
return self._output_dir
def _set_output_dir(self, output_dir_complement=''):
c_dir = self.config_tree.xpath("directory")[0]
self._output_dir = self.base_output_dir
config_tree = etree.parse(self.config_file)
c_dir = config_tree.xpath("directory")[0]
self._output_dir = self.base_output_dir + '/'
# self._output_dir += self.uuid_str + output_dir_complement
self._output_dir += self.technical_base_name + output_dir_complement
self._output_dir += '-' + datetime.now().strftime('%Y%m%d')
......@@ -223,7 +224,8 @@ class Config:
return self._schema_file
def _set_schema_file(self, schema_file_cmpl):
c_file = self.config_tree.xpath("file")[0]
config_tree = etree.parse(self.config_file)
c_file = config_tree.xpath("file")[0]
self._schema_file = self.structure_dir
if self.source_type == 'amr':
self._schema_file += c_file.get("amr_input_data_schema")
......@@ -337,3 +339,71 @@ class Config:
config_str += '\n' + ' *** - *** '
return config_str
def to_dict(self):
config_dict = {
"config_file": self.config_file,
"uuid_str": self.uuid_str,
"source_corpus": self.source_corpus,
"target_ref": self.target_ref,
"base_dir": self.base_dir,
"structure_dir": self.structure_dir,
"cts_dir": self.cts_dir,
"target_frame_dir": self.target_frame_dir,
"input_doc_dir": self.input_doc_dir,
"base_output_dir": self.base_output_dir,
"output_dir": self.output_dir,
"technical_dir_path": self.technical_dir_path,
"sentence_output_dir": self.sentence_output_dir,
"process_level": self.process_level,
"source_type": self.source_type,
"extraction_scheme": self.extraction_scheme,
"config_param_file": self.config_param_file,
"base_ontology_file": self.base_ontology_file,
"cts_file": self.cts_file,
"base_uri": self.base_uri,
"onto_suffix": self.onto_suffix,
"onto_seed_suffix": self.onto_seed_suffix,
"source_sentence_file": self.source_sentence_file,
"frame_ontology_file": self.frame_ontology_file,
"frame_ontology_seed_file": self.frame_ontology_seed_file,
"output_ontology_namespace": self.output_ontology_namespace,
"output_file": self.output_file,
"input_doc_dir": self.input_doc_dir,
"schema_file": self.schema_file,
}
return config_dict
def update_from_dict(self, config_dict):
self.config_file = config_dict.get("config_file")
self.uuid_str = config_dict.get("uuid_str")
self.source_corpus = config_dict.get("source_corpus")
self.target_ref = config_dict.get("target_ref")
self.base_dir = config_dict.get("base_dir")
self.structure_dir = config_dict.get("structure_dir")
self.cts_dir = config_dict.get("cts_dir")
self.target_frame_dir = config_dict.get("target_frame_dir")
self.input_doc_dir = config_dict.get("input_doc_dir")
self.base_output_dir = config_dict.get("base_output_dir")
self.output_dir = config_dict.get("output_dir")
self.technical_dir_path = config_dict.get("technical_dir_path")
self.sentence_output_dir = config_dict.get("sentence_output_dir")
self.process_level = config_dict.get("process_level")
self.source_type = config_dict.get("source_type")
self.extraction_scheme = config_dict.get("extraction_scheme")
self.config_param_file = config_dict.get("config_param_file")
self.base_ontology_file = config_dict.get("base_ontology_file")
self.cts_file = config_dict.get("cts_file")
self.base_uri = config_dict.get("base_uri")
self.onto_suffix = config_dict.get("onto_suffix")
self.onto_seed_suffix = config_dict.get("onto_seed_suffix")
self.source_sentence_file = config_dict.get("source_sentence_file")
self.frame_ontology_file = config_dict.get("frame_ontology_file")
self.frame_ontology_seed_file = config_dict.get("frame_ontology_seed_file")
self.output_ontology_namespace = config_dict.get("output_ontology_namespace")
self.output_file = config_dict.get("output_file")
self.input_doc_dir = config_dict.get("input_doc_dir")
self.schema_file = config_dict.get("schema_file")
\ No newline at end of file
......@@ -14,6 +14,7 @@ import logging
import logging.config
import multiprocessing_logging
import multiprocessing
import json
from extraction import config, structure, process
from utility.timer import timed
......@@ -112,7 +113,7 @@ def __serialize_factoid_graph(config, factoid_graph, out_file_path=None):
# AMR Main Methods (to create an ontology)
#==============================================================================
@timed
#@timed
def create_ontology_from_amrld_file(amrld_file_path,
base_ontology_path=None,
onto_prefix=None,
......@@ -142,9 +143,10 @@ def create_ontology_from_amrld_file(amrld_file_path,
logger.info('\n === Process Initialization === ')
__set_context()
if onto_prefix is None: onto_prefix = 'DefaultId'
base_output_dir = os.path.dirname(out_file_path) if out_file_path is not None else None
config = __set_config(OWL_CONFIG_FILE_PATH,
'amr', amrld_file_path, onto_prefix,
out_file_path, technical_dir_path)
base_output_dir, technical_dir_path)
assert os.path.exists(amrld_file_path), f'input file does not exists ({amrld_file_path})'
# -- Extraction Processing
......@@ -167,23 +169,40 @@ def create_ontology_from_amrld_file(amrld_file_path,
return ontology_turtle_string
global result_triple_queue
global sentence_file_list
def pool_function(sentence_indice,sentence_file_list):
def dump_queue(q):
q.put(None)
return list(iter(q.get, None))
def pool_function(arg_dic):
global result_triple_queue
global sentence_file_list
print(f'==================== TEST A')
process_config = config.Config(OWL_CONFIG_FILE_PATH, 'default', 'default')
process_config.update_from_dict(arg_dic)
print(f'==================== TEST B')
print(f'==================== process_config (1): {process_config}')
sentence_indice = arg_dic['sentence_list_indice']
print(f'==================== process_config (2): {process_config}')
sentence_file = sentence_file_list[sentence_indice]
print(f'==================== sentence_file: {sentence_file}')
logger.info(f' *** sentence {sentence_indice} *** ')
config.sentence_output_dir = f'-{sentence_indice}'
new_triple_list = __apply_extraction(config, sentence_file)
process_config.sentence_output_dir = f'-{sentence_indice}'
new_triple_list = __apply_extraction(process_config, sentence_file)
print(f'==================== TEST C')
# The following must handled via a global queue
result_triple_list.extend(new_triple_list)
result_triple_queue.extend(new_triple_list)
return(new_triple_list)
@timed
#@timed
def create_ontology_from_amrld_dir(amrld_dir_path,
base_ontology_path=None,
onto_prefix=None,
out_file_path=None,
technical_dir_path=None,
processes=multiprocessing.cpu_count()-1
processes=3#multiprocessing.cpu_count()-1
):
"""
Method to create an ontology (as Turtle String) from a transduction
......@@ -196,7 +215,7 @@ def create_ontology_from_amrld_dir(amrld_dir_path,
onto_prefix: the target ontology prefix if defined (if not defined a prefix based on the amrld filename is used).
out_file_path: a file path where the output ontology is written if defined (the function still outputs the string).
technical_dir_path: a dir path where some technical and log files are written if defined.
processes: the nuber of processes in the multiprocessing pool
processes: the number of processes in the multiprocessing pool
Returns
-------
......@@ -204,16 +223,18 @@ def create_ontology_from_amrld_dir(amrld_dir_path,
Complete Ontology Turtle String (synthesis of all ontology)
"""
global result_triple_queue
global sentence_file_list
logger.info('[TENET] Extraction Processing')
# -- Process Initialization
logger.info('\n === Process Initialization === ')
__set_context()
if onto_prefix is None: onto_prefix = 'DefaultId'
base_output_dir = os.path.dirname(out_file_path) if out_file_path is not None else None
config = __set_config(OWL_CONFIG_FILE_PATH,
'amr', amrld_dir_path, onto_prefix,
out_file_path, technical_dir_path)
base_output_dir, technical_dir_path)
assert os.path.exists(amrld_dir_path), f'input directory does not exists ({amrld_dir_path})'
__count_number_of_graph(config)
......@@ -224,18 +245,29 @@ def create_ontology_from_amrld_dir(amrld_dir_path,
result_triple_list = []
result_triple_queue = multiprocessing.Queue()
sentence_file_list = glob.glob(sentence_dir, recursive = True)
# The following is for multiprocessing logging (must be exec before the pool is created
multiprocessing_logging.install_mp_handler()
star_iterable = [(i, sentence_file_list) for i in range(len(sentence_file_list))]
config_dict = config.to_dict()
#star_iterable = [(i, config) for i in range(len(sentence_file_list))]
mapIterable = []
for i in range(len(sentence_file_list)):
config_dict['sentence_list_indice'] = i
mapIterable = mapIterable + [config_dict.copy()]
print(config_dict)
with multiprocessing.Pool(processes) as p:
p.starmap(pool_function, star_iterable)
print (f'\n mapIterable: {mapIterable}')
triples = p.map(pool_function, mapIterable)
# -- Final Ontology Generation (factoid_graph)
logger.info('\n === Final Ontology Generation === ')
result_triple_list = dump_queue(result_triple_queue)
factoid_graph = __generate_final_ontology(result_triple_list)
ontology_turtle_string = __serialize_factoid_graph(config, factoid_graph, out_file_path)
......@@ -254,7 +286,7 @@ def create_ontology_from_amrld_dir(amrld_dir_path,
# AMR Main Methods (to generate ODRL statements)
#==============================================================================
@timed
#@timed
def generate_odrl_from_amrld_file(
amrld_file_path, onto_prefix=None, out_file_path=None,
technical_dir_path=None):
......@@ -281,9 +313,10 @@ def generate_odrl_from_amrld_file(
logger.info('\n === Process Initialization === ')
__set_context()
if onto_prefix is None: onto_prefix = 'DefaultId'
base_output_dir = os.path.dirname(out_file_path) if out_file_path is not None else None
config = __set_config(ODRL_CONFIG_FILE_PATH,
'amr', amrld_file_path, onto_prefix,
out_file_path, technical_dir_path)
base_output_dir, technical_dir_path)
assert os.path.exists(amrld_file_path), f'input file does not exists ({amrld_file_path})'
# -- Extraction Processing
......@@ -306,7 +339,7 @@ def generate_odrl_from_amrld_file(
return ontology_turtle_string
@timed
#@timed
def generate_odrl_from_amrld_dir(
amrld_dir_path, onto_prefix=None, out_file_path=None,
technical_dir_path=None):
......@@ -334,9 +367,10 @@ def generate_odrl_from_amrld_dir(
logger.info('\n === Process Initialization === ')
__set_context()
if onto_prefix is None: onto_prefix = 'DefaultId'
base_output_dir = os.path.dirname(out_file_path) if out_file_path is not None else None
config = __set_config(ODRL_CONFIG_FILE_PATH,
'amr', amrld_dir_path, onto_prefix,
out_file_path, technical_dir_path)
base_output_dir, technical_dir_path)
assert os.path.exists(amrld_dir_path), f'input directory does not exists ({amrld_dir_path})'
__count_number_of_graph(config)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment