Skip to content
Snippets Groups Projects
Select Git revision
  • 0112454b0a1544bd00fa12485aeb167f9575e86e
  • master default protected
2 results

base.twig

Blame
  • tenet_extraction.py 13.08 KiB
    #!/usr/bin/python3.10
    # -*-coding:Utf-8 -*
    
    #==============================================================================
    # TENET: New Tenet Extraction Engine (TEE)
    #------------------------------------------------------------------------------
    # Module, with a specific tenet extraction engine, to run the extraction 
    # process by applying a list of transduction schemes (CTS) on a working 
    # structure 
    #==============================================================================
    
    #==============================================================================
    # Importing required modules
    #==============================================================================
    
    from rdflib import Graph
    import sys
    import logging
    import glob
    from pathlib import Path
    from importlib.machinery import SourceFileLoader
    import importlib.util
    import importlib
    from .timer import timed
    from .transduction.rule import Rule
    from .transduction.sequence import Sequence
    
    
    #==============================================================================
    # Parameters
    #==============================================================================
    
    # Logging
    logger = logging.getLogger(__name__)
    
      
    #==============================================================================
    # Loading Functions
    #==============================================================================
    
    def load_cts(config):
        """ Load extraction scheme (CTS) from <cts_ref> file """
        
        try:
            cts_module = SourceFileLoader(config.cts_ref, 
                                          config.cts_file).load_module()
        
            return cts_module.rule_dir, cts_module.prefix_list, cts_module.scheme
        
        except FileNotFoundError:
            logger.error(' *** Error while loading scheme (load_cts) ***') 
            logger.debug('\n' + ' cts_file unknown: {0}'.format(config.cts_file)) 
        
        
    def get_new_rule_set(rule_def_set, prefix_list):
        """ Get a set of new rules from <rule_def_set> (rule definition dictionary)  
        """
        
        try:
            rule_set = {}
            
            for rule_key, rule_def in rule_def_set.items():
                rule = Rule()
                rule.load_dict(rule_def)
                rule.load_prefix_list(prefix_list)
                rule_set[rule_key] = rule
         
            return rule_set
        
        except:
            logger.error(' *** Error while loading rule set (get_new_rule_set) *** ') 
            logger.debug(' ----- len(rule_def_set): {0}'.format(len(rule_def_set))) 
            logger.debug(' ----- rule_key: {0}'.format(rule_key)) 
            logger.debug(' ----- rule_def: {0}'.format(rule_def))   
            logger.debug(' ----- len(rule_set): {0}'.format(len(rule_set))) 
    
        
    def load_rule_set(config, rule_dir, prefix_list):
        """ Load all rules into a set (as dictionary) from definition files  
            in <rule_dir> directory. 
        """
        
        try:
            rule_set = {}
            
            # -- Append rule paths to sys.path
            path_glob_pattern = config.cts_dir + rule_dir + '**/'
            for rule_path in glob.iglob(path_glob_pattern, recursive = True):
                sys.path.append(rule_path)
        
            # -- Load rule modules
            file_glob_pattern = config.cts_dir + rule_dir + '**/*.py'
            for rule_filename in glob.iglob(file_glob_pattern, recursive = True):
                
                if 'query_builder' not in rule_filename:
                                   
                    # -- old ---
                    # spec = importlib.util.spec_from_file_location(rule_module_name,
                    #                                               rule_filename)
                    # rule_module = importlib.util.module_from_spec(spec)
                    # sys.modules[rule_module_name] = rule_module
                    # spec.loader.exec_module(rule_module)
                    # -- ---
                    
                    # -- Import module
                    rule_module_name = Path(rule_filename).stem
                    rule_module = importlib.import_module(f'{rule_module_name}')
                    
                    # -- Update rule set
                    if hasattr(rule_module, 'rule_set'):
                        rule_def_set = rule_module.rule_set
                        new_rule_set = get_new_rule_set(rule_def_set, prefix_list)
                        rule_set.update(new_rule_set)
        
            return rule_set
        
        except:
           logger.error(' *** Error while loading rule set (load_rule_set) *** ') 
           logger.debug(f' ----- path_glob_pattern: {path_glob_pattern}') 
           logger.debug(f' ----- sys.path: {sys.path}') 
           logger.debug(f' ----- file_glob_pattern: {file_glob_pattern}') 
           logger.debug(f' ----- rule_filename: {rule_filename}')  
           logger.debug(f' ----- rule_module_name: {rule_module_name}')
           logger.debug(f' ----- rule_module: {rule_module}')   
           logger.debug(f' ----- len(rule_def_set): {len(rule_def_set)}')
           logger.debug(f' ----- len(new_rule_set): {len(new_rule_set)}') 
           logger.debug(f' ----- len(rule_set): {len(rule_set)}')  
            
    
      
    #==============================================================================
    # Extraction step
    #==============================================================================
            
    def _prepare_sequence(sequence_def, rule_set):
        sequence = Sequence()
        sequence.load_sequence_from_dict(sequence_def)
        sequence.load_rule_list(rule_set)
        return sequence
    
    
    def _apply_refinement(graph, refinement_rule_list):
        """ Apply <refinement_rule_list> on <graph> """ 
        
        try:    
            all_new_triple_set = []
        
            for rule in refinement_rule_list:
                graph_length_before = len(graph)
                (graph, extracted_triple_set), exec_time_date = rule.apply(graph)
                all_new_triple_set.extend(extracted_triple_set)
                
                graph_length_after = len(graph)  
                new_triple_count = graph_length_after - graph_length_before
                message = "----- (refinement) {0}: {1} new triples ({2})"
                message = message.format(rule.label, 
                                         new_triple_count, 
                                         graph_length_after)
                if (new_triple_count > 0):
                    logger.debug(message)
            
            return graph, all_new_triple_set
       
        except:
            logger.error(" *** Error while processing extraction (_apply_refinement) ***") 
            logger.debug(" ----- len(refinement_rule_list): {0} ".format(len(refinement_rule_list))) 
            logger.debug(" ----- last rule: {0} ".format(rule)) 
        
        
    def _apply_sequence(graph, sequence, refinement_rule_list):
        """ Apply the rules of <sequence> on the working graph <graph> """ 
        
        try:
            logger.info("--- Sequence: {0}".format(sequence.label))    
            all_new_triple_set = []
        
            for rule in sequence.rule_list:
                
                graph_length_before = len(graph)
                
                # -- apply rule
                (graph, extracted_triple_set), exec_time_date = rule.apply(graph)
                all_new_triple_set.extend(extracted_triple_set)
                
                new_triple_count = len(graph) - graph_length_before
                str = f"----- {rule.label}: "
                str += f"{new_triple_count}/{len(extracted_triple_set)} new triple"
                if new_triple_count > 1: str += f"s" 
                str += f" ({len(graph)}, {exec_time_date})"
                if (new_triple_count > 0):
                    logger.info(str)
                else:
                    logger.debug(str)
                    
                # -- apply refinement
                graph, extracted_triple_set = _apply_refinement(graph, refinement_rule_list)
                all_new_triple_set.extend(extracted_triple_set)
        
            return graph, all_new_triple_set
       
        except:
            logger.error(" *** Error while processing extraction (_apply_sequence) ***") 
            logger.debug(" ----- len(sequence): {0} ".format(len(sequence))) 
            logger.debug(" ----- last rule: {0} ".format(rule)) 
            logger.debug(" ----- last SPARQL query: \n{0} ".format(rule.get_query())) 
            logger.debug(" ----- len(extracted_triple_set): {0} ".format(len(extracted_triple_set))) 
            logger.debug(" ----- new_triple_count: {0} ".format(new_triple_count)) 
            
        
    def _serialize_graph(config, graph, step_name):
        """ Serialize <graph> to a file """ 
        
        try:
            uuid_str = config.uuid_str
            work_file = config.output_file.replace('.ttl', '_' + step_name + '.ttl')
            base_ref = "http://{0}/{1}".format(uuid_str, step_name)
            
            message = "--- Serializing graph to {0} "
            message = message.format(Path(work_file).stem)
            logger.debug(message)
            logger.debug("----- step: {0}".format(step_name))
            logger.debug("----- id: {0}".format(uuid_str))
            logger.debug("----- work_file: {0}".format(work_file))
            logger.debug("----- base: {0}".format(base_ref))
            
            graph.serialize(destination=work_file, base=base_ref, format='turtle')
    
        except:
            logger.error(" *** Error while serializing graph (serialize_graph) ***") 
            logger.debug(" ----- work_file: {0} ".format(work_file)) 
            
         
    def apply_step(config, graph, rule_set, step_name, step_sequence_def):
        """ Apply extraction step on the working graph """
        
        try:
            logger.info("-- Applying extraction step: {0}".format(step_name))
            
            # -- Initialize
            step_triple_list = []
            graph_length_before_step = len(graph)
            
            # -- Prepare refinement rule list
            refinement_sequence = _prepare_sequence(step_sequence_def[0], rule_set)
            refinement_rule_list = refinement_sequence.rule_list
            
            # -- Apply the sequences of the step
            for sequence_def in step_sequence_def[1:]:
                sequence = _prepare_sequence(sequence_def, rule_set)
                graph, triple_list = _apply_sequence(graph, 
                                                    sequence, 
                                                    refinement_rule_list)
                step_triple_list.extend(triple_list)
             
            # -- Serialize the working graph updated during the step
            _serialize_graph(config, graph, step_name)
            
            
            # -- Log extracted triple number
            str = "----- {0} triples extracted during {1} step"
            new_triple_count = len(graph) - graph_length_before_step
            logger.info(str.format(new_triple_count, step_name))
             
            return graph, step_triple_list
        
        except:
            logger.error(" *** Error while processing extraction (apply_step) ***") 
            logger.debug(' ----- step_name = {0}'.format(step_name)) 
            logger.debug(' ----- len(step_sequence_def) = {0}'.format(len(step_sequence_def))) 
            logger.debug(' ----- step_sequence_def[0] = {0}'.format(step_sequence_def[0])) 
            logger.debug(' ----- last sequence def = {0}'.format(sequence_def)) 
            logger.debug(' ----- last sequence label = {0}'.format(sequence.label)) 
        
       
    #==============================================================================
    # Main Function
    #==============================================================================   
    
    @timed
    def apply(config, graph):
        """ Apply extraction process on the working graph """
        
        try:
            
            # -- Loading Extraction Scheme
            logger.info("-- Loading Extraction Scheme ({0})".format(config.cts_ref))
            rule_dir, prefix_list, scheme = load_cts(config)
            logger.debug("----- Step number: {0}".format(len(scheme)))
                
            # -- Loading Extraction Rules
            logger.info("-- Loading Extraction Rules ({0}*)".format(rule_dir))
            rule_set = load_rule_set(config, rule_dir, prefix_list)
            logger.debug("----- Total rule number: {0}".format(len(rule_set)))
            
            # -- Apply each step of the scheme
            new_triple_list = []
            for step_name, step_sequence_def in scheme.items():
                graph, new_triple_list = apply_step(config, graph, rule_set,
                                                    step_name, step_sequence_def)
        
            # -- Result: file containing only the factoids (last step result)
            logger.info("-- Result: file containing only the factoids")
            logger.debug("--- Making factoid graph with the last step result")
            factoid_graph = Graph()
            for new_triple in new_triple_list:
                factoid_graph.add(new_triple) 
            logger.debug("----- Number of factoids: " + str(len(new_triple_list)))    
            uuid_str = config.uuid_str
            base_ref = "http://" + uuid_str + '/' + 'factoid'
            logger.debug("----- Graph base: " + base_ref) 
            uuid_str = config.uuid_str
            factoid_file = config.output_file.replace('.ttl', '_factoid.ttl')
            logger.debug("--- Serializing graph to factoid file (" + factoid_file + ")")
            factoid_graph.serialize(destination=factoid_file, 
                                    base=base_ref, 
                                    format='turtle')
                                    
            return graph, new_triple_list   
        
        except:
            logger.error(' *** Error while processing extraction (apply) ***') 
            logger.debug(' ----- config.cts_ref = {0}'.format(config.cts_ref))  
            logger.debug(' ----- rule_dir = {0}'.format(rule_dir))  
            logger.debug(' ----- scheme = {0}'.format(scheme))