We believe empowering engineers drives innovation.

Third Party OIDC Authentication with Airflow 2.x

By Mark deGroat
November 13, 2023

Hello!

In this blog post we are going to dive deep on utilizing OpenID Connect (OIDC) and your third Party Identity Provider to authenticate and assign permissions to users signing into Airflow 2.X through the web app UI.

Airflow is a platform to programmatically author, schedule, and monitor workflows. It is primarily written in Python, and comes with a web based UI for managing workflows and other UI driven tasks. The Airflow web UI uses Flask App Builder (FAB) as the primary framework and Airflow provides methods for customizing which FAB authentication method will be used. Beyond this, you can customize the logic that occurs during login and logout, allowing you to assign users to certain roles in Airflow based on attributes from their OIDC profile.

We have created an example repository on Github you can use to demo this functionality. It uses Docker for running a local Airflow instance, and GitLab as the third party Identity Provider. Detailed instructions on setting this up can be found in the README.

Please note your setup might differ drastically from this directory layout, but the important piece really lies in the few resources described below. As long as you can modify environment variables, and make changes to these resources, you should be good to proceed.

Resources Overview:

.example.env - An example env file that shows what values will need be to set in the Airflow environment.

.gitignore - A git specific ignore file for not committing client_secret.json or any real .env files . Modify as you see fit.

client_secret.example - Used for creating client_secret.json. Because we need to manually register our client with the OpenID provider, we need to craft this file in this format in order to correctly pass needed tokens to the Flask-OIDC library and it’s functions we will be utilizing. More info here.

webserver_config.py - This is a base webserver_config.py file that is generated by Airflow, but has been extended to allow us to use our third party OpenID Provider to authenticate logins, and to revoke tokens on logout. 

airflow-etl.yml - This is a docker compose file that defines the services needed in order to emulate a full Airflow instance.  It also correctly mounts the client_secret.json file to the same root directory as webserver_config.py.

Dockerfile - This dockerfile uses the base Airflow docker image from Apache, and installs Python libraries that will be utilized by the logic added in webserver_config.py above.  It also loads the webserver_config.py file into the airflow directory of the example Airflow instance, at the same location the config_secrets.json file is loaded.

Our Changes to ENV Variables:

AIRFLOW__API__AUTH_BACKEND='airflow.api.auth.backend.basic_auth'
ALLOWED_PROVIDER_GROUPS='fd_main,another_group'
NICKNAME_OIDC_FIELD='nickname'
FULL_NAME_OIDC_FIELD='name'
GROUPS_OIDC_FIELD='groups'
EMAIL_FIELD='email'
SUB_FIELD='sub'
OIDC_CLIENT_SECRETS='client_secret.json'

Please note the first ENV var change, AIRFLOW__API__AUTH_BACKEND is just for this example, and is to set the authentication backend of the Airflow api to basic auth. This is not the same as the authentication method for the webserver, which we are modifying in this example to OIDC.

The next one, ALLOWED_PROVIDER_GROUPS, is if you want to only allow certain “groups” from your OIDC provider to login. If not, you can exclude this ENV var.

The next set, NICKNAME_OIDC_FIELD through SUB_FIELD, are the fields that OIDC protocol will need mapped that may be specific to your 3rd party OIDC provider. These are the values needed for GitLab, but yours may differ. More info is available here.

OIDC_CLIENT_SECRETS is the path to the client_secrets.json file.

A Note on Config Changes in Airflow

Airflow allows you to update configuration in a number of different ways, detailed here for Airflow 2.x.

  1. You can directly modify the auto generated airflow.cfg file, changes and adding values as you see fit.
  2. You can place a webserver_config.py file (as we are doing in this example) in the {AIRFLOW_HOME} directory to override settings for the [webserver] section of the config
  3. You can create ENV variables following the format AIRFLOW__{SECTION_NAME}__{KEY_NAME}

Please note the double underscores. The first example below, AIRFLOW__API__AUTH_BACKEND, modifies the [api] section of configuration, and sets the auth_backend config setting to the module ‘airflow.api.auth.backend.basic_auth’. Another example could be AIRFLOW__WEBSERVER__AUTHENTICATE in order to access the config settings “authenticate” for the [webserver] section of the configuration.

