#1021 backend: more aggressive attempts to contact frontend
Merged 4 years ago by praiskup. Opened 4 years ago by praiskup.

file modified
+1 -1
@@ -586,7 +586,7 @@

  

          try:

              self.frontend_client.update({"actions": [result]})

-         except RequestException:

+         except FrontendClientException:

              self.log.exception("can't post to frontend, retrying indefinitely")

              return False

          return True

@@ -3,9 +3,9 @@

  import time

  import multiprocessing

  from setproctitle import setproctitle

- from requests import get, RequestException

  

  from backend.frontend import FrontendClient

+ from backend.exceptions import FrontendClientException

  

  from ..actions import ActionWorkerManager, ActionQueueTask

  from ..helpers import get_redis_logger, get_redis_connection
@@ -23,6 +23,7 @@

  

          self.opts = opts

          self.log = get_redis_logger(self.opts, "backend.action_dispatcher", "action_dispatcher")

+         self.frontend_client = FrontendClient(self.opts, self.log)

  

      def update_process_title(self, msg=None):

          proc_title = "Action dispatcher"
@@ -36,10 +37,8 @@

          """

  

          try:

-             url = "{0}/backend/pending-actions/".format(self.opts.frontend_base_url)

-             request = get(url, auth=("user", self.opts.frontend_auth))

-             raw_actions = request.json()

-         except (RequestException, ValueError) as error:

+             raw_actions = self.frontend_client.get('pending-actions').json()

+         except (FrontendClientException, ValueError) as error:

              self.log.exception(

                  "Retrieving an action tasks failed with error: %s",

                  error)

@@ -6,10 +6,9 @@

  

  import lockfile

  from daemon import DaemonContext

- from requests import RequestException

  from backend.frontend import FrontendClient

  

- from ..exceptions import CoprBackendError

+ from ..exceptions import CoprBackendError, FrontendClientException

  from ..helpers import BackendConfigReader, get_redis_logger

  

  
@@ -50,8 +49,8 @@

  

          try:

              self.log.info("Rescheduling old unfinished builds")

-             self.frontend_client.reschedule_all_running(120) # 10 minutes

-         except RequestException as err:

+             self.frontend_client.reschedule_all_running()

+         except FrontendClientException as err:

              self.log.exception(err)

              raise CoprBackendError(err)

  

@@ -6,12 +6,12 @@

  from collections import defaultdict

  

  from setproctitle import setproctitle

- from requests import get, RequestException

  

  from backend.frontend import FrontendClient

  

  from ..helpers import get_redis_logger

- from ..exceptions import DispatchBuildError, NoVmAvailable

+ from ..exceptions import (DispatchBuildError, NoVmAvailable,

+                           FrontendClientException)

  from ..job import BuildJob

  from ..vm_manage.manager import VmManager

  from ..constants import BuildStatus
@@ -80,10 +80,8 @@

              self.update_process_title("Waiting for jobs from frontend for {} s"

                                        .format(int(time.time() - get_task_init_time)))

              try:

-                 tasks = get("{0}/backend/pending-jobs/".format(self.opts.frontend_base_url),

-                             auth=("user", self.opts.frontend_auth)).json()

- 

-             except (RequestException, ValueError) as error:

+                 tasks = self.frontend_client.get('pending-jobs').json()

+             except (FrontendClientException, ValueError) as error:

                  self.log.exception("Retrieving build jobs from %s failed with error: %s",

                                     self.opts.frontend_base_url, error)

              finally:
@@ -107,7 +105,7 @@

              job.started_on = time.time()

              job.status = BuildStatus.STARTING

              can_build_start = self.frontend_client.starting_build(job.to_dict())

-         except (RequestException, ValueError) as error:

+         except (FrontendClientException, ValueError) as error:

              self.log.exception("Communication with Frontend to confirm build start failed with error: %s", error)

              return False

  

@@ -358,6 +358,7 @@

      def run(self):

          self.log.info("Starting worker")

          self.init_buses()

+         self.frontend_client.try_indefinitely = True

  

          try:

              self.do_job(self.job)

@@ -138,3 +138,7 @@

  

  class DispatchBuildError(CoprBackendError):

      pass

+ 

+ 

+ class FrontendClientException(Exception):

+     pass

file modified
+99 -49
@@ -1,78 +1,128 @@

  import json

  import time

- from requests import post, get, RequestException

+ import logging

+ from requests import get, post, put, RequestException

+ 

+ from backend.exceptions import FrontendClientException

+ 

+ # prolong the sleep time before asking frontend again

+ SLEEP_INCREMENT_TIME = 5

+ # reasonable timeout for requests that block backend daemon

+ BACKEND_TIMEOUT = 2*60

+ 

+ class FrontendClientRetryError(Exception):

+     pass

  

- RETRY_TIMEOUT = 5

  

  class FrontendClient(object):

      """

      Object to send data back to fronted

      """

  

+     # do we block the main daemon process?

+     try_indefinitely = False

+ 

      def __init__(self, opts, logger=None):

          super(FrontendClient, self).__init__()

          self.frontend_url = "{}/backend".format(opts.frontend_base_url)

          self.frontend_auth = opts.frontend_auth

  

          self.msg = None

-         self.log = logger

- 

-     def _post_to_frontend(self, data, url_path):

-         """

