Python rule 7 some useful rule sets

Description

First sort out some rule sets, the goal is 100 rule sets in ten weeks.

content

1 Table of Contents

Let's list these first, and then stroke it later

Serial numbernameCodenameeffect
1Read mysql in stepsPullMysqlByStepFetch data from mysql
2Divide the article into paragraphsAssuming that the obtained is a text file, the file should be divided into several paragraphs
3Divide a text paragraph into several short sentencesAccording to punctuation marks, divide the paragraph into a number of short sentences that do not connect with the text (the next step is to segment the word)
4Create mysql data if it does not existEnter df, specify the key value, insert data when this value does not exist
5Update mysql data only if it existsEnter df, specify the key value, and update the data when this value exists
6Clean up short sentences, keep textKeep text that may constitute the name of a person or company

2 example

Let me start with the conclusion. On the whole, it is in line with my expectations. The writing format is slightly more cumbersome than completing a task directly, but it is clearer, easier to reuse, and easy to check or fine-tune.

The code looks relatively long, but the overall structure is relatively simple and easier to organize. Let's take the first example ( PullMysqlByStep). When it is used in the future, a line of code will pull the defined rule set. Because it is to show the development process, so it is a rule-by-rule definition, development function and call.

Process entry: database connection, field to be read, table name to be read, folder address, id field name

Flow out: form several pkl files at the specified location

Naming: The table name and the field (sorted list) are used as the hash of a task. There are data and log folders under the folder address, which are named and stored according to this hash.

Note: The size of each batch of data should not exceed 1G, otherwise please modify the read step size

Rule flow:

  • 1 Incoming database connection parameters, fields to be read, table name, id field name and file address
  • 2 Generate hash based on field and table name
  • 3 According to the hash and file address, make sure that the folder exists (data/hash and log/hash)
  • 4 According to the hash and file address, read the existing log, if not, it will be an empty list.
  • 5 Get the amount of data to be read (allowing the settings of max and min), form the read batch (id_tuple list), and form the difference set with the read log
  • 6 According to the id_tuple list cyclically initiate mysql reading, read retained data and logs
  • 7 When the loop is completed (normal/interrupted), the result message is given to the processing

2.1 Initial variable space

# 测试的变量空间
# mysql 连接
cfg_config ={}
cfg_config['user'] = 'xxx'
cfg_config['port'] = 1234
cfg_config['host'] = 'IP'
cfg_config['password'] = 'xxx'
cfg_config['db'] = 'DB'

# 文件夹地址
folder_path = './test_mysql_pull/'

# 需要的字段 ','.join(query_var_list) -> '*'
query_var_list = ['*']

# 表名
query_table_name  = 'xxx'

# id字段名
query_id_name = 'id'

cur_dict ={}
cur_dict['input_dict'] = {}
cur_dict['input_dict']['cfg_config'] = cfg_config
cur_dict['input_dict']['folder_path'] = folder_path
cur_dict['input_dict']['query_var_list'] = query_var_list
cur_dict['input_dict']['query_table_name'] = query_table_name
cur_dict['input_dict']['query_id_name'] = query_id_name

2.2 r0: Incoming database connection parameters, fields to be read, table name, id field name and file address

Rule definition:

r0000_dict = {'rule_set':'PullMysqlByStep', 
              'rule_id':'r_0000', 
              'name':'rulev002_check_and_pass_dict_keys_in_sets',
               'description':'检查并透传关键字变量', 
              'layer':0,
              'params':{'vname':'input_dict', 'key_list':['cfg_config' ,'folder_path', 'query_var_list',
                                                          'query_table_name','query_id_name']},
              'conditions':[],
              'condition_mappings':[], 
              'out_comes':['cfg_mysql', 'folder_path', 'query_var_list', 'query_table_name', 'query_id_name']}

Rule function:

# 检查并透传input_dict中的字段
def rulev002_check_and_pass_dict_keys_in_sets(vname = None, key_list = None, VarSpace = None, out_comes = None,verbose=False):
    some_dict = VarSpace[vname]
    dict_key_set = set(some_dict.keys())
    if verbose:
        print(dict_key_set)
    key_set = set(key_list)
    
    if key_set.issubset(dict_key_set):
        # 每个函数的输出结果是一个列表
        res_list = []
        for k in list(key_list):
            if verbose:
                print(k)
            res_list.append(some_dict.get(k))
        # 使用zip前一定要判定列表长度,zip是允许两个不同长度的列表映射的
        assert len(res_list) == len(out_comes), '输出结果和命名列表长度必须一致'
        tem_dict = dict(zip(out_comes, res_list))
        VarSpace.update(tem_dict)
        
        return True
    else:
        return False

