This commit is contained in:
Patrick Asmus
2026-06-27 21:08:42 +02:00
parent 698c7bec45
commit 7a28f665e0
5 changed files with 722 additions and 22 deletions

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
lastfm-scrobbler.env
.lastfm_scrobbler_state.json
lastfm-scrobbler.log
*.log
__pycache__/
*.py[cod]

35
LICENSE
View File

@@ -1,26 +1,25 @@
GNU AFFERO GENERAL PUBLIC LICENSE
Version 3, 19 November 2007
MIT License
---
Copyright (c) 2026 Patrick Asmus
Copyright ©
Name: Patrick Asmus (scriptos)
Email: support@techniverse.net
Website: https://www.patrick-asmus.de
Blog: https://www.cleveradmin.de
---
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

196
README.md
View File

@@ -4,10 +4,10 @@
</a>
</p>
<h1 align="center">Name des Projekts</h1>
<h1 align="center">AzuraCast Last.fm Scrobbler</h1>
<h4 align="center">
Kurzbeschreibung des Projekts/Anwendung, um die es geht
Kleines Python-Script, das aktuell laufende AzuraCast-Tracks an Last.fm scrobbelt.
</h4>
<h6 align="center">
@@ -21,9 +21,197 @@
</h6>
<br><br>
## Überblick
CONTENT BEREICH
Dieses Script liest die öffentlichen Now-Playing-Daten einer AzuraCast-Station und meldet den aktuellen Song an Last.fm. Es ist für Cron gedacht: Jeder Lauf prüft den aktuellen Track, setzt einmal pro Track den Last.fm-Status "Now Playing" und scrobbelt den Track, sobald genug Laufzeit vergangen ist.
## Funktionen
- liest AzuraCast Now-Playing-Daten über die öffentliche API
- setzt Last.fm "Now Playing"
- scrobbelt Tracks nur einmal pro AzuraCast-Play
- speichert den lokalen Zustand in einer kleinen JSON-Datei
- braucht keine externen Python-Pakete
- kann per Cron oder als Dauerprozess laufen
- kann Albumdaten optional an Last.fm mitsenden oder weglassen
## Dateien
- `lastfm_scrobbler.py`: das Script
- `lastfm-scrobbler.env.example`: Beispiel-Konfiguration
- `lastfm-scrobbler.env`: echte Konfiguration, wird nicht ins Git aufgenommen
- `.lastfm_scrobbler_state.json`: lokaler Zustand, wird automatisch erstellt
## Voraussetzungen
- Python 3
- erreichbare AzuraCast-Instanz
- Last.fm API-Key und Shared Secret
- Last.fm Session-Key für den Account, auf den gescrobbelt werden soll
## Installation
Beispiel im AzuraCast-Projektordner:
```bash
cd /mnt/local/dockerdata2/docker-container/azuracast
mkdir -p scripts
cp lastfm_scrobbler.py scripts/
cp lastfm-scrobbler.env.example scripts/lastfm-scrobbler.env
chmod +x scripts/lastfm_scrobbler.py
chmod 600 scripts/lastfm-scrobbler.env
```
Dann `scripts/lastfm-scrobbler.env` ausfüllen:
```bash
AZURACAST_BASE_URL=https://deine-azuracast-url.example
AZURACAST_STATION_SHORTCODE=main
LASTFM_API_KEY=...
LASTFM_SHARED_SECRET=...
LASTFM_SESSION_KEY=...
LASTFM_SEND_ALBUM=1
```
Alternativ kann statt `AZURACAST_BASE_URL` und `AZURACAST_STATION_SHORTCODE` direkt eine Now-Playing-URL gesetzt werden:
```bash
AZURACAST_NOWPLAYING_URL=https://deine-azuracast-url.example/api/nowplaying_static/main.json
```
## Last.fm Session-Key erzeugen
Last.fm API-Key und Shared Secret erstellst du unter:
```text
https://www.last.fm/api/account/create
```
Danach im Script-Verzeichnis:
```bash
./lastfm_scrobbler.py --get-token
```
Das Script gibt einen Token und eine Freigabe-URL aus. Die URL im Browser öffnen und den Zugriff erlauben.
Danach den Token aus der ersten Ausgabe verwenden:
```bash
./lastfm_scrobbler.py --get-session TOKEN_AUS_DER_AUSGABE
```
`TOKEN_AUS_DER_AUSGABE` ist nur ein Platzhalter. Dort muss der echte Token stehen, der vorher bei `--get-token` hinter `Token:` ausgegeben wurde.
Die Ausgabe enthält dann:
```bash
LASTFM_SESSION_KEY=...
```
Diesen Wert in `lastfm-scrobbler.env` eintragen.
## Testlauf
```bash
./lastfm_scrobbler.py --dry-run
```
Wenn das gut aussieht:
```bash
./lastfm_scrobbler.py
```
## Cron
Empfohlen: jede Minute ausführen.
Wenn Script und Env im `scripts/`-Ordner des AzuraCast-Projekts liegen:
```cron
* * * * * cd /mnt/local/dockerdata2/docker-container/azuracast && /usr/bin/env python3 scripts/lastfm_scrobbler.py >> scripts/lastfm-scrobbler.log 2>&1
```
Cron bearbeiten:
```bash
crontab -e
```
## Dauerlauf statt Cron
```bash
./lastfm_scrobbler.py --daemon --interval 60
```
Cron ist für diese Aufgabe meistens einfacher und robuster.
## Scrobble-Logik
Das Script scrobbelt nicht sofort beim ersten Erkennen eines Songs. Es wartet, bis mindestens die Hälfte des Tracks oder maximal 4 Minuten gespielt wurden.
Bei fehlender Track-Länge wartet es standardmäßig 240 Sekunden. Dadurch werden Songs nicht gescrobbelt, wenn sie nur kurz angelaufen sind.
Damit Cron den gleichen Song nicht mehrfach scrobbelt, merkt sich das Script bereits verarbeitete Tracks in:
```bash
.lastfm_scrobbler_state.json
```
Der interne Schlüssel basiert auf Artist, Titel, Album und AzuraCast-`played_at`.
## Albumdaten
Last.fm erlaubt beim Scrobbeln optionale Albumdaten, aber kein Coverbild. Cover werden von Last.fm selbst anhand der Metadaten gematched.
Wenn AzuraCast saubere Albumdaten liefert, kann das Matching besser werden:
```bash
LASTFM_SEND_ALBUM=1
```
Wenn Albumdaten ungenau sind oder Last.fm dadurch schlechter matched, kann man sie abschalten:
```bash
LASTFM_SEND_ALBUM=0
```
## Wichtige Optionen
```bash
LASTFM_MIN_SCROBBLE_ELAPSED_SECONDS=30
```
Untergrenze. Vor dieser Laufzeit wird nie gescrobbelt.
```bash
LASTFM_NO_DURATION_SCROBBLE_SECONDS=240
```
Fallback, falls AzuraCast keine Track-Länge liefert.
```bash
LASTFM_SCROBBLER_INTERVAL_SECONDS=60
```
Nur für `--daemon` relevant. Bei Cron entscheidet der Cron-Eintrag.
## Sicherheit
Die echte Datei `lastfm-scrobbler.env` enthält API-Key, Shared Secret und Session-Key. Sie sollte nicht committed werden.
Empfohlene Rechte auf Linux:
```bash
chmod 600 lastfm-scrobbler.env
```
## Quellen
- AzuraCast Now Playing API: https://www.azuracast.com/docs/developers/now-playing-data/
- Last.fm `track.scrobble`: https://www.last.fm/api/show/track.scrobble
- Last.fm Auth: https://www.last.fm/api/authspec
<br><br>
<p align="center">
@@ -34,4 +222,4 @@ CONTENT BEREICH
<sub>
© Patrick Asmus · Techniverse Network · <a href="./LICENSE">Lizenz</a>
</sub>
</p>
</p>

