This commit is contained in:
ssp97 2025-04-04 20:16:51 +08:00
commit 3027dbddc5
9 changed files with 644 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.db
*.log
__pycache__

201
src/db/db.py Normal file
View File

@ -0,0 +1,201 @@
# sqlite ORM init
import os
import sqlite3
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import Session, declarative_base, sessionmaker
from .models.files import Files
from .models.base import Base
class Database:
def __init__(self, db_path=':memory:'):
self.db_path = db_path
self.engine = create_engine(f'sqlite:///{self.db_path}', echo=False)
self.Base = Base
self.Session = sessionmaker(bind=self.engine)
self.session = self.Session()
self.create_tables()
def create_tables(self):
"""创建表格"""
if not os.path.exists(self.db_path):
self.Base.metadata.create_all(self.engine)
else:
self.Base.metadata.reflect(self.engine) # 反射已存在的表格
def close(self):
"""关闭数据库连接"""
self.session.close()
self.engine.dispose()
def commit(self):
"""提交事务"""
self.session.commit()
def rollback(self):
"""回滚事务"""
self.session.rollback()
def add(self, obj):
"""添加数据"""
self.session.add(obj)
self.commit()
def delete(self, obj):
"""删除数据"""
self.session.delete(obj)
self.commit()
def query(self, model, **kwargs):
"""查询数据"""
return self.session.query(model).filter_by(**kwargs).all()
def update(self, obj):
"""更新数据"""
self.session.merge(obj)
self.commit()
def execute(self, sql, params=None):
"""执行SQL语句"""
with self.engine.connect() as conn:
if params:
result = conn.execute(sql, params)
else:
result = conn.execute(sql)
return result.fetchall()
def execute_many(self, sql, params):
"""批量执行SQL语句"""
with self.engine.connect() as conn:
conn.execute(sql, params)
self.commit()
def execute_script(self, script):
"""执行SQL脚本"""
with self.engine.connect() as conn:
conn.executescript(script)
self.commit()
def create_table(self, table_name, columns):
"""创建表格"""
columns_str = ', '.join([f'{col} {typ}' for col, typ in columns.items()])
sql = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})'
self.execute(sql)
def drop_table(self, table_name):
"""删除表格"""
sql = f'DROP TABLE IF EXISTS {table_name}'
self.execute(sql)
def table_exists(self, table_name):
"""判断表格是否存在"""
sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'"
result = self.execute(sql)
return len(result) > 0
def get_table_columns(self, table_name):
"""获取表格列名"""
sql = f"PRAGMA table_info({table_name})"
result = self.execute(sql)
return [row[1] for row in result]
def get_table_data(self, table_name):
"""获取表格数据"""
sql = f"SELECT * FROM {table_name}"
result = self.execute(sql)
return result
def get_table_count(self, table_name):
"""获取表格数据条数"""
sql = f"SELECT COUNT(*) FROM {table_name}"
result = self.execute(sql)
return result[0][0] if result else 0
def get_table_size(self, table_name):
"""获取表格大小"""
sql = f"SELECT SUM(pgsize) FROM dbstat WHERE name='{table_name}'"
result = self.execute(sql)
return result[0][0] if result else 0
def get_table_schema(self, table_name):
"""获取表格结构"""
sql = f"PRAGMA table_info({table_name})"
result = self.execute(sql)
return result
def get_table_indexes(self, table_name):
"""获取表格索引"""
sql = f"PRAGMA index_list({table_name})"
result = self.execute(sql)
return result
def get_table_index_info(self, index_name):
"""获取索引信息"""
sql = f"PRAGMA index_info({index_name})"
result = self.execute(sql)
return result
def get_table_index_columns(self, index_name):
"""获取索引列信息"""
sql = f"PRAGMA index_xinfo({index_name})"
result = self.execute(sql)
return result
def get_table_foreign_keys(self, table_name):
"""获取外键信息"""
sql = f"PRAGMA foreign_key_list({table_name})"
result = self.execute(sql)
return result
def get_table_triggers(self, table_name):
"""获取触发器信息"""
sql = f"PRAGMA trigger_list({table_name})"
result = self.execute(sql)
return result
def get_table_vacuum(self, table_name):
"""获取表格碎片信息"""
sql = f"PRAGMA optimize({table_name})"
result = self.execute(sql)
return result
def get_table_stats(self, table_name):
"""获取表格统计信息"""
sql = f"PRAGMA stats({table_name})"
result = self.execute(sql)
return result
def get_table_info(self, table_name):
"""获取表格信息"""
sql = f"PRAGMA table_info({table_name})"
result = self.execute(sql)
return result
def get_table_info_by_sql(self, sql):
"""获取表格信息"""
result = self.execute(sql)
return result
def get_table_info_by_sql_with_params(self, sql, params):
"""获取表格信息"""
result = self.execute(sql, params)
return result
def get_table_info_by_sql_with_params_and_limit(self, sql, params, limit):
"""获取表格信息"""
sql = f"{sql} LIMIT {limit}"
result = self.execute(sql, params)
return result
def get_table_info_by_sql_with_params_and_limit_and_offset(self, sql, params, limit, offset):
"""获取表格信息"""
sql = f"{sql} LIMIT {limit} OFFSET {offset}"
result = self.execute(sql, params)
return result
def get_db():
"""获取数据库连接"""
db = Database(db_path="btDedup.db")
db.create_tables()
return db

