The Instructure Community will enter a read-only state on November 22, 2025 as we prepare to migrate to our new Community platform in early December.
Read our blog post for more info about this change.
Found this content helpful? Log in or sign up to leave a like!
DAPClientError: malformed HTTP response occurs at least twice in a month. and was unable to identify the exact issue.
DAPClientError Traceback (most recent call last)
Cell In [35], line 3
1 if __name__ == '__main__':
2 loop = asyncio.get_event_loop()
----> 3 res = loop.run_until_complete(main())
4 try:
5 loop.close()
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/nest_asyncio.py:90, in _patch_loop.<locals>.run_until_complete(self, future)
87 if not f.done():
88 raise RuntimeError(
89 'Event loop stopped before Future completed.')
---> 90 return f.result()
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
199 self.__log_traceback = False
200 if self._exception is not None:
--> 201 raise self._exception.with_traceback(self._exception_tb)
202 return self._result
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the send method directly, because coroutines
231 # don't have __iter__ and __next__ methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [31], line 3, in main()
1 async def main():
2 async with ClientSession() as session:
----> 3 data_d = await asyncio.gather(*[download_and_upload_to_blob(table, session) for table in valid_tables])
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
302 def __wakeup(self, future):
303 try:
--> 304 future.result()
305 except BaseException as exc:
306 # This may also be a cancellation.
307 self.__step(exc)
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the send method directly, because coroutines
231 # don't have __iter__ and __next__ methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [27], line 27, in download_and_upload_to_blob(table, session)
25 await asyncio.sleep(BACKOFF_FACTOR ** attempt)
26 else:
---> 27 raise e
31 # Create a BlobServiceClient
32 blob_service_client = BlobServiceClient(
33 account_url=account_url,
34 credential=account_key
35 )
Cell In [27], line 20, in download_and_upload_to_blob(table, session)
13 async with DAPClient() as session:
14 query = IncrementalQuery(
15 format=Format.Parquet,
16 mode=None,
17 since=last_seen,
18 until=None,
19 )
---> 20 await session.download_table_data("canvas", table, query, output_directory)
21 break
22 except Exception as e:
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:676, in DAPSession.download_table_data(self, namespace, table, query, output_directory, decompress)
673 objects = await self.get_objects(job.id)
674 directory = os.path.join(output_directory, f"job_{job.id}")
--> 676 downloaded_files = await self.download_objects(objects, directory, decompress)
677 logger.info(f"Files from server downloaded to folder: {directory}")
679 if isinstance(job, CompleteSnapshotJob):
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:566, in DAPSession.download_objects(self, objects, output_directory, decompress)
553 """
554 Save data stored remotely into a local directory.
555
(...)
559 :returns: A list of paths to files saved in the local file system.
560 """
562 downloads = [
563 self.download_object(object, output_directory, decompress)
564 for object in objects
565 ]
--> 566 local_files = await gather_n(downloads, concurrency=DOWNLOAD_CONCURRENCY)
568 return local_files
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/concurrency.py:186, in gather_n(coroutines, concurrency, return_exceptions)
169 async def gather_n(
170 coroutines: Iterable[Invokable[T]],
171 *,
172 concurrency: int,
173 return_exceptions: bool = False
174 ) -> Iterable[T]:
175 """
176 Runs awaitable objects, with at most the specified degree of concurrency.
177
(...)
183 :raises asyncio.CancelledError: Raised when one of the tasks in cancelled.
184 """
--> 186 results = await _gather_n(
187 coroutines, concurrency=concurrency, return_exceptions=return_exceptions
188 )
190 if isinstance(coroutines, list):
191 return list(results)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/concurrency.py:111, in _gather_n(coroutines, concurrency, return_exceptions)
109 exc = task.exception()
110 if exc and not return_exceptions:
--> 111 raise exc
113 # schedule some tasks up to degree of concurrency
114 for _, coroutine in zip(range(len(done)), iterator):
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the send method directly, because coroutines
231 # don't have __iter__ and __next__ methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:582, in DAPSession.download_object(self, object, output_directory, decompress)
570 async def download_object(
571 self, object: Object, output_directory: str, decompress: bool = False
572 ) -> str:
573 """
574 Save a single remote file to a local directory.
575
(...)
579 :returns: A path of the file saved in the local file system.
580 """
--> 582 resource = (await self.get_resources([object]))[object.id]
584 return await self.download_resource(resource, output_directory, decompress)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:490, in DAPSession.get_resources(self, objects)
487 logger.debug("Retrieve resource URLs for objects:")
488 logger.debug([o.id for o in objects])
--> 490 response = await self._post("/dap/object/url", objects, ResourceResult)
492 return response.urls
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:311, in DAPSession._post(self, path, request_data, response_type)
304 logger.debug(f"POST request payload:\n{repr(request_payload)}")
306 async with self._session.post(
307 f"{self._base_url}{path}",
308 data=json_dump_string(request_payload),
309 headers={"Content-Type": "application/json"},
310 ) as response:
--> 311 return await self._process(response, response_type)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:348, in DAPSession._process(self, response, response_type, suppress_output)
345 if response_text:
346 logger.error(f"malformed HTTP response:\n{response_text}")
--> 348 raise DAPClientError("malformed HTTP response")
350 if not suppress_output:
351 logger.debug(f"GET/POST response payload:\n{repr(response_payload)}")
DAPClientError: malformed HTTP response
and from Today this new error occurred 'ProcessingError'
Operation on target Incremental Load failed: ---------------------------------------------------------------------------
ProcessingError Traceback (most recent call last)
Cell In [51], line 3
1 if __name__ == '__main__':
2 loop = asyncio.get_event_loop()
----> 3 res = loop.run_until_complete(main())
4 try:
5 loop.close()
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/nest_asyncio.py:90, in _patch_loop.<locals>.run_until_complete(self, future)
87 if not f.done():
88 raise RuntimeError(
89 'Event loop stopped before Future completed.')
---> 90 return f.result()
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
199 self.__log_traceback = False
200 if self._exception is not None:
--> 201 raise self._exception.with_traceback(self._exception_tb)
202 return self._result
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the `send` method directly, because coroutines
231 # don't have `__iter__` and `__next__` methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [47], line 3, in main()
1 async def main():
2 async with ClientSession() as session:
----> 3 data_d = await asyncio.gather(*[download_and_upload_to_blob_logs(table, session) for table in valid_tables_logs])
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
302 def __wakeup(self, future):
303 try:
--> 304 future.result()
305 except BaseException as exc:
306 # This may also be a cancellation.
307 self.__step(exc)
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the `send` method directly, because coroutines
231 # don't have `__iter__` and `__next__` methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [45], line 23, in download_and_upload_to_blob_logs(table, session)
21 await asyncio.sleep(BACKOFF_FACTOR ** attempt)
22 else:
---> 23 raise e
25 # Create a BlobServiceClient
26 blob_service_client = BlobServiceClient(
27 account_url=account_url,
28 credential=account_key
29 )
Cell In [45], line 16, in download_and_upload_to_blob_logs(table, session)
9 async with DAPClient() as session:
10 query = IncrementalQuery(
11 format=Format.Parquet,
12 mode=None,
13 since=last_seen,
14 until=None,
15 )
---> 16 await session.download_table_data("canvas_logs", table, query, output_directory)
17 break
18 except Exception as e:
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:668, in DAPSession.download_table_data(self, namespace, table, query, output_directory, decompress)
665 # fail early if output directory does not exist and cannot be created
666 os.makedirs(output_directory, exist_ok=True)
--> 668 job = await self.execute_job(namespace, table, query)
670 if job.status is not JobStatus.Complete:
671 raise DAPClientError(f"query job ended with status: {job.status.value}")
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:636, in DAPSession.execute_job(self, namespace, table, query)
634 job = await self.query_snapshot(namespace, table, query)
635 elif isinstance(query, IncrementalQuery):
--> 636 job = await self.query_incremental(namespace, table, query)
637 else:
638 raise TypeError(f"type mismatch for parameter `query`: {type(query)}")
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:404, in DAPSession.query_incremental(self, namespace, table, query)
399 """
400 Starts an incremental query.
401 """
403 logger.debug(f"Query updates for table: {table}")
--> 404 job = await self._post(f"/dap/query/{namespace}/table/{table}/data", query, Job) # type: ignore
405 return job
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:311, in DAPSession._post(self, path, request_data, response_type)
304 logger.debug(f"POST request payload:\n{repr(request_payload)}")
306 async with self._session.post(
307 f"{self._base_url}{path}",
308 data=json_dump_string(request_payload),
309 headers={"Content-Type": "application/json"},
310 ) as response:
--> 311 return await self._process(response, response_type)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:358, in DAPSession._process(self, response, response_type, suppress_output)
356 error_object = self._map_to_error_type(response.status, response_payload)
357 logger.warning(f"Received error in response: {error_object}")
--> 358 raise error_object
359 else:
360 response_object = json_to_object(response_type, response_payload)
ProcessingError:
Community helpTo interact with Panda Bot, our automated chatbot, you need to sign up or log in:
Sign inTo interact with Panda Bot, our automated chatbot, you need to sign up or log in:
Sign in