import json import re import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import DBSCAN from sklearn.metrics.pairwise import cosine_similarity from tqdm import tqdm import argparse # --- 规则和辅助函数 (与之前相同) --- PREDEFINED_RULES = [ # {'name': 'Transfer', 'pattern': re.compile(r'^Transfer from \d+ to \d+$')}, # {'name': 'International Remittance', 'pattern': re.compile(r'^International Remittance$')}, # {'name': 'Bill Payment', 'pattern': re.compile(r'^Bill payment successful for amount \d+$')}, # {'name': 'New Message', 'pattern': re.compile(r'^You have a new message from \d+\.$')}, # # 新增规则:匹配类似 "Sent GCash to GoTyme Bank with account ending in 6784" # {'name': 'Sent GCash to Account', 'pattern': re.compile(r'^Sent GCash to .+? with account ending in \d+$')} ] # 占位符到正则表达式的映射 PLACEHOLDER_PATTERNS = { '<金额>': r'([\d,.]+)', '<付款人名称>': r'(.+?)', '<收款人名称>': r'(.+?)', '<付款人号码>': r'([\d\w\+\-\(\)]+)', '<收款人号码>': r'([\d\w\+\-\(\)]+)', '<银行4位数尾号>': r'(\d{4})', '<参考号>': r'(.+?)', '<交易单号>': r'(.+?)', '<日期时间>': r'(.+?)', '<日期>': r'(\d{2}-\d{2}-\d{4})', '<时间>': r'(\d{1,2}:\d{2}\s[AP]M)', '<消息>': r'(.+?)', '<流水号>': r'(.+?)', '<网络或发票号>': r'(.+?)', '<交易类型>': r'(.+?)', } def get_placeholder(action): """ 根据JSON消息的action类型返回对应的占位符 """ if 'Received' in action: return '付款人号码' elif 'Sent' in action: return '收款人号码' elif 'Refunded' in action: return '付款人号码' else: return '付款人号码' # 默认值 def normalize_text(text): # 模式 8: 从银行收款 (这条规则必须先运行) text = re.sub( r'(?i)Received GCash from (.+?) with account ending in (\d+) (via .+|and invno:.+)$', r'Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <网络或发票号>', text ) # 模式 13: 向未验证账户发送凭证 # 结构: You have sent <货币> <金额> to an unverified account <手机号> on <日期> <时间> with MSG: <消息>. Your new balance is <货币> <金额>. Ref. No. <流水号>. Go to... text = re.sub( r'^You have sent PHP [\d,]+\.\d{2} to an unverified account [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\. Go to GCash Help Center to know how to secure your transactions\.$', r'You have sent PHP <金额> to an unverified account <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <流水号>. Go to GCash Help Center to know how to secure your transactions.', text ) # 模式 12: 详细发送凭证 (更新后可处理多段姓名) # 结构: You have sent <货币> <金额> to <收款人> <手机号> on <日期> <时间> with MSG: <消息>. Your new balance is <货币> <金额>. Ref. No. <流水号>. text = re.sub( r'^You have sent PHP [\d,]+\.\d{2} to (?:[A-Z\*]+\s)+[A-Z\*]\.\s[\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\.$', r'You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <流水号>.', text ) # 模式 11 (机构): 详细收款凭证 # 结构: You have received ... of GCash from <来源>. Your new balance is ... <日期时间>. Ref. No. <流水号>. Use now to buy load... text = re.sub( r'^You have received\s+(?:PHP\s+)?[\d,.]+\s+of GCash from\s+.+?\. Your new balance is\s+(?:PHP\s+)?[\d,.]+\s+\d{1,2}-\d{1,2}-\d{2,4}\s+\d{1,2}:\d{1,2}(?::\d{1,2})?\s+[AP]M\. Ref\. No\.\s+.+?\. Use now to buy load, purchase items, send money, pay bills, and a lot more!$', r'You have received <金额> of GCash from <付款人名称>. Your new balance is <金额>. <日期时间>. Ref. No. <流水号>. Use now to buy load, purchase items, send money, pay bills, and a lot more!', text ) # 模式 11 (个人): 详细收款凭证 (最终修正版,兼容多种手机号/余额/结尾格式) text = re.sub( r'^You have received PHP [\d,.]+\s+of GCash from .+? w/ MSG: .*\. (?:Your new balance is PHP [\d,.]*\.\s)?Ref\. No\. \d+\.(?: To access your funds,.*)?$', r'You have received PHP <金额> of GCash from <付款人名称> w/ MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>.', text ) # 模式 12: 详细发送凭证 (最终修正版,兼容所有已知姓名格式) text = re.sub( r'^You have sent PHP [\d,]+\.\d{2} to .+? [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\.$', r'You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>.', text ) # 模式 10 (来自用户最初的模板列表,这里将其具体化) # 结构: You have paid <金额> via GCash to <接收方> on <日期时间>. Ref. No. <参考号>. QRPH Invoice No. <参考号>. text = re.sub( r'^You have paid P[\d,.]+\s+via GCash to .+? on \d{1,2}-\d{1,2}-\d{2,4}\s\d{1,2}:\d{1,2}:\d{1,2}\s+[AP]M\. Ref\. No\.\s+\d+\. QRPH Invoice No\.\s+\d+\.$', r'You have paid P<金额> via GCash to <收款人名称> on <日期时间>. Ref. No. <参考号>. QRPH Invoice No. <参考号>.', text ) # 模式 9 (来自用户最初的模板列表,这里将其具体化) # 结构: Sent GCash to <机构名> with account ending in <尾号> text = re.sub( r'Sent GCash to (.+?) with account ending in (\d+)$', r'Sent GCash to <收款人名称> with account ending in <银行4位数尾号>', text ) # 新增规则:模式 7: 从一般来源收款 (这条规则紧随其后) # 它只会处理没有被上面那条规则匹配到的 "Received GCash from..." text = re.sub( r'(?i)^Received GCash from [^<]+$', r'Received GCash from <付款人名称>', text ) # 模式 6: 带商户交易单号的支付 # 结构: Payment to <商户名>, Merchant Transaction Number: <交易单号> text = re.sub( r'Payment to (.+?), Merchant Transaction Number: (.+)$', r'Payment to <收款人名称>, Merchant Transaction Number: <交易单号>', text ) # 模式 5 (来自用户最初的模板列表,这里将其具体化) # 结构: Payment to <商户名> text = re.sub( r'^Payment to ([^,]+)$', r'Payment to <收款人名称>', text ) text = re.sub( r'^(.+?) with (Ref\. no\.|Parent Ref\.No\.|Reference No\.) (.+)$', r'<交易类型> with Ref. no. <参考号>', text ) text = re.sub(r'Sent GCash to <收款人名称> with account ending in (\d+)$', r'Sent GCash to <收款人名称> with account ending in <银行4位数尾号>', text) text = re.sub(r'^Transfer from \S+ to \S+$', r'Transfer from <付款人号码> to <收款人号码>', text) # 模式 8: 从银行收款 # 结构: Received GCash from <机构名> with account ending in <尾号> via <网络> or with invno:<...> text = re.sub( r'(?i)Received GCash from (.+?) with account ending in (\d+) (via .+|and invno:.+)$', r'Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <流水号>', text ) # 新增规则:Buy Load Transaction text = re.sub( r'^Buy Load Transaction for .+$', r'Buy Load Transaction for <付款人号码>', text ) # 新增规则:Refund text = re.sub( r'^Refund from .+$', r'Refund from <收款人名称>', text ) # 新增规则:统一处理所有JSON格式消息 # 匹配各种action类型:Received money from, Sent money to, Received settlement from, Reversed settlement from等 text = re.sub( r'\\\"(Received money from|Sent money to|Received settlement from|Reversed settlement from|Refunded money via|Sent money via)\\\",\\\"target\\\":\\\"(.+?)\\\"', lambda m: f'\\\"{m.group(1)}\\\",\\\"target\\\":\\\"<{get_placeholder(m.group(1))}>\\\"', text ) return text def template_to_regex(template): """ 将模板转换为可用于提取参数的正则表达式 """ # 转义模板中的特殊字符,但保留占位符 escaped_template = re.escape(template) # 将占位符映射到对应的正则表达式捕获组 for placeholder, pattern in PLACEHOLDER_PATTERNS.items(): escaped_placeholder = re.escape(placeholder) # 替换占位符为对应的捕获组 escaped_template = escaped_template.replace(escaped_placeholder, pattern) return escaped_template def extract_parameters(template, message): """ 从消息中提取参数值 """ # 生成正则表达式 pattern = template_to_regex(template) # 匹配消息 match = re.search(pattern, message) if match: # 获取所有捕获组 values = match.groups() # 获取模板中的占位符 placeholders = re.findall(r'<[^>]+>', template) # 创建参数字典 parameters = {} for i, placeholder in enumerate(placeholders): if i < len(values): parameters[placeholder] = values[i] return parameters return {} def run_dbscan_on_corpus(corpus, eps, min_samples, max_samples=10): if not corpus: return {} processed_corpus = [normalize_text(text) for text in corpus] try: vectorizer = TfidfVectorizer() X = vectorizer.fit_transform(processed_corpus) db = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine', n_jobs=-1).fit(X) labels = db.labels_ dbscan_templates = {} unique_labels = set(labels) for label in unique_labels: class_member_indices = np.where(labels == label)[0] if label == -1: # 处理噪声点 for idx in class_member_indices: original = corpus[idx] normalized = processed_corpus[idx] if normalized not in dbscan_templates: dbscan_templates[normalized] = [] if len(dbscan_templates[normalized]) < max_samples: dbscan_templates[normalized].append(original) continue # 处理聚类 cluster_vectors = X[class_member_indices] centroid = np.asarray(cluster_vectors.mean(axis=0)) similarities = cosine_similarity(cluster_vectors, centroid) most_representative_idx_in_cluster = np.argmax(similarities) original_corpus_idx = class_member_indices[most_representative_idx_in_cluster] most_representative_normalized = processed_corpus[original_corpus_idx] cluster_originals = [corpus[idx] for idx in class_member_indices] dbscan_templates[most_representative_normalized] = cluster_originals[:max_samples] return dbscan_templates except ValueError: # 如果批次中所有词都在停用词表中,TfidfVectorizer会报错 print("警告: DBSCAN批次处理失败,可能因为内容过于单一或简短。将内容视为独立模板。") return {processed_corpus[i]: [corpus[i]][:max_samples] for i in range(len(corpus))} def extract_templates_iterative(input_files, output_file, rules, batch_size=1000, eps=0.4, min_samples=2, max_samples_per_template=0): """ 使用小批量迭代的混合策略来提取模板,并为每个模板收集最多10个原始数据集。 支持多个输入文件。 """ print("--- 开始迭代式模板提取 ---") final_templates = {} # template -> list of original contents unmatched_batch = [] batch_num = 1 try: print(f"步骤 1: 逐行处理输入文件 {input_files} 并动态构建模板库...") total_lines = 0 for input_file in input_files: with open(input_file, 'r', encoding='utf-8') as f: total_lines += sum(1 for _ in f) for input_file in input_files: with open(input_file, 'r', encoding='utf-8') as f: for line in tqdm(f, total=total_lines, desc="主进程"): try: content = json.loads(line).get('content') if not content: continue normalized_content = normalize_text(content) # 1. 检查是否匹配已发现的任何模板 if normalized_content in final_templates: if len(final_templates[normalized_content]) < 10: final_templates[normalized_content].append(content) continue # 2. 检查是否匹配预定义规则 matched_by_rule = False for rule in rules: if rule['pattern'].match(content): if normalized_content not in final_templates: final_templates[normalized_content] = [] if len(final_templates[normalized_content]) < 10: final_templates[normalized_content].append(content) matched_by_rule = True break if matched_by_rule: continue # 3. 如果都未匹配,加入批处理列表 unmatched_batch.append(content) # 4. 检查是否触发批处理 if len(unmatched_batch) >= batch_size: print(f"\n--- 处理批次 #{batch_num} (大小: {len(unmatched_batch)}) ---") newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10) print(f"批次 #{batch_num}: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。") for template, originals in newly_found_templates.items(): if template in final_templates: remaining = 10 - len(final_templates[template]) final_templates[template].extend(originals[:remaining]) else: final_templates[template] = originals[:10] print(f"当前总模板数: {len(final_templates)}") unmatched_batch.clear() batch_num += 1 except (json.JSONDecodeError, AttributeError): continue # --- 收尾处理 --- print("\n--- 文件处理完毕,处理最后一批剩余内容 ---") if unmatched_batch: print(f"处理最后一个批次 (大小: {len(unmatched_batch)})") newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10) print(f"最后一个批次: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。") for template, originals in newly_found_templates.items(): if template in final_templates: remaining = 10 - len(final_templates[template]) final_templates[template].extend(originals[:remaining]) else: final_templates[template] = originals[:10] else: print("没有剩余内容需要处理。") # --- 输出 --- print("\n--- 第 3 部分: 合并结果并保存 ---") print(f"总共找到 {len(final_templates)} 个唯一的模板。") with open(output_file, 'w', encoding='utf-8') as f: for template, data_list in sorted(final_templates.items()): if max_samples_per_template == 0: json.dump({"content": template}, f, ensure_ascii=False) else: json.dump({"content": template, "data": data_list[:max_samples_per_template]}, f, ensure_ascii=False) f.write('\n') print(f"所有模板已成功写入到 '{output_file}'。") except FileNotFoundError as e: print(f"错误:找不到输入文件 {e.filename}。") return def extract_values_with_templates(input_files, template_file, output_file): """ 使用DBSCAN生成的模板从原始消息中提取参数值 支持多个输入文件。 """ print("--- 开始使用模板提取参数值 ---") # 读取模板 templates = [] with open(template_file, 'r', encoding='utf-8') as f: for line in f: template_data = json.loads(line) templates.append(template_data['content']) print(f"已加载 {len(templates)} 个模板") # 从原始数据中提取值 extracted_values = [] total_lines = 0 for input_file in input_files: with open(input_file, 'r', encoding='utf-8') as f: total_lines += sum(1 for _ in f) for input_file in input_files: with open(input_file, 'r', encoding='utf-8') as f: for line in tqdm(f, total=total_lines, desc="提取参数"): try: data = json.loads(line) content = data.get('content', '') if not content: continue # 尝试匹配每个模板 for template in templates: parameters = extract_parameters(template, content) if parameters: extracted_values.append({ 'template': template, 'message': content, 'parameters': parameters }) # 找到匹配就跳出循环 break except (json.JSONDecodeError, Exception): continue # 保存提取的值 with open(output_file, 'w', encoding='utf-8') as f: for item in extracted_values: json.dump(item, f, ensure_ascii=False) f.write('\n') print(f"成功从 {len(extracted_values)} 条消息中提取参数,并保存到 '{output_file}'") # --- 使用示例 --- # 假设您已经运行了上一个脚本,生成了 'content_filtered.jsonl' input_jsonl_files = ['content_filtered.jsonl', 'output.jsonl'] # 默认单个文件,可扩展为多个 output_template_file = 'templates_iterative.txt' BATCH_PROCESSING_SIZE = 10000 # 可以根据你的内存和数据量调整 if __name__ == "__main__": parser = argparse.ArgumentParser(description='Extract templates from GCash transaction data.') parser.add_argument('--input_file', type=str, nargs='+', default=input_jsonl_files, help='Input JSONL file paths (multiple files supported)') parser.add_argument('--output_file', type=str, default=output_template_file, help='Output template file path') parser.add_argument('--batch_size', type=int, default=BATCH_PROCESSING_SIZE, help='Batch processing size (data volume)') parser.add_argument('--eps', type=float, default=0.4, help='DBSCAN eps parameter') parser.add_argument('--min_samples', type=int, default=5, help='DBSCAN min_samples parameter') parser.add_argument('--extract_values', action='store_true', help='Extract values using generated templates') args = parser.parse_args() if args.extract_values: # 执行参数提取 extract_values_with_templates( input_files=args.input_file, template_file='templates_iterative.txt', output_file=args.output_file ) else: # 执行模板提取 extract_templates_iterative( input_files=args.input_file, output_file=args.output_file, rules=PREDEFINED_RULES, batch_size=args.batch_size, eps=args.eps, min_samples=args.min_samples )