diff --git a/core/pathex.py b/core/pathex.py index 709b763b..e99348a2 100644 --- a/core/pathex.py +++ b/core/pathex.py @@ -2,6 +2,7 @@ from os import scandir image_extensions = [".jpg", ".jpeg", ".png", ".tif", ".tiff"] +video_extensions = [".mp4", ".mov", ".mpg", ".wmv", ".mkv", ".avi", ".avchd", ".flv", ".f4v", ".swf", "webm"] def write_bytes_safe(p, bytes_data): """ @@ -21,7 +22,7 @@ def scantree(path): else: yield entry -def get_image_paths(dir_path, image_extensions=image_extensions, subdirs=False, return_Path_class=False): +def get_extension_paths(dir_path, extensions, subdirs=False, return_Path_class=False): dir_path = Path (dir_path) result = [] @@ -32,10 +33,16 @@ def get_image_paths(dir_path, image_extensions=image_extensions, subdirs=False, else: gen = scandir(str(dir_path)) - for x in list(gen): - if any([x.name.lower().endswith(ext) for ext in image_extensions]): - result.append( x.path if not return_Path_class else Path(x.path) ) - return sorted(result) + for x in gen: + if any(x.name.lower().endswith(ext) for ext in extensions): # listcomp is not needed, any() can handle gencomp + result.append( Path(x) if return_Path_class else x.path ) # avoid unnecessary negative condition + return result # scandir should already be sorted + +def get_image_paths(dir_path, image_extensions=image_extensions, subdirs=False, return_Path_class=False): + return get_extension_paths(dir_path, image_extensions, subdirs, return_Path_class) + +def get_video_paths(dir_path, video_extensions=video_extensions, subdirs=False, return_Path_class=False): + return get_extension_paths(dir_path, video_extensions, subdirs, return_Path_class) def get_image_unique_filestem_paths(dir_path, verbose_print_func=None): result = get_image_paths(dir_path) @@ -59,7 +66,7 @@ def get_paths(dir_path): return [ Path(x) for x in sorted([ x.path for x in list(scandir(str(dir_path))) ]) ] else: return [] - + def get_file_paths(dir_path): dir_path = Path (dir_path) diff --git a/main.py b/main.py index b8219106..415baaaf 100644 --- a/main.py +++ b/main.py @@ -23,7 +23,7 @@ def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, os.path.abspath(os.path.expanduser(values))) exit_code = 0 - + parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() @@ -52,9 +52,9 @@ def process_extract(arguments): p.add_argument('--output-debug', action="store_true", dest="output_debug", default=None, help="Writes debug images to _debug\ directory.") p.add_argument('--no-output-debug', action="store_false", dest="output_debug", default=None, help="Don't writes debug images to _debug\ directory.") p.add_argument('--face-type', dest="face_type", choices=['half_face', 'full_face', 'whole_face', 'head', 'mark_only'], default=None) - p.add_argument('--max-faces-from-image', type=int, dest="max_faces_from_image", default=None, help="Max faces from image.") + p.add_argument('--max-faces-from-image', type=int, dest="max_faces_from_image", default=None, help="Max faces from image.") p.add_argument('--image-size', type=int, dest="image_size", default=None, help="Output image size.") - p.add_argument('--jpeg-quality', type=int, dest="jpeg_quality", default=None, help="Jpeg quality.") + p.add_argument('--jpeg-quality', type=int, dest="jpeg_quality", default=None, help="Jpeg quality.") p.add_argument('--manual-fix', action="store_true", dest="manual_fix", default=False, help="Enables manual extract only frames where faces were not recognized.") p.add_argument('--manual-output-debug-fix', action="store_true", dest="manual_output_debug_fix", default=False, help="Performs manual reextract input-dir frames which were deleted from [output_dir]_debug\ dir.") p.add_argument('--manual-window-size', type=int, dest="manual_window_size", default=1368, help="Manual fix window size. Default: 1368.") @@ -98,7 +98,7 @@ def process_util(arguments): io.log_info ("Performing faceset unpacking...\r\n") from samplelib import PackedFaceset PackedFaceset.unpack( Path(arguments.input_dir) ) - + if arguments.export_faceset_mask: io.log_info ("Exporting faceset mask..\r\n") Util.export_faceset_mask( Path(arguments.input_dir) ) @@ -149,10 +149,10 @@ def process_train(arguments): p.add_argument('--cpu-only', action="store_true", dest="cpu_only", default=False, help="Train on CPU.") p.add_argument('--force-gpu-idxs', dest="force_gpu_idxs", default=None, help="Force to choose GPU indexes separated by comma.") p.add_argument('--silent-start', action="store_true", dest="silent_start", default=False, help="Silent start. Automatically chooses Best GPU and last used model.") - + p.add_argument('--execute-program', dest="execute_program", default=[], action='append', nargs='+') p.set_defaults (func=process_train) - + def process_exportdfm(arguments): osex.set_process_lowest_prio() from mainscripts import ExportDFM @@ -201,6 +201,17 @@ def process_videoed_extract_video(arguments): p.add_argument('--fps', type=int, dest="fps", default=None, help="How many frames of every second of the video will be extracted. 0 - full fps.") p.set_defaults(func=process_videoed_extract_video) + def process_videoed_batch_extract_video(arguments): + osex.set_process_lowest_prio() + from mainscripts import VideoEd + VideoEd.batch_extract_video (arguments.input_dir, arguments.output_dir, arguments.output_ext, arguments.fps) + p = videoed_parser.add_parser( "batch-extract-video", help="Extract images from folder of video files.") + p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir", help="Input directory of files to be processed.") + p.add_argument('--output-dir', required=True, action=fixPathAction, dest="output_dir", help="Output directory. This is where the extracted images will be stored.") + p.add_argument('--output-ext', dest="output_ext", default=None, help="Image format (extension) of output files.") + p.add_argument('--fps', type=int, dest="fps", default=None, help="How many frames of every second of the video will be extracted. 0 - full fps.") + p.set_defaults(func=process_videoed_batch_extract_video) + def process_videoed_cut_video(arguments): osex.set_process_lowest_prio() from mainscripts import VideoEd @@ -266,8 +277,8 @@ def process_faceset_enhancer(arguments): p.add_argument('--force-gpu-idxs', dest="force_gpu_idxs", default=None, help="Force to choose GPU indexes separated by comma.") p.set_defaults(func=process_faceset_enhancer) - - + + p = facesettool_parser.add_parser ("resize", help="Resize DFL faceset.") p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir", help="Input directory of aligned faces.") @@ -285,10 +296,10 @@ def process_dev_test(arguments): p = subparsers.add_parser( "dev_test", help="") p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir") p.set_defaults (func=process_dev_test) - + # ========== XSeg xseg_parser = subparsers.add_parser( "xseg", help="XSeg tools.").add_subparsers() - + p = xseg_parser.add_parser( "editor", help="XSeg editor.") def process_xsegeditor(arguments): @@ -296,11 +307,11 @@ def process_xsegeditor(arguments): from XSegEditor import XSegEditor global exit_code exit_code = XSegEditor.start (Path(arguments.input_dir)) - + p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir") p.set_defaults (func=process_xsegeditor) - + p = xseg_parser.add_parser( "apply", help="Apply trained XSeg model to the extracted faces.") def process_xsegapply(arguments): @@ -310,8 +321,8 @@ def process_xsegapply(arguments): p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir") p.add_argument('--model-dir', required=True, action=fixPathAction, dest="model_dir") p.set_defaults (func=process_xsegapply) - - + + p = xseg_parser.add_parser( "remove", help="Remove applied XSeg masks from the extracted faces.") def process_xsegremove(arguments): osex.set_process_lowest_prio() @@ -319,8 +330,8 @@ def process_xsegremove(arguments): XSegUtil.remove_xseg (Path(arguments.input_dir) ) p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir") p.set_defaults (func=process_xsegremove) - - + + p = xseg_parser.add_parser( "remove_labels", help="Remove XSeg labels from the extracted faces.") def process_xsegremovelabels(arguments): osex.set_process_lowest_prio() @@ -328,8 +339,8 @@ def process_xsegremovelabels(arguments): XSegUtil.remove_xseg_labels (Path(arguments.input_dir) ) p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir") p.set_defaults (func=process_xsegremovelabels) - - + + p = xseg_parser.add_parser( "fetch", help="Copies faces containing XSeg polygons in _xseg dir.") def process_xsegfetch(arguments): @@ -338,7 +349,7 @@ def process_xsegfetch(arguments): XSegUtil.fetch_xseg (Path(arguments.input_dir) ) p.add_argument('--input-dir', required=True, action=fixPathAction, dest="input_dir") p.set_defaults (func=process_xsegfetch) - + def bad_args(arguments): parser.print_help() exit(0) @@ -349,9 +360,9 @@ def bad_args(arguments): if exit_code == 0: print ("Done.") - + exit(exit_code) - + ''' import code code.interact(local=dict(globals(), **locals())) diff --git a/mainscripts/VideoEd.py b/mainscripts/VideoEd.py index f9fcedd1..557aba02 100644 --- a/mainscripts/VideoEd.py +++ b/mainscripts/VideoEd.py @@ -27,7 +27,7 @@ def extract_video(input_file, output_dir, output_ext=None, fps=None): fps = io.input_int ("Enter FPS", 0, help_message="How many frames of every second of the video will be extracted. 0 - full fps") if output_ext is None: - output_ext = io.input_str ("Output image format", "png", ["png","jpg"], help_message="png is lossless, but extraction is x10 slower for HDD, requires x10 more disk space than jpg.") + output_ext = io.input_str ("Output image format", "jpg", ["png","jpg"], help_message="png is lossless, but extraction is x10 slower for HDD, requires x10 more disk space than jpg.") for filename in pathex.get_image_paths (output_path, ['.'+output_ext]): Path(filename).unlink() @@ -48,6 +48,54 @@ def extract_video(input_file, output_dir, output_ext=None, fps=None): except: io.log_err ("ffmpeg fail, job commandline:" + str(job.compile()) ) +def batch_extract_video(input_dir, output_dir, output_ext=None, fps=None): + import os + input_path = Path(input_dir) + output_path = Path(output_dir) + + if not output_path.exists(): + output_path.mkdir(exist_ok=True) + + if not input_path.exists(): + io.log_err("input_dir not found.") + return + + if not input_path.is_dir(): + io.log_info("input_dir is not a directory.") + return + + file_paths = pathex.get_video_paths(input_path, return_Path_class=True) + io.log_info('\n'.join(f'file found ...... {file.name}' for file in file_paths)) + io.log_info(f'{len(file_paths)} files') + + file_check = io.input_str ("Are these the intended files?", "y", ["y", "n"]) + if not file_check == 'y': return + + if fps is None: + fps = io.input_int ("Enter FPS", 0, help_message="How many frames of every second of the video will be extracted. 0 - full fps") + + if output_ext is None: + output_ext = io.input_str ("Output image format", "jpg", ["png","jpg"], help_message="png is lossless, but extraction is x10 slower for HDD, requires x10 more disk space than jpg.") + + for filename in pathex.get_image_paths (output_path, [f'.{output_ext}']): + Path(filename).unlink() + + kwargs = { + 'pix_fmt': 'rgb24', + **({'r':f'{fps}'} if fps else {}), + **({'q:v':'2'} if output_ext == 'jpg' else {}), + } + + for idx, file_path in enumerate(file_paths): + job = ( ffmpeg + .input(str(file_path)) + .output(str(output_path / (f'{idx:03}_%5d.{output_ext}')), **kwargs) + ) + try: + job = job.run() + except: + io.log_err ("ffmpeg fail, job commandline:" + str(job.compile())) + def cut_video ( input_file, from_time=None, to_time=None, audio_track_id=None, bitrate=None): input_file_path = Path(input_file) if input_file_path is None: