添加updaeEXIF.py
This commit is contained in:
parent
45ca37124d
commit
6f3cfa6c87
223
updateEXIF.py
Normal file
223
updateEXIF.py
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
from PIL import Image
|
||||||
|
from PIL.ExifTags import TAGS, GPSTAGS
|
||||||
|
from datetime import datetime
|
||||||
|
import piexif
|
||||||
|
import logging
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
# 设置日志
|
||||||
|
logging.basicConfig(
|
||||||
|
encoding='utf-8',
|
||||||
|
filename='photo_process.log',
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S'
|
||||||
|
)
|
||||||
|
def get_correct_extension(image_path):
|
||||||
|
"""获取图片的真实格式"""
|
||||||
|
try:
|
||||||
|
with Image.open(image_path) as img:
|
||||||
|
format = img.format.lower()
|
||||||
|
# 如果是jpeg/jpg格式,统一返回当前后缀
|
||||||
|
if format in ['jpeg', 'jpg']:
|
||||||
|
return os.path.splitext(image_path)[1].lower()
|
||||||
|
return '.' + format if format else None
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"无法识别图片格式或非图片文件 {image_path}: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fix_image_extension(image_path):
|
||||||
|
"""修正图片文件后缀"""
|
||||||
|
# 检查是否为支持的图片格式
|
||||||
|
supported_formats = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp','.heic']
|
||||||
|
current_ext = os.path.splitext(image_path)[1].lower()
|
||||||
|
|
||||||
|
# 如果不是支持的格式,直接返回
|
||||||
|
if current_ext not in supported_formats:
|
||||||
|
logging.info(f"跳过非图片文件: {image_path}")
|
||||||
|
return image_path, False
|
||||||
|
|
||||||
|
correct_ext = get_correct_extension(image_path)
|
||||||
|
if not correct_ext:
|
||||||
|
return image_path, False
|
||||||
|
|
||||||
|
# 如果当前是jpg/jpeg,不进行修改
|
||||||
|
if current_ext in ['.jpg', '.jpeg'] and correct_ext in ['.jpg', '.jpeg']:
|
||||||
|
return image_path, False
|
||||||
|
|
||||||
|
if current_ext != correct_ext:
|
||||||
|
new_path = os.path.splitext(image_path)[0] + correct_ext
|
||||||
|
try:
|
||||||
|
# 如果目标文件已存在,先删除
|
||||||
|
if os.path.exists(new_path):
|
||||||
|
os.remove(new_path)
|
||||||
|
shutil.move(image_path, new_path)
|
||||||
|
logging.info(f"修正文件后缀: {image_path} -> {new_path}")
|
||||||
|
return new_path, True
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"修正文件后缀失败 {image_path}: {str(e)}")
|
||||||
|
return image_path, False
|
||||||
|
return image_path, False
|
||||||
|
|
||||||
|
def get_exif_data(image):
|
||||||
|
"""获取图片的EXIF数据"""
|
||||||
|
try:
|
||||||
|
exif_dict = piexif.load(image.info['exif'])
|
||||||
|
except:
|
||||||
|
exif_dict = {'0th': {}, 'Exif': {}, 'GPS': {}}
|
||||||
|
return exif_dict
|
||||||
|
|
||||||
|
def convert_to_degrees(value):
|
||||||
|
"""将GPS坐标转换为度数格式"""
|
||||||
|
d = float(value[0][0]) / float(value[0][1])
|
||||||
|
m = float(value[1][0]) / float(value[1][1])
|
||||||
|
s = float(value[2][0]) / float(value[2][1])
|
||||||
|
return d + (m / 60.0) + (s / 3600.0)
|
||||||
|
|
||||||
|
def set_gps_location(exif_dict, lat, lon, alt):
|
||||||
|
"""设置GPS信息"""
|
||||||
|
if lat == 0 and lon == 0:
|
||||||
|
return exif_dict
|
||||||
|
|
||||||
|
gps_ifd = {
|
||||||
|
piexif.GPSIFD.GPSVersionID: (2, 2, 0, 0),
|
||||||
|
piexif.GPSIFD.GPSLatitudeRef: 'N' if lat >= 0 else 'S',
|
||||||
|
piexif.GPSIFD.GPSLatitude: piexif.GPSHelper.deg_to_dms(abs(lat)),
|
||||||
|
piexif.GPSIFD.GPSLongitudeRef: 'E' if lon >= 0 else 'W',
|
||||||
|
piexif.GPSIFD.GPSLongitude: piexif.GPSHelper.deg_to_dms(abs(lon)),
|
||||||
|
}
|
||||||
|
|
||||||
|
if alt != 0:
|
||||||
|
gps_ifd[piexif.GPSIFD.GPSAltitude] = (int(abs(alt) * 100), 100)
|
||||||
|
gps_ifd[piexif.GPSIFD.GPSAltitudeRef] = 1 if alt < 0 else 0
|
||||||
|
|
||||||
|
exif_dict['GPS'] = gps_ifd
|
||||||
|
return exif_dict
|
||||||
|
|
||||||
|
def process_image(image_path, json_path):
|
||||||
|
"""处理单个图片文件"""
|
||||||
|
try:
|
||||||
|
# 读取JSON文件
|
||||||
|
with open(json_path, 'r', encoding='utf-8') as f:
|
||||||
|
json_data = json.load(f)
|
||||||
|
|
||||||
|
# 尝试修正文件后缀
|
||||||
|
fixed_image_path, was_fixed = fix_image_extension(image_path)
|
||||||
|
if not was_fixed and not os.path.exists(fixed_image_path):
|
||||||
|
logging.error(f"无法处理的图片文件: {image_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 打开图片
|
||||||
|
image = Image.open(fixed_image_path)
|
||||||
|
|
||||||
|
# 获取现有EXIF数据
|
||||||
|
exif_dict = get_exif_data(image)
|
||||||
|
# print(f"exif_dict: {exif_dict['Exif']}")
|
||||||
|
modified = False
|
||||||
|
|
||||||
|
# 检查并更新时间戳
|
||||||
|
# if 'Exif' not in exif_dict:
|
||||||
|
if exif_dict['Exif'] == {}:
|
||||||
|
# print(f"exif_dict['Exif'] is None")
|
||||||
|
exif_dict['Exif'] = {}
|
||||||
|
|
||||||
|
# 更新拍摄时间
|
||||||
|
photo_timestamp = int(json_data['photoTakenTime']['timestamp'])
|
||||||
|
photo_date_time = datetime.fromtimestamp(photo_timestamp).strftime("%Y:%m:%d %H:%M:%S")
|
||||||
|
exif_dict['Exif'][piexif.ExifIFD.DateTimeOriginal] = photo_date_time
|
||||||
|
|
||||||
|
# 更新创建时间
|
||||||
|
creation_timestamp = int(json_data['creationTime']['timestamp'])
|
||||||
|
creation_date_time = datetime.fromtimestamp(creation_timestamp).strftime("%Y:%m:%d %H:%M:%S")
|
||||||
|
exif_dict['0th'][piexif.ImageIFD.DateTime] = creation_date_time
|
||||||
|
modified = True
|
||||||
|
|
||||||
|
# 检查并更新GPS数据
|
||||||
|
if 'GPS' not in exif_dict or not exif_dict['GPS']:
|
||||||
|
lat = json_data['geoData']['latitude']
|
||||||
|
lon = json_data['geoData']['longitude']
|
||||||
|
alt = json_data['geoData']['altitude']
|
||||||
|
if lat != 0 or lon != 0 or alt != 0:
|
||||||
|
exif_dict = set_gps_location(exif_dict, lat, lon, alt)
|
||||||
|
modified = True
|
||||||
|
|
||||||
|
# 如果有修改,保存图片
|
||||||
|
if modified:
|
||||||
|
exif_bytes = piexif.dump(exif_dict)
|
||||||
|
image.save(fixed_image_path, exif=exif_bytes)
|
||||||
|
logging.info(f"已更新: {fixed_image_path}")
|
||||||
|
print(f"已更新.")
|
||||||
|
else:
|
||||||
|
logging.info(f"无需更新: {fixed_image_path}")
|
||||||
|
print(f"无需更新.")
|
||||||
|
|
||||||
|
return modified
|
||||||
|
|
||||||
|
except (IOError, json.JSONDecodeError) as e:
|
||||||
|
logging.error(f"文件读取失败 {image_path}: {str(e)}")
|
||||||
|
print(f"文件读取失败.")
|
||||||
|
return False
|
||||||
|
except KeyError as e:
|
||||||
|
logging.error(f"JSON数据格式错误 {image_path}: {str(e)}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"处理失败 {image_path}: {str(e)}")
|
||||||
|
print(f"处理失败.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""主函数"""
|
||||||
|
processed_count = 0
|
||||||
|
updated_count = 0
|
||||||
|
error_count = 0
|
||||||
|
skipped_count = 0 # 新增跳过计数
|
||||||
|
|
||||||
|
# 遍历当前目录及所有子目录
|
||||||
|
for root, dirs, files in os.walk('.'):
|
||||||
|
# 筛选出所有 JSON 文件
|
||||||
|
json_files = [f for f in files if f.lower().endswith('.json')]
|
||||||
|
|
||||||
|
for json_file in json_files:
|
||||||
|
try:
|
||||||
|
# 读取 JSON 文件
|
||||||
|
json_path = os.path.join(root, json_file)
|
||||||
|
with open(json_path, 'r', encoding='utf-8') as f:
|
||||||
|
json_data = json.load(f)
|
||||||
|
|
||||||
|
# 获取对应的文件名
|
||||||
|
image_file = json_data.get('title')
|
||||||
|
if not image_file:
|
||||||
|
logging.warning(f"JSON文件缺少title字段: {json_file}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 检查是否为支持的图片格式
|
||||||
|
if not any(image_file.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']):
|
||||||
|
logging.info(f"跳过非图片文件: {image_file}")
|
||||||
|
skipped_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 构建图片完整路径
|
||||||
|
image_path = os.path.join(root, image_file)
|
||||||
|
|
||||||
|
# 检查图片文件是否存在
|
||||||
|
if not os.path.exists(image_path):
|
||||||
|
logging.warning(f"图片文件不存在: {image_path}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"处理: {image_file}") # 添加进度提示
|
||||||
|
processed_count += 1
|
||||||
|
if process_image(image_path, json_path):
|
||||||
|
updated_count += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_count += 1
|
||||||
|
logging.error(f"处理失败 {json_file}: {str(e)}")
|
||||||
|
continue # 继续处理下一个文件
|
||||||
|
|
||||||
|
logging.info(f"处理完成: 共处理 {processed_count} 个文件,更新了 {updated_count} 个文件,"
|
||||||
|
f"跳过 {skipped_count} 个非图片文件,失败 {error_count} 个文件")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
Reference in New Issue
Block a user