ml_data_template/sql_ml.py
2025-09-09 15:14:44 +08:00

472 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
import argparse
# --- 规则和辅助函数 (与之前相同) ---
PREDEFINED_RULES = [
# {'name': 'Transfer', 'pattern': re.compile(r'^Transfer from \d+ to \d+$')},
# {'name': 'International Remittance', 'pattern': re.compile(r'^International Remittance$')},
# {'name': 'Bill Payment', 'pattern': re.compile(r'^Bill payment successful for amount \d+$')},
# {'name': 'New Message', 'pattern': re.compile(r'^You have a new message from \d+\.$')},
# # 新增规则:匹配类似 "Sent GCash to GoTyme Bank with account ending in 6784"
# {'name': 'Sent GCash to Account', 'pattern': re.compile(r'^Sent GCash to .+? with account ending in \d+$')}
]
# 占位符到正则表达式的映射
PLACEHOLDER_PATTERNS = {
'<金额>': r'([\d,.]+)',
'<付款人名称>': r'(.+?)',
'<收款人名称>': r'(.+?)',
'<付款人号码>': r'([\d\w\+\-\(\)]+)',
'<收款人号码>': r'([\d\w\+\-\(\)]+)',
'<银行4位数尾号>': r'(\d{4})',
'<参考号>': r'(.+?)',
'<交易单号>': r'(.+?)',
'<日期时间>': r'(.+?)',
'<日期>': r'(\d{2}-\d{2}-\d{4})',
'<时间>': r'(\d{1,2}:\d{2}\s[AP]M)',
'<消息>': r'(.+?)',
'<流水号>': r'(.+?)',
'<网络或发票号>': r'(.+?)',
'<交易类型>': r'(.+?)',
}
def get_placeholder(action):
"""
根据JSON消息的action类型返回对应的占位符
"""
if 'Received' in action:
return '付款人号码'
elif 'Sent' in action:
return '收款人号码'
elif 'Refunded' in action:
return '付款人号码'
else:
return '付款人号码' # 默认值
def normalize_text(text):
# 模式 8: 从银行收款 (这条规则必须先运行)
text = re.sub(
r'(?i)Received GCash from (.+?) with account ending in (\d+) (via .+|and invno:.+)$',
r'Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <网络或发票号>',
text
)
# 模式 13: 向未验证账户发送凭证
# 结构: You have sent <货币> <金额> to an unverified account <手机号> on <日期> <时间> with MSG: <消息>. Your new balance is <货币> <金额>. Ref. No. <流水号>. Go to...
text = re.sub(
r'^You have sent PHP [\d,]+\.\d{2} to an unverified account [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\. Go to GCash Help Center to know how to secure your transactions\.$',
r'You have sent PHP <金额> to an unverified account <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <流水号>. Go to GCash Help Center to know how to secure your transactions.',
text
)
# 模式 12: 详细发送凭证 (更新后可处理多段姓名)
# 结构: You have sent <货币> <金额> to <收款人> <手机号> on <日期> <时间> with MSG: <消息>. Your new balance is <货币> <金额>. Ref. No. <流水号>.
text = re.sub(
r'^You have sent PHP [\d,]+\.\d{2} to (?:[A-Z\*]+\s)+[A-Z\*]\.\s[\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\.$',
r'You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <流水号>.',
text
)
# 模式 11 (机构): 详细收款凭证
# 结构: You have received ... of GCash from <来源>. Your new balance is ... <日期时间>. Ref. No. <流水号>. Use now to buy load...
text = re.sub(
r'^You have received\s+(?:PHP\s+)?[\d,.]+\s+of GCash from\s+.+?\. Your new balance is\s+(?:PHP\s+)?[\d,.]+\s+\d{1,2}-\d{1,2}-\d{2,4}\s+\d{1,2}:\d{1,2}(?::\d{1,2})?\s+[AP]M\. Ref\. No\.\s+.+?\. Use now to buy load, purchase items, send money, pay bills, and a lot more!$',
r'You have received <金额> of GCash from <付款人名称>. Your new balance is <金额>. <日期时间>. Ref. No. <流水号>. Use now to buy load, purchase items, send money, pay bills, and a lot more!',
text
)
# 模式 11 (个人): 详细收款凭证 (最终修正版,兼容多种手机号/余额/结尾格式)
text = re.sub(
r'^You have received PHP [\d,.]+\s+of GCash from .+? w/ MSG: .*\. (?:Your new balance is PHP [\d,.]*\.\s)?Ref\. No\. \d+\.(?: To access your funds,.*)?$',
r'You have received PHP <金额> of GCash from <付款人名称> w/ MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>.',
text
)
# 模式 12: 详细发送凭证 (最终修正版,兼容所有已知姓名格式)
text = re.sub(
r'^You have sent PHP [\d,]+\.\d{2} to .+? [\d\w\+\-\(\)]+ on \d{2}-\d{2}-\d{4}\s\d{1,2}:\d{2}\s[AP]M with MSG: \..*? Your new balance is PHP [\d,]+\.\d{2}\. Ref\. No\. \d+\.$',
r'You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>.',
text
)
# 模式 10 (来自用户最初的模板列表,这里将其具体化)
# 结构: You have paid <金额> via GCash to <接收方> on <日期时间>. Ref. No. <参考号>. QRPH Invoice No. <参考号>.
text = re.sub(
r'^You have paid P[\d,.]+\s+via GCash to .+? on \d{1,2}-\d{1,2}-\d{2,4}\s\d{1,2}:\d{1,2}:\d{1,2}\s+[AP]M\. Ref\. No\.\s+\d+\. QRPH Invoice No\.\s+\d+\.$',
r'You have paid P<金额> via GCash to <收款人名称> on <日期时间>. Ref. No. <参考号>. QRPH Invoice No. <参考号>.',
text
)
# 模式 9 (来自用户最初的模板列表,这里将其具体化)
# 结构: Sent GCash to <机构名> with account ending in <尾号>
text = re.sub(
r'Sent GCash to (.+?) with account ending in (\d+)$',
r'Sent GCash to <收款人名称> with account ending in <银行4位数尾号>',
text
)
# 新增规则:模式 7: 从一般来源收款 (这条规则紧随其后)
# 它只会处理没有被上面那条规则匹配到的 "Received GCash from..."
text = re.sub(
r'(?i)^Received GCash from [^<]+$',
r'Received GCash from <付款人名称>',
text
)
# 模式 6: 带商户交易单号的支付
# 结构: Payment to <商户名>, Merchant Transaction Number: <交易单号>
text = re.sub(
r'Payment to (.+?), Merchant Transaction Number: (.+)$',
r'Payment to <收款人名称>, Merchant Transaction Number: <交易单号>',
text
)
# 模式 5 (来自用户最初的模板列表,这里将其具体化)
# 结构: Payment to <商户名>
text = re.sub(
r'^Payment to ([^,]+)$',
r'Payment to <收款人名称>',
text
)
text = re.sub(
r'^(.+?) with (Ref\. no\.|Parent Ref\.No\.|Reference No\.) (.+)$',
r'<交易类型> with Ref. no. <参考号>',
text
)
text = re.sub(r'Sent GCash to <收款人名称> with account ending in (\d+)$', r'Sent GCash to <收款人名称> with account ending in <银行4位数尾号>', text)
text = re.sub(r'^Transfer from \S+ to \S+$', r'Transfer from <付款人号码> to <收款人号码>', text)
# 模式 8: 从银行收款
# 结构: Received GCash from <机构名> with account ending in <尾号> via <网络> or with invno:<...>
text = re.sub(
r'(?i)Received GCash from (.+?) with account ending in (\d+) (via .+|and invno:.+)$',
r'Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <流水号>',
text
)
# 新增规则Buy Load Transaction
text = re.sub(
r'^Buy Load Transaction for .+$',
r'Buy Load Transaction for <付款人号码>',
text
)
# 新增规则Refund
text = re.sub(
r'^Refund from .+$',
r'Refund from <收款人名称>',
text
)
# 新增规则统一处理所有JSON格式消息
# 匹配各种action类型Received money from, Sent money to, Received settlement from, Reversed settlement from等
text = re.sub(
r'\\\"(Received money from|Sent money to|Received settlement from|Reversed settlement from|Refunded money via|Sent money via)\\\",\\\"target\\\":\\\"(.+?)\\\"',
lambda m: f'\\\"{m.group(1)}\\\",\\\"target\\\":\\\"<{get_placeholder(m.group(1))}>\\\"',
text
)
return text
def template_to_regex(template):
"""
将模板转换为可用于提取参数的正则表达式
"""
# 转义模板中的特殊字符,但保留占位符
escaped_template = re.escape(template)
# 将占位符映射到对应的正则表达式捕获组
for placeholder, pattern in PLACEHOLDER_PATTERNS.items():
escaped_placeholder = re.escape(placeholder)
# 替换占位符为对应的捕获组
escaped_template = escaped_template.replace(escaped_placeholder, pattern)
return escaped_template
def extract_parameters(template, message):
"""
从消息中提取参数值
"""
# 生成正则表达式
pattern = template_to_regex(template)
# 匹配消息
match = re.search(pattern, message)
if match:
# 获取所有捕获组
values = match.groups()
# 获取模板中的占位符
placeholders = re.findall(r'<[^>]+>', template)
# 创建参数字典
parameters = {}
for i, placeholder in enumerate(placeholders):
if i < len(values):
parameters[placeholder] = values[i]
return parameters
return {}
def run_dbscan_on_corpus(corpus, eps, min_samples, max_samples=10):
if not corpus: return {}
processed_corpus = [normalize_text(text) for text in corpus]
try:
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(processed_corpus)
db = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine', n_jobs=-1).fit(X)
labels = db.labels_
dbscan_templates = {}
unique_labels = set(labels)
for label in unique_labels:
class_member_indices = np.where(labels == label)[0]
if label == -1: # 处理噪声点
for idx in class_member_indices:
original = corpus[idx]
normalized = processed_corpus[idx]
if normalized not in dbscan_templates:
dbscan_templates[normalized] = []
if len(dbscan_templates[normalized]) < max_samples:
dbscan_templates[normalized].append(original)
continue
# 处理聚类
cluster_vectors = X[class_member_indices]
centroid = np.asarray(cluster_vectors.mean(axis=0))
similarities = cosine_similarity(cluster_vectors, centroid)
most_representative_idx_in_cluster = np.argmax(similarities)
original_corpus_idx = class_member_indices[most_representative_idx_in_cluster]
most_representative_normalized = processed_corpus[original_corpus_idx]
cluster_originals = [corpus[idx] for idx in class_member_indices]
dbscan_templates[most_representative_normalized] = cluster_originals[:max_samples]
return dbscan_templates
except ValueError:
# 如果批次中所有词都在停用词表中TfidfVectorizer会报错
print("警告: DBSCAN批次处理失败可能因为内容过于单一或简短。将内容视为独立模板。")
return {processed_corpus[i]: [corpus[i]][:max_samples] for i in range(len(corpus))}
def extract_templates_iterative(input_files, output_file, rules, batch_size=1000, eps=0.4, min_samples=2, max_samples_per_template=0, content_key='content'):
"""
使用小批量迭代的混合策略来提取模板并为每个模板收集最多10个原始数据集。
支持多个输入文件。
"""
print("--- 开始迭代式模板提取 ---")
final_templates = {} # template -> list of original contents
unmatched_batch = []
batch_num = 1
try:
print(f"步骤 1: 逐行处理输入文件 {input_files} 并动态构建模板库...")
total_lines = 0
for input_file in input_files:
with open(input_file, 'r', encoding='utf-8') as f:
total_lines += sum(1 for _ in f)
for input_file in input_files:
print(f"\n--- 开始处理文件: {input_file} ---")
# 计算当前文件的行数
with open(input_file, 'r', encoding='utf-8') as f:
file_lines = sum(1 for _ in f)
with open(input_file, 'r', encoding='utf-8') as f:
for line in tqdm(f, total=file_lines, desc=f"处理 {input_file.split('/')[-1]}"):
try:
data = json.loads(line)
content = data.get(content_key)
if not content: continue
normalized_content = normalize_text(content)
# 1. 检查是否匹配已发现的任何模板
if normalized_content in final_templates:
if len(final_templates[normalized_content]) < 10:
final_templates[normalized_content].append(content)
continue
# 2. 检查是否匹配预定义规则
matched_by_rule = False
for rule in rules:
if rule['pattern'].match(content):
if normalized_content not in final_templates:
final_templates[normalized_content] = []
if len(final_templates[normalized_content]) < 10:
final_templates[normalized_content].append(content)
matched_by_rule = True
break
if matched_by_rule:
continue
# 3. 如果都未匹配,加入批处理列表
unmatched_batch.append(content)
# 4. 检查是否触发批处理
if len(unmatched_batch) >= batch_size:
print(f"\n--- 处理批次 #{batch_num} (大小: {len(unmatched_batch)}) ---")
newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10)
print(f"批次 #{batch_num}: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。")
for template, originals in newly_found_templates.items():
if template in final_templates:
remaining = 10 - len(final_templates[template])
final_templates[template].extend(originals[:remaining])
else:
final_templates[template] = originals[:10]
print(f"当前总模板数: {len(final_templates)}")
unmatched_batch.clear()
batch_num += 1
except (json.JSONDecodeError, AttributeError):
continue
# --- 收尾处理 ---
print("\n--- 文件处理完毕,处理最后一批剩余内容 ---")
if unmatched_batch:
print(f"处理最后一个批次 (大小: {len(unmatched_batch)})")
newly_found_templates = run_dbscan_on_corpus(unmatched_batch, eps, min_samples, 10)
print(f"最后一个批次: DBSCAN 发现了 {len(newly_found_templates)} 个潜在模板。")
for template, originals in newly_found_templates.items():
if template in final_templates:
remaining = 10 - len(final_templates[template])
final_templates[template].extend(originals[:remaining])
else:
final_templates[template] = originals[:10]
else:
print("没有剩余内容需要处理。")
# --- 输出 ---
print("\n--- 第 3 部分: 合并结果并保存 ---")
print(f"总共找到 {len(final_templates)} 个唯一的模板。")
with open(output_file, 'w', encoding='utf-8') as f:
for template, data_list in sorted(final_templates.items()):
if max_samples_per_template == 0:
json.dump({"content": template}, f, ensure_ascii=False)
else:
json.dump({"content": template, "data": data_list[:max_samples_per_template]}, f, ensure_ascii=False)
f.write('\n')
print(f"所有模板已成功写入到 '{output_file}'")
except FileNotFoundError as e:
print(f"错误:找不到输入文件 {e.filename}")
return
def extract_values_with_templates(input_files, template_file, output_file, content_key='content'):
"""
使用DBSCAN生成的模板从原始消息中提取参数值
支持多个输入文件。
"""
print("--- 开始使用模板提取参数值 ---")
# 读取模板
templates = []
with open(template_file, 'r', encoding='utf-8') as f:
for line in f:
template_data = json.loads(line)
templates.append(template_data['content'])
print(f"已加载 {len(templates)} 个模板")
# 从原始数据中提取值
extracted_values = []
total_lines = 0
for input_file in input_files:
with open(input_file, 'r', encoding='utf-8') as f:
total_lines += sum(1 for _ in f)
for input_file in input_files:
print(f"\n--- 开始处理文件: {input_file} ---")
# 计算当前文件的行数
with open(input_file, 'r', encoding='utf-8') as f:
file_lines = sum(1 for _ in f)
with open(input_file, 'r', encoding='utf-8') as f:
for line in tqdm(f, total=file_lines, desc=f"提取 {input_file.split('/')[-1]}"):
try:
data = json.loads(line)
content = data.get(content_key, '')
if not content:
continue
# 尝试匹配每个模板
for template in templates:
parameters = extract_parameters(template, content)
if parameters:
extracted_values.append({
'template': template,
'message': content,
'parameters': parameters
})
# 找到匹配就跳出循环
break
except (json.JSONDecodeError, Exception):
continue
# 保存提取的值
with open(output_file, 'w', encoding='utf-8') as f:
for item in extracted_values:
json.dump(item, f, ensure_ascii=False)
f.write('\n')
print(f"成功从 {len(extracted_values)} 条消息中提取参数,并保存到 '{output_file}'")
# --- 使用示例 ---
# 假设您已经运行了上一个脚本,生成了 'content_filtered.jsonl'
input_jsonl_files = ['content_filtered.jsonl', 'output.jsonl'] # 默认单个文件,可扩展为多个
output_template_file = 'templates_iterative.txt'
BATCH_PROCESSING_SIZE = 10000 # 可以根据你的内存和数据量调整
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Extract templates from GCash transaction data.')
parser.add_argument('--input_file', type=str, nargs='+', default=input_jsonl_files, help='Input JSONL file paths (multiple files supported)')
parser.add_argument('--output_file', type=str, default=output_template_file, help='Output template file path')
parser.add_argument('--batch_size', type=int, default=BATCH_PROCESSING_SIZE, help='Batch processing size (data volume)')
parser.add_argument('--eps', type=float, default=0.4, help='DBSCAN eps parameter')
parser.add_argument('--min_samples', type=int, default=5, help='DBSCAN min_samples parameter')
parser.add_argument('--extract_values', action='store_true', help='Extract values using generated templates')
parser.add_argument('--content_key', type=str, default='content', help='Key to extract content from JSON objects (default: content)')
args = parser.parse_args()
if args.extract_values:
# 执行参数提取
extract_values_with_templates(
input_files=args.input_file,
template_file='templates_iterative.txt',
output_file=args.output_file,
content_key=args.content_key
)
else:
# 执行模板提取
extract_templates_iterative(
input_files=args.input_file,
output_file=args.output_file,
rules=PREDEFINED_RULES,
batch_size=args.batch_size,
eps=args.eps,
min_samples=args.min_samples,
content_key=args.content_key
)