Files
azuracast-lastfm-scrobbler/lastfm_scrobbler.py
Patrick Asmus 7a28f665e0 Initial
2026-06-27 21:08:42 +02:00

463 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Scrobble AzuraCast "now playing" tracks to Last.fm.
The script is designed for cron: each run fetches the current AzuraCast track,
updates Last.fm "now playing" once per track, and scrobbles once the track has
played long enough.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
LASTFM_API_URL = "https://ws.audioscrobbler.com/2.0/"
LASTFM_AUTH_URL = "https://www.last.fm/api/auth/"
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_STATE_FILE = SCRIPT_DIR / ".lastfm_scrobbler_state.json"
class ConfigError(RuntimeError):
pass
class ApiError(RuntimeError):
pass
def log(message: str) -> None:
print(time.strftime("%Y-%m-%d %H:%M:%S"), message, flush=True)
def load_env_file(path: Path) -> None:
if not path.exists():
return
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def env(name: str, default: str | None = None) -> str | None:
value = os.environ.get(name)
if value is None or value == "":
return default
return value
def require_env(name: str) -> str:
value = env(name)
if not value:
raise ConfigError(f"Missing required environment variable: {name}")
return value
def getenv_int(name: str, default: int) -> int:
value = env(name)
if value is None:
return default
try:
return int(value)
except ValueError as exc:
raise ConfigError(f"{name} must be an integer, got: {value}") from exc
def getenv_bool(name: str, default: bool) -> bool:
value = env(name)
if value is None:
return default
normalized = value.strip().casefold()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ConfigError(f"{name} must be a boolean value, got: {value}")
def http_request(
url: str,
data: dict[str, Any] | None = None,
timeout: int = 15,
) -> dict[str, Any]:
encoded_data = None
headers = {"User-Agent": "azuracast-lastfm-scrobbler/1.0"}
if data is not None:
encoded_data = urllib.parse.urlencode(data).encode("utf-8")
headers["Content-Type"] = "application/x-www-form-urlencoded"
request = urllib.request.Request(url, data=encoded_data, headers=headers)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
details = exc.read().decode("utf-8", errors="replace")
raise ApiError(f"HTTP {exc.code} from {url}: {details}") from exc
except urllib.error.URLError as exc:
raise ApiError(f"Cannot reach {url}: {exc}") from exc
try:
payload = json.loads(body)
except json.JSONDecodeError as exc:
raise ApiError(f"Invalid JSON from {url}: {body[:500]}") from exc
if isinstance(payload, dict) and payload.get("error"):
message = payload.get("message", "Unknown Last.fm error")
raise ApiError(f"Last.fm error {payload.get('error')}: {message}")
return payload
def sign_lastfm(params: dict[str, Any], shared_secret: str) -> str:
signature_source = ""
for key in sorted(params):
if key in {"format", "callback"}:
continue
signature_source += f"{key}{params[key]}"
signature_source += shared_secret
return hashlib.md5(signature_source.encode("utf-8")).hexdigest()
def lastfm_call(
method: str,
params: dict[str, Any],
api_key: str,
shared_secret: str,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"method": method,
"api_key": api_key,
"format": "json",
**params,
}
payload["api_sig"] = sign_lastfm(payload, shared_secret)
return http_request(LASTFM_API_URL, data=payload)
def build_nowplaying_urls() -> list[str]:
explicit_url = env("AZURACAST_NOWPLAYING_URL")
if explicit_url:
return [explicit_url]
base_url = require_env("AZURACAST_BASE_URL").rstrip("/")
station = env("AZURACAST_STATION_SHORTCODE", env("AZURACAST_STATION", "main"))
quoted_station = urllib.parse.quote(str(station), safe="")
return [
f"{base_url}/api/nowplaying_static/{quoted_station}.json",
f"{base_url}/api/nowplaying/{quoted_station}",
]
def fetch_now_playing() -> dict[str, Any]:
errors: list[str] = []
for url in build_nowplaying_urls():
try:
return http_request(url)
except ApiError as exc:
errors.append(str(exc))
raise ApiError("; ".join(errors))
def clean_text(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def split_artist_title(text: str) -> tuple[str, str]:
for separator in (" - ", " ", ""):
if separator in text:
artist, title = text.split(separator, 1)
return artist.strip(), title.strip()
return "", text.strip()
def extract_track(now_playing: dict[str, Any]) -> dict[str, Any] | None:
current = now_playing.get("now_playing") or {}
song = current.get("song") or {}
artist = clean_text(song.get("artist"))
title = clean_text(song.get("title"))
album = clean_text(song.get("album"))
text = clean_text(song.get("text"))
if (not artist or not title) and text:
parsed_artist, parsed_title = split_artist_title(text)
artist = artist or parsed_artist
title = title or parsed_title
elapsed = int(current.get("elapsed") or 0)
duration = int(current.get("duration") or 0)
played_at = int(current.get("played_at") or (time.time() - elapsed))
if not artist or not title:
return None
return {
"artist": artist,
"title": title,
"album": album,
"elapsed": max(0, elapsed),
"duration": max(0, duration),
"played_at": max(0, played_at),
}
def load_state(path: Path) -> dict[str, Any]:
if not path.exists():
return {"scrobbled": {}, "now_playing": {}}
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"scrobbled": {}, "now_playing": {}}
def save_state(path: Path, state: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(path.suffix + ".tmp")
tmp_path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
tmp_path.replace(path)
def track_key(track: dict[str, Any]) -> str:
stable = "|".join(
[
track["artist"].casefold(),
track["title"].casefold(),
track.get("album", "").casefold(),
str(track["played_at"]),
]
)
return hashlib.sha1(stable.encode("utf-8")).hexdigest()
def prune_state(state: dict[str, Any]) -> None:
cutoff = int(time.time()) - (14 * 24 * 60 * 60)
for bucket in ("scrobbled", "now_playing"):
entries = state.setdefault(bucket, {})
stale = [key for key, value in entries.items() if int(value or 0) < cutoff]
for key in stale:
del entries[key]
def scrobble_threshold(track: dict[str, Any]) -> int:
default_no_duration = getenv_int("LASTFM_NO_DURATION_SCROBBLE_SECONDS", 240)
min_elapsed = getenv_int("LASTFM_MIN_SCROBBLE_ELAPSED_SECONDS", 30)
duration = int(track.get("duration") or 0)
if duration <= 0:
return max(min_elapsed, default_no_duration)
return max(min_elapsed, min(duration // 2, 240))
def update_now_playing(track: dict[str, Any], dry_run: bool) -> None:
payload = {
"artist": track["artist"],
"track": track["title"],
"sk": require_env("LASTFM_SESSION_KEY"),
}
if getenv_bool("LASTFM_SEND_ALBUM", True) and track.get("album"):
payload["album"] = track["album"]
if track.get("duration"):
payload["duration"] = str(track["duration"])
if dry_run:
log(f"DRY RUN now playing: {track['artist']} - {track['title']}")
return
lastfm_call(
"track.updateNowPlaying",
payload,
require_env("LASTFM_API_KEY"),
require_env("LASTFM_SHARED_SECRET"),
)
log(f"Updated Last.fm now playing: {track['artist']} - {track['title']}")
def scrobble_track(track: dict[str, Any], dry_run: bool) -> None:
payload = {
"artist": track["artist"],
"track": track["title"],
"timestamp": str(track["played_at"]),
"sk": require_env("LASTFM_SESSION_KEY"),
}
if getenv_bool("LASTFM_SEND_ALBUM", True) and track.get("album"):
payload["album"] = track["album"]
if track.get("duration"):
payload["duration"] = str(track["duration"])
if dry_run:
log(
"DRY RUN scrobble: "
f"{track['artist']} - {track['title']} "
f"(timestamp {track['played_at']})"
)
return
lastfm_call(
"track.scrobble",
payload,
require_env("LASTFM_API_KEY"),
require_env("LASTFM_SHARED_SECRET"),
)
log(f"Scrobbled: {track['artist']} - {track['title']}")
def run_once(state_file: Path, dry_run: bool) -> int:
now_playing = fetch_now_playing()
track = extract_track(now_playing)
if not track:
log("No usable artist/title found in AzuraCast now-playing data.")
return 0
state = load_state(state_file)
prune_state(state)
key = track_key(track)
now_ts = int(time.time())
if key not in state.setdefault("now_playing", {}):
update_now_playing(track, dry_run=dry_run)
state["now_playing"][key] = now_ts
threshold = scrobble_threshold(track)
if track["elapsed"] < threshold:
log(
"Waiting before scrobble: "
f"{track['artist']} - {track['title']} "
f"({track['elapsed']}s/{threshold}s)"
)
save_state(state_file, state)
return 0
if key in state.setdefault("scrobbled", {}):
log(f"Already scrobbled: {track['artist']} - {track['title']}")
save_state(state_file, state)
return 0
scrobble_track(track, dry_run=dry_run)
state["scrobbled"][key] = now_ts
save_state(state_file, state)
return 0
def get_lastfm_token() -> int:
api_key = require_env("LASTFM_API_KEY")
shared_secret = require_env("LASTFM_SHARED_SECRET")
response = lastfm_call("auth.getToken", {}, api_key, shared_secret)
token = response.get("token")
if not token:
raise ApiError(f"Last.fm did not return a token: {response}")
params = urllib.parse.urlencode({"api_key": api_key, "token": token})
print(f"Token: {token}")
print(f"Open and approve: {LASTFM_AUTH_URL}?{params}")
return 0
def get_lastfm_session(token: str) -> int:
response = lastfm_call(
"auth.getSession",
{"token": token},
require_env("LASTFM_API_KEY"),
require_env("LASTFM_SHARED_SECRET"),
)
session = response.get("session") or {}
key = session.get("key")
name = session.get("name", "(unknown)")
if not key:
raise ApiError(f"Last.fm did not return a session key: {response}")
print(f"Last.fm user: {name}")
print(f"LASTFM_SESSION_KEY={key}")
return 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Scrobble AzuraCast now-playing tracks to Last.fm."
)
parser.add_argument(
"--env-file",
default=str(SCRIPT_DIR / "lastfm-scrobbler.env"),
help="Path to KEY=VALUE config file.",
)
parser.add_argument(
"--state-file",
default=env("LASTFM_SCROBBLER_STATE_FILE", str(DEFAULT_STATE_FILE)),
help="Path to state JSON file.",
)
parser.add_argument("--dry-run", action="store_true", help="Do not call Last.fm.")
parser.add_argument(
"--daemon",
action="store_true",
help="Run forever instead of exiting after one check.",
)
parser.add_argument(
"--interval",
type=int,
default=getenv_int("LASTFM_SCROBBLER_INTERVAL_SECONDS", 60),
help="Daemon check interval in seconds.",
)
parser.add_argument(
"--get-token",
action="store_true",
help="Create a Last.fm auth token and print the approval URL.",
)
parser.add_argument(
"--get-session",
metavar="TOKEN",
help="Exchange an approved Last.fm token for LASTFM_SESSION_KEY.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
load_env_file(Path(args.env_file))
try:
if args.get_token:
return get_lastfm_token()
if args.get_session:
return get_lastfm_session(args.get_session)
state_file = Path(args.state_file)
if not args.daemon:
return run_once(state_file=state_file, dry_run=args.dry_run)
while True:
try:
run_once(state_file=state_file, dry_run=args.dry_run)
except Exception as exc: # noqa: BLE001 - daemon should keep running.
log(f"ERROR: {exc}")
time.sleep(max(10, args.interval))
except (ConfigError, ApiError) as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())