Enforce a rule

some_rule = r0000_dict
cur_rule_name = some_rule['name']
cur_rule_input_kw = fs.get_rule_input_kw(some_rule, cur_dict)
fs.run_a_rule_v2(cur_rule_name,cur_rule_input_kw, fs = fs )

2.3 r1: Generate hash based on field and table name

Define rules

r0001_dict = {'rule_set':'PullMysqlByStep', 
              'rule_id':'r_0001', 
              'name':'rulev002_gen_md5_hash_by_list',
               'description':'根据查询的变量和表名生成一个代表查询的hash码', 
              'layer':1,
              'params':{'fs':fs},
              'conditions':['query_var_list','query_table_name'],
              'condition_mappings':['query_var_list','query_table_name'], 
              'out_comes':['project_hash']}

Rule function

# 根据给到的字符串列表,生成md5 hash
def rulev002_gen_md5_hash_by_list(query_var_list = None,query_table_name = None, VarSpace = None, out_comes = None, verbose=False,fs = None):
    # ------- 导入部分
    # query_var_list = 
    # query_table_name = 
    res_list = []
    # ------- 导入部分 END
    
    
    # ------- 主要函数体
    the_key_list  = query_var_list + [query_table_name]
    the_result = fs.md5_trans('_'.join(the_key_list))
    res_list.append(the_result)
    # ------- 主要函数体 END
    
    
    # ------- 输出到变量空间
    # 使用zip前一定要判定列表长度,zip是允许两个不同长度的列表映射的
    assert len(res_list) == len(out_comes), '输出结果和命名列表长度必须一致'
    tem_dict = dict(zip(out_comes, res_list))
    # 中间规则
    VarSpace.update(tem_dict)
    
    # ------- 输出到变量空间 END
    
    # 用于表达规则集是否可以继续执行
    if the_result:
        return True
    else:
        return False

Call rules

some_rule = r0001_dict
cur_rule_name = some_rule['name']
cur_rule_input_kw = fs.get_rule_input_kw(some_rule, cur_dict)
fs.run_a_rule_v2(cur_rule_name,cur_rule_input_kw, fs = fs )

2.4 r2: According to the hash and file address, ensure that the folder exists (data/hash and log/hash)

Define rules

r0002_dict = {'rule_set':'PullMysqlByStep', 
              'rule_id':'r_0002', 
              'name':'rulev002_ensure_project_folders',
               'description':'确保data和log文件夹的存在', 
              'layer':2,
              'params':{'fs':fs, 'ensure_folders':['data', 'log']},
              'conditions':['folder_path'],
              'condition_mappings':['folder_path'], 
              'out_comes':['is_project_folder_ok']}

Rule function

# 确保一个目录下,文件夹的存在
def rulev002_ensure_project_folders(fs=None, ensure_folders=None, VarSpace = None,folder_path = None, out_comes= None, verbose=False):
    # ------- 导入部分
    res_list = []
    # ------- 导入部分 END
    
    
    # ------- 主要函数体
    for the_folder_path in ensure_folders:
        full_path = fs.amend_path_slash(folder_path + the_folder_path)
        create_status = fs.create_folder_if_notexist(full_path)
    res_list.append(create_status)
    # ------- 主要函数体 END
    
    # ------- 输出到变量空间
    # 使用zip前一定要判定列表长度,zip是允许两个不同长度的列表映射的
    assert len(res_list) == len(out_comes), '输出结果和命名列表长度必须一致'
    tem_dict = dict(zip(out_comes, res_list))
    # 中间规则
    VarSpace.update(tem_dict)
    

    # ------- 输出到变量空间 END

    # 用于表达规则集是否可以继续执行
    if create_status:
        return True
    else:
        return False

Enforcement rules

some_rule = r0002_dict
cur_rule_name = some_rule['name']
cur_rule_input_kw = fs.get_rule_input_kw(some_rule, cur_dict)
fs.run_a_rule_v2(cur_rule_name,cur_rule_input_kw, fs = fs )

2.5 r3: According to the hash and file address, read the existing log, if not, it will be an empty list.

Define rules