There is an order of precedence for these changes, such that if you have changes at both the ENV var level and the airflow.cfg file level that overlap, the precedence is:

  1. set as an environment variable (AIRFLOW__DATABASE__SQL_ALCHEMY_CONN)
  2. set as a command environment variable (AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD)
  3. set as a secret environment variable (AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_SECRET)
  4. set in airflow.cfg
  5. command in airflow.cfg
  6. secret key in airflow.cfg
  7. Airflow’s built in defaults

webserver_config.py Changes and Overview

A (default webserver_config.py)[https://github.com/apache/airflow/blob/main/airflow/config_templates/default_webserver_config.py] is automatically generated when you setup Airflow and is where you can modify logic using python.

This file is where we are going to modify some constants that Airflow uses in order to configure itself. We are extending the AuthOIDView class from FAB to check if the user was authenticated using OIDC, assign them to the correct group on login, and revoke their token on logout. Next we define a custom OIDCSecurityManager class where we configure Airflow to use OpenIDConnect, and use our new extended version of AuthOIDView in place of the legacy one.

First we need to handle necessary imports and setup logging.

import os, logging, json, posixpath

from airflow import configuration as conf
from airflow.www.security import AirflowSecurityManager
from flask import abort, make_response, redirect
from flask_appbuilder.security.manager import AUTH_OID
from flask_appbuilder.security.views import AuthOIDView
from flask_appbuilder.views import ModelView, SimpleFormView, expose
from flask_login import login_user
from flask_oidc import OpenIDConnect

logger = logging.getLogger(__name__)

Next we are going to set the OIDC fields that we are going to be mapping from our third party IDP.

Keep in mind these are currently configured as ENV variables, but we can modify the default values below if we want to instead just have these hardcoded for our instance, then leave the ENV vars blank.

# Set the OIDC field that should be used
NICKNAME_OIDC_FIELD = os.getenv('NICKNAME_OIDC_FIELD', 'nickname')
FULL_NAME_OIDC_FIELD = os.getenv('FULL_NAME_OIDC_FIELD', 'name')
GROUPS_OIDC_FIELD = os.getenv('GROUPS_OIDC_FIELD', 'groups')
EMAIL_FIELD = os.getenv('EMAIL_FIELD', 'email')
SUB_FIELD = os.getenv('SUB_FIELD', 'sub')  # User ID

If we are going to utilize checking the group attribute from the profile of the OIDC user, this next piece handles pulling our comma separated values from the ALLOWED_PROVIDER_GROUPS ENV var and placing them into an array that will be referenced later on in the code. If you don’t want to utilize groups, this code can stay as is and the array will be initialized as empty.

# Convert groups from comma separated string to list
ALLOWED_PROVIDER_GROUPS = os.environ.get('ALLOWED_PROVIDER_GROUPS')
if ALLOWED_PROVIDER_GROUPS:
    ALLOWED_PROVIDER_GROUPS = [g.strip() for g in ALLOWED_PROVIDER_GROUPS.split(',')]
else: ALLOWED_PROVIDER_GROUPS = []

if ALLOWED_PROVIDER_GROUPS:
    logger.debug('AirFlow access requires membership to one of the following groups: %s'
        % ', '.join(ALLOWED_PROVIDER_GROUPS))

A new class AuthOIDCView extends the default AuthOIDView.

# Extending AuthOIDView
class AuthOIDCView(AuthOIDView):

We extend the existing handle_login() function to authenticate users based on the response from the OIDC provider.

Stipping out the extraneous logic gives us:

  @expose('/login/', methods=['GET', 'POST'])
  def login(self, flag=True):

  sm = self.appbuilder.sm
  oidc = sm.oid
      @self.appbuilder.sm.oid.require_login
      def handle_login():
          user = sm.auth_user_oid(oidc.user_getfield(EMAIL_FIELD))
          login_user(user, remember=False)
          return redirect(self.appbuilder.get_url_for_index)

      return handle_login()

We overload the login function, and expose it for route “/login” and methods GET or POST.

We are calling auth_user_oid from FAB’s security manager class, and passing the email of the user attempting to login in order to lookup the user (if they exist). If they do, we will call login_user, then redirect them to the index of the FAB instance, which will be the index of the Airflow application.

But in order to allow our group check to occur, we are going to check if the ALLOWED_PROVIDER_GROUPS constant is defined:

  # Group membership required
  if ALLOWED_PROVIDER_GROUPS:

      # Fetch group membership information from OIDC provider
      groups = oidc.user_getinfo([GROUPS_OIDC_FIELD]).get(GROUPS_OIDC_FIELD, [])
      intersection = set(ALLOWED_PROVIDER_GROUPS) & set(groups)
      logger.debug('AirFlow user member of groups in ACL list: %s' % ', '.join(intersection))

      # Unable to find common groups, prevent login
      if not intersection:
          return abort(403)

Using the array of allowed groups set above, we reference the key of the GROUPS_OIDC_FIELD we set in the ENV vars. The goal is to make sure this value coming back from the IDP is one of the values we have defined in our array of groups. If not, we will throw a 403 unauthorized. We could extend this to also have a more detailed message for the end user, saying something like “Please check with your Airflow administrator, it appears you do not belong to the correct permissions group for accessing Airflow.”

After this block, is the logic for creating a new user in Airflow if they did not previously exist in the system. We can create the user in Airflow using the details passed to us from the external IDP via the OIDC profile. You may want to just delete this block if it’s not needed for your implementation, or return the appropriate HTTP error code and message to end users.

  # Create user (if it doesn't already exist)
  if user is None:
      info = oidc.user_getinfo([
          NICKNAME_OIDC_FIELD,
          FULL_NAME_OIDC_FIELD,
          GROUPS_OIDC_FIELD,
          SUB_FIELD,
          EMAIL_FIELD,
          "profile"
      ])
      full_name = info.get(FULL_NAME_OIDC_FIELD)
      if " " in full_name:
          full_name = full_name.split(" ")
          first_name = full_name[0]
          last_name = full_name[1]
      else:
          first_name = full_name
          last_name = ""
      user = sm.add_user(
          username=info.get(NICKNAME_OIDC_FIELD),
          first_name=first_name,
          last_name=last_name,
          email=info.get(EMAIL_FIELD),
          role=sm.find_role(sm.auth_user_registration_role)
      )

Putting all these together our handle_login function becomes:

@expose('/login/', methods=['GET', 'POST'])
def login(self, flag=True):

    sm = self.appbuilder.sm
    oidc = sm.oid

    @self.appbuilder.sm.oid.require_login
    def handle_login():
        user = sm.auth_user_oid(oidc.user_getfield(EMAIL_FIELD))

        # Group membership required
        if ALLOWED_PROVIDER_GROUPS:

            # Fetch group membership information from OIDC provider
            groups = oidc.user_getinfo([GROUPS_OIDC_FIELD]).get(GROUPS_OIDC_FIELD, [])
            intersection = set(ALLOWED_PROVIDER_GROUPS) & set(groups)
            logger.debug('AirFlow user member of groups in ACL list: %s' % ', '.join(intersection))

            # Unable to find common groups, prevent login
            if not intersection:
                return abort(403)

        # Create user (if it doesn't already exist)
        if user is None:
            info = oidc.user_getinfo([
                NICKNAME_OIDC_FIELD,
                FULL_NAME_OIDC_FIELD,
                GROUPS_OIDC_FIELD,
                SUB_FIELD,
                EMAIL_FIELD,
                "profile"
            ])
            full_name = info.get(FULL_NAME_OIDC_FIELD)
            if " " in full_name:
                full_name = full_name.split(" ")
                first_name = full_name[0]
                last_name = full_name[1]
            else:
                first_name = full_name
                last_name = ""
            user = sm.add_user(
                username=info.get(NICKNAME_OIDC_FIELD),
                first_name=first_name,
                last_name=last_name,
                email=info.get(EMAIL_FIELD),
                role=sm.find_role(sm.auth_user_registration_role)
            )

        login_user(user, remember=False)
        return redirect(self.appbuilder.get_url_for_index)

    return handle_login()

Below that is our logout override/extension :

  @expose('/logout/', methods=['GET', 'POST'])
  def logout(self):
      oidc = self.appbuilder.sm.oid
      if not oidc.credentials_store:
          return redirect('/login/')
      self.revoke_token()
      oidc.logout()
      super(AuthOIDCView, self).logout()
      response = make_response("You have been signed out")
      return response

We are extending logout to have a bit of additional logic. We check if the OIDC response object has credentials stored to check if they are even authorized to begin with. If they aren’t then we throw them to the login page.

We then revoke the token, call the logout function from the built in security manager, then we call the logout function from that parent of the extended class AuthOIDCView in order to continue the legacy logout flow that was in place before.

Next we have the revoke_token function:

def revoke_token(self):
    """ Revokes the provided access token. Sends a POST request to the token revocation endpoint
    """
    import aiohttp
    import asyncio
    import json
    oidc = self.appbuilder.sm.oid
    sub = oidc.user_getfield(SUB_FIELD)
    config = oidc.credentials_store
    config = config.get(str(sub))
    config = json.loads(config)
    payload = {
        "token": config['access_token'],
        "token_type_hint": "refresh_token"
    }
    auth = aiohttp.BasicAuth(config['client_id'], config['client_secret'])
    # Sends an asynchronous POST request to revoke the token
  
    async def revoke():
        async with aiohttp.ClientSession() as session:
            async with session.post(self.appbuilder.app.config.get('OIDC_LOGOUT_URI'), data=payload, auth=auth) as response:
                logging.info(f"Revoke response {response.status}")

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(revoke())

The basic idea is we need to revoke the token from the end user and expire their session when they logout. The only reason it is done asynchronously is there is no reason the user should have to wait to logout, we can simply assume we will correctly revoke their token and let that process happen without stalling the end user. You can remove the asynchronous functionality if your use case requires, and update this logic according to the documentation from your IDP.

Next we need to create a custom Security Manager class:

class OIDCSecurityManager(AirflowSecurityManager):
    """
    Custom security manager class that allows using the OpenID Connection authentication method.
    """
    def __init__(self, appbuilder):
        super(OIDCSecurityManager, self).__init__(appbuilder)
        if self.auth_type == AUTH_OID:
           self.oid = OpenIDConnect(self.appbuilder.get_app)
           self.authoidview = AuthOIDCView

We are extending the existing AirflowSecurityManager and changing a few of the object level params that control the OID method, and specify which view will be used for authoidview. We are assigning the view we created above (AuthOIDCView) in order to trigger this logic we have created.

We are going to take advantage of a configuration option described in Airflow’s documentation here. Instead of changing these options in airflow.cfg file, we are instead using webserver_config.py in order to update these values:

basedir = os.path.abspath(os.path.dirname(__file__))

SECURITY_MANAGER_CLASS = OIDCSecurityManager
# The SQLAlchemy connection string.
SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')

# Flask-WTF flag for CSRF
CSRF_ENABLED = True

AUTH_TYPE = AUTH_OID
OIDC_CLIENT_SECRETS = os.getenv('OIDC_CLIENT_SECRETS', 'client_secret.json')  # Configuration file for OIDC provider OIDC
OIDC_COOKIE_SECURE= False
OIDC_ID_TOKEN_COOKIE_SECURE = False
OIDC_REQUIRE_VERIFIED_EMAIL = False
OIDC_USER_INFO_ENABLED = True
CUSTOM_SECURITY_MANAGER = OIDCSecurityManager

# Ensure that the secrets file exists
if not os.path.exists(OIDC_CLIENT_SECRETS):
    ValueError('Unable to load OIDC client configuration. %s does not exist.' % OIDC_CLIENT_SECRETS)

# Parse client_secret.json for scopes and logout URL
with open(OIDC_CLIENT_SECRETS) as f:
    OIDC_APPCONFIG = json.loads(f.read())

# Ensure that the logout/revoke URL is specified in the client secrets file
PROVIDER_OIDC_URL = OIDC_APPCONFIG.get('web', {}).get('issuer')
OIDC_PROVIDER_NAME = OIDC_APPCONFIG.get('web', {}).get('name')
if not PROVIDER_OIDC_URL:
    raise ValueError('Invalid OIDC client configuration, OIDC provider OIDC URI not specified.')

# this will change based on the OIDC provider
OIDC_SCOPES = OIDC_APPCONFIG.get('OIDC_SCOPES', ['openid', 'email', 'profile'])  # Scopes that should be requested.
OIDC_LOGOUT_URI = posixpath.join(PROVIDER_OIDC_URL, 'oauth/revoke') # OIDC logout URL

# Allow user self registration
AUTH_USER_REGISTRATION = False

# Default role to provide to new users
AUTH_USER_REGISTRATION_ROLE = os.environ.get('AUTH_USER_REGISTRATION_ROLE', 'Public')

AUTH_ROLE_ADMIN = 'Admin'
AUTH_ROLE_PUBLIC = "Public"

OPENID_PROVIDERS = [
   {'name': OIDC_PROVIDER_NAME, 'url': posixpath.join(PROVIDER_OIDC_URL, 'oauth/authorize')}
]

What is important to highlight is -

OIDC_CLIENT_SECRETS - the path to client_secret.json file we created, which is then referenced by other constants

CUSTOM_SECURITY_MANAGER - points to class we just built above that adds OIDC functionality to the base AirflowSecurityManager

PROVIDER_OIDC_URL - base URL for the upstream OIDC provider, pulled from client_secret.json .

OIDC_SCOPES - More info here. Standard claims for OIDC.

OPENID_PROVIDERS - this is an array of objects, each containing the details of the OpenID Provider you want to support. In here we will need to update these values to be in line with whatever IDP we are connecting to. The url is what will be hit when initially authorizing, and may differ from IDP to IDP.

GitLab IDP Setup

Note: Your provider may have a slightly different setup for both the concepts of groups, and steps to generate client IDs and secrets. Please refer to your provider’s documentation about their processes surrounding OIDC.

In GitLab you can make “applications” at the user level, group level or instance level. We are going to cover creating a group application, then demonstrate how the group name is checked during authentication to ensure that user is authenticated through GitLab.

First, make sure you have a GitLab account setup, and create a sample project and place it into a sample group.


Our group for this example is named “fd_main” -

gitlab_groups_img

Click the example group, then on the left hand nav menu, select “Settings”, then in the “General” sub menu, click on “Applications” :

settings_example_img

Now you will see a form that has some basic options and the ability to set scopes for this application. We want to name the application something we can remember. We chose test_airflow_ouath for the purposes of this demo. Set the Callback URL to the oidc_callback route that our Airflow instance is configured for, which in our case is

http://localhost:8060/oidc_callback

and then we are going to enable only the scopes: openid profile email.

Ensure “Confidential” is also checked.

preferences_and_scope_example

Save the application, and then you will be presented with a screen that shows the client_id and client_secret that we will need to put into our client_secret.json file in our repository.

client_id_example

Go ahead and place these values into your .ENV file at the root of the example directory.

Putting It All Together

Now that your webserver_config.py is customized and ENV vars are set according to your third party IDP’s instructions, you can navigate to

localhost:8060

on your local machine.

You should be redirected to the GitLab login page, and on login will be redirected back to your local Airflow instance. The end user should be authenticated with the details from your GitLab user. You will be redirected to a page that looks like this, telling you your user has no roles/permissions. no_roles_or_permissions

You can add custom logic in order to handle assigning roles/permissions from Airflow depending on the “group” the user belongs to from the OIDC profile response. For now, let’s run Airflow CLI commands on the docker container to just manually assign this user an admin role.

Connect to the service labeled “openid-app-airflow-webserver-*” by entering

docker ps

In the terminal, then copying the instance id of the container matching the label above. Now we can enter a console on that container with:

docker exec -it <CONTAINER_ID> bash

Then we can view the list of Airflow users, which will show the user that was just created after logging in with GitLab:

airflow users list

You will see an output similar to this:

docker exec -it 8a47b651a82a bash
airflow@8a47b651a82a:/opt/airflow$ airflow users list
id | username     | email                    | first_name | last_name | roles
===+==============+==========================+============+===========+=============
1  | airflow      | airflowadmin@example.com | Airflow    | Admin     | Admin
2  | mark.degroat | mark.degroat@rearc.io    | Mark       | deGroat   | Admin,Public

Take note of the username, and in that same bash console for the container, assign permissions to that new user:

airflow users add-role -u mark.degroat -r Admin

Now refresh the local Airflow web page and you should see the default home page for admin users for Airflow.

airflow_home

Conclusions

This is just one of many ways you can utilize OpenID Connect to enhance the login and logout flows of end users logging into Airflow. You can extend the custom group logic to assign certain permissions based on what group users belong to, setup approval processes for certain groups of users and even enable self sign up into the system if it fits your use case. The open nature of Airflow’s setup brings in a lot of complexity, but with that comes extreme flexibility.
Hopefully this guide helps you with your own implementation of OIDC authentication into Airflow. Always remember to work with your third party IDP to gather the documentation you will need to setup OIDC in their specific platform, as they may use slightly different keys for field names from the profile, or have specific URLs that need to be utilized at different points in the flow. The details described here are for GitLab specifically, but most IDP’s should have major crossover with this setup. Thank you for reading and good luck on your authentication endeavors!