melbalabs journal

practical mysticism

25 Jul 2020

debugging distributed systems 2

Here's how a relatively small distributed system we'll discuss looks like in 2 different views.

airflow img airflow img

The execution of multiple tasks is managed by apache airflow, which is a workflow scheduler. It can schedule tasks, which are arranged in a dependency graph, to run on different machines. It's useful, because the tasks don't have to track each other for completion and failures. They can focus on working on their own problems in their own environments with their own dependencies.

In production the setup_jobs task started executing very slow (1hr+), often leading to timeouts and requiring staff to intervene.

Assuming no prior knowledge of this system, how do we even approach this? The plan is

  • recreate the system in an isolated environment
  • reproduce the issue there
  • find the root cause of the issue
  • investigate different solutions
  • compare the best looking solution for correctness with the original
  • deploy to prod after you gather all the blessings of the powers that be (management, senior staff, QA team)
  • avoid being called to work on the weekend, because you cut a corner somewhere and nobody actually cared enough to review your change

The plan is conceptually simple, its realization not so much. The system uses 4 different databases, 2 different autoscaling groups of virtual machines, multiple types of worker processes, a web dashboard that requires authentication.

Understanding the dependencies of the system is key to isolating it in its own environment. Your brand new soon-to-exist "isolated" dev environment is pure malware as far as I'm concerned, and you should treat it that way. Review everything for side effects that can potentialy destroy production. Re-check and question it at every step. For example using automatic deployments when setting up a dev env is a very good way to cause an automatic outage, because someone cut a corner and didn't parametrize something.

Sometimes when I give a coworker a completely unknown system to work on, I recreate the basic virtual machines and completely disable VM access to the network and external APIs. This way every external access request initially fails and they have to ask me for access to everything, which gives us a chance to review everything and reduces the risk of outage. Nobody said it's going to be easy.

After investigation the databases used are as follows

  • a db where apache airflow keeps its own execution graph progress, state, retries and authentication information
  • a db where a custom worker job queue stores progress. During execution the system generates a highly varying number of jobs (think tens of thousands), which the system implementers decided should not be handled by airflow, because that's not its main use case. Airflow is for scheduling and managing relatively static workflows.
  • a db populated by other unrelated external systems. This is basically the external input that requires processing
  • a db where this system stores its results to be consumed by other external systems. External output.

Cloning everything is out of the question, the external input db is a multi-terabyte cluster that costs tens of thousands per month. There are multiple ways to approach this.

  • copy some representative data to a smaller version of the cluster. Relatively hard, as you have to understand what exactly is needed, how to get it, when to get it (data is being cleaned up on schedule), where to put it, how to move it.
  • if you are lucky enough, the cluster stores useful input data (known to cause issues) long enough that you can refer to it on demand from your development setup. You have to make sure that the dev environment never modifies that data. This is a well known best practice in concurrent and distributed systems design - immutable input data, use intermediate storage to generate an output and store it somewhere else.

The first asg is the airflow setup (airflow worker, web server, scheduler) + the basic custom workflow code specific to the project, which sets up job execution, dependencies and basic processing.

The second asg are the workers for the custom job queue.

The airflow dashboard is useful for humans to monitor progress and peek and poke with the system. I immediately had issues with it in the dev env because authentication is based on google oauth. I didn't have access to the google account used to setup the oauth backend, nor I had the patience to recreate the setup with an account I have access to, so I changed the config to use plain password authentication instead. I created a new user with a newly generated random password, which only I know. Note how I didn't disable authentication completely, even though it's "just a dev env". Always assume the worst will happen, stick to the principle of least privilege in your distributed systems design and across every other aspect of your life.

After days of archeology, access to the production system for reference, reviewing the internal source code, modifying deployment scripts, reviewing the source of the main dependencies (a very old airflow version) and experimeting with the new dev setup, I end up with something working. Of course it's too early to celebrate. It turns out some configuration has to be manually inserted in some of the databases, so I fix that too. The harder issue to track was that after everything seemed to be running, the original issue was not being reproduced in the dev environment. Everything is fast.