r0003_dict = {'rule_set':'PullMysqlByStep', 
              'rule_id':'r_0003', 
              'name':'rulev002_read_log',
               'description':'读取log,形成已读取的slice_list_already', 
              'layer':3,
              'params':{'fs':fs, 'log_path':'log'},
              'conditions':['folder_path', 'project_hash' ],
              'condition_mappings':['folder_path','project_hash'], 
              'out_comes':['slice_list_already']}

Rule function

import os 
# 根据hash和文件地址,读取已有的日志,如果没有就为空列表
def rulev002_read_log(fs = None, log_path = None,VarSpace = None, out_comes =None, verbose= False,folder_path=None,
                     project_hash = None):
    # ------- 导入部分
    res_list = []
    # ------- 导入部分 END
    
    
    # ------- 主要函数体
    log_txt_name = fs.amend_path_slash(folder_path+log_path) + project_hash + '.log'
    is_file_exists = os.path.exists(log_txt_name)
    if not is_file_exists:
        res_list.append([])
    # 如果有数据进行解析,稍后补
    else:
        pass
    
    # ------- 主要函数体 END
    
    # ------- 输出到变量空间
    # 使用zip前一定要判定列表长度,zip是允许两个不同长度的列表映射的
    assert len(res_list) == len(out_comes), '输出结果和命名列表长度必须一致'
    tem_dict = dict(zip(out_comes, res_list))
    # 中间规则
    VarSpace.update(tem_dict)
    # ------- 输出到变量空间 END

    # 用于表达规则集是否可以继续执行
    return True

Enforcement rules

some_rule = r0003_dict
cur_rule_name = some_rule['name']
cur_rule_input_kw = fs.get_rule_input_kw(some_rule, cur_dict)
fs.run_a_rule_v2(cur_rule_name,cur_rule_input_kw, fs = fs )

2.6 r4: Get the amount of data to be read (allowing the settings of max and min), form the read batch (id_tuple list), and form the difference set with the read log

definition

r0004_dict = {'rule_set':'PullMysqlByStep', 
              'rule_id':'r_0004', 
              'name':'rulev002_gen_slice_list',
              'description':'根据指定的的最大最小值,步长,去掉已有的slice,生成本次要循环的slice_list', 
              'layer':4,
              'params':{'fs':fs, 'min_idx':1,'max_idx':10000,'step':1000 },
              'conditions':['slice_list_already' ],
              'condition_mappings':['slice_list_already'], 
              'out_comes':['cur_slice_list']}

function

#  获取要读取的数据量(允许max和min的设置),形成读取的批次(id_tuple list),和已读取的日志形成差集
def rulev002_gen_slice_list(VarSpace = None, out_comes = None,verbose=False,
                            fs = None, min_idx = None, max_idx = None, step= None,
                            slice_list_already =None):
    # ------- 导入部分
    res_list = []
    # ------- 导入部分 END
    
    
    # ------- 主要函数体
    slice_list = fs.slice_list_by_batch1(min_idx, max_idx, step)
    cur_slice_list = sorted(list(set(slice_list) - set(slice_list_already)))
    
    res_list.append(cur_slice_list)
    # ------- 主要函数体 END
    
    # ------- 输出到变量空间
    # 使用zip前一定要判定列表长度,zip是允许两个不同长度的列表映射的
    assert len(res_list) == len(out_comes), '输出结果和命名列表长度必须一致'
    tem_dict = dict(zip(out_comes, res_list))
    # 中间规则
    VarSpace.update(tem_dict)
    # ------- 输出到变量空间 END

    # 用于表达规则集是否可以继续执行
    return True

carried out

some_rule = r0004_dict
cur_rule_name = some_rule['name']
cur_rule_input_kw = fs.get_rule_input_kw(some_rule, cur_dict)
fs.run_a_rule_v2(cur_rule_name,cur_rule_input_kw, fs = fs )

2.7 r5: Initiate mysql reading based on id_tuple list cycle, read retained data and logs

definition

r0005_dict = {'rule_set':'PullMysqlByStep', 
              'rule_id':'r_0005', 
              'name':'rulev002_read_mysql_iter_slice_list',
              'description':'按照slice list发起mysql的读取循环', 
              'layer':5,
              'params':{'fs':fs, 'the_sql_template':'select %s from %s where %s >= %s and %s < %s ',
                         'data_folder':'data', 'log_folder':'log'},
              'conditions':['folder_path', 'project_hash','cur_slice_list',
                            'query_var_list', 'query_table_name', 'query_id_name' ,'cfg_mysql'],
              
              'condition_mappings':['folder_path', 'project_hash','cur_slice_list',
                                    'query_var_list', 'query_table_name', 'query_id_name','cfg_mysql' ], 
              'out_comes':['mysql_read_status']}