View File

@@ -0,0 +1,45 @@
# Kopieren nach: lastfm-scrobbler.env
# Danach schützen: chmod 600 lastfm-scrobbler.env
# Öffentliche AzuraCast-URL ohne Slash am Ende.
# Beispiel: https://radio.example.org
AZURACAST_BASE_URL=
# AzuraCast-Stations-Shortcode. Wenn die Station "Main" heißt, ist das oft "main".
AZURACAST_STATION_SHORTCODE=main
# Optional: direkte Now-Playing-URL statt AZURACAST_BASE_URL + Stations-Shortcode.
# AZURACAST_NOWPLAYING_URL=https://radio.example.org/api/nowplaying_static/main.json
# API-Zugang erstellen unter: https://www.last.fm/api/account/create
LASTFM_API_KEY=
LASTFM_SHARED_SECRET=
# LASTFM_SESSION_KEY wird nicht direkt bei Last.fm angezeigt.
# Der Ablauf ist:
#
# 1. Token und Freigabe-URL erzeugen:
# ./lastfm_scrobbler.py --get-token
#
# 2. Die ausgegebene URL im Browser öffnen und erlauben.
#
# 3. Danach den echten Token aus Schritt 1 einsetzen, nicht den Platzhalter:
# ./lastfm_scrobbler.py --get-session DEIN_ECHTER_TOKEN
#
# 4. Die Ausgabe sieht dann so aus:
# LASTFM_SESSION_KEY=...
# Nur diesen Session-Key hier eintragen.
LASTFM_SESSION_KEY=
# Optionales Feintuning.
# 1 = Album mitsenden, 0 = Album weglassen.
LASTFM_SEND_ALBUM=1
# Untergrenze: vor dieser Laufzeit wird nie gescrobbelt.
LASTFM_MIN_SCROBBLE_ELAPSED_SECONDS=30
# Fallback, falls AzuraCast keine Track-Länge liefert.
LASTFM_NO_DURATION_SCROBBLE_SECONDS=240
# Nur für --daemon relevant. Bei Cron entscheidet der Cron-Intervall.
LASTFM_SCROBBLER_INTERVAL_SECONDS=60

