How to Implement Multi-Device Authentication System with FastAPI, Redis, and JWT
2024-10-10 03:0:20 Author: hackernoon.com(查看原文) 阅读量:2 收藏

Many big tech companies offer a cool feature that allows users to sign in on multiple devices. Users can manage their devices, view which ones are signed in, and even sign out from any device using any of their signed-in devices. Today, I want to explore how I can implement a similar authentication system using Redis and JWT.

So how does this work exactly?

For this, I have decided to use;

  • Fast-API for the back-end

  • Redis for caching

We will be using JWT (JSON Web Tokens) to authorize requests. Since our application is stateless (meaning it has no memory of previous requests), we need a way to send session and user data. JWT is ideal for managing authentication in stateless applications, and it does an excellent job.

However, one downside of JWT is that the more payload you include in a token, the longer it becomes. For our system, I’ve decided to include only the session_id and the username in the token. This information is sufficient for authorizing requests without making the token excessively large.e will be using JWT (JSON Web Tokens) to authorize requests. Since our application is stateless (meaning it has no memory of previous requests), we need a way to send session and user data. JWT is ideal for managing authentication in stateless applications, and it does an excellent job in this regard.

You said session_id? but we are using JWT and our app in stateless

In this context, "session" refers to the device or means through which a user interacts with our app. Essentially, it is the device the user is logged into. Whenever a user makes a login request, we create a new session (device) in our system that contains all the relevant device information. This data will be stored in Redis for future requests.

Alright, let’s get coding 😁

The first thing to do is to make sure you have Redis installed on your local machine. To install Redis, head over to https://redis.io/docs/latest/operate/oss_and_stack/install/install-redis/ and follow the instructions specific to your operating system.

Next, we install python. For this, I will be using the python 3.11 (I have not seen the need to upgrade to 3.12 yet, to be frank the only reason I even use 3.11 is because of StrEnum, other than that I still love 3.10)

Next, we need to install poetry, this is the package manager that I use

pip install poetry 
# or
python3.11 -m install poetry 

That is settled, so go ahead and clone the repo

git clone https://github.com/emperorsixpacks/multi_device_sign_in_with_redis.git && cd server
poetry shell && poetry install # create a new virtual environment and install all dependanceies

Setting up our Redis connection


import os
from redis import Redis

load_dotenv(".env")

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")

redis_client = Redis(REDIS_HOST, int(REDIS_PORT))

Database setup

I have created a demo database in demo_users.json which is what we are going to be using for this tutorial.

{
    "user124": {
        "username": "user124",
        "email": "[email protected]",
        "password": "1234",
        "bio": "This is my brief bio"
    },
    "user123": {
        "username": "user123",
        "email": "[email protected]",
        "password": "1234",
        "bio": "This is my brief bio"
    }
}

Now, we need to add our schemas and helper functions for our database. For brevity, I will not put all the code here.

@dataclass
class Session:
    """
    A class to represent a user's session.

    Attributes:
    session_id (str): A unique id for the session.
    device_name (str): The name of the device used for the session.
    ip_address (str): The ip address of the device used for the session.
    device_id (str): A unique id for the device.
    date_created (datetime): The date and time the session was created.
    """
    session_id: str = field(default_factory=create_new_session_id)
    device_name: str = field(default=None)
    ip_address: str = field(default=None)
    device_id: str = field(default_factory=generate_new_device_id)
    date_created: datetime = field(default_factory=now_date_time_to_str)


@dataclass
class User:
    """
    A class to represent a user.

    Attributes:
    username (str): The username of the user.
    email (str): The email of the user.
    password (str): The password of the user.
    bio (str): The bio of the user.
    sessions (List[Session] | None): A list of Session objects representing the user's sessions.
    """

    username: str = field(default=None)
    email: str = field(default=None)
    password: str = field(default=None)
    bio: str = field(default=None)
    sessions: List[Session] | None = None

    @property
    def __dict__(self):
        """
        Returns a dictionary representing the user.

        Returns:
        Dict[str, Any]: A dictionary representing the user
        """
        return {
            "username": self.username,
            "email": self.email,
            "password": self.password,
            "bio": self.bio,
            "sessions": self.return_session_dict(),
        }

    def return_session_dict(self):
        """
        Returns a list of dictionaries representing the user's sessions.

        If the sessions field is a list of Session objects, returns a list of dictionaries
        where each dictionary is the __dict__ of a Session object. If the sessions field
        is a list of dictionaries, returns the list as is.

        Returns:
            List[Dict[str, Any]]: A list of dictionaries representing the user's sessions
        """
        try:
            return [session.__dict__ for session in self.sessions]
        except AttributeError:
            return [session for session in self.sessions]


# Utiliy finctions 
def return_user_from_db(username) -> User | None:
    """
    Retrieves a user from the database by their username.

    Args:
    username (str): The username of the user to be retrieved

    Returns:
    User | None: The user if found, None otherwise
    """
    with open("demo_users.json", "r", encoding="utf-8") as file:
        user = json.load(file).get(str(username), None)
        return User(**user) or None

