650 lines
31 KiB
Python
650 lines
31 KiB
Python
import json
|
||
import re
|
||
import numpy as np
|
||
from sklearn.feature_extraction.text import TfidfVectorizer
|
||
from sklearn.cluster import DBSCAN
|
||
from sklearn.metrics.pairwise import cosine_similarity
|
||
from tqdm import tqdm
|
||
import argparse
|
||
|
||
# --- 规则和辅助函数 ---
|
||
PREDEFINED_RULES = [
|
||
|
||
]
|
||
|
||
# 抽象化规则:将正则、模板和占位符/每组的正则一并声明
|
||
ABSTRACT_RULES = [
|
||
{
|
||
# 示例:把原始正则和占位符显式化
|
||
'source_regex': r'^Sent GCash to (.+?) with account ending in (\d+)$',
|
||
'template': 'Sent GCash to <收款人名称> with account ending in <银行4位数尾号>',
|
||
'placeholders': ['收款人名称', '银行4位数尾号']
|
||
},
|
||
{
|
||
# Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <网络或发票号>
|
||
'source_regex': r'(?i)^Received GCash from (.+?) with account ending in (\d+) (via .+|and invno:.+)$',
|
||
'template': 'Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <网络或发票号>',
|
||
'placeholders': ['付款人名称', '银行4位数尾号', '网络或发票号']
|
||
},
|
||
{
|
||
# Payment to <商户名>, Merchant Transaction Number: <交易单号>
|
||
'source_regex': r'^Payment to (.+?), Merchant Transaction Number: (.+)$',
|
||
'template': 'Payment to <收款人名称>, Merchant Transaction Number: <交易单号>',
|
||
'placeholders': ['收款人名称', '交易单号']
|
||
},
|
||
# 以下条目为自动迁移自 normalize_text 的简单规则
|
||
{
|
||
'source_regex': r'^You have sent PHP [\d,]+\.\d{2} to an unverified account [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \.\..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\. Go to GCash Help Center to know how to secure your transactions\.$',
|
||
'template': 'You have sent PHP <金额> to an unverified account <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <流水号>. Go to GCash Help Center to know how to secure your transactions.',
|
||
'placeholders': ['金额', '收款人号码', '日期', '时间', '消息', '金额', '流水号']
|
||
},{
|
||
'source_regex': r'^You have sent PHP [\d,]+\.\d{2} to .+? [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: .*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\.$',
|
||
'template': 'You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>.',
|
||
'placeholders': ['金额', '收款人名称', '收款人号码', '日期', '时间', '消息', '金额', '参考号']
|
||
},{
|
||
'source_regex': r'^You have received\s+(?:PHP\s+)?[\d,.]+\s+of GCash from\s+.+?\. Your new balance is\s+(?:PHP\s+)?[\d,.]+\s+\d{1,2}-\d{1,2}-\d{2,4}\s+\d{1,2}:\d{1,2}(?::\d{1,2})?\s+[AP]M\. Ref\. No\.\s+.+?\. Use now to buy load, purchase items, send money, pay bills, and a lot more!$',
|
||
'template': 'You have received <金额> of GCash from <付款人名称>. Your new balance is <金额>. <日期时间>. Ref. No. <流水号>. Use now to buy load, purchase items, send money, pay bills, and a lot more!',
|
||
'placeholders': ['金额','付款人名称','金额','日期时间','流水号']
|
||
},{
|
||
'source_regex': r'^You have received PHP [\d,.]+\s+of GCash from .+? w/ MSG: .*\. (?:Your new balance is PHP [\d,.]*\.\s)?Ref\. No\. \d+\.(?: To access your funds,.*)?$',
|
||
'template': 'You have received PHP <金额> of GCash from <付款人名称> w/ MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>.',
|
||
'placeholders': ['金额','付款人名称','消息','金额','参考号']
|
||
},{
|
||
'source_regex': r'^You have paid P[\d,.]+\s+via GCash to .+? on \d{1,2}-\d{1,2}-\d{2,4}\s\d{1,2}:\d{1,2}:\d{1,2}\s+[AP]M\. Ref\. No\.\s+\d+\. QRPH Invoice No\.\s+\d+\.$',
|
||
'template': 'You have paid P<金额> via GCash to <收款人名称> on <日期时间>. Ref. No. <参考号>. QRPH Invoice No. <参考号>.',
|
||
'placeholders': ['金额','收款人名称','日期时间','参考号','参考号']
|
||
},{
|
||
'source_regex': r'(?i)^Received GCash from [^<]+$',
|
||
'template': 'Received GCash from <付款人名称>',
|
||
'placeholders': ['付款人名称']
|
||
},{
|
||
'source_regex': r'^Payment to ([^,]+)$',
|
||
'template': 'Payment to <收款人名称>',
|
||
'placeholders': ['收款人名称']
|
||
},{
|
||
'source_regex': r'^(.+?) with (Ref\. no\.|Parent Ref\.No\.|Reference No\.) (.+)$',
|
||
'template': '<交易类型> with Ref. no. <参考号>',
|
||
'placeholders': ['交易类型','参考号']
|
||
},{
|
||
'source_regex': r'^Buy Load Transaction for .+$',
|
||
'template': 'Buy Load Transaction for <付款人号码>',
|
||
'placeholders': ['付款人号码']
|
||
},{
|
||
'source_regex': r'^Transfer from \S+ to \S+$',
|
||
'template': 'Transfer from <付款人号码> to <收款人号码>',
|
||
'placeholders': ['付款人号码', '收款人号码']
|
||
}
|
||
]
|
||
|
||
# 注:不再使用全局 PLACEHOLDER_PATTERNS 作为主要来源。
|
||
# 如果需要回退模式,会使用通用 '(.+?)' 作为默认。
|
||
|
||
def extract_group_patterns_from_regex(regex_str: str):
|
||
"""
|
||
从一个正则表达式字符串中提取按出现顺序的顶层捕获组的内部模式(不含最外层括号)。
|
||
|
||
说明与限制:
|
||
- 只提取按文本顺序出现的捕获组 (包括命名组 (?P<name>...)),跳过非捕获组 (?:...), lookaround 等。
|
||
- 对非常复杂或不规则的正则(条件组、嵌套的命名/非命名混合、内联 flags 等)不能保证 100% 正确,建议对特殊规则手动验证/修正。
|
||
返回值:字符串列表,例如 ['\\d{4}', '.+?'](不包含外层括号)。
|
||
"""
|
||
s = regex_str
|
||
n = len(s)
|
||
i = 0
|
||
stack = [] # each item: (start_index, is_capturing)
|
||
results = []
|
||
while i < n:
|
||
ch = s[i]
|
||
if ch == '\\':
|
||
# skip escaped char
|
||
i += 2
|
||
continue
|
||
if ch == '(':
|
||
# lookahead to decide capturing vs non-capturing
|
||
is_capturing = True
|
||
if i + 1 < n and s[i+1] == '?':
|
||
# (?P<name>... ) is a capturing named group
|
||
if i + 2 < n and s[i+2] == 'P':
|
||
is_capturing = True
|
||
else:
|
||
# other (?...) forms are non-capturing or lookaround
|
||
is_capturing = False
|
||
stack.append((i, is_capturing))
|
||
i += 1
|
||
continue
|
||
if ch == ')':
|
||
if not stack:
|
||
i += 1
|
||
continue
|
||
start, is_capturing = stack.pop()
|
||
if is_capturing:
|
||
inner = s[start+1:i]
|
||
# strip surrounding whitespace
|
||
results.append(inner)
|
||
i += 1
|
||
continue
|
||
i += 1
|
||
return results
|
||
|
||
def get_placeholder(action):
|
||
"""
|
||
根据JSON消息的action类型返回对应的占位符
|
||
"""
|
||
if 'Received' in action:
|
||
return '付款人号码'
|
||
elif 'Sent' in action:
|
||
return '收款人号码'
|
||
elif 'Refunded' in action:
|
||
return '付款人号码'
|
||
else:
|
||
return '付款人号码' # 默认值
|
||
|
||
def normalize_text(text):
|
||
# 先统一应用 ABSTRACT_RULES 中的简单替换规则(已迁移)
|
||
for ar in ABSTRACT_RULES:
|
||
try:
|
||
if ar.get('source_regex') and ar.get('template'):
|
||
text = re.sub(ar['source_regex'], ar['template'], text)
|
||
except re.error:
|
||
# 跳过不合法的正则
|
||
continue
|
||
|
||
# 保留无法迁移的复杂规则:统一处理所有JSON格式消息(使用lambda)
|
||
text = re.sub(
|
||
r'\\\"(Received money from|Sent money to|Received settlement from|Reversed settlement from|Refunded money via|Sent money via)\\\",\\\"target\\\":\\\"(.+?)\\\"',
|
||
lambda m: f'\\\"{m.group(1)}\\\",\\\"target\\\":\\\"<{get_placeholder(m.group(1))}>\\\"',
|
||
text
|
||
)
|
||
|
||
return text
|
||
|
||
def template_to_regex(template):
|
||
"""
|
||
将模板转换为可用于提取参数的正则表达式
|
||
"""
|
||
# 转义模板中的特殊字符,但保留占位符
|
||
escaped_template = re.escape(template)
|
||
|
||
# 将占位符替换为通用捕获组(因为不再依赖全局占位符映射)
|
||
# 如果需要更精确的子模式,请在 ABSTRACT_RULES 的 source_regex 中提供捕获组,后续会从中提取。
|
||
default_pat = r'(.+?)'
|
||
placeholders = re.findall(r'<([^>]+)>', template)
|
||
for name in placeholders:
|
||
escaped_placeholder = re.escape(f'<{name}>')
|
||
escaped_template = escaped_template.replace(escaped_placeholder, default_pat, 1)
|
||
|
||
return escaped_template
|
||
|
||
|
||
def build_regex_from_template_data(template_data):
|
||
"""
|
||
根据模板的元信息构建一个用于匹配的正则表达式字符串。
|
||
优先使用 template_data['source_regex'],否则从 template_data['content'] + template_data['placeholders'] 生成带命名分组的正则。
|
||
"""
|
||
# 如果有原始正则,直接使用(保持原样)
|
||
if 'source_regex' in template_data and template_data['source_regex']:
|
||
return template_data['source_regex']
|
||
|
||
template = template_data.get('content', '')
|
||
placeholders_meta = template_data.get('placeholders')
|
||
|
||
# 如果没有占位符元信息,退回到旧方法生成正则
|
||
if not placeholders_meta:
|
||
return template_to_regex(template)
|
||
|
||
# 转义模板的普通字符,保留占位符位置
|
||
escaped_template = re.escape(template)
|
||
|
||
def strip_outer_parens(pat: str) -> str:
|
||
pat = pat.strip()
|
||
if pat.startswith('(') and pat.endswith(')'):
|
||
return pat[1:-1]
|
||
return pat
|
||
|
||
# placeholders_meta 可能是 [{'name':..., 'pattern':...}, ...] 或 ['name1','name2']
|
||
# 使用一个计数器来处理重复的占位符名称
|
||
placeholder_count = {}
|
||
# placeholders_meta 可能是 [{'name':..., 'pattern':...}, ...] 或 ['name1','name2']
|
||
for ph in placeholders_meta:
|
||
if isinstance(ph, dict):
|
||
name = ph.get('name')
|
||
pattern = ph.get('pattern') if ph.get('pattern') else '(.+?)'
|
||
else:
|
||
# 旧格式:仅名字
|
||
name = ph
|
||
# 使用默认通配子模式
|
||
pattern = '(.+?)'
|
||
|
||
# 处理重复的占位符名称
|
||
count = placeholder_count.get(name, 0)
|
||
placeholder_count[name] = count + 1
|
||
|
||
# 如果有重复,使用 name_1, name_2 等
|
||
safe_name = re.sub(r"[^0-9A-Za-z_]", "_", name)
|
||
# 为了避免不同名称产生相同的safe_name,添加更多区分信息
|
||
if len(safe_name) <= 5: # 如果转换后很短
|
||
# 使用名称的字符信息来区分
|
||
char_sum = sum(ord(c) for c in name)
|
||
safe_name = f"{safe_name}_{char_sum % 100}"
|
||
if count > 0:
|
||
safe_name = f"{safe_name}_{count}"
|
||
|
||
# 清理 pattern 的外层括号以避免双重捕获
|
||
inner = strip_outer_parens(pattern)
|
||
|
||
named_group = f"(?P<{safe_name}>{inner})"
|
||
|
||
escaped_placeholder = re.escape(f'<{name}>')
|
||
escaped_template = escaped_template.replace(escaped_placeholder, named_group, 1) # 只替换第一个匹配
|
||
|
||
return escaped_template
|
||
|
||
def extract_parameters(template, message):
|
||
"""
|
||
从消息中提取参数值
|
||
"""
|
||
# 生成正则表达式
|
||
pattern = template_to_regex(template)
|
||
|
||
# 匹配消息
|
||
match = re.search(pattern, message)
|
||
|
||
if match:
|
||
# 获取所有捕获组
|
||
values = match.groups()
|
||
|
||
# 获取模板中的占位符
|
||
placeholders = re.findall(r'<[^>]+>', template)
|
||
|
||
# 创建参数字典
|
||
parameters = {}
|
||
for i, placeholder in enumerate(placeholders):
|
||
if i < len(values):
|
||
parameters[placeholder] = values[i]
|
||
|
||
return parameters
|
||
|
||
return {}
|
||
|
||
def run_dbscan_on_corpus(corpus, eps, min_samples, max_samples=10):
|
||
if not corpus: return {}
|
||
|
||
processed_corpus = [normalize_text(text) for text in corpus]
|
||
|
||
try:
|
||
vectorizer = TfidfVectorizer()
|
||
X = vectorizer.fit_transform(processed_corpus)
|
||
|
||
db = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine', n_jobs=-1).fit(X)
|
||
labels = db.labels_
|
||
|
||
dbscan_templates = {}
|
||
unique_labels = set(labels)
|
||
|
||
for label in unique_labels:
|
||
class_member_indices = np.where(labels == label)[0]
|
||
if label == -1: # 处理噪声点
|
||
for idx in class_member_indices:
|
||
original = corpus[idx]
|
||
normalized = processed_corpus[idx]
|
||
if normalized not in dbscan_templates:
|
||
dbscan_templates[normalized] = []
|
||
if len(dbscan_templates[normalized]) < max_samples:
|
||
dbscan_templates[normalized].append(original)
|
||
continue
|
||
|
||
# 处理聚类
|
||
cluster_vectors = X[class_member_indices]
|
||
centroid = np.asarray(cluster_vectors.mean(axis=0))
|
||
similarities = cosine_similarity(cluster_vectors, centroid)
|
||
most_representative_idx_in_cluster = np.argmax(similarities)
|
||
original_corpus_idx = class_member_indices[most_representative_idx_in_cluster]
|
||
most_representative_normalized = processed_corpus[original_corpus_idx]
|
||
cluster_originals = [corpus[idx] for idx in class_member_indices]
|
||
dbscan_templates[most_representative_normalized] = cluster_originals[:max_samples]
|
||
|
||
return dbscan_templates
|
||
except ValueError:
|
||
# 如果批次中所有词都在停用词表中,TfidfVectorizer会报错
|
||
print("警告: DBSCAN批次处理失败,可能因为内容过于单一或简短。将内容视为独立模板。")
|
||
return {processed_corpus[i]: [corpus[i]][:max_samples] for i in range(len(corpus))}
|
||
|
||
|
||
def extract_templates_iterative(input_files, output_file, rules, batch_size=1000, eps=0.4, min_samples=2, max_samples_per_template=0, content_key='content'):
|
||
"""
|
||
使用小批量迭代的混合策略来提取模板,并为每个模板收集最多10个原始数据集。
|
||
支持多个输入文件。
|
||
"""
|
||
print("--- 开始迭代式模板提取 ---")
|
||
final_templates = {} # template -> list of original contents
|
||
# 存储模板的元信息(占位符与其正则、原始正则等)
|
||
templates_meta = {} # template -> { 'placeholders': [ {name, pattern}, ... ], 'source_regex': str }
|
||
unmatched_batch = []
|
||
batch_num = 1
|
||
|
||
try:
|
||
print(f"步骤 1: 逐行处理输入文件 {input_files} 并动态构建模板库...")
|
||
total_lines = 0
|
||
for input_file in input_files:
|
||
with open(input_file, 'r', encoding='utf-8') as f:
|
||
total_lines += sum(1 for _ in f)
|
||
|
||
for input_file in input_files:
|
||
print(f"\n--- 开始处理文件: {input_file} ---")
|
||
# 计算当前文件的行数
|
||
with open(input_file, 'r', encoding='utf-8') as f:
|
||
file_lines = sum(1 for _ in f)
|
||
|
||
with open(input_file, 'r', encoding='utf-8') as f:
|
||
for line in tqdm(f, total=file_lines, desc=f"处理 {input_file.split('/')[-1]}"):
|
||
try:
|
||
data = json.loads(line)
|
||
content = data.get(content_key)
|
||
if not content: continue
|
||
|
||
normalized_content = normalize_text(content)
|
||
|
||
# 1. 检查是否匹配已发现的任何模板
|
||
if normalized_content in final_templates:
|
||
if len(final_templates[normalized_content]) < 10:
|
||
final_templates[normalized_content].append(content)
|
||
continue
|
||
|
||
# 2. 检查是否匹配预定义规则
|
||
matched_by_rule = False
|
||
for rule in rules:
|
||
if rule['pattern'].match(content):
|
||
if normalized_content not in final_templates:
|
||
final_templates[normalized_content] = []
|
||
if len(final_templates[normalized_content]) < 10:
|
||
final_templates[normalized_content].append(content)
|
||
matched_by_rule = True
|
||
break
|
||
# 额外:检查抽象化规则(source_regex + template + placeholders)
|
||
if not matched_by_rule:
|
||
for ar in ABSTRACT_RULES:
|
||
try:
|
||
if re.match(ar['source_regex'], content):
|
||
tpl = ar['template']
|
||
if tpl not in final_templates:
|
||
final_templates[tpl] = []
|
||
if len(final_templates[tpl]) < 10:
|
||
final_templates[tpl].append(content)
|
||
# 注册元信息(占位符与其组正则)
|
||
if tpl not in templates_meta:
|
||
phs = []
|
||
for name in ar.get('placeholders', []):
|
||
# 优先把 source_regex 作为权威,不在此处重复具体子模式(用 None 标记)
|
||
phs.append({'name': name, 'pattern': None})
|
||
templates_meta[tpl] = {
|
||
'placeholders': phs,
|
||
'source_regex': ar['source_regex']
|
||
}
|
||
matched_by_rule = True
|
||
break
|
||
except re.error:
|
||
# 如果抽象规则本身有错误,跳过
|
||
continue
|
||
|
||
if matched_by_rule:
|
||
continue
|
||
|
||
# 3. 如果都未匹配,加入批处理列表
|
||
unmatched_batch.append(content)
|
||
|
||
# 4. 检查是否触发批处理
|
||
if len(unmatched_batch) >= batch_size:
|
||
print(f"\n--- 处理批次 #{batch_num} (大小: {len(unmatched_batch)}) ---")
|
||
newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10)
|
||
|
||
print(f"批次 #{batch_num}: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。")
|
||
for template, originals in newly_found_templates.items():
|
||
if template in final_templates:
|
||
remaining = 10 - len(final_templates[template])
|
||
final_templates[template].extend(originals[:remaining])
|
||
else:
|
||
final_templates[template] = originals[:10]
|
||
print(f"当前总模板数: {len(final_templates)}")
|
||
|
||
unmatched_batch.clear()
|
||
batch_num += 1
|
||
|
||
except (json.JSONDecodeError, AttributeError):
|
||
continue
|
||
|
||
# --- 收尾处理 ---
|
||
print("\n--- 文件处理完毕,处理最后一批剩余内容 ---")
|
||
if unmatched_batch:
|
||
print(f"处理最后一个批次 (大小: {len(unmatched_batch)})")
|
||
newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10)
|
||
|
||
print(f"最后一个批次: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。")
|
||
for template, originals in newly_found_templates.items():
|
||
if template in final_templates:
|
||
remaining = 10 - len(final_templates[template])
|
||
final_templates[template].extend(originals[:remaining])
|
||
else:
|
||
final_templates[template] = originals[:10]
|
||
else:
|
||
print("没有剩余内容需要处理。")
|
||
|
||
# --- 输出 ---
|
||
print("\n--- 第 3 部分: 合并结果并保存 ---")
|
||
print(f"总共找到 {len(final_templates)} 个唯一的模板。")
|
||
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
for template, data_list in sorted(final_templates.items()):
|
||
# 新格式:{"template":..., "regex":{name: pattern, ...}, "data":[...]}
|
||
output_obj = {'template': template}
|
||
|
||
# 构建 regex 映射:placeholder name -> pattern
|
||
regex_map = {}
|
||
if template in templates_meta:
|
||
for ph in templates_meta[template].get('placeholders', []):
|
||
if isinstance(ph, dict):
|
||
regex_map[ph.get('name')] = ph.get('pattern')
|
||
else:
|
||
# ph 可能是名字字符串,使用通用后备模式
|
||
regex_map[ph] = '(.+?)'
|
||
# 优先保留 source_regex as special key if present
|
||
if templates_meta[template].get('source_regex'):
|
||
output_obj['source_regex'] = templates_meta[template]['source_regex']
|
||
else:
|
||
# 没有元信息时,尝试根据模板内占位符名用默认映射构建
|
||
placeholders = re.findall(r'<([^>]+)>', template)
|
||
for name in placeholders:
|
||
regex_map[name] = '(.+?)'
|
||
|
||
output_obj['regex'] = regex_map
|
||
|
||
# 示例数据
|
||
output_obj['data'] = data_list[:max_samples_per_template] if max_samples_per_template != 0 else []
|
||
|
||
json.dump(output_obj, f, ensure_ascii=False)
|
||
f.write('\n')
|
||
|
||
print(f"所有模板已成功写入到 '{output_file}'。")
|
||
|
||
except FileNotFoundError as e:
|
||
print(f"错误:找不到输入文件 {e.filename}。")
|
||
return
|
||
|
||
def extract_values_with_templates(input_files, template_file, output_file, content_key='content'):
|
||
"""
|
||
使用DBSCAN生成的模板从原始消息中提取参数值
|
||
支持多个输入文件。
|
||
"""
|
||
print("--- 开始使用模板提取参数值 ---")
|
||
|
||
# 读取模板及其元信息。支持两种格式:旧的 {content, placeholders, source_regex} 或 新的 {template, regex, source_regex, data}
|
||
templates_meta = []
|
||
with open(template_file, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
try:
|
||
raw = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
# 规范化到内部使用的 tmeta 格式:{ 'content': ..., 'placeholders': [ {name, pattern}, ... ], 'source_regex': ... }
|
||
tmeta = {}
|
||
if 'template' in raw:
|
||
tmeta['content'] = raw.get('template')
|
||
# raw may have 'regex' mapping name->pattern
|
||
regex_map = raw.get('regex', {})
|
||
phs = []
|
||
for name, pat in regex_map.items():
|
||
phs.append({'name': name, 'pattern': pat})
|
||
# also consider raw['source_regex'] and raw['data'] if present
|
||
if 'source_regex' in raw:
|
||
tmeta['source_regex'] = raw.get('source_regex')
|
||
if phs:
|
||
tmeta['placeholders'] = phs
|
||
elif 'content' in raw:
|
||
# backward-compatible
|
||
tmeta['content'] = raw.get('content')
|
||
if 'placeholders' in raw:
|
||
tmeta['placeholders'] = raw.get('placeholders')
|
||
if 'source_regex' in raw:
|
||
tmeta['source_regex'] = raw.get('source_regex')
|
||
else:
|
||
# skip unknown format
|
||
continue
|
||
|
||
templates_meta.append(tmeta)
|
||
|
||
print(f"已加载 {len(templates_meta)} 个模板(含元信息/已规范化)")
|
||
|
||
# 从原始数据中提取值
|
||
extracted_values = []
|
||
|
||
total_lines = 0
|
||
for input_file in input_files:
|
||
with open(input_file, 'r', encoding='utf-8') as f:
|
||
total_lines += sum(1 for _ in f)
|
||
|
||
for input_file in input_files:
|
||
print(f"\n--- 开始处理文件: {input_file} ---")
|
||
# 计算当前文件的行数
|
||
with open(input_file, 'r', encoding='utf-8') as f:
|
||
file_lines = sum(1 for _ in f)
|
||
|
||
with open(input_file, 'r', encoding='utf-8') as f:
|
||
for line in tqdm(f, total=file_lines, desc=f"提取 {input_file.split('/')[-1]}"):
|
||
try:
|
||
data = json.loads(line)
|
||
content = data.get(content_key, '')
|
||
|
||
if not content:
|
||
continue
|
||
|
||
# 尝试匹配每个模板
|
||
for tmeta in templates_meta:
|
||
# 构建用于匹配的正则
|
||
try:
|
||
regex_str = build_regex_from_template_data(tmeta)
|
||
match = re.search(regex_str, content)
|
||
except re.error:
|
||
# 如果生成的正则有问题,退回到老方法
|
||
match = None
|
||
|
||
parameters = {}
|
||
if match:
|
||
# 如果有命名组,优先使用命名组
|
||
if match.groupdict():
|
||
# 将安全组名还原为原始占位符名(如果有元信息)
|
||
gd = match.groupdict()
|
||
# 如果模板元信息里有占位符名映射,则把原名映射回来
|
||
ph_map = {}
|
||
if 'placeholders' in tmeta and isinstance(tmeta['placeholders'], list):
|
||
for ph in tmeta['placeholders']:
|
||
if isinstance(ph, dict):
|
||
orig = ph.get('name')
|
||
else:
|
||
orig = ph
|
||
safe = re.sub(r"[^0-9A-Za-z_]", "_", orig)
|
||
ph_map[safe] = orig
|
||
|
||
for safe_name, val in gd.items():
|
||
orig_name = ph_map.get(safe_name, safe_name)
|
||
parameters[f'<{orig_name}>'] = val
|
||
else:
|
||
# 否则使用位置组,优先使用模板元信息中的占位符顺序(来自 templates_meta)
|
||
values = match.groups()
|
||
placeholders_meta = tmeta.get('placeholders')
|
||
if placeholders_meta and isinstance(placeholders_meta, list):
|
||
# placeholders_meta 可能是 [{'name':..., 'pattern':...}, ...] 或 ['name1','name2']
|
||
for i, ph in enumerate(placeholders_meta):
|
||
if i >= len(values):
|
||
break
|
||
name = ph.get('name') if isinstance(ph, dict) else ph
|
||
parameters[f'<{name}>'] = values[i]
|
||
else:
|
||
# 退回到从模板字符串解析占位符(向后兼容旧格式)
|
||
placeholders = re.findall(r'<([^>]+)>', tmeta.get('content', ''))
|
||
for i, name in enumerate(placeholders):
|
||
if i < len(values):
|
||
parameters[f'<{name}>'] = values[i]
|
||
|
||
else:
|
||
# 退回老方法:通过模板字符串替换占位符生成正则
|
||
tpl = tmeta.get('content')
|
||
if tpl:
|
||
parameters = extract_parameters(tpl, content)
|
||
|
||
if parameters:
|
||
extracted_values.append({
|
||
'template': tmeta.get('content'),
|
||
'message': content,
|
||
'parameters': parameters
|
||
})
|
||
# 找到匹配就跳出循环
|
||
break
|
||
|
||
except (json.JSONDecodeError, Exception):
|
||
continue
|
||
|
||
# 保存提取的值
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
for item in extracted_values:
|
||
json.dump(item, f, ensure_ascii=False)
|
||
f.write('\n')
|
||
|
||
print(f"成功从 {len(extracted_values)} 条消息中提取参数,并保存到 '{output_file}'")
|
||
|
||
# --- 使用示例 ---
|
||
# 假设您已经运行了上一个脚本,生成了 'content_filtered.jsonl'
|
||
input_jsonl_files = ['content_filtered.jsonl', 'output.jsonl'] # 默认单个文件,可扩展为多个
|
||
output_template_file = 'templates_iterative.txt'
|
||
BATCH_PROCESSING_SIZE = 10000 # 可以根据你的内存和数据量调整
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description='Extract templates from GCash transaction data.')
|
||
parser.add_argument('--input_file', type=str, nargs='+', default=input_jsonl_files, help='Input JSONL file paths (multiple files supported)')
|
||
parser.add_argument('--output_file', type=str, default=output_template_file, help='Output template file path')
|
||
parser.add_argument('--batch_size', type=int, default=BATCH_PROCESSING_SIZE, help='Batch processing size (data volume)')
|
||
parser.add_argument('--eps', type=float, default=0.4, help='DBSCAN eps parameter')
|
||
parser.add_argument('--min_samples', type=int, default=5, help='DBSCAN min_samples parameter')
|
||
parser.add_argument('--extract_values', action='store_true', help='Extract values using generated templates')
|
||
parser.add_argument('--content_key', type=str, default='content', help='Key to extract content from JSON objects (default: content)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.extract_values:
|
||
# 执行参数提取
|
||
extract_values_with_templates(
|
||
input_files=args.input_file,
|
||
template_file='templates_iterative.txt',
|
||
output_file=args.output_file,
|
||
content_key=args.content_key
|
||
)
|
||
else:
|
||
# 执行模板提取
|
||
extract_templates_iterative(
|
||
input_files=args.input_file,
|
||
output_file=args.output_file,
|
||
rules=PREDEFINED_RULES,
|
||
batch_size=args.batch_size,
|
||
eps=args.eps,
|
||
min_samples=args.min_samples,
|
||
content_key=args.content_key
|
||
) |