rule

# 根据id_tuple list 循环的发起mysql读取,读取保留数据和日志
def rulev002_read_mysql_iter_slice_list(VarSpace = None, out_comes = None,verbose=False, fs=None,
                                        the_sql_template=None, data_folder =None, log_folder =None,
                                        folder_path = None, project_hash = None,cur_slice_list=None,
                                        query_var_list=None,query_table_name=None,query_id_name=None,
                                        cfg_mysql=None):
    # ------- 导入部分
    res_list = []
    # ------- 导入部分 END
    
    
    # ------- 主要函数体
    # 1 data需要的是一个文件夹( data/project_hash)
    data_folder = fs.amend_path_slash(folder_path+data_folder)
    project_folder = fs.amend_path_slash(data_folder+project_hash)
    fs.create_folder_if_notexist(project_folder)
    # 2 log需要的是一个文件 (log/project_hash.log)
    log_txt_name = fs.amend_path_slash(folder_path+log_folder) + project_hash + '.log'
    

    for the_slice in cur_slice_list:
        # 选择的变量
        query_var_str = ','.join(query_var_list)
        # sql语句构造
        the_sql = the_sql_template % (query_var_str, query_table_name,query_id_name,the_slice[0],query_id_name,the_slice[1])
        # 获取结果
        tem_res = fs.mysql_exe_sql_with_cursor(the_sql=the_sql,cfg_mysql= cfg_mysql)
        
        # slice data名称
        slice_data_name = '_'.join(['slice',str(the_slice[0]).zfill(12), str(the_slice[1]).zfill(12)])
        # 将结果存为pkl
        fs.to_pickle(tem_res, slice_data_name, path =project_folder )
        
        # 将结果存日志(使用空格分割)
        log_str = ' '.join(['ok',slice_data_name])
        fs.logging_str_a_row(fpath=log_txt_name, some_str = log_str, fs=fs)
    
    res_list.append(True)
    # ------- 主要函数体 END
    
    # ------- 输出到变量空间
    # 使用zip前一定要判定列表长度,zip是允许两个不同长度的列表映射的
    assert len(res_list) == len(out_comes), '输出结果和命名列表长度必须一致'
    tem_dict = dict(zip(out_comes, res_list))
    # 中间规则
    VarSpace.update(tem_dict)
    # ------- 输出到变量空间 END

    # 用于表达规则集是否可以继续执行
    return True

carried out

some_rule = r0005_dict
cur_rule_name = some_rule['name']
cur_rule_input_kw = fs.get_rule_input_kw(some_rule, cur_dict)
fs.run_a_rule_v2(cur_rule_name,cur_rule_input_kw, fs = fs )
---
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000000001_000000001001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000001001_000000002001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000002001_000000003001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000003001_000000004001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000004001_000000005001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000005001_000000006001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000006001_000000007001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000007001_000000008001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000008001_000000009001.pkl
data save to pickle:  ./test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/slice_000000009001_000000010001.pkl

In this step, the corresponding requirements have been completed, and finally the output

2.8 r6: Loop completion (normal/interrupted) to the result message of processing

definition

r0006_dict = {'rule_set':'PullMysqlByStep', 
              'rule_id':'r_0006', 
              'name':'rulev002_report_mysql_query_result',
              'description':'报告mysql查询的结果', 
              'layer':9999,
              'params':{'fs':fs, 
                         'data_folder':'data', 'log_folder':'log', 'verbose':True},
              'conditions':['folder_path', 'project_hash','cur_slice_list'],
              
              'condition_mappings':['folder_path', 'project_hash','cur_slice_list'], 
              'out_comes':['total_success_recs','cur_success_recs',
                           'cur_start_dt','cur_end_dt','cur_mean_seconds',
                           'data_path','log_path','cur_slice_list']}

rule

import pandas as pd
# 循环完成(正常/中断)给到处理的结果消息
def rulev002_report_mysql_query_result(VarSpace = None, out_comes = None,verbose=False, fs=None,
                                        data_folder=None, log_folder=None,
                                       folder_path=None, project_hash=None, cur_slice_list = None,
                                       ):
    # ------- 导入部分
    res_list = []
    # ------- 导入部分 END
    
    
    # ------- 主要函数体
    # 获取日志地址
    log_txt_name = fs.amend_path_slash(folder_path+log_folder) + project_hash + '.log'
    log_df = fs.parse_log1(log_txt_name)
    if verbose:
        print(log_df)
    # 获取所有成功的slice
    success_slice_list = list(log_df[log_df['status']=='ok']['slice_tuple'].apply(lambda x: tuple(x[x.find('_') +1:].split('_'))))
    if verbose:
        print('>>>  success slice list in log', success_slice_list)
    # 制造和日志里相同格式的slice_list
    cur_slice_list1 = []
    for the_slice in cur_slice_list:
        tem1 = str(the_slice[0]).zfill(12)
        tem2= str(the_slice[1]).zfill(12)
        tem_tuple = (tem1,tem2)
        cur_slice_list1.append(tem_tuple)
        
    # 本次成功的slice
    cur_success_slice_list = sorted(list(set(cur_slice_list1) & set(success_slice_list)))
    # 生成可匹配日志的slice字段
    slice_match_list = [] 
    for the_slice in cur_success_slice_list:
        slice_match_list.append('_'.join(['slice',str(the_slice[0]).zfill(12), str(the_slice[1]).zfill(12)]))
    if verbose:
        print(slice_match_list)
    # 获取本次成功的时间
    sel = log_df['slice_tuple'].apply(lambda x: True if x in slice_match_list else False)
    if verbose:
        print('>>>>',sel.sum())
    if sel.sum() > 0:
        cur_success_ts = log_df[sel]['create_time'].apply(lambda x: x.replace('_',' ')).apply(pd.to_datetime)

        # 最新一次的更新时间
        last_create_time = cur_success_ts.apply(str).max()
        # 开始时间
        start_create_time = cur_success_ts.apply(str).min()
        # 平均时间
        mean_exe_time = cur_success_ts.sort_values().diff().apply(lambda x:x.seconds).mean()
        # 数据存放地址
        data_folder = fs.amend_path_slash(folder_path+data_folder)
        project_folder = fs.amend_path_slash(data_folder+project_hash)
        # 日志存放地址
        log_save_path = log_txt_name
    else:
        # 最新一次的更新时间
        last_create_time = None
        # 开始时间
        start_create_time = None
        # 平均时间
        mean_exe_time = None
        # 数据存放地址
        data_folder = fs.amend_path_slash(folder_path+data_folder)
        project_folder = fs.amend_path_slash(data_folder+project_hash)
        # 日志存放地址
        log_save_path = log_txt_name

    
    
    # res1 成功的条数 total_success_recs
    res_list.append(len(success_slice_list))
    # res2 本次成功的条数 cur_success_recs
    res_list.append(len(cur_success_slice_list))
    # res3 本次开始的时间 cur_start_dt
    res_list.append(start_create_time)
    # res4 本次结束的时间 cur_end_dt
    res_list.append(last_create_time)
    # res5 本次平均时间 cur_mean_seconds
    res_list.append(mean_exe_time)
    # res6 数据存放的地址 data_path
    res_list.append(project_folder)
    # res7 日志存放的地址 log_path
    res_list.append(log_save_path)
    # res8 本次成功的slice列表 cur_slice_list
    res_list.append(cur_success_slice_list)
    
    
    # ------- 主要函数体 END
    
    # ------- 输出到变量空间
    # 使用zip前一定要判定列表长度,zip是允许两个不同长度的列表映射的
    assert len(res_list) == len(out_comes), '输出结果和命名列表长度必须一致'
    tem_dict = dict(zip(out_comes, res_list))
    
    
    # 如果是出规则
    VarSpace['output_dict'] = tem_dict
    # ------- 输出到变量空间 END

    # 用于表达规则集是否可以继续执行
    return True

carried out

some_rule = r0006_dict
cur_rule_name = some_rule['name']
cur_rule_input_kw = fs.get_rule_input_kw(some_rule, cur_dict)
fs.run_a_rule_v2(cur_rule_name,cur_rule_input_kw, fs = fs )

Final result

