This commit is contained in:
dev07 2025-09-09 15:56:47 +08:00
parent 6b5b05de17
commit be83924799

209
sql_ml.py
View File

@ -17,6 +17,18 @@ PREDEFINED_RULES = [
# {'name': 'Sent GCash to Account', 'pattern': re.compile(r'^Sent GCash to .+? with account ending in \d+$')} # {'name': 'Sent GCash to Account', 'pattern': re.compile(r'^Sent GCash to .+? with account ending in \d+$')}
] ]
# 抽象化规则:将正则、模板和占位符/每组的正则一并声明
ABSTRACT_RULES = [
{
# 示例:把原始正则和占位符、每组的正则显式化
'source_regex': r'^Sent GCash to (.+?) with account ending in (\d+)$',
'template': 'Sent GCash to <收款人名称> with account ending in <银行4位数尾号>',
'placeholders': ['收款人名称', '银行4位数尾号'],
'group_patterns': ['(.+?)', '(\\d+)']
},
# 你可以在这里继续添加更多抽象规则
]
# 占位符到正则表达式的映射 # 占位符到正则表达式的映射
PLACEHOLDER_PATTERNS = { PLACEHOLDER_PATTERNS = {
'<金额>': r'([\d,.]+)', '<金额>': r'([\d,.]+)',
@ -191,6 +203,58 @@ def template_to_regex(template):
return escaped_template return escaped_template
def build_regex_from_template_data(template_data):
"""
根据模板的元信息placeholders / group_patterns / source_regex构建一个用于匹配的正则表达式字符串
优先使用 template_data['source_regex']如果存在否则从 template_data['content'] + template_data['placeholders'] 生成带命名分组的正则
返回未编译的正则字符串
"""
# 如果有原始正则,直接使用(保持原样)
if 'source_regex' in template_data and template_data['source_regex']:
return template_data['source_regex']
template = template_data.get('content', '')
placeholders_meta = template_data.get('placeholders')
# 如果没有占位符元信息,退回到旧方法生成正则
if not placeholders_meta:
return template_to_regex(template)
# 转义模板的普通字符,保留占位符位置
escaped_template = re.escape(template)
def strip_outer_parens(pat: str) -> str:
pat = pat.strip()
if pat.startswith('(') and pat.endswith(')'):
return pat[1:-1]
return pat
# placeholders_meta 可能是 [{'name':..., 'pattern':...}, ...] 或 ['name1','name2']
for ph in placeholders_meta:
if isinstance(ph, dict):
name = ph.get('name')
pattern = ph.get('pattern', '.+?')
else:
# 旧格式:仅名字
name = ph
# 尝试从全局映射中找到默认pattern
pattern = PLACEHOLDER_PATTERNS.get(f'<{name}>', '(.+?)')
# 清理 pattern 的外层括号以避免双重捕获
inner = strip_outer_parens(pattern)
# 命名捕获组 (保持组名为中文名,但 Python 的 (?P<name>) 要求组名为字母数字和下划线)
# 将占位符名中的非字母数字替换为下划线以构造合法的组名
safe_name = re.sub(r"[^0-9A-Za-z_]", "_", name)
named_group = f"(?P<{safe_name}>{inner})"
escaped_placeholder = re.escape(f'<{name}>')
escaped_template = escaped_template.replace(escaped_placeholder, named_group)
return escaped_template
def extract_parameters(template, message): def extract_parameters(template, message):
""" """
从消息中提取参数值 从消息中提取参数值
@ -269,6 +333,8 @@ def extract_templates_iterative(input_files, output_file, rules, batch_size=1000
""" """
print("--- 开始迭代式模板提取 ---") print("--- 开始迭代式模板提取 ---")
final_templates = {} # template -> list of original contents final_templates = {} # template -> list of original contents
# 存储模板的元信息(占位符与其正则、原始正则等)
templates_meta = {} # template -> { 'placeholders': [ {name, pattern}, ... ], 'source_regex': str }
unmatched_batch = [] unmatched_batch = []
batch_num = 1 batch_num = 1
@ -310,7 +376,31 @@ def extract_templates_iterative(input_files, output_file, rules, batch_size=1000
final_templates[normalized_content].append(content) final_templates[normalized_content].append(content)
matched_by_rule = True matched_by_rule = True
break break
# 额外检查抽象化规则source_regex + template + placeholders
if not matched_by_rule:
for ar in ABSTRACT_RULES:
try:
if re.match(ar['source_regex'], content):
tpl = ar['template']
if tpl not in final_templates:
final_templates[tpl] = []
if len(final_templates[tpl]) < 10:
final_templates[tpl].append(content)
# 注册元信息(占位符与其组正则)
if tpl not in templates_meta:
phs = []
for name, gp in zip(ar.get('placeholders', []), ar.get('group_patterns', [])):
phs.append({'name': name, 'pattern': gp})
templates_meta[tpl] = {
'placeholders': phs,
'source_regex': ar['source_regex']
}
matched_by_rule = True
break
except re.error:
# 如果抽象规则本身有错误,跳过
continue
if matched_by_rule: if matched_by_rule:
continue continue
@ -359,10 +449,33 @@ def extract_templates_iterative(input_files, output_file, rules, batch_size=1000
with open(output_file, 'w', encoding='utf-8') as f: with open(output_file, 'w', encoding='utf-8') as f:
for template, data_list in sorted(final_templates.items()): for template, data_list in sorted(final_templates.items()):
if max_samples_per_template == 0: # 新格式:{"template":..., "regex":{name: pattern, ...}, "data":[...]}
json.dump({"content": template}, f, ensure_ascii=False) output_obj = {'template': template}
# 构建 regex 映射placeholder name -> pattern
regex_map = {}
if template in templates_meta:
for ph in templates_meta[template].get('placeholders', []):
if isinstance(ph, dict):
regex_map[ph.get('name')] = ph.get('pattern')
else:
# ph 可能是名字字符串,尝试从全局映射取 pattern
regex_map[ph] = PLACEHOLDER_PATTERNS.get(f'<{ph}>', '(.+?)')
# 优先保留 source_regex as special key if present
if templates_meta[template].get('source_regex'):
output_obj['source_regex'] = templates_meta[template]['source_regex']
else: else:
json.dump({"content": template, "data": data_list[:max_samples_per_template]}, f, ensure_ascii=False) # 没有元信息时,尝试根据模板内占位符名用默认映射构建
placeholders = re.findall(r'<([^>]+)>', template)
for name in placeholders:
regex_map[name] = PLACEHOLDER_PATTERNS.get(f'<{name}>', '(.+?)')
output_obj['regex'] = regex_map
# 示例数据
output_obj['data'] = data_list[:max_samples_per_template] if max_samples_per_template != 0 else []
json.dump(output_obj, f, ensure_ascii=False)
f.write('\n') f.write('\n')
print(f"所有模板已成功写入到 '{output_file}'") print(f"所有模板已成功写入到 '{output_file}'")
@ -378,14 +491,43 @@ def extract_values_with_templates(input_files, template_file, output_file, conte
""" """
print("--- 开始使用模板提取参数值 ---") print("--- 开始使用模板提取参数值 ---")
# 读取模板 # 读取模板及其元信息。支持两种格式:旧的 {content, placeholders, source_regex} 或 新的 {template, regex, source_regex, data}
templates = [] templates_meta = []
with open(template_file, 'r', encoding='utf-8') as f: with open(template_file, 'r', encoding='utf-8') as f:
for line in f: for line in f:
template_data = json.loads(line) try:
templates.append(template_data['content']) raw = json.loads(line)
except json.JSONDecodeError:
print(f"已加载 {len(templates)} 个模板") continue
# 规范化到内部使用的 tmeta 格式:{ 'content': ..., 'placeholders': [ {name, pattern}, ... ], 'source_regex': ... }
tmeta = {}
if 'template' in raw:
tmeta['content'] = raw.get('template')
# raw may have 'regex' mapping name->pattern
regex_map = raw.get('regex', {})
phs = []
for name, pat in regex_map.items():
phs.append({'name': name, 'pattern': pat})
# also consider raw['source_regex'] and raw['data'] if present
if 'source_regex' in raw:
tmeta['source_regex'] = raw.get('source_regex')
if phs:
tmeta['placeholders'] = phs
elif 'content' in raw:
# backward-compatible
tmeta['content'] = raw.get('content')
if 'placeholders' in raw:
tmeta['placeholders'] = raw.get('placeholders')
if 'source_regex' in raw:
tmeta['source_regex'] = raw.get('source_regex')
else:
# skip unknown format
continue
templates_meta.append(tmeta)
print(f"已加载 {len(templates_meta)} 个模板(含元信息/已规范化)")
# 从原始数据中提取值 # 从原始数据中提取值
extracted_values = [] extracted_values = []
@ -411,11 +553,52 @@ def extract_values_with_templates(input_files, template_file, output_file, conte
continue continue
# 尝试匹配每个模板 # 尝试匹配每个模板
for template in templates: for tmeta in templates_meta:
parameters = extract_parameters(template, content) # 构建用于匹配的正则
try:
regex_str = build_regex_from_template_data(tmeta)
match = re.search(regex_str, content)
except re.error:
# 如果生成的正则有问题,退回到老方法
match = None
parameters = {}
if match:
# 如果有命名组,优先使用命名组
if match.groupdict():
# 将安全组名还原为原始占位符名(如果有元信息)
gd = match.groupdict()
# 如果模板元信息里有占位符名映射,则把原名映射回来
ph_map = {}
if 'placeholders' in tmeta and isinstance(tmeta['placeholders'], list):
for ph in tmeta['placeholders']:
if isinstance(ph, dict):
orig = ph.get('name')
else:
orig = ph
safe = re.sub(r"[^0-9A-Za-z_]", "_", orig)
ph_map[safe] = orig
for safe_name, val in gd.items():
orig_name = ph_map.get(safe_name, safe_name)
parameters[f'<{orig_name}>'] = val
else:
# 否则使用位置组,借助模板中的占位符顺序
values = match.groups()
placeholders = re.findall(r'<[^>]+>', tmeta.get('content', ''))
for i, placeholder in enumerate(placeholders):
if i < len(values):
parameters[placeholder] = values[i]
else:
# 退回老方法:通过模板字符串替换占位符生成正则
tpl = tmeta.get('content')
if tpl:
parameters = extract_parameters(tpl, content)
if parameters: if parameters:
extracted_values.append({ extracted_values.append({
'template': template, 'template': tmeta.get('content'),
'message': content, 'message': content,
'parameters': parameters 'parameters': parameters
}) })