-         Make a request to the frontend

-         """

- 

+         self.logger = logger

+ 

+     @property

+     def log(self):

+         'return configured logger object, or no-op logger'

+         if not self.logger:

+             self.logger = logging.getLogger(__name__)

+             self.logger.addHandler(logging.NullHandler())

+         return self.logger

+ 

+     def _frontend_request(self, url_path, data=None, authenticate=True,

+                           method='post'):

          headers = {"content-type": "application/json"}

          url = "{}/{}/".format(self.frontend_url, url_path)

-         auth = ("user", self.frontend_auth)

- 

-         self.msg = None

+         auth = ("user", self.frontend_auth) if authenticate else None

  

          try:

-             response = post(url, data=json.dumps(data), auth=auth, headers=headers)

-             if response.status_code >= 400:

-                 self.msg = "Failed to submit to frontend: {0}: {1}".format(

-                     response.status_code, response.text)

-                 raise RequestException(self.msg)

-         except RequestException as e:

-             self.msg = "Post request failed: {0}".format(e)

-             raise

+             kwargs = {

+                 'auth': auth,

+                 'headers': headers,

+             }

+             method = method.lower()

+             if method in ['post', 'put']:

+                 kwargs['data'] = json.dumps(data)

+                 method = post if method == 'post' else put

+             else:

+                 method = get

+             response = method(url, **kwargs)

+         except RequestException as ex:

+             raise FrontendClientRetryError(

+                 "Requests error on {}: {}".format(url, str(ex)))

+ 

+         if response.status_code >= 500:

+             # Server error.  Hopefully this is only temporary problem, we wan't

+             # to re-try, and wait till the server works again.

+             raise FrontendClientRetryError(

+                 "Request server error on {}: {} {}".format(

+                     url, response.status_code, response.reason))

+ 

+         if response.status_code >= 400:

+             # Client error.  The mistake is on our side, it doesn't make sense

+             # to continue with retries.

+             raise FrontendClientException(

+                 "Request client error on {}: {} {}".format(

+                     url, response.status_code, response.reason))

+ 

+         # TODO: Success, but tighten the redirects etc.

          return response

  

-     def get_reliably(self, url_path):

+     def _frontend_request_repeatedly(self, url_path, method='post', data=None,

+                                      authenticate=True):

          """

-         Get the URL response from frontend, try indefinitely till the server

-         gives us answer.

+         Repeat the request until it succeeds, or timeout is reached.

          """

-         url = "{}/{}/".format(self.frontend_url, url_path)

-         auth = ("user", self.frontend_auth)

+         sleep = SLEEP_INCREMENT_TIME

+         start = time.time()

+         stop = start + BACKEND_TIMEOUT

  

-         attempt = 0

+         i = 0

          while True:

-             attempt += 1

-             try:

-                 response = get(url, auth=auth)

-             except RequestException as ex:

-                 self.msg = "Get request {} failed: {}".format(attempt, ex)

-                 time.sleep(RETRY_TIMEOUT)

-                 continue

+             i += 1

+             if not self.try_indefinitely and time.time() > stop:

+                 raise FrontendClientException(

+                     "Attempt to talk to frontend timeouted "

+                     "(we gave it {} attempts)".format(i))

  

-             return response

+             try:

+                 return self._frontend_request(url_path, data=data,

+                                               authenticate=authenticate,

+                                               method=method)

+             except FrontendClientRetryError as ex:

+                 self.log.warning("Retry request #%s on %s: %s", i, url_path,

+                                  str(ex))

+                 time.sleep(sleep)

+                 sleep += SLEEP_INCREMENT_TIME

  

  

-     def _post_to_frontend_repeatedly(self, data, url_path, max_repeats=10):

+     def _post_to_frontend_repeatedly(self, data, url_path):

          """