...
'output_dict': {'total_success_recs': 10,
  'cur_success_recs': 10,
  'cur_start_dt': '2021-01-01 13:18:51',
  'cur_end_dt': '2021-01-01 13:18:53',
  'cur_mean_seconds': 0.2222222222222222,
  'data_path': './test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/',
  'log_path': './test_mysql_pull/log/8e611428930a02d8554b0d76d735ad73.log',
  'cur_slice_list': [('000000000001', '000000001001'),
   ('000000001001', '000000002001'),
   ('000000002001', '000000003001'),
   ('000000003001', '000000004001'),
   ('000000004001', '000000005001'),
   ('000000005001', '000000006001'),
   ('000000006001', '000000007001'),
   ('000000007001', '000000008001'),
   ('000000008001', '000000009001'),
   ('000000009001', '000000010001')]}

3 Rule set form

I found that there is a small problem. The rule set should exist in the database. In this case, the data (mainly parameters) must be json-enabled, and the fs (function dictionary object) is used as the parameter in the front, so there will be problems in the follow-up, so again Modify the agreement:

  • The function keyword fs cannot be used, it is left to the function dictionary, every function will have this variable (even if it is not used)

It is enough to modify the parameter function of this construction rule function, and the others do not need to be changed. Correspondingly, the fs parameter is removed from the rule definition, and there is no need to write it.

# v2规则集从规则信息中提取参数
def get_rule_input_kw_v2(some_rule_info_dict = None, VarSpace= None, fs=None):
    some_rule = some_rule_info_dict
    
    # 并不会对规则定义里的参数做任何更改
    some_rule_dict = dict(some_rule)
    assert isinstance(some_rule['params'],dict),'参数必须为字典'
    assert isinstance(some_rule['conditions'],list),'条件(变量空间的变量名)必须为列表'
    assert isinstance(some_rule['condition_mappings'],list),'条件向函数变量名的映射必须为列表'
    assert len(some_rule['conditions']) == len(some_rule['condition_mappings']),'变量和映射列表需要等长'
    
    # 看是否需要参数
    #if (len(some_rule['params'].keys()) + len(some_rule['conditions']))==0:
        #print('None')
        #return None
    
    var_name_list = list(some_rule['condition_mappings']).copy()
    var_list = list(some_rule['conditions']).copy()


    kw_dict = some_rule['params'].copy()
    # 从VarSpace提取对应的变量
    if len(var_list) > 0:
        for i ,var in enumerate(var_list):
            new_k = var_name_list[i]
            kw_dict[new_k] = VarSpace[var]
            
    # 函数outcomes在变量空间内的命名
    kw_dict['out_comes'] =  some_rule['out_comes']
            
    # 最后把变量空间进来
    kw_dict['VarSpace'] = VarSpace
    kw_dict['fs'] =fs
    return kw_dict

Tabulate the above rule set like this to

Insert picture description here


execute this rule set

fs.run_ruleset_v2('pulltest', PullMysqlByStep_df, cur_dict, fs)
---
{'name': 'pulltest',
 'status': True,
 'msg': 'ok',
 'duration': 7,
 'data': {'total_success_recs': 10,
  'cur_success_recs': 0,
  'cur_start_dt': None,
  'cur_end_dt': None,
  'cur_mean_seconds': None,
  'data_path': './test_mysql_pull/data/8e611428930a02d8554b0d76d735ad73/',
  'log_path': './test_mysql_pull/log/8e611428930a02d8554b0d76d735ad73.log',
  'cur_slice_list': []}}

Save the rule set

# 生成rr_id
PullMysqlByStep_df.loc[:,'rr_id'] = fs.gen_md5_for_columns(PullMysqlByStep_df, ['rule_set','rule_id'],fs)

# 插入
fs.mongo_opr_insert_df_slice(func_lmongo, 'rules', 'rules', PullMysqlByStep_df, 'rr_id', fs)
# 更新
# fs.mongo_opr_update_df_slice(func_lmongo, 'rules', 'rules', PullMysqlByStep_df, 'rr_id', fs)

4 Rule Set Document

Regarding the use of rule sets as an interface, after each rule set is documented in markdown, it is serialized and stored in metadata. It’s too long to write here, so I won’t write it anymore.

to sum up

  • 1 The rule function is written with reference to the template, which is more routine and not tiring
  • 2 This method does have higher reliability
  • 3 The logical degree of a rule set should not be too high. It is recommended to control it within 10 rules. Subsequent can continue to stack. (10 rules are almost the limit that can be solved in one day, and there may be hundreds of calculation logic behind them)
  • 4 When using, you can use it directly according to the admission source table, or you can modify the template again with parameters