First functional commit

master
Alexandru Pisarenco 5 years ago
commit 2f8af4c7ba

142
.gitignore vendored

@ -0,0 +1,142 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Local stuff
/config_local.py

@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}

@ -0,0 +1,19 @@
MIT License Copyright (c) <year> <copyright holders>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice (including the next
paragraph) shall be included in all copies or substantial portions of the
Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF
OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@ -0,0 +1,3 @@
# video-compression-handbrake
Computes best way to compress with very little destruction a video library using the AMD VCE H.265. Creates Handbrake queue file that can be imported and ran. Uses MediaInfo CLI

@ -0,0 +1,8 @@
from pathlib import Path
class Config:
QUEUE_FILE = "queue.json"
MEDIAINFO_BINARY=Path("/")
BASE_PATH = Path("/path/to/videos")
DESTINATION_PATH = Path("/where/to/save/result")
LOW_BITRATE_THRESHOLD = 550

@ -0,0 +1,234 @@
from minfo import MediaInfo
from pathlib import Path
class Configuration:
def __init__(self):
pass
def as_dict(self):
return {
"IsDvdNavDisabled": False,
"EnableQuickSyncDecoding": False,
"UseQSVDecodeForNonQSVEnc": False,
"ScalingMode": 0,
"PreviewScanCount": 10,
"Verbosity": 1,
"MinScanDuration": 10,
"SaveLogToCopyDirectory": False,
"SaveLogWithVideo": False,
"SaveLogCopyDirectory": "",
"RemoteServiceEnabled": False,
"RemoteServicePort": 0,
"EnableVceEncoder": True,
"EnableNvencEncoder": False,
"EnableQsvEncoder": False
}
class Statistics:
def __init__(self):
pass
def as_dict(self):
return {
"StartTime": "0001-01-01T00:00:00",
"StartTimeDisplay": "Not Available",
"EndTime": "0001-01-01T00:00:00",
"EndTimeDisplay": "",
"PausedDuration": "00:00:00",
"PausedDisplay": "",
"Duration": "00:00:00",
"DurationDisplay": "",
"FinalFileSizeInMegaBytes": 0,
"FileSizeDisplay": "",
"IsNotifying": True
}
class AllowedPassthruOptions:
def __init__(self):
pass
def as_dict(self):
return {
"AudioAllowAACPass": True,
"AudioAllowAC3Pass": True,
"AudioAllowDTSHDPass": True,
"AudioAllowDTSPass": True,
"AudioAllowMP3Pass": True,
"AudioAllowTrueHDPass": True,
"AudioAllowFlacPass": True,
"AudioAllowEAC3Pass": True,
"AudioEncoderFallback": 5,
"AllowedPassthruOptions": [
13,
8,
11,
10,
14,
12,
18,
9
]
}
class AudioTrack:
def __init__(self, minfo: MediaInfo):
self.minfo = minfo
def as_dict(self):
return {
"DRC": 0.0,
"Gain": 0,
"Encoder": 7,
"SampleRate": 0.0,
"EncoderRateType": 0,
"Bitrate": 160,
"Quality": -1.0,
"ScannedTrack": {
"TrackNumber": 1,
"Language": self.minfo.tracks[0].language,
# "LanguageCode": "und",
"Name": self.minfo.tracks[0].name,
#"Codec": 65536,
# aac is 65536
# mp3 is 524288
#"SampleRate": 32000,
#"Bitrate": 32002,
#"ChannelLayout": 4
},
"IsNotifying": True
}
class Chapters:
def __init__(self, minfo: MediaInfo):
self.minfo = minfo
def as_dict(self):
return [
{
"ChapterNumber": index + 1,
"Duration": c.duration, # self.mediainfo.duration,
"ChapterName": c.name, # "Chapter 1",
"IsNotifying": True
} for index, c in enumerate(self.minfo.menu.items)
]
class Task:
def __init__(self, path: Path, destination: Path):
self.path = path
self.destination = destination / (path.name.rsplit(".", 1)[0] + '.m4v')
self.mediainfo = MediaInfo(self.path)
self.allowed_passthru_options = AllowedPassthruOptions()
self.audio_track = AudioTrack(self.mediainfo)
def as_dict(self):
return {
"Source": str(self.path),
"Title": 1,
"Angle": 1,
"PointToPointMode": 0,
"StartPoint": 1,
"EndPoint": len(self.mediainfo.menu.items),
"Destination": str(self.destination),
"OutputFormat": 0,
"OptimizeMP4": False,
"IPod5GSupport": False,
"AlignAVStart": True,
"Width": self.mediainfo.resolution.width,
"Height": self.mediainfo.resolution.height,
"Cropping": {
"Top": 0,
"Bottom": 0,
"Left": 0,
"Right": 0
},
"HasCropping": False,
"Anamorphic": 4,
"DisplayWidth": self.mediainfo.display_width,
"KeepDisplayAspect": self.mediainfo.keep_display_aspect,
"PixelAspectX": int(self.mediainfo.pixel_aspect_ratio.x),
"PixelAspectY": int(self.mediainfo.pixel_aspect_ratio.y),
"Modulus": 2,
"DeinterlaceFilter": 2,
"DeinterlacePreset": {
"Name": "Default",
"ShortName": "default"
},
"CombDetect": 2,
"CustomDeinterlaceSettings": "",
"CustomCombDetect": "",
"Detelecine": 0,
"CustomDetelecine": "",
"Denoise": 0,
"DenoisePreset": 5,
"DenoiseTune": 0,
"CustomDenoise": "",
"Grayscale": False,
"Rotation": 0,
"FlipVideo": False,
"Sharpen": 0,
"SharpenPreset": {
"DisplayName": "Medium",
"Key": "medium"
},
"SharpenTune": {
"DisplayName": "None",
"Key": "none"
},
"SharpenCustom": "",
"DeblockPreset": {
"DisplayName": "Off",
"Key": "off"
},
"DeblockTune": {
"DisplayName": "Medium (8x8)",
"Key": "medium"
},
"CustomDeblock": "strength=strong:thresh=20:blocksize=8",
"VideoEncodeRateType": 1,
"VideoEncoder": 13,
"VideoProfile": {
"DisplayName": "Main",
"ShortName": "main"
},
"VideoLevel": {
"DisplayName": "Auto",
"ShortName": "auto"
},
"VideoPreset": {
"DisplayName": "Quality",
"ShortName": "quality"
},
"VideoTunes": [],
"ExtraAdvancedArguments": "",
"FramerateMode": 1,
"Quality": 22.0,
"VideoBitrate": self.mediainfo.output_bitrate,
"TwoPass": False,
"TurboFirstPass": False,
"Framerate": self.mediainfo.framerate,
"AudioTracks": [
self.audio_track.as_dict()
],
"AllowedPassthruOptions": self.allowed_passthru_options.as_dict(),
"SubtitleTracks": [],
"IncludeChapterMarkers": len(self.mediainfo.menu.items) > 1,
"ChapterNames": Chapters(self.mediainfo).as_dict(),
"MetaData": {},
"IsPreviewEncode": False
}
class QueueItem:
def __init__(self, path: Path, destination: Path):
self.path = path
self.task = Task(path, destination)
self.configuration = Configuration()
self.statistics = Statistics()
def as_dict(self):
return {
"ScannedSourcePath": str(self.path),
"Status": 0,
"Task": self.task.as_dict(),
"Configuration": self.configuration.as_dict(),
"Statistics": self.statistics.as_dict()
}

