Meet AsyncStream

AsyncStream is brand new in Xcode 13 beta 3, and allows you to provide a set of values over time using the await syntax. When used correctly, this could replace some basic uses of combine.

If you’re not familiar with the basics of async in Swift, checkout the post on those first.

### How does it work?

To see how AsyncStream can improve your code, let’s take a look at an example of returning an array of content asynchronously, without AsyncStream.

class AsyncAlbumFetcher {
    private var albums = ["Lover" ,"Folklore", "Evermore", "Fearless (Taylor's Version)", "Red (Taylor's Version)"]
    
    func fetchAlbums() async -> [String] {
        var albumsToReturn: [String] = []
        while !albums.isEmpty {
            Thread.sleep(forTimeInterval: 1.0)
            albumsToReturn.append(albums.popLast() ?? "")
        }
        return albumsToReturn
    }
}
class AsyncAlbumFetcher {
    private var albums = ["Lover" ,"Folklore", "Evermore", "Fearless (Taylor's Version)", "Red (Taylor's Version)"]
    
    func fetchAlbums() async -> [String] {
        var albumsToReturn: [String] = []
        while !albums.isEmpty {
            Thread.sleep(forTimeInterval: 1.0)
            albumsToReturn.append(albums.popLast() ?? "")
        }
        return albumsToReturn
    }
}

Here we’re iterating over a local array and adding a fake one second delay, but you could imagine that these are coming from the web instead.

If you’re going to use this, the code would look something like this:

let basicFetcher = AsyncAlbumFetcher()

Task(priority: .userInitiated) {
    let albums = await basicFetcher.fetchAlbums()
    print("Got \(albums.count) albums!")
}

// After 5 seconds output:
"Got 5 albums!"
let basicFetcher = AsyncAlbumFetcher()

Task(priority: .userInitiated) {
    let albums = await basicFetcher.fetchAlbums()
    print("Got \(albums.count) albums!")
}

// After 5 seconds output:
"Got 5 albums!"

For a quick refresher, Task() { } is how you create an async task, and allows you call async code from non-async code, like a ViewController’s viewDidLoad function.

Whilst this is fine, it would be nice to be able to respond to these albums as they come in. We could do that without AsyncStream by creating a separate async task for each album, but that would require knowing the length of the array to begin with, which we wouldn’t if this was fetching from the web.

Let’s create a new fetcher that leans on AsyncStream instead.

class StreamingAlbumFetcher {
    private var albums = ["Lover" ,"Folklore", "Evermore", "Fearless (Taylor's Version)", "Red (Taylor's Version)"]
    
    func fetchAlbums() -> AsyncStream<String> {
        AsyncStream(String.self) { continuation in
            while !albums.isEmpty {
                Thread.sleep(forTimeInterval: 1.0)
                continuation.yield(albums.popLast() ?? "")
            }
        }
    }
}
class StreamingAlbumFetcher {
    private var albums = ["Lover" ,"Folklore", "Evermore", "Fearless (Taylor's Version)", "Red (Taylor's Version)"]
    
    func fetchAlbums() -> AsyncStream<String> {
        AsyncStream(String.self) { continuation in
            while !albums.isEmpty {
                Thread.sleep(forTimeInterval: 1.0)
                continuation.yield(albums.popLast() ?? "")
            }
        }
    }
}

There’s a couple things to look out for here, so let’s break it down. func fetchAlbums() -> AsyncStream<String> The function declaration doesn’t have the async keyword like you might expect, as we’re returning a special type instead.

AsyncStream(String.self) { continuation in

The initialiser for AsyncStream takes the type you’re going to return, and provides you with a continuation. The continuation works like the others you’ll have encountered in Swift concurrency, and allows you to yield a value with continuation.yield(thing) or finish with continuation.finish().

If you don’t add the call to finish, the containing task will keep running forever. We can see this in action if we run our code above.

let fetcher = StreamingAlbumFetcher()

print("Fetching albums")

Task(priority: .userInitiated) {
    var fetchedAlbums: [String] = []
    for await album in fetcher.fetchAlbums() {
        print(album)
        fetchedAlbums.append(album)
    }
    print("Got \(fetchedAlbums.count) albums!")
}
let fetcher = StreamingAlbumFetcher()

print("Fetching albums")

Task(priority: .userInitiated) {
    var fetchedAlbums: [String] = []
    for await album in fetcher.fetchAlbums() {
        print(album)
        fetchedAlbums.append(album)
    }
    print("Got \(fetchedAlbums.count) albums!")
}

There’s something new here, and that’s the for in syntax with an AsyncStream. This allows you to await the result of each item, and execute when you get it. This means the loop will pause between receiving each item.When you run this code, you’ll notice that there’s two problems. The first is that the code waits five seconds and returns the whole array in one go, which means we haven’t actually built a proper stream. The second, is that the final print statement is never called. Let’s fix these one at a time.To fix the print statement never being called, we need to end the stream. To do that, we simply call continuation.finish() at the end of our while loop.

AsyncStream(String.self) { continuation in
    while !albums.isEmpty {
        Thread.sleep(forTimeInterval: 1.0)
        continuation.yield(albums.popLast() ?? "")
    }
    continuation.finish()
}
AsyncStream(String.self) { continuation in
    while !albums.isEmpty {
        Thread.sleep(forTimeInterval: 1.0)
        continuation.yield(albums.popLast() ?? "")
    }
    continuation.finish()
}

To fix the stream not actually streaming, we’ll need to get a little more advanced with our concurrency code. The reason it isn’t streaming is that we’re awaiting within the same task that we’re returning the stream. This means that the code waits until the loop is finished before returning anything.

We’ll need to do something called “detaching” to fix this. This requires making a separate task inside our AsyncStream. To do this, we use the Task initialiser.

func fetchAlbums() -> AsyncStream<String> {
    AsyncStream(String.self) { continuation in
        Task(priority: .background) {
            while !albums.isEmpty {
                Thread.sleep(forTimeInterval: 1.0)
                continuation.yield(albums.popLast() ?? "")
            }
            continuation.finish()
        }
    }
}
func fetchAlbums() -> AsyncStream<String> {
    AsyncStream(String.self) { continuation in
        Task(priority: .background) {
            while !albums.isEmpty {
                Thread.sleep(forTimeInterval: 1.0)
                continuation.yield(albums.popLast() ?? "")
            }
            continuation.finish()
        }
    }
}

This now means that we return a stream from the main task, and then yield values from the detached one. This allows the stream to yield them as they come, without pausing the primary task.

If you run this code now, you’ll get each album iterated on its own, a second delay, and then the next. When it completes, you’ll get the final print statement outputting.

Fetching albums // 1s wait
Red (Taylor's Version) // 1s wait
Fearless (Taylor's Version) // 1s wait
Evermore // 1s wait
Folklore // 1s wait
Lover
Got 5 albums!

Thats the basics of AsyncStream! There’s more to learn here, such as handling errors thrown by the stream, but you should be able to build something simple using this.

Want to chat about Swift, or learn a little more? I’m @SwiftyAlex over on twitter.

The code is available in my samples repository.