Server setup

We are using FastAPI to run our app, so let us go and set that up to


# Setting up server
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI(
    name="Multi device sign in with Redis",
    description="Multi device sign in with Redis in stateless applications",
)


@app.get("/")
def index_route():
    return JSONResponse(content={"Message": "hello, this seems to be working :)"})

if __name__ == "__main__":
    import uvicorn

    uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=True, use_colors=True)

Alright this is good our application seems to be coming together nicely

Log-in / sign-in

Each time a user logs into the system, we need a way to generate a session_id and store that session in Redis, along with all their other sessions.

When a user logs in, we will first authenticate the request to ensure it is valid. Once validated, we can retrieve all the device information from the request. After that, we’ll store this information in Redis, generate a new token, and return that token to the user.

@app.post("/login")
def login_route(
    form: Annotated[LoginForm, Depends()], request: Request
) -> JSONResponse:
    """
    Handles a login request.

    Args:
    form (Annotated[LoginForm, Depends()]): The form data containing the username and password
    request (Request): The request containing the User-Agent header and client host

    Returns:
    JSONResponse: A JSON response containing a JWT token if the login is successful, otherwise a JSONResponse with a 404 status code and a message indicating that the username or password is invalid
    """
    username = form.username
    password = form.password

    # Authenticate the user
    user = authenticate_user(username, password)
    if user is None:
        return JSONResponse(
            status_code=404, content={"message": "Invalid username or password"}
        )

    # Create a new session
    session = Session(
        device_name=request.headers.get("User-Agent"), ip_address=request.client.host
    )

    # Get the user from the cache
    user_from_cache = get_user_from_cache(username)

    if user_from_cache is None:
        return JSONResponse(content={"message": "one minute"}, status_code=404)

    # Get the user's sessions
    user_sessions = get_sessions(userid=username)

    # Add the new session to the user's sessions
    try:
        user_sessions.append(session)
    except AttributeError:
        user_sessions = [session]

    # Update the user in the cache
    user_from_cache.sessions = user_sessions
    update_user_cache(userid=username, new_data=user_from_cache)

    # Create a JWT token
    token = create_token(Token(user=username, session_id=session.session_id))

    # Return the JWT token
    return JSONResponse(content={"message": "logged in", "token": token})

Logging out/ sign out

This is the easier part. Each time a user makes a request to our app, we decode the Bearer token to retrieve the session_id and username. We can then query Redis using the username.

If we find a match, we remove the session associated with the session_id from the decoded token. For instance, if the session does not exist, we simply return a message to the user. This indicates that the user has already logged out of that device from a different device, or that the token is invalid.

@app.post("/logout")
def logout_route(request: Request):
    """
    Handles a request to log out the user.

    This endpoint will delete the user's session from the cache and return a JSON response with a message indicating that the user has been logged out.

    Args:
        request (Request): The request containing the Authorization header with the JWT token

    Returns:
        JSONResponse: A JSON response containing the message "logged out" if the token is valid, otherwise a JSONResponse with a 404 status code and a message indicating that the token is invalid
    """
    # Get the JWT token from the Authorization header
    _, token = get_authorization_scheme_param(request.headers.get("Authorization"))

    # Decode the JWT token
    payload = decode_token(token)

    # Check if the token is invalid
    if payload is None:
        return JSONResponse(content={"message": "Invalid token"}, status_code=404)

    # Check if the user or session does not exist
    if get_single_session(userid=payload.user, session_id=payload.session_id) is None or get_user_from_cache(
        userid=payload.user) is None:
        return JSONResponse(content={"message": "Invalid token"}, status_code=404)

    # Delete the session from the cache
    delete_session(payload.user, payload.session_id)

    # Return a JSON response with a message indicating that the user has been logged out
    return JSONResponse(content={"message": "logged out"})

So yeah, that was not so hard, was it? I have had this project in my head for a couple of weeks now, and I wanted to test it out. Although this system is not completely perfect (I mean, no system is without its flaws) we can obviously make this one better. For instance, how do we manage requests from a place like Curl or a console app or even postman? Multiple requests from these sources could lead to a lot of sessions, therefore populating our db with unnecessary data. Yes, we could check to see where the request is coming from and create our logic to handle that, but to be honest, that would be a lot of work. That is why I do not recommend building authorization and authentication systems for production apps, except you are a real “agba” (senior engineer). I’d rather use OAuth 2.0 (Google or Apple) or an external provider like Kinde or Auth0. And if you are broke like me and are using EdgeDB, it comes with an auth system ready to use out of the box. This way, if something happens, you have someone else to blame and not just the intern 🙂.


文章来源: https://hackernoon.com/how-to-implement-multi-device-authentication-system-with-fastapi-redis-and-jwt?source=rss
如有侵权请联系:admin#unsafe.sh