Ha vállalkozsó vagy, akkor jó ideje már minden általad kiállított és neked kiállított minden belföldi számla ott van digitálisan a NAV-nál. A probléma csak az, hogy ehhez nincs egy egyszerű interfész, ami leszedné neked az adatokat otthonra játszani, a könyvelőkből meg néha úgy kell ezt kisírni. Ezen segítünk most.
Csinálj egy technikai felhasználót a NAV online számlázóban
Az onlineszamla.nav.gov.hu-n klikkelj a Felhasználókra:

Az itt megnyíló oldalon a felhasználókezelésre:

Itt az új felhasználóra:

Majd a technikai felhasználó blokkban a létrehozásra:

Végül adj meg egy jelszót és engedélyezd az új technikai felhasználónak a számlák lekérdezését:

A mentést követően kapsz egy generált felhasznlónevet, XML aláíró és cserekulcsot:

Készítsd elő a scriptnek a Python környezetet
mkdir nav_fetcher
cd nav_fetcher
python3 -m venv venv
source venv/bin/activate
pip install requests python-dotenv lxml openpyxl
deactivate
touch .env
touch nav_fetch_digest.py
touch run.sh
chmod +x run.sh
Definiáld a technikai felhasználó authentikálásához szükséges infókat az előbb létrehozott .env file-ban
NAV_TAX_NUMBER={cég adószám első 8 karakter}
NAV_USER={nav technikai felhasználó név}
NAV_PASSWORD={nav technikai felhasználó jelszó}
NAV_SIGNATURE_KEY={xml aláírókulcs}
NAV_EXCHANGE_KEY={xml cserekulcs}
NAV_API_URL=https://api.onlineszamla.nav.gov.hu/invoiceService/v3%
A {} karakterek nem kellenek, csak az adatok.
Írd meg az előbb létrehozott nav_fetch_digest.py scriptet
import os
import uuid
import hashlib
import base64
import gzip
import zlib
import requests
from pathlib import Path
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from lxml import etree
from openpyxl import Workbook, load_workbook
BASE_DIR = Path(__file__).resolve().parent
load_dotenv(BASE_DIR / ".env")
NAV_TAX_NUMBER = os.getenv("NAV_TAX_NUMBER")
NAV_USER = os.getenv("NAV_USER")
NAV_PASSWORD = os.getenv("NAV_PASSWORD")
NAV_SIGNATURE_KEY = os.getenv("NAV_SIGNATURE_KEY")
NAV_API_URL = os.getenv("NAV_API_URL")
OUTPUT_XLSX = BASE_DIR / "nav_invoice_database.xlsx"
RAW_XML_DIR = BASE_DIR / "raw_xml"
# START_DATE = datetime(2026, 1, 1)
# END_DATE = datetime.now()
HISTORY_START_DATE = datetime(2026, 1, 1)
END_DATE = datetime.now()
LOOKBACK_DAYS = 35
SOFTWARE_ID = "ARADATISZA00000001"
DIGEST_HEADERS = [
"direction",
"invoice_number",
"supplier_tax_number",
"supplier",
"customer_tax_number",
"customer",
"invoice_issue_date",
"payment_date",
"invoice_delivery_date",
"currency",
"net_amount",
"vat_amount",
"gross_amount",
"transaction_id",
"invoice_operation",
"date_from",
"date_to",
"page",
"fetched_at",
"raw_xml_file",
]
ITEM_HEADERS = [
"direction",
"invoice_number",
"supplier_tax_number",
"supplier",
"customer_tax_number",
"customer",
"invoice_issue_date",
"currency",
"line_number",
"description",
"quantity",
"unit",
"unit_price",
"net_amount",
"vat_rate",
"vat_amount",
"gross_amount",
"raw_xml_file",
]
def sha512_upper(value: str) -> str:
return hashlib.sha512(value.encode("utf-8")).hexdigest().upper()
def sha3_512_upper(value: str) -> str:
return hashlib.sha3_512(value.encode("utf-8")).hexdigest().upper()
def make_request_id() -> str:
return "RID" + uuid.uuid4().hex[:27].upper()
def make_timestamps():
now = datetime.now(timezone.utc)
return now.strftime("%Y-%m-%dT%H:%M:%S.000Z"), now.strftime("%Y%m%d%H%M%S")
def require_env():
missing = []
for key in [
"NAV_TAX_NUMBER",
"NAV_USER",
"NAV_PASSWORD",
"NAV_SIGNATURE_KEY",
"NAV_API_URL",
]:
if not os.getenv(key):
missing.append(key)
if missing:
raise RuntimeError("Missing .env values: " + ", ".join(missing))
def common_header_user():
request_id = make_request_id()
xml_timestamp, signature_timestamp = make_timestamps()
password_hash = sha512_upper(NAV_PASSWORD)
request_signature = sha3_512_upper(
request_id + signature_timestamp + NAV_SIGNATURE_KEY
)
return f"""
<common:header>
<common:requestId>{request_id}</common:requestId>
<common:timestamp>{xml_timestamp}</common:timestamp>
<common:requestVersion>3.0</common:requestVersion>
<common:headerVersion>1.0</common:headerVersion>
</common:header>
<common:user>
<common:login>{NAV_USER}</common:login>
<common:passwordHash cryptoType="SHA-512">{password_hash}</common:passwordHash>
<common:taxNumber>{NAV_TAX_NUMBER}</common:taxNumber>
<common:requestSignature cryptoType="SHA3-512">{request_signature}</common:requestSignature>
</common:user>
"""
def software_xml():
return f"""
<software>
<softwareId>{SOFTWARE_ID}</softwareId>
<softwareName>HaromKutya importer</softwareName>
<softwareOperation>LOCAL_SOFTWARE</softwareOperation>
<softwareMainVersion>1.0</softwareMainVersion>
<softwareDevName>Harom Kutya</softwareDevName>
<softwareDevContact>local</softwareDevContact>
<softwareDevCountryCode>HU</softwareDevCountryCode>
<softwareDevTaxNumber>{NAV_TAX_NUMBER}</softwareDevTaxNumber>
</software>
"""
def build_digest_xml(direction: str, date_from: str, date_to: str, page: int):
return f"""<?xml version="1.0" encoding="UTF-8"?>
<QueryInvoiceDigestRequest xmlns:common="http://schemas.nav.gov.hu/NTCA/1.0/common"
xmlns="http://schemas.nav.gov.hu/OSA/3.0/api">
{common_header_user()}
{software_xml()}
<page>{page}</page>
<invoiceDirection>{direction}</invoiceDirection>
<invoiceQueryParams>
<mandatoryQueryParams>
<invoiceIssueDate>
<dateFrom>{date_from}</dateFrom>
<dateTo>{date_to}</dateTo>
</invoiceIssueDate>
</mandatoryQueryParams>
</invoiceQueryParams>
</QueryInvoiceDigestRequest>
"""
def build_invoice_data_xml(direction: str, invoice_number: str, supplier_tax_number: str = ""):
supplier_tag = ""
if direction == "INBOUND" and supplier_tax_number:
supplier_tag = f"<supplierTaxNumber>{supplier_tax_number}</supplierTaxNumber>"
return f"""<?xml version="1.0" encoding="UTF-8"?>
<QueryInvoiceDataRequest xmlns:common="http://schemas.nav.gov.hu/NTCA/1.0/common"
xmlns="http://schemas.nav.gov.hu/OSA/3.0/api">
{common_header_user()}
{software_xml()}
<invoiceNumberQuery>
<invoiceNumber>{escape_xml(invoice_number)}</invoiceNumber>
<invoiceDirection>{direction}</invoiceDirection>
{supplier_tag}
</invoiceNumberQuery>
</QueryInvoiceDataRequest>
"""
def escape_xml(value: str) -> str:
return (
str(value)
.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
.replace("'", "'")
)
def post_nav(endpoint: str, xml: str) -> str:
url = NAV_API_URL.rstrip("/") + "/" + endpoint
response = requests.post(
url,
data=xml.encode("utf-8"),
headers={
"Content-Type": "application/xml",
"Accept": "application/xml",
},
timeout=60,
)
if response.status_code != 200:
print(response.text)
raise RuntimeError(f"NAV HTTP error: {response.status_code}")
return response.text
def text_or_empty(node, xpath: str) -> str:
result = node.xpath(xpath)
return result[0] if result else ""
def to_float(value):
if value in (None, ""):
return None
return float(str(value).replace(",", "."))
def to_excel_date(value):
if not value:
return None
return datetime.strptime(value[:10], "%Y-%m-%d").date()
def safe_filename(value: str) -> str:
value = str(value)
value = "".join(c if c.isalnum() or c in "-_." else "_" for c in value)
return value[:120]
def parse_digest_response(xml_text: str, direction: str):
root = etree.fromstring(xml_text.encode("utf-8"))
func_code = text_or_empty(root, "//*[local-name()='funcCode']/text()")
error_code = text_or_empty(root, "//*[local-name()='errorCode']/text()")
message = text_or_empty(root, "//*[local-name()='message']/text()")
if func_code != "OK":
raise RuntimeError(f"NAV error: {error_code} {message}")
rows = []
for inv in root.xpath("//*[local-name()='invoiceDigest']"):
rows.append({
"direction": direction,
"invoice_number": text_or_empty(inv, ".//*[local-name()='invoiceNumber']/text()"),
"supplier_tax_number": text_or_empty(inv, ".//*[local-name()='supplierTaxNumber']/text()"),
"supplier": text_or_empty(inv, ".//*[local-name()='supplierName']/text()"),
"customer_tax_number": text_or_empty(inv, ".//*[local-name()='customerTaxNumber']/text()"),
"customer": text_or_empty(inv, ".//*[local-name()='customerName']/text()"),
"invoice_issue_date": text_or_empty(inv, ".//*[local-name()='invoiceIssueDate']/text()"),
"payment_date": text_or_empty(inv, ".//*[local-name()='paymentDate']/text()"),
"invoice_delivery_date": text_or_empty(inv, ".//*[local-name()='invoiceDeliveryDate']/text()"),
"currency": text_or_empty(inv, ".//*[local-name()='currency']/text()"),
"net_amount": text_or_empty(inv, ".//*[local-name()='invoiceNetAmount']/text()"),
"vat_amount": text_or_empty(inv, ".//*[local-name()='invoiceVatAmount']/text()"),
"gross_amount": text_or_empty(inv, ".//*[local-name()='invoiceGrossAmount']/text()"),
"transaction_id": text_or_empty(inv, ".//*[local-name()='transactionId']/text()"),
"invoice_operation": text_or_empty(inv, ".//*[local-name()='invoiceOperation']/text()"),
})
return rows
def decode_invoice_data_response(xml_text: str):
root = etree.fromstring(xml_text.encode("utf-8"))
func_code = text_or_empty(root, "//*[local-name()='funcCode']/text()")
error_code = text_or_empty(root, "//*[local-name()='errorCode']/text()")
message = text_or_empty(root, "//*[local-name()='message']/text()")
if func_code != "OK":
raise RuntimeError(f"NAV queryInvoiceData error: {error_code} {message}")
encoded = text_or_empty(root, "//*[local-name()='invoiceData']/text()")
compressed = text_or_empty(root, "//*[local-name()='compressedContent']/text()").lower()
if not encoded:
return ""
data = base64.b64decode(encoded)
if compressed == "true":
try:
data = gzip.decompress(data)
except OSError:
data = zlib.decompress(data)
return data.decode("utf-8")
def save_raw_xml(direction: str, digest_row: dict, invoice_xml: str) -> str:
target_dir = RAW_XML_DIR / direction.lower()
target_dir.mkdir(parents=True, exist_ok=True)
invoice_number = safe_filename(digest_row["invoice_number"])
supplier_tax_number = safe_filename(digest_row["supplier_tax_number"])
issue_date = safe_filename(digest_row["invoice_issue_date"])
filename = f"{issue_date}_{supplier_tax_number}_{invoice_number}.xml"
path = target_dir / filename
path.write_text(invoice_xml, encoding="utf-8")
return str(path)
def parse_invoice_items(invoice_xml: str, digest_row: dict, raw_xml_file: str):
if not invoice_xml.strip():
return []
root = etree.fromstring(invoice_xml.encode("utf-8"))
rows = []
for line in root.xpath("//*[local-name()='line']"):
line_number = text_or_empty(line, ".//*[local-name()='lineNumber']/text()")
description = text_or_empty(line, ".//*[local-name()='lineDescription']/text()")
quantity = text_or_empty(line, ".//*[local-name()='quantity']/text()")
unit = text_or_empty(line, ".//*[local-name()='unitOfMeasure']/text()")
unit_price = text_or_empty(line, ".//*[local-name()='unitPrice']/text()")
net_amount = first_number(line, [
".//*[local-name()='lineNetAmount']/text()",
".//*[local-name()='lineNetAmountHUF']/text()",
])
vat_amount = first_number(line, [
".//*[local-name()='lineVatAmount']/text()",
".//*[local-name()='lineVatAmountHUF']/text()",
])
gross_amount = first_number(line, [
".//*[local-name()='lineGrossAmountNormal']/text()",
".//*[local-name()='lineGrossAmountNormalHUF']/text()",
".//*[local-name()='lineGrossAmountSimplified']/text()",
".//*[local-name()='lineGrossAmountSimplifiedHUF']/text()",
])
vat_rate = extract_vat_rate(line)
rows.append({
"direction": digest_row["direction"],
"invoice_number": digest_row["invoice_number"],
"supplier_tax_number": digest_row["supplier_tax_number"],
"supplier": digest_row["supplier"],
"customer_tax_number": digest_row["customer_tax_number"],
"customer": digest_row["customer"],
"invoice_issue_date": digest_row["invoice_issue_date"],
"currency": digest_row["currency"],
"line_number": line_number,
"description": description,
"quantity": quantity,
"unit": unit,
"unit_price": unit_price,
"net_amount": net_amount,
"vat_rate": vat_rate,
"vat_amount": vat_amount,
"gross_amount": gross_amount,
"raw_xml_file": raw_xml_file,
})
return rows
def first_number(node, xpaths):
for xp in xpaths:
value = text_or_empty(node, xp)
if value != "":
return value
return ""
def extract_vat_rate(line):
value = text_or_empty(line, ".//*[local-name()='vatPercentage']/text()")
if value:
return value
for tag in [
"vatExemption",
"vatOutOfScope",
"vatDomesticReverseCharge",
"marginSchemeIndicator",
]:
found = line.xpath(f".//*[local-name()='{tag}']")
if found:
return tag
return ""
def date_blocks(start: datetime, end: datetime, max_days: int = 35):
current = start
while current <= end:
block_end = min(current + timedelta(days=max_days - 1), end)
yield current.strftime("%Y-%m-%d"), block_end.strftime("%Y-%m-%d")
current = block_end + timedelta(days=1)
def ensure_workbook():
if OUTPUT_XLSX.exists():
wb = load_workbook(OUTPUT_XLSX)
else:
wb = Workbook()
wb.remove(wb.active)
for sheet_name, headers in [
("inbound_digest", DIGEST_HEADERS),
("outbound_digest", DIGEST_HEADERS),
("inbound_items", ITEM_HEADERS),
("outbound_items", ITEM_HEADERS),
]:
if sheet_name not in wb.sheetnames:
ws = wb.create_sheet(sheet_name)
ws.append(headers)
wb.save(OUTPUT_XLSX)
return wb
def existing_digest_keys(ws):
keys = set()
header_map = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)}
for row in range(2, ws.max_row + 1):
key = (
ws.cell(row=row, column=header_map["direction"]).value,
ws.cell(row=row, column=header_map["invoice_number"]).value,
ws.cell(row=row, column=header_map["supplier_tax_number"]).value,
ws.cell(row=row, column=header_map["invoice_issue_date"]).value,
ws.cell(row=row, column=header_map["gross_amount"]).value,
)
keys.add(key)
return keys
def existing_item_keys(ws):
keys = set()
header_map = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)}
for row in range(2, ws.max_row + 1):
key = (
ws.cell(row=row, column=header_map["direction"]).value,
ws.cell(row=row, column=header_map["invoice_number"]).value,
ws.cell(row=row, column=header_map["supplier_tax_number"]).value,
ws.cell(row=row, column=header_map["line_number"]).value,
)
keys.add(key)
return keys
def get_last_imported_issue_date(ws):
header_map = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)}
date_col = header_map.get("invoice_issue_date")
if not date_col or ws.max_row < 2:
return None
latest = None
for row in range(2, ws.max_row + 1):
value = ws.cell(row=row, column=date_col).value
if value is None:
continue
if isinstance(value, datetime):
value_date = value.date()
else:
value_date = value
if latest is None or value_date > latest:
latest = value_date
return latest
def append_digest_rows(ws, rows, date_from, date_to, page, known_keys):
fetched_at = datetime.now()
added = 0
for row in rows:
key = (
row["direction"],
row["invoice_number"],
row["supplier_tax_number"],
to_excel_date(row["invoice_issue_date"]),
to_float(row["gross_amount"]),
)
if key in known_keys:
continue
ws.append([
row["direction"],
row["invoice_number"],
row["supplier_tax_number"],
row["supplier"],
row["customer_tax_number"],
row["customer"],
to_excel_date(row["invoice_issue_date"]),
to_excel_date(row["payment_date"]) if row["payment_date"] else None,
to_excel_date(row["invoice_delivery_date"]) if row["invoice_delivery_date"] else None,
row["currency"],
to_float(row["net_amount"]),
to_float(row["vat_amount"]),
to_float(row["gross_amount"]),
row["transaction_id"],
row["invoice_operation"],
to_excel_date(date_from),
to_excel_date(date_to),
page,
fetched_at,
"",
])
known_keys.add(key)
added += 1
return added
def append_item_rows(ws, rows, known_keys):
added = 0
for row in rows:
key = (
row["direction"],
row["invoice_number"],
row["supplier_tax_number"],
row["line_number"],
)
if key in known_keys:
continue
ws.append([
row["direction"],
row["invoice_number"],
row["supplier_tax_number"],
row["supplier"],
row["customer_tax_number"],
row["customer"],
to_excel_date(row["invoice_issue_date"]),
row["currency"],
int(row["line_number"]) if str(row["line_number"]).isdigit() else row["line_number"],
row["description"],
to_float(row["quantity"]),
row["unit"],
to_float(row["unit_price"]),
to_float(row["net_amount"]),
to_float(row["vat_rate"]) if str(row["vat_rate"]).replace(".", "", 1).isdigit() else row["vat_rate"],
to_float(row["vat_amount"]),
to_float(row["gross_amount"]),
row["raw_xml_file"],
])
known_keys.add(key)
added += 1
return added
def format_digest_sheet(ws):
widths = {
"A": 12, "B": 28, "C": 16, "D": 42, "E": 16, "F": 42,
"G": 14, "H": 14, "I": 14, "J": 10, "K": 16, "L": 16,
"M": 16, "N": 26, "O": 16, "P": 14, "Q": 14, "R": 8,
"S": 20, "T": 60,
}
for col, width in widths.items():
ws.column_dimensions[col].width = width
for row in range(2, ws.max_row + 1):
for col in [7, 8, 9, 16, 17]:
ws.cell(row=row, column=col).number_format = "yyyy-mm-dd"
for col in [11, 12, 13]:
ws.cell(row=row, column=col).number_format = '#,##0.00'
ws.cell(row=row, column=19).number_format = "yyyy-mm-dd hh:mm:ss"
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
def format_items_sheet(ws):
widths = {
"A": 12, "B": 28, "C": 16, "D": 42, "E": 16, "F": 42,
"G": 14, "H": 10, "I": 12, "J": 60, "K": 14, "L": 12,
"M": 16, "N": 16, "O": 12, "P": 16, "Q": 16, "R": 60,
}
for col, width in widths.items():
ws.column_dimensions[col].width = width
for row in range(2, ws.max_row + 1):
ws.cell(row=row, column=7).number_format = "yyyy-mm-dd"
for col in [11, 13, 14, 15, 16, 17]:
ws.cell(row=row, column=col).number_format = '#,##0.00'
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
def fetch_direction(direction: str, wb):
digest_sheet_name = "inbound_digest" if direction == "INBOUND" else "outbound_digest"
item_sheet_name = "inbound_items" if direction == "INBOUND" else "outbound_items"
digest_ws = wb[digest_sheet_name]
item_ws = wb[item_sheet_name]
digest_keys = existing_digest_keys(digest_ws)
item_keys = existing_item_keys(item_ws)
last_imported = get_last_imported_issue_date(digest_ws)
if last_imported:
start_date = datetime.combine(
last_imported - timedelta(days=LOOKBACK_DAYS),
datetime.min.time()
)
if start_date < HISTORY_START_DATE:
start_date = HISTORY_START_DATE
else:
start_date = HISTORY_START_DATE
print(f"\n{direction} import window: {start_date.date()} -> {END_DATE.date()}")
total_seen = 0
total_digest_added = 0
total_items_added = 0
total_xml_downloaded = 0
for date_from, date_to in date_blocks(start_date, END_DATE):
print(f"\n=== {direction} {date_from} -> {date_to} ===")
page = 1
while True:
digest_xml = build_digest_xml(direction, date_from, date_to, page)
response_xml = post_nav("queryInvoiceDigest", digest_xml)
rows = parse_digest_response(response_xml, direction)
seen = len(rows)
digest_added = append_digest_rows(
digest_ws, rows, date_from, date_to, page, digest_keys
)
total_seen += seen
total_digest_added += digest_added
print(f"page {page}: seen={seen}, digest_added={digest_added}")
for row in rows:
invoice_data_xml = build_invoice_data_xml(
direction,
row["invoice_number"],
row["supplier_tax_number"],
)
try:
invoice_response = post_nav("queryInvoiceData", invoice_data_xml)
invoice_xml = decode_invoice_data_response(invoice_response)
raw_xml_file = save_raw_xml(direction, row, invoice_xml)
items = parse_invoice_items(invoice_xml, row, raw_xml_file)
items_added = append_item_rows(item_ws, items, item_keys)
total_xml_downloaded += 1
total_items_added += items_added
except Exception as e:
print(f"QUERY DATA ERROR: {direction} {row['invoice_number']} -> {e}")
if seen == 0:
break
page += 1
format_digest_sheet(digest_ws)
format_items_sheet(item_ws)
return {
"direction": direction,
"seen": total_seen,
"digest_added": total_digest_added,
"xml_downloaded": total_xml_downloaded,
"items_added": total_items_added,
}
def main():
require_env()
wb = ensure_workbook()
results = []
results.append(fetch_direction("INBOUND", wb))
results.append(fetch_direction("OUTBOUND", wb))
wb.save(OUTPUT_XLSX)
print("")
print("Done.")
for r in results:
print(
f"{r['direction']}: "
f"seen={r['seen']}, "
f"digest_added={r['digest_added']}, "
f"xml_downloaded={r['xml_downloaded']}, "
f"items_added={r['items_added']}"
)
print(f"Excel: {OUTPUT_XLSX}")
print(f"Raw XML: {RAW_XML_DIR}")
if __name__ == "__main__":
main()
Írd meg a futtató scriptet
#!/bin/bash
source venv/bin/activate
python nav_fetch_digest.py
Próbacseresznye
Futtasd le a run.sh scriptet shellben:
./run.sh
Ha mindent jól csináltál (és nem változott semmi amióta összeraktam), akkor a script a saját könyvtárában létrehoz egy nav_invoice_database.xlsx nevű Excel file-t, 2 sheeten a bejövő és kimenő számláid fejlécei, 2 másik sheeten pedig a számlák tételei lesznek majd 2026 január 1 óta.
A scriptet újrafutattva mindig csak a friss számlákat csapja majd hozzá a táblához.
























































