@ -0,0 +1,43 @@
from config_local import Config
from jobdef import QueueItem
from minfo import MediaInfo
from pathlib import Path
import json
import os
extensions = {
"mp4",
"mkv",
"mpeg",
"mpg",
"m4v",
"avi",
"mov",
"wmv",
"flv",
}
jobs = []
for dir, dirs, files in os.walk(str(Config.BASE_PATH)):
for file in files:
parts = file.rsplit(".", 1)
if len(parts) == 2:
extension = parts[1].lower()
if extension in extensions:
current_path = Path(dir)
relative_path = current_path.relative_to(Config.BASE_PATH)
print(relative_path / file, end=" ")
item = QueueItem(Path(dir) / file, Config.DESTINATION_PATH / relative_path)
info = item.task.mediainfo
if info.codec == 'HEVC':
print("")
continue
if info.bitrate < Config.LOW_BITRATE_THRESHOLD:
print("")
continue
item.task.destination.parent.mkdir(exist_ok=True)
print(" ... added")
jobs.append(item)
with open("queue.json", "w") as fp:
json.dump(jobs, fp, default=lambda o: getattr(o, 'as_dict', str)(), indent=2)

@ -0,0 +1,235 @@
import json
import re
from typing import Any, Dict, Iterable, List, NamedTuple, Optional
from config_local import Config
from subprocess import Popen, PIPE
from pathlib import Path
from io import BytesIO, TextIOWrapper
from pytimeparse.timeparse import timeparse
from datetime import timedelta
from fractions import Fraction
class Translation(NamedTuple):
start: int
end: int
multiplier: float
min_br: int
max_br: int
class Resolution:
def __init__(self, width: int, height: int):
self.width = width
self.height = height
def __repr__(self) -> str:
return f"Resolution(width={self.width},height={self.height})"
def __str__(self) -> str:
return f"{self.width}x{self.height}"
class Ratio:
def __init__(self, x: int, y: int):
self.x = x
self.y = y
def __repr__(self) -> str:
return f"Ratio(x={self.x},y={self.y})"
def __str__(self) -> str:
return f"{self.x}x{self.y}"
class AudioInfo:
def __init__(self, audio_dict: Dict[str, str]):
self.data = audio_dict
@property
def language(self) -> str:
return self.data.get("Language", "Unknown")
@property
def name(self) -> str:
return self.data.get("Title", "Unknown")
class MenuItem:
def __init__(self, *, start: str, duration: str, name: str):
self.start = start
self.duration = duration
self.name = name
class MenuInfo:
REGEX = re.compile(r"^_\d{2}_\d{2}_\d{2}")
def __init__(self, menu_dict: Dict[str, str]):
self.data = menu_dict
@property
def items(self) -> List[MenuItem]:
if not "extra" in self.data:
duration = str(timedelta(seconds=round(timeparse(list(self.data.keys())[0]))))
return [MenuItem(start="00:00:00", duration=f"{duration:0>8}", name='Chapter 1')]
keys = sorted(key for key in self.data["extra"] if type(self).REGEX.match(key))
td0 = timedelta(seconds=0)
td = None
result = []
for key in keys:
td = timedelta(
hours=int(key[1:3]),
minutes=int(key[4:6]),
seconds=int(key[7:9]),
milliseconds=float(key[10:])
)
dur_td = timedelta(seconds=round((td - td0).total_seconds()))
duration = str(dur_td)
duration = f"{duration:0>8}"
td0 = td
name = self.data["extra"][key].replace(" ", " ")
if result:
result[-1].duration = duration
result.append(MenuItem(start=key, duration="", name=name))
if result:
td = timedelta(seconds=float(self.data["Duration"]))
dur_td = timedelta(seconds=round((td - td0).total_seconds()))
duration = str(dur_td)
duration = f"{duration:0>8}"
result[-1].duration = duration
return result
class MediaInfo:
def __init__(self, path: Path):
self.path = path
procinfo = Popen([Config.MEDIAINFO_BINARY, path, "--Output=JSON"], stdout=PIPE)
stdout, _ = procinfo.communicate()
data = json.loads(stdout.decode('utf-8'))["media"]["track"]
self.general: Optional[Dict[str, Any]] = None
self.video: Optional[Dict[str, Any]] = None
self.audio: List[Dict[str, Any]] = []
self.menu_data: Optional[Dict[str, Any]] = None
for track in data:
if track["@type"] == "General":
self.general = track
elif track["@type"] == "Video":
self.video = track
elif track["@type"] == "Audio":
self.audio.append(track)
elif track["@type"] == "Menu":
self.menu_data = track
# print(json.dumps(self.video, indent=2))
@property
def codec(self) -> str:
return self.video["Format"]
@property
def bitrate(self) -> int:
try:
bitrate = self.video["BitRate"]
except KeyError:
bitrate = self.general["OverallBitRate"]
ibitrate = int(bitrate) // 1000
return ibitrate
@property
def framerate(self) -> float:
keys = ["FrameRate_Maximum", "FrameRate", "FrameRate_Nominal"]
for key in keys:
if key in self.video:
return float(self.video[key])
print(json.dumps(self.general, indent=2))
print(json.dumps(self.video, indent=2))
raise Exception("No frame rate for video " + str(self.path))
@property
def duration(self) -> str:
places_to_look = [self.video, self.general]
for place in places_to_look:
if "Duration" in place:
seconds = float(place["Duration"])
td = str(timedelta(seconds=seconds))
return td # f"{td:0>8}"
raise Exception("No duration for video " + str(self.path))
@property
def resolution(self) -> Resolution:
width = int(self.video["Width"])
height = int(self.video["Height"])
return Resolution(width=width, height=height)
@property
def display_aspect_ratio(self) -> Ratio:
ratio = [float(x) for x in str(self.video["DisplayAspectRatio"]).split(':')]
if len(ratio) != 2:
res = self.resolution
if -0.01 <= ratio[0] - res.width / res.height <= 0.01:
frac = Fraction(res.width, res.height)
else:
new_width = round(res.height * ratio[0])
frac = Fraction(new_width, res.height)
ratio = (frac.numerator, frac.denominator)
return Ratio(x=ratio[0], y=ratio[1])
@property
def keep_display_aspect(self) -> bool:
ratio = self.display_aspect_ratio
resolution = self.resolution
return 0.95 < (resolution.width / resolution.height) / (ratio.x / ratio.y) < 1.05
@property
def display_width(self) -> float:
ratio = self.display_aspect_ratio
if not self.keep_display_aspect:
display_width = self.resolution.height * ratio.x / ratio.y
else:
display_width = float(self.resolution.width)
return display_width
@property
def pixel_aspect_ratio(self) -> Ratio:
if self.keep_display_aspect:
return Ratio(1.0, 1.0)
else:
return Ratio(self.display_aspect_ratio.x, self.resolution.width)
@property
def output_bitrate(self) -> int:
print(self.codec)
if self.codec == "MPEG-4 Visual":
translations = [
Translation(0, 800, 0.5, 200, 600),
Translation(800, 1500, 0.4, 400, 800),
Translation(1500, 99999999999, 0.33, 600, 1200),
]
else:
translations = [
Translation(0, 800, 0.7, 450, 700),
Translation(800, 1500, 0.6, 550, 1000),
Translation(1500, 99999999999, 0.45, 800, 5000),
]
bitrate = self.bitrate
for t in translations:
if t.start <= bitrate < t.end:
return int(min(t.max_br, max(t.min_br, bitrate * t.multiplier)))
@property
def tracks(self) -> List[AudioInfo]:
result = []
for audio in self.audio:
track = AudioInfo(audio)
result.append(track)
return result
@property
def menu(self) -> MenuInfo:
return MenuInfo(self.menu_data or { self.duration: "Chapter 1" })
Loading…
Cancel
Save