7
src/db/models/base.py Normal file
View File

@ -0,0 +1,7 @@
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Text
from sqlalchemy.orm import relationship, declarative_base
from datetime import datetime
Base = declarative_base()

21
src/db/models/files.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Text
from sqlalchemy.orm import relationship, declarative_base
from datetime import datetime
from .base import Base
class Files(Base):
__tablename__ = 'files'
id = Column(Integer, primary_key=True)
name = Column(String(255))
path = Column(String(255), unique=True)
size = Column(Integer)
abstractHash = Column(String(64))
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
def __repr__(self):
return f"<Files(id={self.id}, name='{self.name}', path='{self.path}', size={self.size}, abstractHash='{self.abstractHash}', created_at='{self.created_at}', updated_at='{self.updated_at}')>"

74
src/fileScan.py Normal file
View File

@ -0,0 +1,74 @@
import os
import sys
import argparse
from datetime import datetime
from .db.db import get_db
from .db.models.files import Files
from .utils.hash import calculate_sha256
def culculate_abstract_hash(file_path, abstract_len=128*1024):
"""计算文件的摘要哈希值"""
with open(file_path, 'rb') as f:
block = f.read(abstract_len)
return calculate_sha256(block)
return None
def scan_directory(directory_path):
"""扫描目录并返回文件信息列表"""
file_infos = []
for root, _, files in os.walk(directory_path):
for filename in files:
file_path = os.path.abspath(os.path.join(root, filename))
try:
file_size = os.path.getsize(file_path)
file_abstract_hash = culculate_abstract_hash(file_path)
file_infos.append({
'name': filename,
'path': file_path,
'size': file_size,
'abstractHash': file_abstract_hash
})
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return file_infos
def save_to_database(file_infos):
"""将文件信息保存到数据库"""
db = get_db()
try:
for info in file_infos:
file_record = Files(
name=info['name'],
path=info['path'],
size=info['size'],
abstractHash=info['abstractHash']
)
db.add(file_record)
print(f"成功保存 {len(file_infos)} 条文件记录到数据库")
except Exception as e:
print(f"保存到数据库时出错: {e}")
db.rollback()
finally:
db.close()
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='扫描文件夹内的所有文件,并将文件信息存入数据库')
parser.add_argument('directory', help='要扫描的目录路径')
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"错误: {args.directory} 不是有效的目录")
sys.exit(1)
print(f"开始扫描目录: {args.directory}")
file_infos = scan_directory(args.directory)
if file_infos:
print(f"找到 {len(file_infos)} 个文件,正在保存到数据库...")
save_to_database(file_infos)
else:
print("没有找到任何文件")
if __name__ == "__main__":
main()

57
src/torrentFindFile.py Normal file
View File

@ -0,0 +1,57 @@
import sys
import os
try:
from .db.db import get_db # 模块方式导入
from .db.models.files import Files
from .utils.torrentParse import TorrentParse
except ImportError:
from db.db import get_db # 直接运行方式导入
from db.models.files import Files
from utils.torrentParse import TorrentParse
def find_files_by_size(file_size):
"""根据文件大小查找数据库中的文件"""
db = get_db()
files = db.query(Files, size=file_size)
return [file.path for file in files]
def main():
if len(sys.argv) != 2:
print("用法: torrentFindFile.py <种子文件>")
return
torrent_file = sys.argv[1]
if not os.path.exists(torrent_file):
print(f"错误: 文件 {torrent_file} 不存在")
return
try:
# 解析种子文件
parser = TorrentParse()
parser.load_torrent_file(torrent_file)
parser.parse_torrent_file()
torrent_info = parser.get_torrent_info()
# 遍历种子中的文件
for file_info in torrent_info['files']:
file_size = file_info['length']
matching_files = find_files_by_size(file_size)
# print(f"\n文件: {file_info['path']} (大小: {file_size} 字节)")
if not matching_files:
# print("没有找到匹配的文件")
pass
elif len(matching_files) == 1:
print(f"\"{file_info['path']}\" \"{matching_files[0]}\"")
# print(f"\n文件: {file_info['path']} (大小: {file_size} 字节)")
# print(f"找到匹配文件: {matching_files[0]}")
# elif len(matching_files) > 1:
# print(f"找到多个匹配文件:")
# for f in matching_files:
# print(f" {f}")
except Exception as e:
print(f"错误: {str(e)}")
if __name__ == "__main__":
main()

134
src/torrenthardlink.py Normal file
View File

