import json import re import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import DBSCAN from sklearn.metrics.pairwise import cosine_similarity from tqdm import tqdm import argparse # --- 规则和辅助函数 --- PREDEFINED_RULES = [ ] # 抽象化规则:将正则、模板和占位符/每组的正则一并声明 ABSTRACT_RULES = [ { # 示例:把原始正则和占位符显式化 'source_regex': r'^Sent GCash to (.+?) with account ending in (\d+)$', 'template': 'Sent GCash to <收款人名称> with account ending in <银行4位数尾号>', }, { # Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <流水号> 'source_regex': r'(?i)^Received GCash from (.+?) with account ending in (\d+) (via .+|and invno:.+)$', 'template': 'Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <流水号>', }, { # Payment to <商户名>, Merchant Transaction Number: <交易单号> 'source_regex': r'^Payment to (.+?), Merchant Transaction Number: (.+)$', 'template': 'Payment to <收款人名称>, Merchant Transaction Number: <交易单号>', }, # 以下条目为自动迁移自 normalize_text 的简单规则 { 'source_regex': r'^You have sent PHP [\d,]+\.\d{2} to an unverified account [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \.\..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\. Go to GCash Help Center to know how to secure your transactions\.$', 'template': 'You have sent PHP <金额> to an unverified account <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <交易单号>. Go to GCash Help Center to know how to secure your transactions.', },{ 'source_regex': r'^You have sent PHP [\d,]+\.\d{2} to .+? [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: .*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\.$', 'template': 'You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <交易单号>.', },{ 'source_regex': r'^You have received\s+(?:PHP\s+)?[\d,.]+\s+of GCash from\s+.+?\. Your new balance is\s+(?:PHP\s+)?[\d,.]+\s+\d{1,2}-\d{1,2}-\d{2,4}\s+\d{1,2}:\d{1,2}(?::\d{1,2})?\s+[AP]M\. Ref\. No\.\s+.+?\. Use now to buy load, purchase items, send money, pay bills, and a lot more!$', 'template': 'You have received <金额> of GCash from <付款人名称>. Your new balance is <金额>. <日期时间>. Ref. No. <交易单号>. Use now to buy load, purchase items, send money, pay bills, and a lot more!', },{ 'source_regex': r'^You have received PHP [\d,.]+\s+of GCash from .+? w/ MSG: .*\. (?:Your new balance is PHP [\d,.]*\.\s)?Ref\. No\. \d+\.(?: To access your funds,.*)?$', 'template': 'You have received PHP <金额> of GCash from <付款人名称> w/ MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <交易单号>.', },{ 'source_regex': r'^You have paid P[\d,.]+\s+via GCash to .+? on \d{1,2}-\d{1,2}-\d{2,4}\s\d{1,2}:\d{1,2}:\d{1,2}\s+[AP]M\. Ref\. No\.\s+\d+\. QRPH Invoice No\.\s+\d+\.$', 'template': 'You have paid P<金额> via GCash to <收款人名称> on <日期时间>. Ref. No. <交易单号>. QRPH Invoice No. <流水号>.', },{ 'source_regex': r'(?i)^Received GCash from [^<]+$', 'template': 'Received GCash from <付款人名称>', },{ 'source_regex': r'^Payment to ([^,]+)$', 'template': 'Payment to <收款人名称>', },{ 'source_regex': r'^(.+?) with (Ref\. no\.|Parent Ref\.No\.|Reference No\.) (.+)$', 'template': '<交易类型> with Ref. no. <交易单号>', },{ 'source_regex': r'^Buy Load Transaction for .+$', 'template': 'Buy Load Transaction for <付款人号码>', },{ 'source_regex': r'^Transfer from \S+ to \S+$', 'template': 'Transfer from <付款人号码> to <收款人号码>', } ] # 注:不再使用全局 PLACEHOLDER_PATTERNS 作为主要来源。 # 如果需要回退模式,会使用通用 '(.+?)' 作为默认。 def extract_group_patterns_from_regex(regex_str: str): """ 从一个正则表达式字符串中提取按出现顺序的顶层捕获组的内部模式(不含最外层括号)。 说明与限制: - 只提取按文本顺序出现的捕获组 (包括命名组 (?P...)),跳过非捕获组 (?:...), lookaround 等。 - 对非常复杂或不规则的正则(条件组、嵌套的命名/非命名混合、内联 flags 等)不能保证 100% 正确,建议对特殊规则手动验证/修正。 返回值:字符串列表,例如 ['\\d{4}', '.+?'](不包含外层括号)。 """ s = regex_str n = len(s) i = 0 stack = [] # each item: (start_index, is_capturing) results = [] while i < n: ch = s[i] if ch == '\\': # skip escaped char i += 2 continue if ch == '(': # lookahead to decide capturing vs non-capturing is_capturing = True if i + 1 < n and s[i+1] == '?': # (?P... ) is a capturing named group if i + 2 < n and s[i+2] == 'P': is_capturing = True else: # other (?...) forms are non-capturing or lookaround is_capturing = False stack.append((i, is_capturing)) i += 1 continue if ch == ')': if not stack: i += 1 continue start, is_capturing = stack.pop() if is_capturing: inner = s[start+1:i] # strip surrounding whitespace results.append(inner) i += 1 continue i += 1 return results def get_placeholder(action): """ 根据JSON消息的action类型返回对应的占位符 """ if 'Received' in action: return '付款人号码' elif 'Sent' in action: return '收款人号码' elif 'Refunded' in action: return '付款人号码' else: return '付款人号码' # 默认值 def normalize_text(text): # 先统一应用 ABSTRACT_RULES 中的简单替换规则(已迁移) for ar in ABSTRACT_RULES: try: if ar.get('source_regex') and ar.get('template'): text = re.sub(ar['source_regex'], ar['template'], text) except re.error: # 跳过不合法的正则 continue # 保留无法迁移的复杂规则:统一处理所有JSON格式消息(使用lambda) text = re.sub( r'\\\"(Received money from|Sent money to|Received settlement from|Reversed settlement from|Refunded money via|Sent money via)\\\",\\\"target\\\":\\\"(.+?)\\\"', lambda m: f'\\\"{m.group(1)}\\\",\\\"target\\\":\\\"<{get_placeholder(m.group(1))}>\\\"', text ) return text def template_to_regex(template): """ 将模板转换为可用于提取参数的正则表达式 """ # 转义模板中的特殊字符,但保留占位符 escaped_template = re.escape(template) # 将占位符替换为通用捕获组(因为不再依赖全局占位符映射) # 如果需要更精确的子模式,请在 ABSTRACT_RULES 的 source_regex 中提供捕获组,后续会从中提取。 default_pat = r'(.+?)' placeholders = re.findall(r'<([^>]+)>', template) for name in placeholders: escaped_placeholder = re.escape(f'<{name}>') escaped_template = escaped_template.replace(escaped_placeholder, default_pat, 1) return escaped_template def build_regex_from_template_data(template_data): """ 根据模板的元信息构建一个用于匹配的正则表达式字符串。 优先使用 template_data['source_regex'],否则从 template_data['content'] + template_data['placeholders'] 生成带命名分组的正则。 """ # 如果有原始正则,直接使用(保持原样) if 'source_regex' in template_data and template_data['source_regex']: return template_data['source_regex'] template = template_data.get('content', '') placeholders_meta = template_data.get('placeholders') # 如果没有占位符元信息,退回到旧方法生成正则 if not placeholders_meta: return template_to_regex(template) # 转义模板的普通字符,保留占位符位置 escaped_template = re.escape(template) def strip_outer_parens(pat: str) -> str: pat = pat.strip() if pat.startswith('(') and pat.endswith(')'): return pat[1:-1] return pat # placeholders_meta 可能是 [{'name':..., 'pattern':...}, ...] 或 ['name1','name2'] # 使用一个计数器来处理重复的占位符名称 placeholder_count = {} # placeholders_meta 可能是 [{'name':..., 'pattern':...}, ...] 或 ['name1','name2'] for ph in placeholders_meta: if isinstance(ph, dict): name = ph.get('name') pattern = ph.get('pattern') if ph.get('pattern') else '(.+?)' else: # 旧格式:仅名字 name = ph # 使用默认通配子模式 pattern = '(.+?)' # 处理重复的占位符名称 count = placeholder_count.get(name, 0) placeholder_count[name] = count + 1 # 如果有重复,使用 name_1, name_2 等 safe_name = re.sub(r"[^0-9A-Za-z_]", "_", name) # 为了避免不同名称产生相同的safe_name,添加更多区分信息 if len(safe_name) <= 5: # 如果转换后很短 # 使用名称的字符信息来区分 char_sum = sum(ord(c) for c in name) safe_name = f"{safe_name}_{char_sum % 100}" if count > 0: safe_name = f"{safe_name}_{count}" # 清理 pattern 的外层括号以避免双重捕获 inner = strip_outer_parens(pattern) named_group = f"(?P<{safe_name}>{inner})" escaped_placeholder = re.escape(f'<{name}>') escaped_template = escaped_template.replace(escaped_placeholder, named_group, 1) # 只替换第一个匹配 return escaped_template def extract_parameters(template, message): """ 从消息中提取参数值 """ # 生成正则表达式 pattern = template_to_regex(template) # 匹配消息 match = re.search(pattern, message) if match: # 获取所有捕获组 values = match.groups() # 获取模板中的占位符 placeholders = re.findall(r'<[^>]+>', template) # 创建参数字典 parameters = {} for i, placeholder in enumerate(placeholders): if i < len(values): parameters[placeholder] = values[i] return parameters return {} def run_dbscan_on_corpus(corpus, eps, min_samples, max_samples=10): if not corpus: return {} processed_corpus = [normalize_text(text) for text in corpus] try: vectorizer = TfidfVectorizer() X = vectorizer.fit_transform(processed_corpus) db = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine', n_jobs=-1).fit(X) labels = db.labels_ dbscan_templates = {} unique_labels = set(labels) for label in unique_labels: class_member_indices = np.where(labels == label)[0] if label == -1: # 处理噪声点 for idx in class_member_indices: original = corpus[idx] normalized = processed_corpus[idx] if normalized not in dbscan_templates: dbscan_templates[normalized] = [] if len(dbscan_templates[normalized]) < max_samples: dbscan_templates[normalized].append(original) continue # 处理聚类 cluster_vectors = X[class_member_indices] centroid = np.asarray(cluster_vectors.mean(axis=0)) similarities = cosine_similarity(cluster_vectors, centroid) most_representative_idx_in_cluster = np.argmax(similarities) original_corpus_idx = class_member_indices[most_representative_idx_in_cluster] most_representative_normalized = processed_corpus[original_corpus_idx] cluster_originals = [corpus[idx] for idx in class_member_indices] dbscan_templates[most_representative_normalized] = cluster_originals[:max_samples] return dbscan_templates except ValueError: # 如果批次中所有词都在停用词表中,TfidfVectorizer会报错 print("警告: DBSCAN批次处理失败,可能因为内容过于单一或简短。将内容视为独立模板。") return {processed_corpus[i]: [corpus[i]][:max_samples] for i in range(len(corpus))} def extract_templates_iterative(input_files, output_file, rules, batch_size=1000, eps=0.4, min_samples=2, max_samples_per_template=0, content_key='content'): """ 使用小批量迭代的混合策略来提取模板,并为每个模板收集最多10个原始数据集。 支持多个输入文件。 """ print("--- 开始迭代式模板提取 ---") final_templates = {} # template -> list of original contents # 存储模板的元信息(占位符与其正则、原始正则等) templates_meta = {} # template -> { 'placeholders': [ {name, pattern}, ... ], 'source_regex': str } unmatched_batch = [] batch_num = 1 try: print(f"步骤 1: 逐行处理输入文件 {input_files} 并动态构建模板库...") total_lines = 0 for input_file in input_files: with open(input_file, 'r', encoding='utf-8') as f: total_lines += sum(1 for _ in f) for input_file in input_files: print(f"\n--- 开始处理文件: {input_file} ---") # 计算当前文件的行数 with open(input_file, 'r', encoding='utf-8') as f: file_lines = sum(1 for _ in f) with open(input_file, 'r', encoding='utf-8') as f: for line in tqdm(f, total=file_lines, desc=f"处理 {input_file.split('/')[-1]}"): try: data = json.loads(line) content = data.get(content_key) if not content: continue normalized_content = normalize_text(content) # 1. 检查是否匹配已发现的任何模板 if normalized_content in final_templates: if len(final_templates[normalized_content]) < 10: final_templates[normalized_content].append(content) continue # 2. 检查是否匹配预定义规则 matched_by_rule = False for rule in rules: if rule['pattern'].match(content): if normalized_content not in final_templates: final_templates[normalized_content] = [] if len(final_templates[normalized_content]) < 10: final_templates[normalized_content].append(content) matched_by_rule = True break # 额外:检查抽象化规则(source_regex + template + placeholders) if not matched_by_rule: for ar in ABSTRACT_RULES: try: if re.match(ar['source_regex'], content): tpl = ar['template'] if tpl not in final_templates: final_templates[tpl] = [] if len(final_templates[tpl]) < 10: final_templates[tpl].append(content) # 注册元信息(占位符与其组正则) if tpl not in templates_meta: # 如果没有显式提供 placeholders,从模板字符串自动提取 placeholders = ar.get('placeholders') if not placeholders: placeholders = re.findall(r'<([^>]+)>', ar['template']) # 尝试从 source_regex 中提取顶层捕获组模式并按占位符顺序配对 try: group_patterns = extract_group_patterns_from_regex(ar['source_regex']) except Exception: group_patterns = [] phs = [] for idx, name in enumerate(placeholders): # 如果成功提取到对应的捕获组模式,则使用之,否则回退到通配 '(.+?)' pat = group_patterns[idx] if idx < len(group_patterns) and group_patterns[idx] else '(.+?)' phs.append({'name': name, 'pattern': pat}) templates_meta[tpl] = { 'placeholders': phs, 'source_regex': ar['source_regex'] } matched_by_rule = True break except re.error: # 如果抽象规则本身有错误,跳过 continue if matched_by_rule: continue # 3. 如果都未匹配,加入批处理列表 unmatched_batch.append(content) # 4. 检查是否触发批处理 if len(unmatched_batch) >= batch_size: print(f"\n--- 处理批次 #{batch_num} (大小: {len(unmatched_batch)}) ---") newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10) print(f"批次 #{batch_num}: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。") for template, originals in newly_found_templates.items(): if template in final_templates: remaining = 10 - len(final_templates[template]) final_templates[template].extend(originals[:remaining]) else: final_templates[template] = originals[:10] print(f"当前总模板数: {len(final_templates)}") unmatched_batch.clear() batch_num += 1 except (json.JSONDecodeError, AttributeError): continue # --- 收尾处理 --- print("\n--- 文件处理完毕,处理最后一批剩余内容 ---") if unmatched_batch: print(f"处理最后一个批次 (大小: {len(unmatched_batch)})") newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10) print(f"最后一个批次: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。") for template, originals in newly_found_templates.items(): if template in final_templates: remaining = 10 - len(final_templates[template]) final_templates[template].extend(originals[:remaining]) else: final_templates[template] = originals[:10] else: print("没有剩余内容需要处理。") # --- 输出 --- print("\n--- 第 3 部分: 合并结果并保存 ---") print(f"总共找到 {len(final_templates)} 个唯一的模板。") # 收集所有出现的占位符名称,写入单独文件以便查看 all_placeholders = set() with open(output_file, 'w', encoding='utf-8') as f: for template, data_list in sorted(final_templates.items()): # 新格式:{"template":..., "regex":{name: pattern, ...}, "data":[...]} output_obj = {'template': template} # 构建 regex 映射:placeholder name -> pattern regex_map = {} if template in templates_meta: for ph in templates_meta[template].get('placeholders', []): if isinstance(ph, dict): regex_map[ph.get('name')] = ph.get('pattern') else: # ph 可能是名字字符串,使用通用后备模式 regex_map[ph] = '(.+?)' # 优先保留 source_regex as special key if present if templates_meta[template].get('source_regex'): output_obj['source_regex'] = templates_meta[template]['source_regex'] else: # 没有元信息时,尝试根据模板内占位符名用默认映射构建 placeholders = re.findall(r'<([^>]+)>', template) for name in placeholders: regex_map[name] = '(.+?)' output_obj['regex'] = regex_map # 示例数据 output_obj['data'] = data_list[:max_samples_per_template] if max_samples_per_template != 0 else [] json.dump(output_obj, f, ensure_ascii=False) f.write('\n') # 收集占位符(以 形式) for name in regex_map.keys(): all_placeholders.add(f'<{name}>') # 如果模板字符串本身包含未出现在 regex_map 的占位符,也一并收集 for name in re.findall(r'<([^>]+)>', template): all_placeholders.add(f'<{name}>') print(f"所有模板已成功写入到 '{output_file}'。") # 输出占位符清单到文件 placeholders_file = 'placeholders_list.txt' try: with open(placeholders_file, 'w', encoding='utf-8') as pf: for ph in sorted(all_placeholders): pf.write(ph + '\n') print(f"所有占位符已写入到 '{placeholders_file}'(共 {len(all_placeholders)} 个)。") except Exception: print("警告:写入占位符清单失败。") except FileNotFoundError as e: print(f"错误:找不到输入文件 {e.filename}。") return def extract_values_with_templates(input_files, template_file, output_file, content_key='content'): """ 使用DBSCAN生成的模板从原始消息中提取参数值 支持多个输入文件。 """ print("--- 开始使用模板提取参数值 ---") # 读取模板及其元信息。支持两种格式:旧的 {content, placeholders, source_regex} 或 新的 {template, regex, source_regex, data} templates_meta = [] with open(template_file, 'r', encoding='utf-8') as f: for line in f: try: raw = json.loads(line) except json.JSONDecodeError: continue # 规范化到内部使用的 tmeta 格式:{ 'content': ..., 'placeholders': [ {name, pattern}, ... ], 'source_regex': ... } tmeta = {} if 'template' in raw: tmeta['content'] = raw.get('template') # raw may have 'regex' mapping name->pattern regex_map = raw.get('regex', {}) phs = [] for name, pat in regex_map.items(): phs.append({'name': name, 'pattern': pat}) # also consider raw['source_regex'] and raw['data'] if present if 'source_regex' in raw: tmeta['source_regex'] = raw.get('source_regex') if phs: tmeta['placeholders'] = phs elif 'content' in raw: # backward-compatible tmeta['content'] = raw.get('content') if 'placeholders' in raw: tmeta['placeholders'] = raw.get('placeholders') if 'source_regex' in raw: tmeta['source_regex'] = raw.get('source_regex') else: # skip unknown format continue templates_meta.append(tmeta) print(f"已加载 {len(templates_meta)} 个模板(含元信息/已规范化)") # 从原始数据中提取值 extracted_values = [] total_lines = 0 for input_file in input_files: with open(input_file, 'r', encoding='utf-8') as f: total_lines += sum(1 for _ in f) for input_file in input_files: print(f"\n--- 开始处理文件: {input_file} ---") # 计算当前文件的行数 with open(input_file, 'r', encoding='utf-8') as f: file_lines = sum(1 for _ in f) with open(input_file, 'r', encoding='utf-8') as f: for line in tqdm(f, total=file_lines, desc=f"提取 {input_file.split('/')[-1]}"): try: data = json.loads(line) content = data.get(content_key, '') if not content: continue # 尝试匹配每个模板 for tmeta in templates_meta: # 构建用于匹配的正则 try: regex_str = build_regex_from_template_data(tmeta) match = re.search(regex_str, content) except re.error: # 如果生成的正则有问题,退回到老方法 match = None parameters = {} if match: # 如果有命名组,优先使用命名组 if match.groupdict(): # 将安全组名还原为原始占位符名(如果有元信息) gd = match.groupdict() # 如果模板元信息里有占位符名映射,则把原名映射回来 ph_map = {} if 'placeholders' in tmeta and isinstance(tmeta['placeholders'], list): for ph in tmeta['placeholders']: if isinstance(ph, dict): orig = ph.get('name') else: orig = ph safe = re.sub(r"[^0-9A-Za-z_]", "_", orig) ph_map[safe] = orig for safe_name, val in gd.items(): orig_name = ph_map.get(safe_name, safe_name) parameters[f'<{orig_name}>'] = val else: # 否则使用位置组,优先使用模板元信息中的占位符顺序(来自 templates_meta) values = match.groups() placeholders_meta = tmeta.get('placeholders') if placeholders_meta and isinstance(placeholders_meta, list): # placeholders_meta 可能是 [{'name':..., 'pattern':...}, ...] 或 ['name1','name2'] for i, ph in enumerate(placeholders_meta): if i >= len(values): break name = ph.get('name') if isinstance(ph, dict) else ph parameters[f'<{name}>'] = values[i] else: # 退回到从模板字符串解析占位符(向后兼容旧格式) placeholders = re.findall(r'<([^>]+)>', tmeta.get('content', '')) for i, name in enumerate(placeholders): if i < len(values): parameters[f'<{name}>'] = values[i] else: # 退回老方法:通过模板字符串替换占位符生成正则 tpl = tmeta.get('content') if tpl: parameters = extract_parameters(tpl, content) if parameters: extracted_values.append({ 'template': tmeta.get('content'), 'message': content, 'parameters': parameters }) # 找到匹配就跳出循环 break except (json.JSONDecodeError, Exception): continue # 保存提取的值 with open(output_file, 'w', encoding='utf-8') as f: for item in extracted_values: json.dump(item, f, ensure_ascii=False) f.write('\n') print(f"成功从 {len(extracted_values)} 条消息中提取参数,并保存到 '{output_file}'") # --- 使用示例 --- # 假设您已经运行了上一个脚本,生成了 'content_filtered.jsonl' input_jsonl_files = ['content_filtered.jsonl', 'output.jsonl'] # 默认单个文件,可扩展为多个 output_template_file = 'templates_iterative.jsonl' BATCH_PROCESSING_SIZE = 10000 # 可以根据你的内存和数据量调整 if __name__ == "__main__": parser = argparse.ArgumentParser(description='Extract templates from GCash transaction data.') parser.add_argument('--input_file', type=str, nargs='+', default=input_jsonl_files, help='Input JSONL file paths (multiple files supported)') parser.add_argument('--output_file', type=str, default=output_template_file, help='Output template file path') parser.add_argument('--batch_size', type=int, default=BATCH_PROCESSING_SIZE, help='Batch processing size (data volume)') parser.add_argument('--eps', type=float, default=0.4, help='DBSCAN eps parameter') parser.add_argument('--min_samples', type=int, default=5, help='DBSCAN min_samples parameter') parser.add_argument('--extract_values', action='store_true', help='Extract values using generated templates') parser.add_argument('--content_key', type=str, default='content', help='Key to extract content from JSON objects (default: content)') args = parser.parse_args() if args.extract_values: # 执行参数提取 extract_values_with_templates( input_files=args.input_file, template_file='templates_iterative.jsonl', output_file=args.output_file, content_key=args.content_key ) else: # 执行模板提取 extract_templates_iterative( input_files=args.input_file, output_file=args.output_file, rules=PREDEFINED_RULES, batch_size=args.batch_size, eps=args.eps, min_samples=args.min_samples, content_key=args.content_key )