Select Git revision
wakeonlanservice.sh
tenet_extraction.py 13.08 KiB
#!/usr/bin/python3.10
# -*-coding:Utf-8 -*
#==============================================================================
# TENET: New Tenet Extraction Engine (TEE)
#------------------------------------------------------------------------------
# Module, with a specific tenet extraction engine, to run the extraction
# process by applying a list of transduction schemes (CTS) on a working
# structure
#==============================================================================
#==============================================================================
# Importing required modules
#==============================================================================
from rdflib import Graph
import sys
import logging
import glob
from pathlib import Path
from importlib.machinery import SourceFileLoader
import importlib.util
import importlib
from .timer import timed
from .transduction.rule import Rule
from .transduction.sequence import Sequence
#==============================================================================
# Parameters
#==============================================================================
# Logging
logger = logging.getLogger(__name__)
#==============================================================================
# Loading Functions
#==============================================================================
def load_cts(config):
""" Load extraction scheme (CTS) from <cts_ref> file """
try:
cts_module = SourceFileLoader(config.cts_ref,
config.cts_file).load_module()
return cts_module.rule_dir, cts_module.prefix_list, cts_module.scheme
except FileNotFoundError:
logger.error(' *** Error while loading scheme (load_cts) ***')
logger.debug('\n' + ' cts_file unknown: {0}'.format(config.cts_file))
def get_new_rule_set(rule_def_set, prefix_list):
""" Get a set of new rules from <rule_def_set> (rule definition dictionary)
"""
try:
rule_set = {}
for rule_key, rule_def in rule_def_set.items():
rule = Rule()
rule.load_dict(rule_def)
rule.load_prefix_list(prefix_list)
rule_set[rule_key] = rule
return rule_set
except:
logger.error(' *** Error while loading rule set (get_new_rule_set) *** ')
logger.debug(' ----- len(rule_def_set): {0}'.format(len(rule_def_set)))
logger.debug(' ----- rule_key: {0}'.format(rule_key))
logger.debug(' ----- rule_def: {0}'.format(rule_def))
logger.debug(' ----- len(rule_set): {0}'.format(len(rule_set)))
def load_rule_set(config, rule_dir, prefix_list):
""" Load all rules into a set (as dictionary) from definition files
in <rule_dir> directory.
"""
try:
rule_set = {}
# -- Append rule paths to sys.path
path_glob_pattern = config.cts_dir + rule_dir + '**/'
for rule_path in glob.iglob(path_glob_pattern, recursive = True):
sys.path.append(rule_path)
# -- Load rule modules
file_glob_pattern = config.cts_dir + rule_dir + '**/*.py'
for rule_filename in glob.iglob(file_glob_pattern, recursive = True):
if 'query_builder' not in rule_filename:
# -- old ---
# spec = importlib.util.spec_from_file_location(rule_module_name,
# rule_filename)
# rule_module = importlib.util.module_from_spec(spec)
# sys.modules[rule_module_name] = rule_module
# spec.loader.exec_module(rule_module)
# -- ---
# -- Import module
rule_module_name = Path(rule_filename).stem
rule_module = importlib.import_module(f'{rule_module_name}')
# -- Update rule set
if hasattr(rule_module, 'rule_set'):
rule_def_set = rule_module.rule_set
new_rule_set = get_new_rule_set(rule_def_set, prefix_list)
rule_set.update(new_rule_set)
return rule_set
except:
logger.error(' *** Error while loading rule set (load_rule_set) *** ')
logger.debug(f' ----- path_glob_pattern: {path_glob_pattern}')
logger.debug(f' ----- sys.path: {sys.path}')
logger.debug(f' ----- file_glob_pattern: {file_glob_pattern}')
logger.debug(f' ----- rule_filename: {rule_filename}')
logger.debug(f' ----- rule_module_name: {rule_module_name}')
logger.debug(f' ----- rule_module: {rule_module}')
logger.debug(f' ----- len(rule_def_set): {len(rule_def_set)}')
logger.debug(f' ----- len(new_rule_set): {len(new_rule_set)}')
logger.debug(f' ----- len(rule_set): {len(rule_set)}')
#==============================================================================
# Extraction step
#==============================================================================
def _prepare_sequence(sequence_def, rule_set):
sequence = Sequence()
sequence.load_sequence_from_dict(sequence_def)
sequence.load_rule_list(rule_set)
return sequence
def _apply_refinement(graph, refinement_rule_list):
""" Apply <refinement_rule_list> on <graph> """
try:
all_new_triple_set = []
for rule in refinement_rule_list:
graph_length_before = len(graph)
(graph, extracted_triple_set), exec_time_date = rule.apply(graph)
all_new_triple_set.extend(extracted_triple_set)
graph_length_after = len(graph)
new_triple_count = graph_length_after - graph_length_before
message = "----- (refinement) {0}: {1} new triples ({2})"
message = message.format(rule.label,
new_triple_count,
graph_length_after)
if (new_triple_count > 0):
logger.debug(message)
return graph, all_new_triple_set
except:
logger.error(" *** Error while processing extraction (_apply_refinement) ***")
logger.debug(" ----- len(refinement_rule_list): {0} ".format(len(refinement_rule_list)))
logger.debug(" ----- last rule: {0} ".format(rule))
def _apply_sequence(graph, sequence, refinement_rule_list):
""" Apply the rules of <sequence> on the working graph <graph> """
try:
logger.info("--- Sequence: {0}".format(sequence.label))
all_new_triple_set = []
for rule in sequence.rule_list:
graph_length_before = len(graph)
# -- apply rule
(graph, extracted_triple_set), exec_time_date = rule.apply(graph)
all_new_triple_set.extend(extracted_triple_set)
new_triple_count = len(graph) - graph_length_before
str = f"----- {rule.label}: "
str += f"{new_triple_count}/{len(extracted_triple_set)} new triple"
if new_triple_count > 1: str += f"s"
str += f" ({len(graph)}, {exec_time_date})"
if (new_triple_count > 0):
logger.info(str)
else:
logger.debug(str)
# -- apply refinement
graph, extracted_triple_set = _apply_refinement(graph, refinement_rule_list)
all_new_triple_set.extend(extracted_triple_set)
return graph, all_new_triple_set
except:
logger.error(" *** Error while processing extraction (_apply_sequence) ***")
logger.debug(" ----- len(sequence): {0} ".format(len(sequence)))
logger.debug(" ----- last rule: {0} ".format(rule))
logger.debug(" ----- last SPARQL query: \n{0} ".format(rule.get_query()))
logger.debug(" ----- len(extracted_triple_set): {0} ".format(len(extracted_triple_set)))
logger.debug(" ----- new_triple_count: {0} ".format(new_triple_count))
def _serialize_graph(config, graph, step_name):
""" Serialize <graph> to a file """
try:
uuid_str = config.uuid_str
work_file = config.output_file.replace('.ttl', '_' + step_name + '.ttl')
base_ref = "http://{0}/{1}".format(uuid_str, step_name)
message = "--- Serializing graph to {0} "
message = message.format(Path(work_file).stem)
logger.debug(message)
logger.debug("----- step: {0}".format(step_name))
logger.debug("----- id: {0}".format(uuid_str))
logger.debug("----- work_file: {0}".format(work_file))
logger.debug("----- base: {0}".format(base_ref))
graph.serialize(destination=work_file, base=base_ref, format='turtle')
except:
logger.error(" *** Error while serializing graph (serialize_graph) ***")
logger.debug(" ----- work_file: {0} ".format(work_file))
def apply_step(config, graph, rule_set, step_name, step_sequence_def):
""" Apply extraction step on the working graph """
try:
logger.info("-- Applying extraction step: {0}".format(step_name))
# -- Initialize
step_triple_list = []
graph_length_before_step = len(graph)
# -- Prepare refinement rule list
refinement_sequence = _prepare_sequence(step_sequence_def[0], rule_set)
refinement_rule_list = refinement_sequence.rule_list
# -- Apply the sequences of the step
for sequence_def in step_sequence_def[1:]:
sequence = _prepare_sequence(sequence_def, rule_set)
graph, triple_list = _apply_sequence(graph,
sequence,
refinement_rule_list)
step_triple_list.extend(triple_list)
# -- Serialize the working graph updated during the step
_serialize_graph(config, graph, step_name)
# -- Log extracted triple number
str = "----- {0} triples extracted during {1} step"
new_triple_count = len(graph) - graph_length_before_step
logger.info(str.format(new_triple_count, step_name))
return graph, step_triple_list
except:
logger.error(" *** Error while processing extraction (apply_step) ***")
logger.debug(' ----- step_name = {0}'.format(step_name))
logger.debug(' ----- len(step_sequence_def) = {0}'.format(len(step_sequence_def)))
logger.debug(' ----- step_sequence_def[0] = {0}'.format(step_sequence_def[0]))
logger.debug(' ----- last sequence def = {0}'.format(sequence_def))
logger.debug(' ----- last sequence label = {0}'.format(sequence.label))
#==============================================================================
# Main Function
#==============================================================================
@timed
def apply(config, graph):
""" Apply extraction process on the working graph """
try:
# -- Loading Extraction Scheme
logger.info("-- Loading Extraction Scheme ({0})".format(config.cts_ref))
rule_dir, prefix_list, scheme = load_cts(config)
logger.debug("----- Step number: {0}".format(len(scheme)))
# -- Loading Extraction Rules
logger.info("-- Loading Extraction Rules ({0}*)".format(rule_dir))
rule_set = load_rule_set(config, rule_dir, prefix_list)
logger.debug("----- Total rule number: {0}".format(len(rule_set)))
# -- Apply each step of the scheme
new_triple_list = []
for step_name, step_sequence_def in scheme.items():
graph, new_triple_list = apply_step(config, graph, rule_set,
step_name, step_sequence_def)
# -- Result: file containing only the factoids (last step result)
logger.info("-- Result: file containing only the factoids")
logger.debug("--- Making factoid graph with the last step result")
factoid_graph = Graph()
for new_triple in new_triple_list:
factoid_graph.add(new_triple)
logger.debug("----- Number of factoids: " + str(len(new_triple_list)))
uuid_str = config.uuid_str
base_ref = "http://" + uuid_str + '/' + 'factoid'
logger.debug("----- Graph base: " + base_ref)
uuid_str = config.uuid_str
factoid_file = config.output_file.replace('.ttl', '_factoid.ttl')
logger.debug("--- Serializing graph to factoid file (" + factoid_file + ")")
factoid_graph.serialize(destination=factoid_file,
base=base_ref,
format='turtle')
return graph, new_triple_list
except:
logger.error(' *** Error while processing extraction (apply) ***')
logger.debug(' ----- config.cts_ref = {0}'.format(config.cts_ref))
logger.debug(' ----- rule_dir = {0}'.format(rule_dir))
logger.debug(' ----- scheme = {0}'.format(scheme))