Source code for nc_py_api.loginflow_v2

"""Login flow v2 API wrapper."""

import asyncio
import json
import time
from dataclasses import dataclass

import httpx

from ._exceptions import check_error
from ._session import AsyncNcSession, NcSession

MAX_TIMEOUT = 60 * 20


[docs] @dataclass class LoginFlow: """The Nextcloud Login flow v2 initialization response representation.""" def __init__(self, raw_data: dict) -> None: self.raw_data = raw_data @property def login(self) -> str: """The URL for user authorization. Should be opened by the user in the default browser to authorize in Nextcloud. """ return self.raw_data["login"] @property def token(self) -> str: """Token for a polling for confirmation of user authorization.""" return self.raw_data["poll"]["token"] @property def endpoint(self) -> str: """Endpoint for polling.""" return self.raw_data["poll"]["endpoint"] def __repr__(self) -> str: return f"<{self.__class__.__name__} login_url={self.login}>"
[docs] @dataclass class Credentials: """The Nextcloud Login flow v2 response with app credentials representation.""" def __init__(self, raw_data: dict) -> None: self.raw_data = raw_data @property def server(self) -> str: """The address of Nextcloud to connect to. The server may specify a protocol (http or https). If no protocol is specified https will be used. """ return self.raw_data["server"] @property def login_name(self) -> str: """The username for authenticating with Nextcloud.""" return self.raw_data["loginName"] @property def app_password(self) -> str: """The application password generated for authenticating with Nextcloud.""" return self.raw_data["appPassword"] def __repr__(self) -> str: return f"<{self.__class__.__name__} login={self.login_name} app_password={self.app_password}>"
[docs] class _LoginFlowV2API: """Class implementing Nextcloud Login flow v2.""" _ep_init: str = "/index.php/login/v2" _ep_poll: str = "/index.php/login/v2/poll" def __init__(self, session: NcSession) -> None: self._session = session
[docs] def init(self, user_agent: str = "nc_py_api") -> LoginFlow: """Init a Login flow v2. :param user_agent: Application name. Application password will be associated with this name. """ r = self._session.adapter.post(self._ep_init, headers={"user-agent": user_agent}) return LoginFlow(_res_to_json(r))
[docs] def poll(self, token: str, timeout: int = MAX_TIMEOUT, step: int = 1, overwrite_auth: bool = True) -> Credentials: """Poll the Login flow v2 credentials. :param token: Token for a polling for confirmation of user authorization. :param timeout: Maximum time to wait for polling in seconds, defaults to MAX_TIMEOUT. :param step: Interval for polling in seconds, defaults to 1. :param overwrite_auth: If True current session will be overwritten with new credentials, defaults to True. :raises ValueError: If timeout more than 20 minutes. """ if timeout > MAX_TIMEOUT: msg = "Timeout can't be more than 20 minutes." raise ValueError(msg) for _ in range(timeout // step): r = self._session.adapter.post(self._ep_poll, data={"token": token}) if r.status_code == 200: break time.sleep(step) r_model = Credentials(_res_to_json(r)) if overwrite_auth: self._session.cfg.auth = (r_model.login_name, r_model.app_password) self._session.init_adapter(restart=True) self._session.init_adapter_dav(restart=True) return r_model
class _AsyncLoginFlowV2API: """Class implementing Async Nextcloud Login flow v2.""" _ep_init: str = "/index.php/login/v2" _ep_poll: str = "/index.php/login/v2/poll" def __init__(self, session: AsyncNcSession) -> None: self._session = session async def init(self, user_agent: str = "nc_py_api") -> LoginFlow: """Init a Login flow v2. :param user_agent: Application name. Application password will be associated with this name. """ r = await self._session.adapter.post(self._ep_init, headers={"user-agent": user_agent}) return LoginFlow(_res_to_json(r)) async def poll( self, token: str, timeout: int = MAX_TIMEOUT, step: int = 1, overwrite_auth: bool = True ) -> Credentials: """Poll the Login flow v2 credentials. :param token: Token for a polling for confirmation of user authorization. :param timeout: Maximum time to wait for polling in seconds, defaults to MAX_TIMEOUT. :param step: Interval for polling in seconds, defaults to 1. :param overwrite_auth: If True current session will be overwritten with new credentials, defaults to True. :raises ValueError: If timeout more than 20 minutes. """ if timeout > MAX_TIMEOUT: raise ValueError("Timeout can't be more than 20 minutes.") for _ in range(timeout // step): r = await self._session.adapter.post(self._ep_poll, data={"token": token}) if r.status_code == 200: break await asyncio.sleep(step) r_model = Credentials(_res_to_json(r)) if overwrite_auth: self._session.cfg.auth = (r_model.login_name, r_model.app_password) self._session.init_adapter(restart=True) self._session.init_adapter_dav(restart=True) return r_model def _res_to_json(response: httpx.Response) -> dict: check_error(response) return json.loads(response.text)