462
lastfm_scrobbler.py Normal file
View File

@@ -0,0 +1,462 @@
#!/usr/bin/env python3
"""
Scrobble AzuraCast "now playing" tracks to Last.fm.
The script is designed for cron: each run fetches the current AzuraCast track,
updates Last.fm "now playing" once per track, and scrobbles once the track has
played long enough.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
LASTFM_API_URL = "https://ws.audioscrobbler.com/2.0/"
LASTFM_AUTH_URL = "https://www.last.fm/api/auth/"
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_STATE_FILE = SCRIPT_DIR / ".lastfm_scrobbler_state.json"
class ConfigError(RuntimeError):
pass
class ApiError(RuntimeError):
pass
def log(message: str) -> None:
print(time.strftime("%Y-%m-%d %H:%M:%S"), message, flush=True)
def load_env_file(path: Path) -> None:
if not path.exists():
return
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def env(name: str, default: str | None = None) -> str | None:
value = os.environ.get(name)
if value is None or value == "":
return default
return value
def require_env(name: str) -> str:
value = env(name)
if not value:
raise ConfigError(f"Missing required environment variable: {name}")
return value
def getenv_int(name: str, default: int) -> int:
value = env(name)
if value is None:
return default
try:
return int(value)
except ValueError as exc:
raise ConfigError(f"{name} must be an integer, got: {value}") from exc
def getenv_bool(name: str, default: bool) -> bool:
value = env(name)
if value is None:
return default
normalized = value.strip().casefold()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ConfigError(f"{name} must be a boolean value, got: {value}")
def http_request(
url: str,
data: dict[str, Any] | None = None,
timeout: int = 15,
) -> dict[str, Any]:
encoded_data = None
headers = {"User-Agent": "azuracast-lastfm-scrobbler/1.0"}
if data is not None:
encoded_data = urllib.parse.urlencode(data).encode("utf-8")
headers["Content-Type"] = "application/x-www-form-urlencoded"
request = urllib.request.Request(url, data=encoded_data, headers=headers)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
details = exc.read().decode("utf-8", errors="replace")
raise ApiError(f"HTTP {exc.code} from {url}: {details}") from exc
except urllib.error.URLError as exc:
raise ApiError(f"Cannot reach {url}: {exc}") from exc
try:
payload = json.loads(body)
except json.JSONDecodeError as exc:
raise ApiError(f"Invalid JSON from {url}: {body[:500]}") from exc
if isinstance(payload, dict) and payload.get("error"):
message = payload.get("message", "Unknown Last.fm error")
raise ApiError(f"Last.fm error {payload.get('error')}: {message}")
return payload
def sign_lastfm(params: dict[str, Any], shared_secret: str) -> str:
signature_source = ""
for key in sorted(params):
if key in {"format", "callback"}:
continue
signature_source += f"{key}{params[key]}"
signature_source += shared_secret
return hashlib.md5(signature_source.encode("utf-8")).hexdigest()
def lastfm_call(
method: str,
params: dict[str, Any],
api_key: str,
shared_secret: str,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"method": method,
"api_key": api_key,
"format": "json",
**params,
}
payload["api_sig"] = sign_lastfm(payload, shared_secret)
return http_request(LASTFM_API_URL, data=payload)
def build_nowplaying_urls() -> list[str]:
explicit_url = env("AZURACAST_NOWPLAYING_URL")
if explicit_url:
return [explicit_url]
base_url = require_env("AZURACAST_BASE_URL").rstrip("/")
station = env("AZURACAST_STATION_SHORTCODE", env("AZURACAST_STATION", "main"))
quoted_station = urllib.parse.quote(str(station), safe="")
return [
f"{base_url}/api/nowplaying_static/{quoted_station}.json",
f"{base_url}/api/nowplaying/{quoted_station}",
]
def fetch_now_playing() -> dict[str, Any]:
errors: list[str] = []
for url in build_nowplaying_urls():
try:
return http_request(url)
except ApiError as exc:
errors.append(str(exc))
raise ApiError("; ".join(errors))
def clean_text(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def split_artist_title(text: str) -> tuple[str, str]:
for separator in (" - ", " ", ""):
if separator in text:
artist, title = text.split(separator, 1)
return artist.strip(), title.strip()
return "", text.strip()
def extract_track(now_playing: dict[str, Any]) -> dict[str, Any] | None:
current = now_playing.get("now_playing") or {}
song = current.get("song") or {}
artist = clean_text(song.get("artist"))
title = clean_text(song.get("title"))
album = clean_text(song.get("album"))
text = clean_text(song.get("text"))
if (not artist or not title) and text:
parsed_artist, parsed_title = split_artist_title(text)
artist = artist or parsed_artist
title = title or parsed_title
elapsed = int(current.get("elapsed") or 0)
duration = int(current.get("duration") or 0)
played_at = int(current.get("played_at") or (time.time() - elapsed))
if not artist or not title:
return None
return {
"artist": artist,
"title": title,
"album": album,
"elapsed": max(0, elapsed),
"duration": max(0, duration),
"played_at": max(0, played_at),
}
def load_state(path: Path) -> dict[str, Any]:
if not path.exists():
return {"scrobbled": {}, "now_playing": {}}
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"scrobbled": {}, "now_playing": {}}
def save_state(path: Path, state: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(path.suffix + ".tmp")
tmp_path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
tmp_path.replace(path)
def track_key(track: dict[str, Any]) -> str:
stable = "|".join(
[
track["artist"].casefold(),
track["title"].casefold(),
track.get("album", "").casefold(),
str(track["played_at"]),
]
)
return hashlib.sha1(stable.encode("utf-8")).hexdigest()
def prune_state(state: dict[str, Any]) -> None:
cutoff = int(time.time()) - (14 * 24 * 60 * 60)
for bucket in ("scrobbled", "now_playing"):
entries = state.setdefault(bucket, {})
stale = [key for key, value in entries.items() if int(value or 0) < cutoff]
for key in stale:
del entries[key]
def scrobble_threshold(track: dict[str, Any]) -> int:
default_no_duration = getenv_int("LASTFM_NO_DURATION_SCROBBLE_SECONDS", 240)
min_elapsed = getenv_int("LASTFM_MIN_SCROBBLE_ELAPSED_SECONDS", 30)
duration = int(track.get("duration") or 0)
if duration <= 0:
return max(min_elapsed, default_no_duration)
return max(min_elapsed, min(duration // 2, 240))
def update_now_playing(track: dict[str, Any], dry_run: bool) -> None:
payload = {
"artist": track["artist"],
"track": track["title"],
"sk": require_env("LASTFM_SESSION_KEY"),
}
if getenv_bool("LASTFM_SEND_ALBUM", True) and track.get("album"):
payload["album"] = track["album"]
if track.get("duration"):
payload["duration"] = str(track["duration"])
if dry_run:
log(f"DRY RUN now playing: {track['artist']} - {track['title']}")
return
lastfm_call(
"track.updateNowPlaying",
payload,
require_env("LASTFM_API_KEY"),
require_env("LASTFM_SHARED_SECRET"),
)
log(f"Updated Last.fm now playing: {track['artist']} - {track['title']}")
def scrobble_track(track: dict[str, Any], dry_run: bool) -> None:
payload = {
"artist": track["artist"],
"track": track["title"],
"timestamp": str(track["played_at"]),
"sk": require_env("LASTFM_SESSION_KEY"),
}
if getenv_bool("LASTFM_SEND_ALBUM", True) and track.get("album"):
payload["album"] = track["album"]
if track.get("duration"):
payload["duration"] = str(track["duration"])
if dry_run:
log(
"DRY RUN scrobble: "
f"{track['artist']} - {track['title']} "
f"(timestamp {track['played_at']})"
)
return
lastfm_call(
"track.scrobble",
payload,
require_env("LASTFM_API_KEY"),
require_env("LASTFM_SHARED_SECRET"),
)
log(f"Scrobbled: {track['artist']} - {track['title']}")
def run_once(state_file: Path, dry_run: bool) -> int:
now_playing = fetch_now_playing()
track = extract_track(now_playing)
if not track:
log("No usable artist/title found in AzuraCast now-playing data.")
return 0
state = load_state(state_file)
prune_state(state)
key = track_key(track)
now_ts = int(time.time())
if key not in state.setdefault("now_playing", {}):
update_now_playing(track, dry_run=dry_run)
state["now_playing"][key] = now_ts
threshold = scrobble_threshold(track)
if track["elapsed"] < threshold:
log(
"Waiting before scrobble: "
f"{track['artist']} - {track['title']} "
f"({track['elapsed']}s/{threshold}s)"
)
save_state(state_file, state)
return 0
if key in state.setdefault("scrobbled", {}):
log(f"Already scrobbled: {track['artist']} - {track['title']}")
save_state(state_file, state)
return 0
scrobble_track(track, dry_run=dry_run)
state["scrobbled"][key] = now_ts
save_state(state_file, state)
return 0
def get_lastfm_token() -> int:
api_key = require_env("LASTFM_API_KEY")
shared_secret = require_env("LASTFM_SHARED_SECRET")
response = lastfm_call("auth.getToken", {}, api_key, shared_secret)
token = response.get("token")
if not token:
raise ApiError(f"Last.fm did not return a token: {response}")
params = urllib.parse.urlencode({"api_key": api_key, "token": token})
print(f"Token: {token}")
print(f"Open and approve: {LASTFM_AUTH_URL}?{params}")
return 0
def get_lastfm_session(token: str) -> int:
response = lastfm_call(
"auth.getSession",
{"token": token},
require_env("LASTFM_API_KEY"),
require_env("LASTFM_SHARED_SECRET"),
)
session = response.get("session") or {}
key = session.get("key")
name = session.get("name", "(unknown)")
if not key:
raise ApiError(f"Last.fm did not return a session key: {response}")
print(f"Last.fm user: {name}")
print(f"LASTFM_SESSION_KEY={key}")
return 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Scrobble AzuraCast now-playing tracks to Last.fm."
)
parser.add_argument(
"--env-file",
default=str(SCRIPT_DIR / "lastfm-scrobbler.env"),
help="Path to KEY=VALUE config file.",
)
parser.add_argument(
"--state-file",
default=env("LASTFM_SCROBBLER_STATE_FILE", str(DEFAULT_STATE_FILE)),
help="Path to state JSON file.",
)
parser.add_argument("--dry-run", action="store_true", help="Do not call Last.fm.")
parser.add_argument(
"--daemon",
action="store_true",
help="Run forever instead of exiting after one check.",
)
parser.add_argument(
"--interval",
type=int,
default=getenv_int("LASTFM_SCROBBLER_INTERVAL_SECONDS", 60),
help="Daemon check interval in seconds.",
)
parser.add_argument(
"--get-token",
action="store_true",
help="Create a Last.fm auth token and print the approval URL.",
)
parser.add_argument(
"--get-session",
metavar="TOKEN",
help="Exchange an approved Last.fm token for LASTFM_SESSION_KEY.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
load_env_file(Path(args.env_file))
try:
if args.get_token:
return get_lastfm_token()
if args.get_session:
return get_lastfm_session(args.get_session)
state_file = Path(args.state_file)
if not args.daemon:
return run_once(state_file=state_file, dry_run=args.dry_run)
while True:
try:
run_once(state_file=state_file, dry_run=args.dry_run)
except Exception as exc: # noqa: BLE001 - daemon should keep running.
log(f"ERROR: {exc}")
time.sleep(max(10, args.interval))
except (ConfigError, ApiError) as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())