Your Community is getting an upgrade!
Read about our partnership with Higher Logic and how we will build the next generation of the Instructure Community.
Found this content helpful? Log in or sign up to leave a like!
Hello,
I am trying to use DAPSession.stream_resource to stream resources/files directly into an S3 bucket.
The idea is to pipe the stream directly into S3 without downloading and storing the entire file locally first, and then reuploading it into S3.
However, I am not sure how to use that method. First, I am not sure how to handle the async nature of it, and how/if/when to await it. Second, I am confused by it returning an Iteration of StreamReaders (i.e. potentially several), rather than just one. Since we are passing in one single resource, with that single resource representing one single download URL, shouldn't there be only one single StreamReader, rather than an iteration of them?!
On the S3/AWS side of my code, I want to use the boto3 S3 client's upload_fileobj() method (see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj....)
So given a job ID, what would my python code need to look like to get the resources for that job id and streaming them into s3 (self._dap_session is an instance of DAPSession)?
I tried this, but got an error:
async def stream_job_files_to_s3(self, job_id: str, namespace:str, tablename:str😞
async with self._dap_session as session:
objects = await session.get_objects(job_id)
if objects is not None:
urls = await session.get_resources(objects)
for key in urls.keys():
rsrc = urls[key]
key = key.replace('/', '_')
aiter = await session.stream_resource(rsrc)
s3_key = f'{self._prefix}/{namespace}/{tablename}/{key.strip("/")}'
async for stream in aiter:
self._s3_client.upload_fileobj(stream, self._bucket, s3_key)
keys.append(s3_key)
return keys
TypeError: object async_generator can't be used in 'await' expression
aiter = await session.stream_resource(rsrc)
Hi @msarnold,
We don't use DAPSession.stream_resource, but we use the following code to stream weblogs files to S3 after the query job has finished on Instructure's end. This isn't the complete Lambda function (there is some custom setup, etc.) but hopefully it's helpful! Let me know if you have any questions. (The libraries being used are boto3, json, and requests.)
ETA: To be clear, there's no async happening here. One Lambda function hits the API to start the query job, then another Lambda function polls the server until the job is finished, and then the below function downloads the files. Those functions are all orchestrated in a state machine.
# Get details about the completed job from Instructure
cj_response = loads(requests.get(
f"https://api-gateway.instructure.com/dap/job/{event['job_id']}",
headers={"x-instauth" : event["access_token"]},
).text)
logger.info(f"Received response from Instructure: {cj_response}")
# Get the list of files for this request
objs_response = loads(requests.post(
"https://api-gateway.instructure.com/dap/object/url",
headers={"x-instauth" : event["access_token"]},
json=cj_response["objects"],
).text)
# Stream those files to S3
urls = objs_response["urls"]
for key in urls.keys():
logger.info(f"Uploading file {key} to S3")
with requests.get(urls[key]["url"], stream=True) as stream:
s3_client.upload_fileobj(stream.raw, secret["S3_BUCKET"], secret["S3_PREFIX"] + key.split("/")[1])
Relying on the proprietary header parameter X-InstAuth
to pass the authentication token is deprecated, and will no longer be available as an authentication option in a future version of Instructure API Gateway. Instead, use the standard HTTP header parameter Authorization
, passing the same token. Authorization
is the standard header parameter tools like curl
would employ.
Thanks very much for flagging that, @LeventeHunyadi. Is there a changelog or something where that was communicated? I appreciate your heroic attention to the forum but I'm also slightly concerned that I would have completely missed this otherwise. I see the change reflected in the API Gateway documentation now, but I wouldn't have revisited that documentation as long as my application continued to work.
Yeah, that's what I had done so far.
I got side-tracked by a dependency conflict issue that I thought was caused by a dependency of requests (rpds). So I was trying to get rid of requests and find some other way of streaming pieces directly into S3 instead of downloading the entire file locally.
I don't really care about sync vs async, because as you said, Lambda is only running short "atomic" steps which are orchestrated outside via Step Functions; but since the dap2 client is doing everything async, that's what you have to deal with...
Although... async might still come in handy if the server decides to split a result for a request into multiple files - using async might result in getting those downloads done concurrently instead of sequentially...
I finally figured out that the dependency issue problem had a completely different root cause; so I'm back to that original code now...
I'm still curious what the correct way is to use that stream_resource() function, though...
Thanks,
Mark
Curious if you ever figured it out?
No, I've given up on it and went back to my original code that's similar to what jwals had posted here...
If we can find some time here, we'll take a look at it. But pretty strapped for time. Will likely need to approach it similar to y'all.
I found the following snippet while building some of my scripts:
(Found at URL: https://data-access-platform-api.s3.eu-central-1.amazonaws.com/client/README.html#getting-latest-cha...)
import os from datetime import datetime, timezone from urllib.parse import ParseResult, urlparse import aiofiles from dap.api import DAPClient from dap.dap_types import Format, IncrementalQuery # timestamp returned by last snapshot or incremental query last_seen = datetime(2023, 2, 1, 0, 0, 0, tzinfo=timezone.utc) async with DAPClient() as session: query = IncrementalQuery( format=Format.JSONL, mode=None, since=last_seen, until=None, ) result = await session.get_table_data("canvas", "accounts", query) resources = await session.get_resources(result.objects) for resource in resources.values(): components: ParseResult = urlparse(str(resource.url)) file_path = os.path.join( os.getcwd(), "data", os.path.basename(components.path) ) async for stream in session.stream_resource(resource) async with aiofiles.open(file_path, "wb") as file: # save gzip data to file without decompressing async for chunk in stream.iter_chunked(64 * 1024) await file.write(chunk)
I actually wrote my own first, which was less sophisticated, then stumbled upon the snippet above when building my code for incremental updated. Hope it helps! 🙂
To interact with Panda Bot, our automated chatbot, you need to sign up or log in:
Sign InTo interact with Panda Bot, our automated chatbot, you need to sign up or log in:
Sign In