Our filter for retrieving the tasks we are interested in still fetches all data from the database before applying the actual filtering. It is time to fix that and use a more flexible solution that works with the asynchronous SQLAlchemy that we introduced in the last post.
This post is part of my journey to learn Python. You find the code for this post in my PythonFriday repository on GitHub.
FastAPI Filter
There are multiple approaches to filter our data we can choose from. For this post I use FastAPI Filter, because it offers us a lot of flexibility. We can use everything it offers or, as I prefer, start small and add only the keywords we need.
By default, FastAPI Filter supports the following operators that we can append to our fields (field__[operator]
):
- neq – not equals
- gt – greater than
- gte – greater than or equal to
- in – in a list of values
- isnull – is NULL
- lt – less than
- lte – less than or equal to
- not/ne – not or not equal to a specified value
- not_in/nin – not in a list of values
- like/ilike – like with case sensitive or case insensitive
Installation
We can install the fastapi-filter package with this command to support SQLAlchemy:
1 |
pip install fastapi-filter[sqlalchemy] |
Keep in mind that you also need an asynchronous driver, for example aiosqlite, to connect to your database:
1 |
pip install aiosqlite |
New tests for our data store
Even when we want to filter in FastAPI, we must make sure that we can use the filter with our database. Therefore, we start the implementation at the bottom and go upwards through our application.
Here are the tests we need to verify that the first filters work:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
from ..models.task_filter import TaskFilter ... @pytest.mark.asyncio async def test_filter_empty_filter_gives_all_entries(with_db): store = DataStoreDb(with_db) await store.add(TaskInput(name="counter", priority=1, due_date=date.today(), done=False)) await store.add(TaskInput(name="A second entry", priority=2, due_date=date.today(), done=True)) filter = TaskFilter() entries = await store.filter(filter) assert len(entries) >= 2 @pytest.mark.asyncio async def test_filter_with_filter_for_done_gives_only_done_entries(with_db): store = DataStoreDb(with_db) await store.add(TaskInput(name="counter", priority=1, due_date=date.today(), done=False)) await store.add(TaskInput(name="A second entry", priority=2, due_date=date.today(), done=True)) filter = TaskFilter() filter.done = True entries = await store.filter(filter) assert len(entries) >= 1 for entry in entries: assert entry.done == True @pytest.mark.asyncio async def test_filter_with_filter_for_name_gives_only_matching_entries(with_db): store = DataStoreDb(with_db) await store.add(TaskInput(name="counter", priority=1, due_date=date.today(), done=False)) await store.add(TaskInput(name="Create a filter", priority=2, due_date=date.today(), done=True)) filter = TaskFilter() filter.name = "Create a filter" entries = await store.filter(filter) assert len(entries) == 1 assert entries[0].name == "Create a filter" @pytest.mark.asyncio async def test_filter_with_search_gives_only_matching_entries(with_db): store = DataStoreDb(with_db) await store.add(TaskInput(name="Search for item", priority=1, due_date=date.today(), done=False)) await store.add(TaskInput(name="Create a Search", priority=2, due_date=date.today(), done=True)) filter = TaskFilter() filter.search = "Search" entries = await store.filter(filter) assert len(entries) >= 2 for entry in entries: assert "Search" in entry.name |
The tests currently fail, then we have no TaskFilter class and no filter() method on our data store.
Create the TaskFilter
The main component of FastAPI Filter is our filter object. We need to specify what fields we want to filter on and how we want to order the results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from datetime import date from typing import Optional from fastapi_filter.contrib.sqlalchemy import Filter from ..data.entities import Task class TaskFilter(Filter): id: Optional[int] = None name: Optional[str] = None priority: Optional[list[int]] = None due_date__lte: Optional[date] = None done: Optional[bool] = None order_by: list[str] = ["name"] search: Optional[str] = None class Constants(Filter.Constants): model = Task search_model_fields = ["name"] |
If we specify our field
as it is, we can filter for an exact match. If we want to use an operator from the list above, we can use our field name, add two _ and then use the operator as we did with due_date__lte
.
The order_by uses the field name in ascending order if we do not overwrite it. That way we always get a sorted result.
The search field allows us to search for a part of the name. If we want to include other fields, we can add them to the list of search_model_fields.
As a final point, make sure that you set the default value to None. That way all the filters are optional. If you omit the None value, you turn them into required fields.
Extend the data store
In our data store, we add a new filter() method that uses the TaskFilter to expand the query with a WHERE
and an ORDER BY
clause:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from ..models.task_filter import TaskFilter ... async def filter(self, filter: TaskFilter): async with self.db() as session: query = select(Task) query = filter.filter(query) query = filter.sort(query) result = await session.execute(query) results = [] for entry in result: results.append(self.__to_output(*entry)) return results |
I had to add an * in front of entry to convert it to our TaskOutput object. Otherwise, I got an error about a missing id attribute.
Our new tests for the data store should now work. If that is the case, we can go one layer up to FastAPI.
Change the endpoint tests
With our new FastAPI Filter package, the syntax to filter the tasks changes a bit. We can change the syntax of the existing tests that filter the tasks and add a few new tests to check that the order_by works as expected:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
@pytest.mark.asyncio async def test_show_all_tasks(): await prepare_task("a first task") await prepare_task("a second task") await prepare_task("a third task") response = client.get("/api/todo") assert response.status_code == 200 tasks = response.json() assert len(tasks) >= 3 @pytest.mark.asyncio async def test_show_all_tasks_that_are_not_done(): await prepare_task("a finished task", done=True) await prepare_task("an open task", done=False) response = client.get("/api/todo?done=false") assert response.status_code == 200 tasks = response.json() done = [task for task in tasks if task['done'] == True] assert len(done) == 0 @pytest.mark.asyncio async def test_show_all_tasks_that_are_due_within_five_days(): await prepare_task("in 10 days", due_date=date.today() + timedelta(days=10)) await prepare_task("in 4 days", due_date=date.today() + timedelta(days=4)) response = client.get(f"/api/todo?done=false&due_date__lte={date.today() + timedelta(days=5)}") assert response.status_code == 200 tasks = response.json() assert len(tasks) >= 1 larger = [task for task in tasks if date.fromisoformat(task['due_date']) > date.today() + timedelta(days=5)] assert len(larger) == 0 @pytest.mark.asyncio async def test_show_all_tasks_that_match_search_criteria_sorted_by_name(): await prepare_task("485960 A", due_date=date.today()) await prepare_task("485960 B", due_date=date.today()) await prepare_task("485960 C", due_date=date.today()) response = client.get(f"/api/todo?search=485960") assert response.status_code == 200 tasks = response.json() assert len(tasks) == 3 assert tasks[0]["name"] == "485960 A" assert tasks[1]["name"] == "485960 B" assert tasks[2]["name"] == "485960 C" @pytest.mark.asyncio async def test_show_all_tasks_that_match_search_criteria_sorted_by_name_descending(): await prepare_task("5780383 A", due_date=date.today()) await prepare_task("5780383 B", due_date=date.today()) await prepare_task("5780383 C", due_date=date.today()) response = client.get(f"/api/todo?search=5780383&order_by=-name") assert response.status_code == 200 tasks = response.json() assert len(tasks) == 3 assert tasks[0]["name"] == "5780383 C" assert tasks[1]["name"] == "5780383 B" assert tasks[2]["name"] == "5780383 A" |
The endpoint tests that filter the tasks should all fail. To make them pass we need to wire-up the filters in our endpoint.
Change the /tasks endpoint
Our show_all_tasks() method needs to use our TaskFilter as a dependency and pass that filter to our data store. Our new Filter package comes with the FilterDepends keyword, that does all the magic to translate the query string to our TaskFilter object:
1 2 3 4 5 6 7 8 |
from fastapi_filter import FilterDepends from ..models.task_filter import TaskFilter @router.get("/") async def show_all_tasks(filter: TaskFilter = FilterDepends(TaskFilter), db: DataStoreDb = Depends(get_db)) -> List[TaskOutput]: result = await db.filter(filter) return result |
We can now run all our tests and they should pass.
Updated documentation
If we start our application and head to the /docs endpoint, we find the filter fields as part of the documentation:
Next
With FastAPI Filter we can create our filter object and let the package write the correct query for us. We can focus on the filters we want and skip the rest for later. That offers us a lot of flexibility and still gives us a much better solution than the hand-written one we started with.
There is one downside of our change: our clients must adapt to the new way, or they can no longer use our service. Next week we look at ways to prevent this with an API endpoint versioning strategy.
1 thought on “Python Friday #241: Filters for SQLAlchemy and FastAPI”