463 lines
14 KiB
Python
463 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Scrobble AzuraCast "now playing" tracks to Last.fm.
|
||
|
||
The script is designed for cron: each run fetches the current AzuraCast track,
|
||
updates Last.fm "now playing" once per track, and scrobbles once the track has
|
||
played long enough.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
LASTFM_API_URL = "https://ws.audioscrobbler.com/2.0/"
|
||
LASTFM_AUTH_URL = "https://www.last.fm/api/auth/"
|
||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||
DEFAULT_STATE_FILE = SCRIPT_DIR / ".lastfm_scrobbler_state.json"
|
||
|
||
|
||
class ConfigError(RuntimeError):
|
||
pass
|
||
|
||
|
||
class ApiError(RuntimeError):
|
||
pass
|
||
|
||
|
||
def log(message: str) -> None:
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S"), message, flush=True)
|
||
|
||
|
||
def load_env_file(path: Path) -> None:
|
||
if not path.exists():
|
||
return
|
||
|
||
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
||
line = raw_line.strip()
|
||
if not line or line.startswith("#") or "=" not in line:
|
||
continue
|
||
|
||
key, value = line.split("=", 1)
|
||
key = key.strip()
|
||
value = value.strip().strip('"').strip("'")
|
||
if key and key not in os.environ:
|
||
os.environ[key] = value
|
||
|
||
|
||
def env(name: str, default: str | None = None) -> str | None:
|
||
value = os.environ.get(name)
|
||
if value is None or value == "":
|
||
return default
|
||
return value
|
||
|
||
|
||
def require_env(name: str) -> str:
|
||
value = env(name)
|
||
if not value:
|
||
raise ConfigError(f"Missing required environment variable: {name}")
|
||
return value
|
||
|
||
|
||
def getenv_int(name: str, default: int) -> int:
|
||
value = env(name)
|
||
if value is None:
|
||
return default
|
||
try:
|
||
return int(value)
|
||
except ValueError as exc:
|
||
raise ConfigError(f"{name} must be an integer, got: {value}") from exc
|
||
|
||
|
||
def getenv_bool(name: str, default: bool) -> bool:
|
||
value = env(name)
|
||
if value is None:
|
||
return default
|
||
|
||
normalized = value.strip().casefold()
|
||
if normalized in {"1", "true", "yes", "on"}:
|
||
return True
|
||
if normalized in {"0", "false", "no", "off"}:
|
||
return False
|
||
|
||
raise ConfigError(f"{name} must be a boolean value, got: {value}")
|
||
|
||
|
||
def http_request(
|
||
url: str,
|
||
data: dict[str, Any] | None = None,
|
||
timeout: int = 15,
|
||
) -> dict[str, Any]:
|
||
encoded_data = None
|
||
headers = {"User-Agent": "azuracast-lastfm-scrobbler/1.0"}
|
||
|
||
if data is not None:
|
||
encoded_data = urllib.parse.urlencode(data).encode("utf-8")
|
||
headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||
|
||
request = urllib.request.Request(url, data=encoded_data, headers=headers)
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=timeout) as response:
|
||
body = response.read().decode("utf-8")
|
||
except urllib.error.HTTPError as exc:
|
||
details = exc.read().decode("utf-8", errors="replace")
|
||
raise ApiError(f"HTTP {exc.code} from {url}: {details}") from exc
|
||
except urllib.error.URLError as exc:
|
||
raise ApiError(f"Cannot reach {url}: {exc}") from exc
|
||
|
||
try:
|
||
payload = json.loads(body)
|
||
except json.JSONDecodeError as exc:
|
||
raise ApiError(f"Invalid JSON from {url}: {body[:500]}") from exc
|
||
|
||
if isinstance(payload, dict) and payload.get("error"):
|
||
message = payload.get("message", "Unknown Last.fm error")
|
||
raise ApiError(f"Last.fm error {payload.get('error')}: {message}")
|
||
|
||
return payload
|
||
|
||
|
||
def sign_lastfm(params: dict[str, Any], shared_secret: str) -> str:
|
||
signature_source = ""
|
||
for key in sorted(params):
|
||
if key in {"format", "callback"}:
|
||
continue
|
||
signature_source += f"{key}{params[key]}"
|
||
signature_source += shared_secret
|
||
return hashlib.md5(signature_source.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def lastfm_call(
|
||
method: str,
|
||
params: dict[str, Any],
|
||
api_key: str,
|
||
shared_secret: str,
|
||
) -> dict[str, Any]:
|
||
payload: dict[str, Any] = {
|
||
"method": method,
|
||
"api_key": api_key,
|
||
"format": "json",
|
||
**params,
|
||
}
|
||
payload["api_sig"] = sign_lastfm(payload, shared_secret)
|
||
return http_request(LASTFM_API_URL, data=payload)
|
||
|
||
|
||
def build_nowplaying_urls() -> list[str]:
|
||
explicit_url = env("AZURACAST_NOWPLAYING_URL")
|
||
if explicit_url:
|
||
return [explicit_url]
|
||
|
||
base_url = require_env("AZURACAST_BASE_URL").rstrip("/")
|
||
station = env("AZURACAST_STATION_SHORTCODE", env("AZURACAST_STATION", "main"))
|
||
quoted_station = urllib.parse.quote(str(station), safe="")
|
||
|
||
return [
|
||
f"{base_url}/api/nowplaying_static/{quoted_station}.json",
|
||
f"{base_url}/api/nowplaying/{quoted_station}",
|
||
]
|
||
|
||
|
||
def fetch_now_playing() -> dict[str, Any]:
|
||
errors: list[str] = []
|
||
for url in build_nowplaying_urls():
|
||
try:
|
||
return http_request(url)
|
||
except ApiError as exc:
|
||
errors.append(str(exc))
|
||
raise ApiError("; ".join(errors))
|
||
|
||
|
||
def clean_text(value: Any) -> str:
|
||
if value is None:
|
||
return ""
|
||
return str(value).strip()
|
||
|
||
|
||
def split_artist_title(text: str) -> tuple[str, str]:
|
||
for separator in (" - ", " – ", " — "):
|
||
if separator in text:
|
||
artist, title = text.split(separator, 1)
|
||
return artist.strip(), title.strip()
|
||
return "", text.strip()
|
||
|
||
|
||
def extract_track(now_playing: dict[str, Any]) -> dict[str, Any] | None:
|
||
current = now_playing.get("now_playing") or {}
|
||
song = current.get("song") or {}
|
||
|
||
artist = clean_text(song.get("artist"))
|
||
title = clean_text(song.get("title"))
|
||
album = clean_text(song.get("album"))
|
||
text = clean_text(song.get("text"))
|
||
|
||
if (not artist or not title) and text:
|
||
parsed_artist, parsed_title = split_artist_title(text)
|
||
artist = artist or parsed_artist
|
||
title = title or parsed_title
|
||
|
||
elapsed = int(current.get("elapsed") or 0)
|
||
duration = int(current.get("duration") or 0)
|
||
played_at = int(current.get("played_at") or (time.time() - elapsed))
|
||
|
||
if not artist or not title:
|
||
return None
|
||
|
||
return {
|
||
"artist": artist,
|
||
"title": title,
|
||
"album": album,
|
||
"elapsed": max(0, elapsed),
|
||
"duration": max(0, duration),
|
||
"played_at": max(0, played_at),
|
||
}
|
||
|
||
|
||
def load_state(path: Path) -> dict[str, Any]:
|
||
if not path.exists():
|
||
return {"scrobbled": {}, "now_playing": {}}
|
||
try:
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
except (OSError, json.JSONDecodeError):
|
||
return {"scrobbled": {}, "now_playing": {}}
|
||
|
||
|
||
def save_state(path: Path, state: dict[str, Any]) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp_path = path.with_suffix(path.suffix + ".tmp")
|
||
tmp_path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
|
||
tmp_path.replace(path)
|
||
|
||
|
||
def track_key(track: dict[str, Any]) -> str:
|
||
stable = "|".join(
|
||
[
|
||
track["artist"].casefold(),
|
||
track["title"].casefold(),
|
||
track.get("album", "").casefold(),
|
||
str(track["played_at"]),
|
||
]
|
||
)
|
||
return hashlib.sha1(stable.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def prune_state(state: dict[str, Any]) -> None:
|
||
cutoff = int(time.time()) - (14 * 24 * 60 * 60)
|
||
for bucket in ("scrobbled", "now_playing"):
|
||
entries = state.setdefault(bucket, {})
|
||
stale = [key for key, value in entries.items() if int(value or 0) < cutoff]
|
||
for key in stale:
|
||
del entries[key]
|
||
|
||
|
||
def scrobble_threshold(track: dict[str, Any]) -> int:
|
||
default_no_duration = getenv_int("LASTFM_NO_DURATION_SCROBBLE_SECONDS", 240)
|
||
min_elapsed = getenv_int("LASTFM_MIN_SCROBBLE_ELAPSED_SECONDS", 30)
|
||
|
||
duration = int(track.get("duration") or 0)
|
||
if duration <= 0:
|
||
return max(min_elapsed, default_no_duration)
|
||
|
||
return max(min_elapsed, min(duration // 2, 240))
|
||
|
||
|
||
def update_now_playing(track: dict[str, Any], dry_run: bool) -> None:
|
||
payload = {
|
||
"artist": track["artist"],
|
||
"track": track["title"],
|
||
"sk": require_env("LASTFM_SESSION_KEY"),
|
||
}
|
||
if getenv_bool("LASTFM_SEND_ALBUM", True) and track.get("album"):
|
||
payload["album"] = track["album"]
|
||
if track.get("duration"):
|
||
payload["duration"] = str(track["duration"])
|
||
|
||
if dry_run:
|
||
log(f"DRY RUN now playing: {track['artist']} - {track['title']}")
|
||
return
|
||
|
||
lastfm_call(
|
||
"track.updateNowPlaying",
|
||
payload,
|
||
require_env("LASTFM_API_KEY"),
|
||
require_env("LASTFM_SHARED_SECRET"),
|
||
)
|
||
log(f"Updated Last.fm now playing: {track['artist']} - {track['title']}")
|
||
|
||
|
||
def scrobble_track(track: dict[str, Any], dry_run: bool) -> None:
|
||
payload = {
|
||
"artist": track["artist"],
|
||
"track": track["title"],
|
||
"timestamp": str(track["played_at"]),
|
||
"sk": require_env("LASTFM_SESSION_KEY"),
|
||
}
|
||
if getenv_bool("LASTFM_SEND_ALBUM", True) and track.get("album"):
|
||
payload["album"] = track["album"]
|
||
if track.get("duration"):
|
||
payload["duration"] = str(track["duration"])
|
||
|
||
if dry_run:
|
||
log(
|
||
"DRY RUN scrobble: "
|
||
f"{track['artist']} - {track['title']} "
|
||
f"(timestamp {track['played_at']})"
|
||
)
|
||
return
|
||
|
||
lastfm_call(
|
||
"track.scrobble",
|
||
payload,
|
||
require_env("LASTFM_API_KEY"),
|
||
require_env("LASTFM_SHARED_SECRET"),
|
||
)
|
||
log(f"Scrobbled: {track['artist']} - {track['title']}")
|
||
|
||
|
||
def run_once(state_file: Path, dry_run: bool) -> int:
|
||
now_playing = fetch_now_playing()
|
||
track = extract_track(now_playing)
|
||
if not track:
|
||
log("No usable artist/title found in AzuraCast now-playing data.")
|
||
return 0
|
||
|
||
state = load_state(state_file)
|
||
prune_state(state)
|
||
key = track_key(track)
|
||
now_ts = int(time.time())
|
||
|
||
if key not in state.setdefault("now_playing", {}):
|
||
update_now_playing(track, dry_run=dry_run)
|
||
state["now_playing"][key] = now_ts
|
||
|
||
threshold = scrobble_threshold(track)
|
||
if track["elapsed"] < threshold:
|
||
log(
|
||
"Waiting before scrobble: "
|
||
f"{track['artist']} - {track['title']} "
|
||
f"({track['elapsed']}s/{threshold}s)"
|
||
)
|
||
save_state(state_file, state)
|
||
return 0
|
||
|
||
if key in state.setdefault("scrobbled", {}):
|
||
log(f"Already scrobbled: {track['artist']} - {track['title']}")
|
||
save_state(state_file, state)
|
||
return 0
|
||
|
||
scrobble_track(track, dry_run=dry_run)
|
||
state["scrobbled"][key] = now_ts
|
||
save_state(state_file, state)
|
||
return 0
|
||
|
||
|
||
def get_lastfm_token() -> int:
|
||
api_key = require_env("LASTFM_API_KEY")
|
||
shared_secret = require_env("LASTFM_SHARED_SECRET")
|
||
response = lastfm_call("auth.getToken", {}, api_key, shared_secret)
|
||
token = response.get("token")
|
||
if not token:
|
||
raise ApiError(f"Last.fm did not return a token: {response}")
|
||
|
||
params = urllib.parse.urlencode({"api_key": api_key, "token": token})
|
||
print(f"Token: {token}")
|
||
print(f"Open and approve: {LASTFM_AUTH_URL}?{params}")
|
||
return 0
|
||
|
||
|
||
def get_lastfm_session(token: str) -> int:
|
||
response = lastfm_call(
|
||
"auth.getSession",
|
||
{"token": token},
|
||
require_env("LASTFM_API_KEY"),
|
||
require_env("LASTFM_SHARED_SECRET"),
|
||
)
|
||
session = response.get("session") or {}
|
||
key = session.get("key")
|
||
name = session.get("name", "(unknown)")
|
||
if not key:
|
||
raise ApiError(f"Last.fm did not return a session key: {response}")
|
||
|
||
print(f"Last.fm user: {name}")
|
||
print(f"LASTFM_SESSION_KEY={key}")
|
||
return 0
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(
|
||
description="Scrobble AzuraCast now-playing tracks to Last.fm."
|
||
)
|
||
parser.add_argument(
|
||
"--env-file",
|
||
default=str(SCRIPT_DIR / "lastfm-scrobbler.env"),
|
||
help="Path to KEY=VALUE config file.",
|
||
)
|
||
parser.add_argument(
|
||
"--state-file",
|
||
default=env("LASTFM_SCROBBLER_STATE_FILE", str(DEFAULT_STATE_FILE)),
|
||
help="Path to state JSON file.",
|
||
)
|
||
parser.add_argument("--dry-run", action="store_true", help="Do not call Last.fm.")
|
||
parser.add_argument(
|
||
"--daemon",
|
||
action="store_true",
|
||
help="Run forever instead of exiting after one check.",
|
||
)
|
||
parser.add_argument(
|
||
"--interval",
|
||
type=int,
|
||
default=getenv_int("LASTFM_SCROBBLER_INTERVAL_SECONDS", 60),
|
||
help="Daemon check interval in seconds.",
|
||
)
|
||
parser.add_argument(
|
||
"--get-token",
|
||
action="store_true",
|
||
help="Create a Last.fm auth token and print the approval URL.",
|
||
)
|
||
parser.add_argument(
|
||
"--get-session",
|
||
metavar="TOKEN",
|
||
help="Exchange an approved Last.fm token for LASTFM_SESSION_KEY.",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def main() -> int:
|
||
args = parse_args()
|
||
load_env_file(Path(args.env_file))
|
||
|
||
try:
|
||
if args.get_token:
|
||
return get_lastfm_token()
|
||
if args.get_session:
|
||
return get_lastfm_session(args.get_session)
|
||
|
||
state_file = Path(args.state_file)
|
||
if not args.daemon:
|
||
return run_once(state_file=state_file, dry_run=args.dry_run)
|
||
|
||
while True:
|
||
try:
|
||
run_once(state_file=state_file, dry_run=args.dry_run)
|
||
except Exception as exc: # noqa: BLE001 - daemon should keep running.
|
||
log(f"ERROR: {exc}")
|
||
time.sleep(max(10, args.interval))
|
||
except (ConfigError, ApiError) as exc:
|
||
print(f"ERROR: {exc}", file=sys.stderr)
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|