-         Make a request max_repeats-time to the frontend

+         Repeat the request until it succeeds, or timeout is reached.

          """

-         for i in range(max_repeats):

-             try:

-                 if i and self.log:

-                     self.log.warning("failed to post data to frontend, repeat #{0}".format(i))

-                 return self._post_to_frontend(data, url_path)

-             except RequestException:

-                 time.sleep(5)

-         else:

-             raise RequestException("Failed to post to frontend for {} times".format(max_repeats))

+         return self._frontend_request_repeatedly(url_path, data=data)

+ 

+     def get(self, url_path):

+         'Issue relentless GET request to Frontend'

+         return self._frontend_request_repeatedly(url_path, method='get')

+ 

+     def post(self, url_path, data):

+         'Issue relentless POST request to Frontend'

+         return self._frontend_request_repeatedly(url_path, data=data)

+ 

+     def put(self, url_path, data):

+         'Issue relentless POST request to Frontend'

+         return self._frontend_request_repeatedly(url_path, data=data,

+                                                  method='put')

  

      def update(self, data):

          """
@@ -88,7 +138,7 @@

          """

          response = self._post_to_frontend_repeatedly(data, "starting_build")

          if "can_start" not in response.json():

-             raise RequestException("Bad respond from the frontend")

+             raise FrontendClientException("Bad response from the frontend")

          return response.json()["can_start"]

  

      def reschedule_build(self, build_id, task_id, chroot_name):
@@ -98,7 +148,7 @@

          data = {"build_id": build_id, "task_id": task_id, "chroot": chroot_name}

          self._post_to_frontend_repeatedly(data, "reschedule_build_chroot")

  

-     def reschedule_all_running(self, attempts):

-         response = self._post_to_frontend_repeatedly({}, "reschedule_all_running", attempts)

+     def reschedule_all_running(self):

+         response = self._post_to_frontend_repeatedly({}, "reschedule_all_running")

          if response.status_code != 200:

-             raise RequestException("Failed to reschedule all running jobs")

+             raise FrontendClientException("Failed to reschedule builds")

@@ -55,7 +55,7 @@

          redis.hset(args.worker_id, 'started', 1)

          redis.hset(args.worker_id, 'PID', os.getpid())

  

-     resp = frontend_client.get_reliably('action/{}'.format(task_id))

+     resp = frontend_client.get('action/{}'.format(task_id))

      if resp.status_code != 200:

          log.error("failed to download task, apache code %s", resp.status_code)

          sys.exit(1)

@@ -79,7 +79,7 @@

              self.mtime_optimization = not cmdline_opts.no_mtime_optimization

  

      def run(self):

-         response = self.frontend_client._post_to_frontend_repeatedly("", "chroots-prunerepo-status")

+         response = self.frontend_client.get("chroots-prunerepo-status")

          self.chroots = json.loads(response.content)

  

          results_dir = self.opts.destdir
@@ -102,7 +102,7 @@

          for chroot, active in self.chroots.items():

              if not active:

                  chroots_to_prune.append(chroot)

-         self.frontend_client._post_to_frontend_repeatedly(chroots_to_prune, "final-prunerepo-done")

+         self.frontend_client.post(chroots_to_prune, "final-prunerepo-done")

  

          loginfo("--------------------------------------------")

          loginfo("Pruning finished")

file modified
+91 -26
@@ -3,9 +3,11 @@

  import multiprocessing

  

  from munch import Munch

- from requests import RequestException

+ from requests import RequestException, Response

  

- from backend.frontend import FrontendClient

+ from backend.frontend import (FrontendClient, FrontendClientRetryError,

+                               SLEEP_INCREMENT_TIME)

+ from backend.exceptions import FrontendClientException

  

  from unittest import mock

  from unittest.mock import MagicMock
@@ -16,6 +18,12 @@

      with mock.patch("backend.frontend.post") as obj:

          yield obj

  

+ @pytest.fixture(scope='function', params=['get', 'post', 'put'])

+ def f_request_method(request):

+     'mock the requests.{get,post,put} method'

+     with mock.patch("backend.frontend.{}".format(request.param)) as ctx:

+         yield (request.param, ctx)

+ 

  

  @pytest.yield_fixture

  def mc_time():
@@ -43,55 +51,112 @@

          self.chroot_name = "fedora-20-x86_64"

  

      @pytest.fixture

-     def mask_post_to_fe(self):

-         self.ptf = MagicMock()

-         self.fc._post_to_frontend = self.ptf

+     def mask_frontend_request(self):

+         self.f_r = MagicMock()

+         self.fc._frontend_request = self.f_r

  

-     def test_post_to_frontend(self, post_req):

-         post_req.return_value.status_code = 200

-         self.fc._post_to_frontend(self.data, self.url_path)

+     def test_post_to_frontend(self, f_request_method):

+         name, method = f_request_method

+         method.return_value.status_code = 200

+         self.fc._frontend_request(self.url_path, self.data, method=name)

+         assert method.called

  

-         assert post_req.called

+     def test_post_to_frontend_wrappers(self, f_request_method):

+         name, method = f_request_method

+         method.return_value.status_code = 200

+ 

+         call = getattr(self.fc, name)

+         if name == 'get':

+             call(self.url_path)

+         else:

+             call(self.url_path, self.data)

+ 

+         assert method.called

  

      def test_post_to_frontend_not_200(self, post_req):

          post_req.return_value.status_code = 501

-         with pytest.raises(RequestException):

-             self.fc._post_to_frontend(self.data, self.url_path)

+         with pytest.raises(FrontendClientRetryError):

+             self.fc._frontend_request(self.url_path, self.data)

  

          assert post_req.called

  

      def test_post_to_frontend_post_error(self, post_req):

          post_req.side_effect = RequestException()

-         with pytest.raises(RequestException):

-             self.fc._post_to_frontend(self.data, self.url_path)

+         with pytest.raises(FrontendClientRetryError):

+             self.fc._frontend_request(self.url_path, self.data)

  

          assert post_req.called

  

-     def test_post_to_frontend_repeated_first_try_ok(self, mask_post_to_fe, mc_time):

+     def test_post_to_frontend_repeated_first_try_ok(self, mask_frontend_request, mc_time):

          response = "ok\n"

-         self.ptf.return_value = response

+         self.f_r.return_value = response

+         mc_time.time.return_value = 0

  

          assert self.fc._post_to_frontend_repeatedly(self.data, self.url_path) == response

          assert not mc_time.sleep.called

  

-     def test_post_to_frontend_repeated_second_try_ok(self, mask_post_to_fe, mc_time):

+     def test_post_to_frontend_repeated_second_try_ok(self, f_request_method,

+             mask_frontend_request, mc_time):

+         method_name, method = f_request_method

+ 

          response = "ok\n"

-         self.ptf.side_effect = [

-             RequestException(),

+         self.f_r.side_effect = [

+             FrontendClientRetryError(),

              response,

          ]

- 

-         assert self.fc._post_to_frontend_repeatedly(self.data, self.url_path) == response

+         mc_time.time.return_value = 0

+         assert self.fc._frontend_request_repeatedly(

+             self.url_path,

+             data=self.data,

+             method=method_name

+         ) == response

          assert mc_time.sleep.called

  

-     def test_post_to_frontend_repeated_all_attempts_failed(self, mask_post_to_fe, mc_time):

-         self.ptf.side_effect = RequestException()

+     def test_post_to_frontend_err_400(self, post_req, mc_time):

+         response = Response()

+         response.status_code = 404

+         response.reason = 'NOT FOUND'

  

-         with pytest.raises(RequestException):

-             self.fc._post_to_frontend_repeatedly(self.data, self.url_path)

+         post_req.side_effect = [

+             FrontendClientRetryError(),

+             response,

+         ]

  

+         mc_time.time.return_value = 0

+         with pytest.raises(FrontendClientException):

+             assert self.fc._post_to_frontend_repeatedly(self.data, self.url_path) == response

          assert mc_time.sleep.called

  

+     @mock.patch('backend.frontend.BACKEND_TIMEOUT', 100)

+     def test_post_to_frontend_repeated_all_attempts_failed(self,

+             mask_frontend_request, caplog, mc_time):

+         mc_time.time.side_effect = [0, 0, 5, 5+10, 5+10+15, 5+10+15+20, 1000]

+         self.f_r.side_effect = FrontendClientRetryError()

+         with pytest.raises(FrontendClientException):

+             self.fc._post_to_frontend_repeatedly(self.data, self.url_path)

+         assert mc_time.sleep.call_args_list == [mock.call(x) for x in [5, 10, 15, 20, 25]]

+         assert len(caplog.records) == 5

+ 

+     def test_post_to_frontend_repeated_indefinitely(self,

+             mask_frontend_request, caplog, mc_time):

+         mc_time.time.return_value = 1

+         self.fc.try_indefinitely = True

+         self.f_r.side_effect = [FrontendClientRetryError() for _ in range(100)] \

+                              + [FrontendClientException()] # e.g. 501 eventually

+         with pytest.raises(FrontendClientException):

+             self.fc._post_to_frontend_repeatedly(self.data, self.url_path)

+         assert mc_time.sleep.called

+         assert len(caplog.records) == 100

+ 

+     def test_reschedule_300(self, mask_frontend_request, post_req):

+         response = Response()

+         response.status_code = 302

+         response.reason = 'whatever'

+         post_req.side_effect = response

+         with pytest.raises(FrontendClientException) as ex:

+             self.fc.reschedule_all_running()

+         assert 'Failed to reschedule builds' in str(ex)

+ 

      def test_update(self):

          ptfr = MagicMock()

          self.fc._post_to_frontend_repeatedly = ptfr
@@ -110,7 +175,7 @@

          ptfr = MagicMock()

          self.fc._post_to_frontend_repeatedly = ptfr

  

-         with pytest.raises(RequestException):

+         with pytest.raises(FrontendClientException):

              self.fc.starting_build(self.data)

  

      def test_starting_build_err_2(self):
@@ -118,7 +183,7 @@

          self.fc._post_to_frontend_repeatedly = ptfr

          ptfr.return_value.json.return_value = {}

  

-         with pytest.raises(RequestException):

+         with pytest.raises(FrontendClientException):

              self.fc.starting_build(self.data)

  

      def test_reschedule_build(self):

@@ -369,7 +369,7 @@

  

      return flask.jsonify(response)

  

- @backend_ns.route("/chroots-prunerepo-status/", methods=["POST", "PUT"])

+ @backend_ns.route("/chroots-prunerepo-status/")

  def chroots_prunerepo_status():

      return flask.jsonify(MockChrootsLogic.chroots_prunerepo_status())

  

no initial comment

Metadata Update from @praiskup:
- Pull-request tagged with: blocked, needs-work

4 years ago

rebased onto a059b75ff637464c79c6a409229be05565e76a9e

4 years ago

Metadata Update from @praiskup:
- Pull-request untagged with: needs-work

4 years ago

3 new commits added

  • frontend, backend: add FrontendClient.{get,post,put}
  • backend: more aggressive posting to Frontend
  • backend: simplify logging in FrontendClient
4 years ago

rebased onto 2810385f6431b9a8d05e73e47e8bcaf7ffe379ab

4 years ago

rebased onto 2810385f6431b9a8d05e73e47e8bcaf7ffe379ab

4 years ago

Metadata Update from @praiskup:
- Pull-request untagged with: blocked

4 years ago

rebased onto 0912221ff274e647712840b6e30038b93114876b

4 years ago

What does "independent" mean? From what I understand from the code, I don't see anything like spawning a new independent thread for the communication with the frontend, or something like this. Am I missing something? In case it is just a variable deciding whether we want to indefinitely/infinitely (I am not sure what is the right word for this case), I would just call it that way.

Also, I don't like the setter function def set_independent(self): very much. It doesn't do anything useful and just sets this variable. I would remove it and just set the variable directly from wherever the client is used.

LGTM, another very appreciated PR.
I have just one comment in the code section.

By independent I meant that the worker doesn't affect backend process(es). I'll change that if you dislike that naming.

rebased onto 9479ba2

4 years ago

Metadata Update from @praiskup:
- Pull-request tagged with: release-blocker

4 years ago

Commit 1f590f7 fixes this pull-request

Pull-Request has been merged by praiskup

4 years ago

Commit 678f797 fixes this pull-request

Pull-Request has been merged by praiskup

4 years ago

Commit 5a6dd38 fixes this pull-request

Pull-Request has been merged by praiskup

4 years ago