@ -0,0 +1,134 @@
import sys
import os
import argparse
from datetime import datetime
try:
from .db.db import get_db # 模块方式导入
from .db.models.files import Files
from .utils.torrentParse import TorrentParse
except ImportError:
from db.db import get_db # 直接运行方式导入
from db.models.files import Files
from utils.torrentParse import TorrentParse
def find_files_by_size(file_size):
"""根据文件大小查找数据库中的文件"""
db = get_db()
files = db.query(Files, size=file_size)
return [file.path for file in files]
def setup_target_path(torrent_path, dest_dir, file_path):
"""构建目标路径并创建父目录"""
# 从种子文件名获取目录名
torrent_name = os.path.splitext(os.path.basename(torrent_path))[0]
target_base = os.path.join(dest_dir, torrent_name)
# 保留原始文件路径结构
full_path = os.path.join(target_base, file_path)
target_dir = os.path.dirname(full_path)
os.makedirs(target_dir, exist_ok=True)
return full_path
def create_hardlink(src, dest, force=False, log=None):
"""创建硬链接并处理错误"""
try:
if os.path.exists(dest):
if force:
os.remove(dest)
else:
msg = f"目标文件已存在: {dest}"
if log: log("跳过", src, dest, msg)
return False
os.link(src, dest)
if log: log("成功", src, dest)
return True
except Exception as e:
error_msg = f"创建失败: {str(e)}"
if log: log("错误", src, dest, error_msg)
return False
def process_torrent(torrent_path, dest_dir, force, log_handler):
"""处理单个种子文件"""
try:
parser = TorrentParse()
parser.load_torrent_file(torrent_path)
parser.parse_torrent_file()
torrent_info = parser.get_torrent_info()
except Exception as e:
log_handler("错误", "", "", f"种子解析失败: {str(e)}", torrent_path)
return
for file_info in torrent_info['files']:
file_size = file_info['length']
file_path = file_info['path']
matching_files = find_files_by_size(file_size)
dest_path = setup_target_path(torrent_path, dest_dir, file_path)
if len(matching_files) == 0:
log_handler("跳过", file_path, dest_path, "没有匹配文件", torrent_path)
elif len(matching_files) > 1:
log_handler("跳过", file_path, dest_path,
f"找到{len(matching_files)}个匹配文件", torrent_path)
else:
create_hardlink(
src=matching_files[0],
dest=dest_path,
force=force,
log=lambda status, s, d, msg="": log_handler(
status, s, d, msg, torrent_path)
)
def main():
parser = argparse.ArgumentParser(
description="根据种子文件创建硬链接到下载目录",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("source_dir", help="种子文件目录")
parser.add_argument("dest_dir", help="下载目标目录")
parser.add_argument("-f", "--force", action="store_true",
help="覆盖已存在的文件")
parser.add_argument("-l", "--log", default="hardlink.log",
help="日志文件路径")
args = parser.parse_args()
# 初始化日志系统
log_header = ["时间", "状态", "种子文件", "原始文件", "目标文件", "详细信息"]
with open(args.log, "a", encoding="utf-8") as f:
# 如果是新日志文件,先写入表头
if f.tell() == 0:
f.write("\t".join(log_header) + "\n")
def log_handler(status, src, dest, msg="", torrent=""):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
record = [
timestamp,
status,
os.path.basename(torrent),
os.path.basename(src),
os.path.relpath(dest, args.dest_dir),
msg
]
f.write("\t".join(record) + "\n")
print(f"[{status}] {src} => {dest} | {msg}")
# 遍历种子目录
for root, _, files in os.walk(args.source_dir):
for filename in files:
if filename.lower().endswith(".torrent"):
torrent_path = os.path.join(root, filename)
print(f"\n处理种子: {filename}")
process_torrent(
torrent_path=torrent_path,
dest_dir=args.dest_dir,
force=args.force,
log_handler=log_handler
)
print(f"\n操作完成,日志已保存到: {args.log}")
if __name__ == "__main__":
main()

13
src/utils/hash.py Normal file
View File

@ -0,0 +1,13 @@
# 计算SHA
import hashlib
def calculate_sha256(input_bytes):
"""
Calculate the SHA-256 hash of the given input bytes.
:param input_bytes: The bytes to hash.
:return: The SHA-256 hash as a hexadecimal string.
"""
sha256_hash = hashlib.sha256(input_bytes).hexdigest()
return sha256_hash

134
src/utils/torrentParse.py Normal file
View File

@ -0,0 +1,134 @@
import hashlib
import re
def bdecode(data:bytes):
"""自定义bencode解码器"""
def decode_next(data, index=0):
if data[index] == ord('d'):
index += 1
result = {}
while data[index] != ord('e'):
key, index = decode_next(data, index)
value, index = decode_next(data, index)
result[key.decode() if isinstance(key, bytes) else key] = value
index += 1
return result, index
elif data[index] == ord('l'):
index += 1
result = []
while data[index] != ord('e'):
item, index = decode_next(data, index)
result.append(item)
index += 1
return result, index
elif data[index] == ord('i'):
index += 1
end = data.index(ord('e'), index)
num = int(data[index:end])
return num, end + 1
else:
match = re.match(rb'(\d+):', data[index:])
if not match:
raise ValueError("Invalid bencode format")
length = int(match.group(1))
colon_pos = match.end()
start = index + colon_pos
end = start + length
return data[start:end], end
d = decode_next(data)
return d[0]
def parse_torrent(data:bytes):
"""
解析bt文件返回文件信息和块信息
:param data: bt文件数据
:return: 文件信息和块信息
"""
torrent = bdecode(data)
info = torrent['info']
files = []
if 'files' in info:
for file in info['files']:
files.append({
'length': file['length'],
'path': '/'.join([p.decode() for p in file['path']])
})
else:
files.append({
'length': info['length'],
'path': info['name'].decode()
})
return {
'name': info['name'].decode(),
'piece_length': info['piece length'],
'pieces': info['pieces'],
'files': files,
'comment': torrent.get('comment', b'').decode(),
'created_by': torrent.get('created by', b'').decode(),
'creation_date': torrent.get('creation date', 0),
'announce': torrent.get('announce', b'').decode(),
'url_list': [url.decode() for url in torrent.get('url-list', [])],
'encoding': torrent.get('encoding', b'').decode(),
}
# if __name__ == '__main__':
# with open('test/test.torrent', 'rb') as f:
# data = f.read()
# torrent = parse_torrent(data)
# print(torrent["files"])
# # print(torrent['pieces'])
# # 计算块的sha1值
# pieces = torrent['pieces']
# piece_length = torrent['piece_length']
# for i in range(len(pieces) // 20):
# sha1 = pieces[i * 20:(i + 1) * 20]
# # print(sha1.hex())
class TorrentParse:
def __init__(self):
self.torrent_data = None
self.torrent_info = None
def load_torrent_file(self, file_path:str):
"""加载bt文件"""
with open(file_path, 'rb') as f:
self.torrent_data = f.read()
def load_torrent_data(self, data:bytes):
"""加载bt数据"""
self.torrent_data = data
def parse_torrent_file(self):
"""解析bt文件"""
self.torrent_info = parse_torrent(self.torrent_data)
def get_torrent_info(self):
"""获取bt文件信息"""
return self.torrent_info
def get_path_pieces(self, path):
"""获取文件路径和块信息"""
for file in self.torrent_info['files']:
if file['path'] == path:
return file['length'], file['path']
return None, None
# 获取文件块大小
def get_piece_length(self):
"""获取文件块大小"""
return self.torrent_info['piece_length']
# 校验文件完整性
# 校验文件完整性
def check_file_integrity(self, data:bytes):
"""校验文件完整性"""
sha1 = hashlib.sha1(data).hexdigest()
return sha1 == self.torrent_info['pieces']