2025-09-08 23:40:28 +08:00
import json
import re
import numpy as np
from sklearn . feature_extraction . text import TfidfVectorizer
from sklearn . cluster import DBSCAN
from sklearn . metrics . pairwise import cosine_similarity
from tqdm import tqdm
2025-09-09 01:40:25 +08:00
import argparse
2025-09-08 23:40:28 +08:00
# --- 规则和辅助函数 (与之前相同) ---
PREDEFINED_RULES = [
# {'name': 'Transfer', 'pattern': re.compile(r'^Transfer from \d+ to \d+$')},
# {'name': 'International Remittance', 'pattern': re.compile(r'^International Remittance$')},
# {'name': 'Bill Payment', 'pattern': re.compile(r'^Bill payment successful for amount \d+$')},
# {'name': 'New Message', 'pattern': re.compile(r'^You have a new message from \d+\.$')},
# # 新增规则:匹配类似 "Sent GCash to GoTyme Bank with account ending in 6784"
# {'name': 'Sent GCash to Account', 'pattern': re.compile(r'^Sent GCash to .+? with account ending in \d+$')}
]
2025-09-09 01:40:25 +08:00
# 占位符到正则表达式的映射
PLACEHOLDER_PATTERNS = {
' <金额> ' : r ' ([ \ d,.]+) ' ,
' <付款人名称> ' : r ' (.+?) ' ,
' <收款人名称> ' : r ' (.+?) ' ,
' <付款人号码> ' : r ' ([ \ d \ w \ + \ - \ ( \ )]+) ' ,
' <收款人号码> ' : r ' ([ \ d \ w \ + \ - \ ( \ )]+) ' ,
' <银行4位数尾号> ' : r ' ( \ d {4} ) ' ,
' <参考号> ' : r ' (.+?) ' ,
' <交易单号> ' : r ' (.+?) ' ,
' <日期时间> ' : r ' (.+?) ' ,
' <日期> ' : r ' ( \ d {2} - \ d {2} - \ d {4} ) ' ,
' <时间> ' : r ' ( \ d { 1,2}: \ d {2} \ s[AP]M) ' ,
' <消息> ' : r ' (.+?) ' ,
' <流水号> ' : r ' (.+?) ' ,
' <网络或发票号> ' : r ' (.+?) ' ,
' <交易类型> ' : r ' (.+?) ' ,
}
2025-09-08 23:40:28 +08:00
def normalize_text ( text ) :
# 模式 8: 从银行收款 (这条规则必须先运行)
text = re . sub (
2025-09-09 01:32:20 +08:00
r ' (?i)Received GCash from (.+?) with account ending in ( \ d+) (via .+|and invno:.+)$ ' ,
2025-09-08 23:40:28 +08:00
r ' Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <网络或发票号> ' ,
text
)
# 模式 13: 向未验证账户发送凭证
# 结构: You have sent <货币> <金额> to an unverified account <手机号> on <日期> <时间> with MSG: <消息>. Your new balance is <货币> <金额>. Ref. No. <流水号>. Go to...
text = re . sub (
2025-09-09 01:32:20 +08:00
r ' ^You have sent PHP [ \ d,]+ \ . \ d {2} to an unverified account [ \ d \ w \ + \ - \ ( \ )]+ on \ d {2} - \ d {2} - \ d {4} \ s \ d { 1,2}: \ d {2} \ s[AP]M with MSG: \ ..*? Your new balance is PHP [ \ d,]+ \ . \ d {2} \ . Ref \ . No \ . \ d+ \ . Go to GCash Help Center to know how to secure your transactions \ .$ ' ,
2025-09-08 23:40:28 +08:00
r ' You have sent PHP <金额> to an unverified account <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <流水号>. Go to GCash Help Center to know how to secure your transactions. ' ,
text
)
# 模式 12: 详细发送凭证 (更新后可处理多段姓名)
# 结构: You have sent <货币> <金额> to <收款人> <手机号> on <日期> <时间> with MSG: <消息>. Your new balance is <货币> <金额>. Ref. No. <流水号>.
text = re . sub (
2025-09-09 01:32:20 +08:00
r ' ^You have sent PHP [ \ d,]+ \ . \ d {2} to (?:[A-Z \ *]+ \ s)+[A-Z \ *] \ . \ s[ \ d \ w \ + \ - \ ( \ )]+ on \ d {2} - \ d {2} - \ d {4} \ s \ d { 1,2}: \ d {2} \ s[AP]M with MSG: \ ..*? Your new balance is PHP [ \ d,]+ \ . \ d {2} \ . Ref \ . No \ . \ d+ \ .$ ' ,
2025-09-08 23:40:28 +08:00
r ' You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <流水号>. ' ,
text
)
# 模式 11 (机构): 详细收款凭证
# 结构: You have received ... of GCash from <来源>. Your new balance is ... <日期时间>. Ref. No. <流水号>. Use now to buy load...
text = re . sub (
r ' ^You have received \ s+(?:PHP \ s+)?[ \ d,.]+ \ s+of GCash from \ s+.+? \ . Your new balance is \ s+(?:PHP \ s+)?[ \ d,.]+ \ s+ \ d { 1,2}- \ d { 1,2}- \ d { 2,4} \ s+ \ d { 1,2}: \ d { 1,2}(?:: \ d { 1,2})? \ s+[AP]M \ . Ref \ . No \ . \ s+.+? \ . Use now to buy load, purchase items, send money, pay bills, and a lot more!$ ' ,
r ' You have received <金额> of GCash from <付款人名称>. Your new balance is <金额>. <日期时间>. Ref. No. <流水号>. Use now to buy load, purchase items, send money, pay bills, and a lot more! ' ,
text
)
# 模式 11 (个人): 详细收款凭证 (最终修正版,兼容多种手机号/余额/结尾格式)
text = re . sub (
r ' ^You have received PHP [ \ d,.]+ \ s+of GCash from .+? w/ MSG: .* \ . (?:Your new balance is PHP [ \ d,.]* \ . \ s)?Ref \ . No \ . \ d+ \ .(?: To access your funds,.*)?$ ' ,
r ' You have received PHP <金额> of GCash from <付款人名称> w/ MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>. ' ,
text
)
# 模式 12: 详细发送凭证 (最终修正版,兼容所有已知姓名格式)
text = re . sub (
2025-09-09 01:32:20 +08:00
r ' ^You have sent PHP [ \ d,]+ \ . \ d {2} to .+? [ \ d \ w \ + \ - \ ( \ )]+ on \ d {2} - \ d {2} - \ d {4} \ s \ d { 1,2}: \ d {2} \ s[AP]M with MSG: \ ..*? Your new balance is PHP [ \ d,]+ \ . \ d {2} \ . Ref \ . No \ . \ d+ \ .$ ' ,
2025-09-08 23:40:28 +08:00
r ' You have sent PHP <金额> to <收款人名称> <收款人号码> on <日期> <时间> with MSG: <消息>. Your new balance is PHP <金额>. Ref. No. <参考号>. ' ,
text
)
# 模式 10 (来自用户最初的模板列表,这里将其具体化)
# 结构: You have paid <金额> via GCash to <接收方> on <日期时间>. Ref. No. <参考号>. QRPH Invoice No. <参考号>.
text = re . sub (
r ' ^You have paid P[ \ d,.]+ \ s+via GCash to .+? on \ d { 1,2}- \ d { 1,2}- \ d { 2,4} \ s \ d { 1,2}: \ d { 1,2}: \ d { 1,2} \ s+[AP]M \ . Ref \ . No \ . \ s+ \ d+ \ . QRPH Invoice No \ . \ s+ \ d+ \ .$ ' ,
r ' You have paid P<金额> via GCash to <收款人名称> on <日期时间>. Ref. No. <参考号>. QRPH Invoice No. <参考号>. ' ,
text
)
# 模式 9 (来自用户最初的模板列表,这里将其具体化)
# 结构: Sent GCash to <机构名> with account ending in <尾号>
text = re . sub (
r ' Sent GCash to (.+?) with account ending in ( \ d+)$ ' ,
r ' Sent GCash to <收款人名称> with account ending in <银行4位数尾号> ' ,
text
)
# 新增规则:模式 7: 从一般来源收款 (这条规则紧随其后)
# 它只会处理没有被上面那条规则匹配到的 "Received GCash from..."
text = re . sub (
2025-09-09 01:32:20 +08:00
r ' (?i)^Received GCash from [^<]+$ ' ,
2025-09-08 23:40:28 +08:00
r ' Received GCash from <付款人名称> ' ,
text
)
# 模式 6: 带商户交易单号的支付
# 结构: Payment to <商户名>, Merchant Transaction Number: <交易单号>
text = re . sub (
r ' Payment to (.+?), Merchant Transaction Number: (.+)$ ' ,
r ' Payment to <收款人名称>, Merchant Transaction Number: <交易单号> ' ,
text
)
# 模式 5 (来自用户最初的模板列表,这里将其具体化)
# 结构: Payment to <商户名>
text = re . sub (
2025-09-09 01:32:20 +08:00
r ' ^Payment to ([^,]+)$ ' ,
2025-09-08 23:40:28 +08:00
r ' Payment to <收款人名称> ' ,
text
)
text = re . sub (
r ' ^(.+?) with (Ref \ . no \ .|Parent Ref \ .No \ .|Reference No \ .) (.+)$ ' ,
2025-09-09 01:32:20 +08:00
r ' <交易类型> with Ref. no. <参考号> ' ,
2025-09-08 23:40:28 +08:00
text
)
text = re . sub ( r ' Sent GCash to <收款人名称> with account ending in ( \ d+)$ ' , r ' Sent GCash to <收款人名称> with account ending in <银行4位数尾号> ' , text )
text = re . sub ( r ' ^Transfer from \ S+ to \ S+$ ' , r ' Transfer from <付款人号码> to <收款人号码> ' , text )
# 模式 8: 从银行收款
# 结构: Received GCash from <机构名> with account ending in <尾号> via <网络> or with invno:<...>
text = re . sub (
2025-09-09 01:32:20 +08:00
r ' (?i)Received GCash from (.+?) with account ending in ( \ d+) (via .+|and invno:.+)$ ' ,
2025-09-08 23:40:28 +08:00
r ' Received GCash from <付款人名称> with account ending in <银行4位数尾号> via <流水号> ' ,
text
)
# 新增规则: Buy Load Transaction
text = re . sub (
r ' ^Buy Load Transaction for .+$ ' ,
r ' Buy Load Transaction for <付款人号码> ' ,
text
)
# 新增规则: Refund
text = re . sub (
r ' ^Refund from .+$ ' ,
r ' Refund from <收款人名称> ' ,
text
)
return text
2025-09-09 01:40:25 +08:00
def template_to_regex ( template ) :
"""
将模板转换为可用于提取参数的正则表达式
"""
# 转义模板中的特殊字符,但保留占位符
escaped_template = re . escape ( template )
# 将占位符映射到对应的正则表达式捕获组
for placeholder , pattern in PLACEHOLDER_PATTERNS . items ( ) :
escaped_placeholder = re . escape ( placeholder )
# 替换占位符为对应的捕获组
escaped_template = escaped_template . replace ( escaped_placeholder , pattern )
return escaped_template
def extract_parameters ( template , message ) :
"""
从消息中提取参数值
"""
# 生成正则表达式
pattern = template_to_regex ( template )
# 匹配消息
match = re . search ( pattern , message )
if match :
# 获取所有捕获组
values = match . groups ( )
# 获取模板中的占位符
placeholders = re . findall ( r ' <[^>]+> ' , template )
# 创建参数字典
parameters = { }
for i , placeholder in enumerate ( placeholders ) :
if i < len ( values ) :
parameters [ placeholder ] = values [ i ]
return parameters
return { }
2025-09-09 01:32:20 +08:00
def run_dbscan_on_corpus ( corpus , eps , min_samples , max_samples = 10 ) :
if not corpus : return { }
2025-09-08 23:40:28 +08:00
processed_corpus = [ normalize_text ( text ) for text in corpus ]
try :
vectorizer = TfidfVectorizer ( )
X = vectorizer . fit_transform ( processed_corpus )
db = DBSCAN ( eps = eps , min_samples = min_samples , metric = ' cosine ' , n_jobs = - 1 ) . fit ( X )
labels = db . labels_
2025-09-09 01:32:20 +08:00
dbscan_templates = { }
2025-09-08 23:40:28 +08:00
unique_labels = set ( labels )
for label in unique_labels :
class_member_indices = np . where ( labels == label ) [ 0 ]
if label == - 1 : # 处理噪声点
for idx in class_member_indices :
2025-09-09 01:32:20 +08:00
original = corpus [ idx ]
normalized = processed_corpus [ idx ]
if normalized not in dbscan_templates :
dbscan_templates [ normalized ] = [ ]
if len ( dbscan_templates [ normalized ] ) < max_samples :
dbscan_templates [ normalized ] . append ( original )
2025-09-08 23:40:28 +08:00
continue
# 处理聚类
cluster_vectors = X [ class_member_indices ]
centroid = np . asarray ( cluster_vectors . mean ( axis = 0 ) )
similarities = cosine_similarity ( cluster_vectors , centroid )
most_representative_idx_in_cluster = np . argmax ( similarities )
original_corpus_idx = class_member_indices [ most_representative_idx_in_cluster ]
2025-09-09 01:32:20 +08:00
most_representative_normalized = processed_corpus [ original_corpus_idx ]
cluster_originals = [ corpus [ idx ] for idx in class_member_indices ]
dbscan_templates [ most_representative_normalized ] = cluster_originals [ : max_samples ]
2025-09-08 23:40:28 +08:00
return dbscan_templates
except ValueError :
# 如果批次中所有词都在停用词表中, TfidfVectorizer会报错
print ( " 警告: DBSCAN批次处理失败, 可能因为内容过于单一或简短。将内容视为独立模板。 " )
2025-09-09 01:32:20 +08:00
return { processed_corpus [ i ] : [ corpus [ i ] ] [ : max_samples ] for i in range ( len ( corpus ) ) }
2025-09-08 23:40:28 +08:00
2025-09-09 01:32:20 +08:00
def extract_templates_iterative ( input_file , output_file , rules , batch_size = 1000 , eps = 0.4 , min_samples = 2 , max_samples_per_template = 0 ) :
2025-09-08 23:40:28 +08:00
"""
2025-09-09 01:32:20 +08:00
使用小批量迭代的混合策略来提取模板 , 并为每个模板收集最多10个原始数据集 。
2025-09-08 23:40:28 +08:00
"""
print ( " --- 开始迭代式模板提取 --- " )
2025-09-09 01:32:20 +08:00
final_templates = { } # template -> list of original contents
2025-09-08 23:40:28 +08:00
unmatched_batch = [ ]
batch_num = 1
try :
print ( f " 步骤 1: 逐行处理 ' { input_file } ' 并动态构建模板库... " )
with open ( input_file , ' r ' , encoding = ' utf-8 ' ) as f :
total_lines = sum ( 1 for _ in f )
with open ( input_file , ' r ' , encoding = ' utf-8 ' ) as f :
for line in tqdm ( f , total = total_lines , desc = " 主进程 " ) :
try :
content = json . loads ( line ) . get ( ' content ' )
if not content : continue
normalized_content = normalize_text ( content )
# 1. 检查是否匹配已发现的任何模板
if normalized_content in final_templates :
2025-09-09 01:32:20 +08:00
if len ( final_templates [ normalized_content ] ) < 10 :
final_templates [ normalized_content ] . append ( content )
2025-09-08 23:40:28 +08:00
continue
# 2. 检查是否匹配预定义规则
matched_by_rule = False
for rule in rules :
if rule [ ' pattern ' ] . match ( content ) :
2025-09-09 01:32:20 +08:00
if normalized_content not in final_templates :
final_templates [ normalized_content ] = [ ]
if len ( final_templates [ normalized_content ] ) < 10 :
final_templates [ normalized_content ] . append ( content )
2025-09-08 23:40:28 +08:00
matched_by_rule = True
break
if matched_by_rule :
continue
# 3. 如果都未匹配,加入批处理列表
unmatched_batch . append ( content )
# 4. 检查是否触发批处理
if len ( unmatched_batch ) > = batch_size :
print ( f " \n --- 处理批次 # { batch_num } (大小: { len ( unmatched_batch ) } ) --- " )
2025-09-09 01:32:20 +08:00
newly_found_templates = run_dbscan_on_corpus ( unmatched_batch , eps , min_samples , 10 )
2025-09-08 23:40:28 +08:00
print ( f " 批次 # { batch_num } : DBSCAN 发现了 { len ( newly_found_templates ) } 个潜在模板。 " )
2025-09-09 01:32:20 +08:00
for template , originals in newly_found_templates . items ( ) :
if template in final_templates :
remaining = 10 - len ( final_templates [ template ] )
final_templates [ template ] . extend ( originals [ : remaining ] )
else :
final_templates [ template ] = originals [ : 10 ]
2025-09-08 23:40:28 +08:00
print ( f " 当前总模板数: { len ( final_templates ) } " )
unmatched_batch . clear ( )
batch_num + = 1
except ( json . JSONDecodeError , AttributeError ) :
continue
# --- 收尾处理 ---
print ( " \n --- 文件处理完毕,处理最后一批剩余内容 --- " )
if unmatched_batch :
print ( f " 处理最后一个批次 (大小: { len ( unmatched_batch ) } ) " )
2025-09-09 01:32:20 +08:00
newly_found_templates = run_dbscan_on_corpus ( unmatched_batch , eps , min_samples , 10 )
2025-09-08 23:40:28 +08:00
print ( f " 最后一个批次: DBSCAN 发现了 { len ( newly_found_templates ) } 个潜在模板。 " )
2025-09-09 01:32:20 +08:00
for template , originals in newly_found_templates . items ( ) :
if template in final_templates :
remaining = 10 - len ( final_templates [ template ] )
final_templates [ template ] . extend ( originals [ : remaining ] )
else :
final_templates [ template ] = originals [ : 10 ]
2025-09-08 23:40:28 +08:00
else :
print ( " 没有剩余内容需要处理。 " )
# --- 输出 ---
print ( " \n --- 第 3 部分: 合并结果并保存 --- " )
print ( f " 总共找到 { len ( final_templates ) } 个唯一的模板。 " )
with open ( output_file , ' w ' , encoding = ' utf-8 ' ) as f :
2025-09-09 01:32:20 +08:00
for template , data_list in sorted ( final_templates . items ( ) ) :
if max_samples_per_template == 0 :
json . dump ( { " content " : template } , f , ensure_ascii = False )
else :
json . dump ( { " content " : template , " data " : data_list [ : max_samples_per_template ] } , f , ensure_ascii = False )
2025-09-08 23:40:28 +08:00
f . write ( ' \n ' )
print ( f " 所有模板已成功写入到 ' { output_file } ' 。 " )
except FileNotFoundError :
print ( f " 错误:找不到输入文件 ' { input_file } ' 。 " )
return
2025-09-09 01:40:25 +08:00
def extract_values_with_templates ( input_file , template_file , output_file ) :
"""
使用DBSCAN生成的模板从原始消息中提取参数值
"""
print ( " --- 开始使用模板提取参数值 --- " )
# 读取模板
templates = [ ]
with open ( template_file , ' r ' , encoding = ' utf-8 ' ) as f :
for line in f :
template_data = json . loads ( line )
templates . append ( template_data [ ' content ' ] )
print ( f " 已加载 { len ( templates ) } 个模板 " )
# 从原始数据中提取值
extracted_values = [ ]
with open ( input_file , ' r ' , encoding = ' utf-8 ' ) as f :
total_lines = sum ( 1 for _ in f )
with open ( input_file , ' r ' , encoding = ' utf-8 ' ) as f :
for line in tqdm ( f , total = total_lines , desc = " 提取参数 " ) :
try :
data = json . loads ( line )
content = data . get ( ' content ' , ' ' )
if not content :
continue
# 尝试匹配每个模板
for template in templates :
parameters = extract_parameters ( template , content )
if parameters :
extracted_values . append ( {
' template ' : template ,
' message ' : content ,
' parameters ' : parameters
} )
# 找到匹配就跳出循环
break
except ( json . JSONDecodeError , Exception ) :
continue
# 保存提取的值
with open ( output_file , ' w ' , encoding = ' utf-8 ' ) as f :
for item in extracted_values :
json . dump ( item , f , ensure_ascii = False )
f . write ( ' \n ' )
print ( f " 成功从 { len ( extracted_values ) } 条消息中提取参数,并保存到 ' { output_file } ' " )
2025-09-08 23:40:28 +08:00
# --- 使用示例 ---
# 假设您已经运行了上一个脚本,生成了 'content_filtered.jsonl'
input_jsonl_file = ' content_filtered.jsonl '
output_template_file = ' templates_iterative.txt '
BATCH_PROCESSING_SIZE = 10000 # 可以根据你的内存和数据量调整
2025-09-09 01:40:25 +08:00
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = ' Extract templates from GCash transaction data. ' )
parser . add_argument ( ' --input_file ' , type = str , default = input_jsonl_file , help = ' Input JSONL file path ' )
parser . add_argument ( ' --output_file ' , type = str , default = output_template_file , help = ' Output template file path ' )
parser . add_argument ( ' --batch_size ' , type = int , default = BATCH_PROCESSING_SIZE , help = ' Batch processing size (data volume) ' )
parser . add_argument ( ' --eps ' , type = float , default = 0.4 , help = ' DBSCAN eps parameter ' )
parser . add_argument ( ' --min_samples ' , type = int , default = 5 , help = ' DBSCAN min_samples parameter ' )
parser . add_argument ( ' --extract_values ' , action = ' store_true ' , help = ' Extract values using generated templates ' )
args = parser . parse_args ( )
if args . extract_values :
# 执行参数提取
extract_values_with_templates (
input_file = args . input_file ,
template_file = args . output_file ,
output_file = ' extracted_parameters.jsonl '
)
else :
# 执行模板提取
extract_templates_iterative (
input_file = args . input_file ,
output_file = args . output_file ,
rules = PREDEFINED_RULES ,
batch_size = args . batch_size ,
eps = args . eps ,
min_samples = args . min_samples
)