dify/api/controllers/inner_api/wraps.py
-LAN- 13be84e4d4
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
chore(api/controllers): Apply Ruff Formatter. (#7645)
2024-08-26 15:29:10 +08:00

63 lines
1.7 KiB
Python

from base64 import b64encode
from functools import wraps
from hashlib import sha1
from hmac import new as hmac_new
from flask import abort, request
from configs import dify_config
from extensions.ext_database import db
from models.model import EndUser
def inner_api_only(view):
@wraps(view)
def decorated(*args, **kwargs):
if not dify_config.INNER_API:
abort(404)
# get header 'X-Inner-Api-Key'
inner_api_key = request.headers.get("X-Inner-Api-Key")
if not inner_api_key or inner_api_key != dify_config.INNER_API_KEY:
abort(401)
return view(*args, **kwargs)
return decorated
def inner_api_user_auth(view):
@wraps(view)
def decorated(*args, **kwargs):
if not dify_config.INNER_API:
return view(*args, **kwargs)
# get header 'X-Inner-Api-Key'
authorization = request.headers.get("Authorization")
if not authorization:
return view(*args, **kwargs)
parts = authorization.split(":")
if len(parts) != 2:
return view(*args, **kwargs)
user_id, token = parts
if " " in user_id:
user_id = user_id.split(" ")[1]
inner_api_key = request.headers.get("X-Inner-Api-Key")
data_to_sign = f"DIFY {user_id}"
signature = hmac_new(inner_api_key.encode("utf-8"), data_to_sign.encode("utf-8"), sha1)
signature = b64encode(signature.digest()).decode("utf-8")
if signature != token:
return view(*args, **kwargs)
kwargs["user"] = db.session.query(EndUser).filter(EndUser.id == user_id).first()
return view(*args, **kwargs)
return decorated