Turns out one of the outputs of the system becomes part of the input for later runs. It's not realistic to recreate the state from scratch in this case, because the system has been running one workflow per day since 2018-03, so we would have 2.5 years to backfill. Both execution time and having the actual historical data are limiting factors. That data has been cleaned up long long ago. I copied the accumulated state from prod.

Finally I get that 1hr+ runtime. All the work so far was just to reproduce the issue in a lab environment. Now what?

The slow code under investigation was created by the "data science" team to process a bunch of data to enable better analytical reporting. The code is relatively short, but somewhat messy. It also uses the trendy pandas library, which markets itself as a data manipulation swiss army knife. It is a great tool, but it implements and assumes a lot of domain specific knowledge, jargon, programming style. I don't use it that regularly and I'd have to refer to the docs a lot to get the gist of the processing that's happening. Of course the original authors of the code have already left the company and there's no documentation and project requirements to refer to. Nobody cares about the accumulation of technical debt as it's too hard to estimate and to decide if it's worth addressing. Usually by the time it matters, it's someone else's problem. The human civilization has always accumulated debt for the future generations to deal with, so it's not a new concept.

Reading the code it looks relatively simple and I don't notice anything that can be causing such performance problems. I put breakpoints in different places to look around, but in the end I decided I should automate this process and looked around for a profiler. I tried a few different profilers and finally got a useful result.

   240|   1402909|      3.36518|  2.39872e-06|  0.05%|    episodes.loc[:, 'start_date'] = episodes.apply(lambda x: x['premier_date'] if (
   241|   1402909|      2657.13|   0.00189402| 36.50%|        (x['algorithm_id'] == 0) & (x['epid'] not in list(tracked_epids.epid))) else x.start_date, axis=1)
(call)|         1|    0.0293341|    0.0293341|  0.00%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/indexing.py:134 __setitem__
(call)|   2798398|      700.157|  0.000250199|  9.62%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/generic.py:2730 __getattr__
(call)|   1402908|      288.418|  0.000205585|  3.96%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/series.py:1088 __iter__
(call)|         1|  1.81198e-05|  1.81198e-05|  0.00%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/generic.py:1348 _indexer
(call)|   1402908|      45.3757|   3.2344e-05|  0.62%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/series.py:461 __len__
(call)|         1|      6191.93|      6191.93| 85.07%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/frame.py:4076 apply
(call)|   2813234|      657.705|   0.00023379|  9.04%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/series.py:598 __getitem__
   242|         1|  9.70364e-05|  9.70364e-05|  0.00%|    episodes.loc[:, 'start_date'] = pd.to_datetime(episodes.start_date)
(call)|         1|   0.00100398|   0.00100398|  0.00%|# /srv/service/12b2a689a8c181325cbe79072c5a69b3aea2e39f/venv/local/lib/python2.7/site-packages/pandas/core/generic.py:2730 __getattr__

Most of the time is spent on 2 lines of code. Basically the code iterates a big 2d array (pandas dataframe) and applies a function to each row. The problem is that the applied function compares a value from the row with all values from another big array to check membership. So a fix might be pretty simple

  • turn that tracked_epids list into a set, which will make lookups for each row constant instead of linear time
  • create the set outside of the loop, caching the value

Timings dropped from 55min to 8min on the data I tested with.

Algorithmic complexity matters when working with big datasets. Simple problems become hard when there's a lot of data. Things get even harder after a project has been deployed in production and the business depends on it.

Weeks later this fix still hasn't been deployed in production. Management is using generic risk management to approach the problem, even though the fix is 2 lines of code, the change is purely algebraic and there's no need for extensive testing and verification. To an extent they are right, because no matter how simple, a deployment always carries risk. Humans are pretty good at making mistakes.