import os import json from PIL import Image from PIL.ExifTags import TAGS, GPSTAGS from datetime import datetime import piexif import logging import shutil # 设置日志 logging.basicConfig( encoding='utf-8', filename='photo_process.log', level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def get_correct_extension(image_path): """获取图片的真实格式""" try: with Image.open(image_path) as img: format = img.format.lower() # 如果是jpeg/jpg格式,统一返回当前后缀 if format in ['jpeg', 'jpg']: return os.path.splitext(image_path)[1].lower() return '.' + format if format else None except Exception as e: logging.error(f"无法识别图片格式或非图片文件 {image_path}: {str(e)}") return None def fix_image_extension(image_path): """修正图片文件后缀""" # 检查是否为支持的图片格式 supported_formats = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp','.heic'] current_ext = os.path.splitext(image_path)[1].lower() # 如果不是支持的格式,直接返回 if current_ext not in supported_formats: logging.info(f"跳过非图片文件: {image_path}") return image_path, False correct_ext = get_correct_extension(image_path) if not correct_ext: return image_path, False # 如果当前是jpg/jpeg,不进行修改 if current_ext in ['.jpg', '.jpeg'] and correct_ext in ['.jpg', '.jpeg']: return image_path, False if current_ext != correct_ext: new_path = os.path.splitext(image_path)[0] + correct_ext try: # 如果目标文件已存在,先删除 if os.path.exists(new_path): os.remove(new_path) shutil.move(image_path, new_path) logging.info(f"修正文件后缀: {image_path} -> {new_path}") return new_path, True except Exception as e: logging.error(f"修正文件后缀失败 {image_path}: {str(e)}") return image_path, False return image_path, False def get_exif_data(image): """获取图片的EXIF数据""" try: exif_dict = piexif.load(image.info['exif']) except: exif_dict = {'0th': {}, 'Exif': {}, 'GPS': {}} return exif_dict def convert_to_degrees(value): """将GPS坐标转换为度数格式""" d = float(value[0][0]) / float(value[0][1]) m = float(value[1][0]) / float(value[1][1]) s = float(value[2][0]) / float(value[2][1]) return d + (m / 60.0) + (s / 3600.0) def set_gps_location(exif_dict, lat, lon, alt): """设置GPS信息""" if lat == 0 and lon == 0: return exif_dict gps_ifd = { piexif.GPSIFD.GPSVersionID: (2, 2, 0, 0), piexif.GPSIFD.GPSLatitudeRef: 'N' if lat >= 0 else 'S', piexif.GPSIFD.GPSLatitude: piexif.GPSHelper.deg_to_dms(abs(lat)), piexif.GPSIFD.GPSLongitudeRef: 'E' if lon >= 0 else 'W', piexif.GPSIFD.GPSLongitude: piexif.GPSHelper.deg_to_dms(abs(lon)), } if alt != 0: gps_ifd[piexif.GPSIFD.GPSAltitude] = (int(abs(alt) * 100), 100) gps_ifd[piexif.GPSIFD.GPSAltitudeRef] = 1 if alt < 0 else 0 exif_dict['GPS'] = gps_ifd return exif_dict def process_image(image_path, json_path): """处理单个图片文件""" try: # 读取JSON文件 with open(json_path, 'r', encoding='utf-8') as f: json_data = json.load(f) # 尝试修正文件后缀 fixed_image_path, was_fixed = fix_image_extension(image_path) if not was_fixed and not os.path.exists(fixed_image_path): logging.error(f"无法处理的图片文件: {image_path}") return False # 打开图片 image = Image.open(fixed_image_path) # 获取现有EXIF数据 exif_dict = get_exif_data(image) # print(f"exif_dict: {exif_dict['Exif']}") modified = False # 检查并更新时间戳 # if 'Exif' not in exif_dict: if exif_dict['Exif'] == {}: # print(f"exif_dict['Exif'] is None") exif_dict['Exif'] = {} # 更新拍摄时间 photo_timestamp = int(json_data['photoTakenTime']['timestamp']) photo_date_time = datetime.fromtimestamp(photo_timestamp).strftime("%Y:%m:%d %H:%M:%S") exif_dict['Exif'][piexif.ExifIFD.DateTimeOriginal] = photo_date_time # 更新创建时间 creation_timestamp = int(json_data['creationTime']['timestamp']) creation_date_time = datetime.fromtimestamp(creation_timestamp).strftime("%Y:%m:%d %H:%M:%S") exif_dict['0th'][piexif.ImageIFD.DateTime] = creation_date_time modified = True # 检查并更新GPS数据 if 'GPS' not in exif_dict or not exif_dict['GPS']: lat = json_data['geoData']['latitude'] lon = json_data['geoData']['longitude'] alt = json_data['geoData']['altitude'] if lat != 0 or lon != 0 or alt != 0: exif_dict = set_gps_location(exif_dict, lat, lon, alt) modified = True # 如果有修改,保存图片 if modified: exif_bytes = piexif.dump(exif_dict) image.save(fixed_image_path, exif=exif_bytes) logging.info(f"已更新: {fixed_image_path}") print(f"已更新.") else: logging.info(f"无需更新: {fixed_image_path}") print(f"无需更新.") return modified except (IOError, json.JSONDecodeError) as e: logging.error(f"文件读取失败 {image_path}: {str(e)}") print(f"文件读取失败.") return False except KeyError as e: logging.error(f"JSON数据格式错误 {image_path}: {str(e)}") return False except Exception as e: logging.error(f"处理失败 {image_path}: {str(e)}") print(f"处理失败.") return False def main(): """主函数""" processed_count = 0 updated_count = 0 error_count = 0 skipped_count = 0 # 新增跳过计数 # 遍历当前目录及所有子目录 for root, dirs, files in os.walk('.'): # 筛选出所有 JSON 文件 json_files = [f for f in files if f.lower().endswith('.json')] for json_file in json_files: try: # 读取 JSON 文件 json_path = os.path.join(root, json_file) with open(json_path, 'r', encoding='utf-8') as f: json_data = json.load(f) # 获取对应的文件名 image_file = json_data.get('title') if not image_file: logging.warning(f"JSON文件缺少title字段: {json_file}") continue # 检查是否为支持的图片格式 if not any(image_file.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']): logging.info(f"跳过非图片文件: {image_file}") skipped_count += 1 continue # 构建图片完整路径 image_path = os.path.join(root, image_file) # 检查图片文件是否存在 if not os.path.exists(image_path): logging.warning(f"图片文件不存在: {image_path}") continue print(f"处理: {image_file}") # 添加进度提示 processed_count += 1 if process_image(image_path, json_path): updated_count += 1 except Exception as e: error_count += 1 logging.error(f"处理失败 {json_file}: {str(e)}") continue # 继续处理下一个文件 logging.info(f"处理完成: 共处理 {processed_count} 个文件,更新了 {updated_count} 个文件," f"跳过 {skipped_count} 个非图片文件,失败 {error_count} 个文件") if __name__ == "__main__": main()