ASRunLoopQueue.mm
ASDeallocQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- (void)releaseObjectInBackground:(id _Nullable __strong *)objectPtr
{
NSParameterAssert(objectPtr != NULL);
// Cast to CFType so we can manipulate retain count manually.
const auto cfPtr = (CFTypeRef *)(void *)objectPtr;
if (!cfPtr || !*cfPtr) {
return;
}
_lock.lock();
const auto isFirstEntry = _queue.empty();
// Push the pointer into our queue and clear their pointer.
// This "steals" the +1 from ARC and nils their pointer so they can't
// access or release the object.
_queue.push_back(*cfPtr);
*cfPtr = NULL;
_lock.unlock();
if (isFirstEntry) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.100 * NSEC_PER_SEC)), dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0), ^{
[self drain];
});
}
}
1
2
3
4
5
6
7
8
9
10
- (void)drain
{
_lock.lock();
const auto q = std::move(_queue);
_lock.unlock();
for (CFTypeRef ref : q) {
// NOTE: Could check that retain count is 1 and retry later if not.
CFRelease(ref);
}
}
ASAbstractRunLoopQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
@implementation ASAbstractRunLoopQueue
- (instancetype)init
{
self = [super init];
if (self == nil) {
return nil;
}
ASDisplayNodeAssert(self.class != [ASAbstractRunLoopQueue class], @"Should never create instances of abstract class ASAbstractRunLoopQueue.");
return self;
}
@end
ASRunLoopQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@interface ASRunLoopQueue () {
CFRunLoopRef _runLoop;
CFRunLoopSourceRef _runLoopSource;
CFRunLoopObserverRef _runLoopObserver;
NSPointerArray *_internalQueue; // Use NSPointerArray so we can decide __strong or __weak per-instance.
AS::RecursiveMutex _internalQueueLock;
// In order to not pollute the top-level activities, each queue has 1 root activity.
os_activity_t _rootActivity;
#if ASRunLoopQueueLoggingEnabled
NSTimer *_runloopQueueLoggingTimer;
#endif
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
static void runLoopSourceCallback(void *info) {
// No-op
#if ASRunLoopQueueVerboseLoggingEnabled
NSLog(@"<%@> - Called runLoopSourceCallback", info);
#endif
}
- (instancetype)initWithRunLoop:(CFRunLoopRef)runloop retainObjects:(BOOL)retainsObjects handler:(void (^)(id _Nullable, BOOL))handlerBlock
{
if (self = [super init]) {
_runLoop = runloop;
NSPointerFunctionsOptions options = retainsObjects ? NSPointerFunctionsStrongMemory : NSPointerFunctionsWeakMemory;
_internalQueue = [[NSPointerArray alloc] initWithOptions:options];
_queueConsumer = handlerBlock;
_batchSize = 1;
_ensureExclusiveMembership = YES;
// We don't want to pollute the top-level app activities with run loop batches, so we create one top-level
// activity per queue, and each batch activity joins that one instead.
_rootActivity = as_activity_create("Process run loop queue items", OS_ACTIVITY_NONE, OS_ACTIVITY_FLAG_DEFAULT);
{
// Log a message identifying this queue into the queue's root activity.
as_activity_scope_verbose(_rootActivity);
as_log_verbose(ASDisplayLog(), "Created run loop queue: %@", self);
}
// Self is guaranteed to outlive the observer. Without the high cost of a weak pointer,
// __unsafe_unretained allows us to avoid flagging the memory cycle detector.
__unsafe_unretained __typeof__(self) weakSelf = self;
void (^handlerBlock) (CFRunLoopObserverRef observer, CFRunLoopActivity activity) = ^(CFRunLoopObserverRef observer, CFRunLoopActivity activity) {
[weakSelf processQueue];
};
_runLoopObserver = CFRunLoopObserverCreateWithHandler(NULL, kCFRunLoopBeforeWaiting, true, 0, handlerBlock);
CFRunLoopAddObserver(_runLoop, _runLoopObserver, kCFRunLoopCommonModes);
// It is not guaranteed that the runloop will turn if it has no scheduled work, and this causes processing of
// the queue to stop. Attaching a custom loop source to the run loop and signal it if new work needs to be done
CFRunLoopSourceContext sourceContext = {};
sourceContext.perform = runLoopSourceCallback;
#if ASRunLoopQueueLoggingEnabled
sourceContext.info = (__bridge void *)self;
#endif
_runLoopSource = CFRunLoopSourceCreate(NULL, 0, &sourceContext);
CFRunLoopAddSource(runloop, _runLoopSource, kCFRunLoopCommonModes);
#if ASRunLoopQueueLoggingEnabled
_runloopQueueLoggingTimer = [NSTimer timerWithTimeInterval:1.0 target:self selector:@selector(checkRunLoop) userInfo:nil repeats:YES];
[[NSRunLoop mainRunLoop] addTimer:_runloopQueueLoggingTimer forMode:NSRunLoopCommonModes];
#endif
}
return self;
}
- (void)dealloc
{
if (CFRunLoopContainsSource(_runLoop, _runLoopSource, kCFRunLoopCommonModes)) {
CFRunLoopRemoveSource(_runLoop, _runLoopSource, kCFRunLoopCommonModes);
}
CFRelease(_runLoopSource);
_runLoopSource = nil;
if (CFRunLoopObserverIsValid(_runLoopObserver)) {
CFRunLoopObserverInvalidate(_runLoopObserver);
}
CFRelease(_runLoopObserver);
_runLoopObserver = nil;
}
- (void)checkRunLoop
{
NSLog(@"<%@> - Jobs: %ld", self, _internalQueue.count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
- (void)processQueue
{
BOOL hasExecutionBlock = (_queueConsumer != nil);
// If we have an execution block, this vector will be populated, otherwise remains empty.
// This is to avoid needlessly retaining/releasing the objects if we don't have a block.
std::vector<id> itemsToProcess;
BOOL isQueueDrained = NO;
{
MutexLocker l(_internalQueueLock);
NSInteger internalQueueCount = _internalQueue.count;
// Early-exit if the queue is empty.
if (internalQueueCount == 0) {
return;
}
ASSignpostStart(ASSignpostRunLoopQueueBatch);
// Snatch the next batch of items.
NSInteger maxCountToProcess = MIN(internalQueueCount, self.batchSize);
/**
* For each item in the next batch, if it's non-nil then NULL it out
* and if we have an execution block then add it in.
* This could be written a bunch of different ways but
* this particular one nicely balances readability, safety, and efficiency.
*/
NSInteger foundItemCount = 0;
for (NSInteger i = 0; i < internalQueueCount && foundItemCount < maxCountToProcess; i++) {
/**
* It is safe to use unsafe_unretained here. If the queue is weak, the
* object will be added to the autorelease pool. If the queue is strong,
* it will retain the object until we transfer it (retain it) in itemsToProcess.
*/
__unsafe_unretained id ptr = (__bridge id)[_internalQueue pointerAtIndex:i];
if (ptr != nil) {
foundItemCount++;
if (hasExecutionBlock) {
itemsToProcess.push_back(ptr);
}
[_internalQueue replacePointerAtIndex:i withPointer:NULL];
}
}
if (foundItemCount == 0) {
// If _internalQueue holds weak references, and all of them just become NULL, then the array
// is never marked as needsCompletion, and compact will return early, not removing the NULL's.
// Inserting a NULL here ensures the compaction will take place.
// See http://www.openradar.me/15396578 and https://stackoverflow.com/a/40274426/1136669
[_internalQueue addPointer:NULL];
}
[_internalQueue compact];
if (_internalQueue.count == 0) {
isQueueDrained = YES;
}
}
// itemsToProcess will be empty if _queueConsumer == nil so no need to check again.
const auto count = itemsToProcess.size();
if (count > 0) {
as_activity_scope_verbose(as_activity_create("Process run loop queue batch", _rootActivity, OS_ACTIVITY_FLAG_DEFAULT));
const auto itemsEnd = itemsToProcess.cend();
for (auto iterator = itemsToProcess.begin(); iterator < itemsEnd; iterator++) {
__unsafe_unretained id value = *iterator;
_queueConsumer(value, isQueueDrained && iterator == itemsEnd - 1);
as_log_verbose(ASDisplayLog(), "processed %@", value);
}
if (count > 1) {
as_log_verbose(ASDisplayLog(), "processed %lu items", (unsigned long)count);
}
}
// If the queue is not fully drained yet force another run loop to process next batch of items
if (!isQueueDrained) {
CFRunLoopSourceSignal(_runLoopSource);
CFRunLoopWakeUp(_runLoop);
}
ASSignpostEnd(ASSignpostRunLoopQueueBatch);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- (void)enqueue:(id)object
{
if (!object) {
return;
}
MutexLocker l(_internalQueueLock);
// Check if the object exists.
BOOL foundObject = NO;
if (_ensureExclusiveMembership) {
for (id currentObject in _internalQueue) {
if (currentObject == object) {
foundObject = YES;
break;
}
}
}
if (!foundObject) {
[_internalQueue addPointer:(__bridge void *)object];
if (_internalQueue.count == 1) {
CFRunLoopSourceSignal(_runLoopSource);
CFRunLoopWakeUp(_runLoop);
}
}
}
ASCATransactionQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@interface ASCATransactionQueue () {
CFRunLoopSourceRef _runLoopSource;
CFRunLoopObserverRef _preTransactionObserver;
// Current buffer for new entries, only accessed from within its mutex.
std::vector<id<ASCATransactionQueueObserving>> _internalQueue;
// No retain, no release, pointer hash, pointer equality.
// Enforce uniqueness in our queue. std::unordered_set does a heap allocation for each entry – not good.
CFMutableSetRef _internalQueueHashSet;
// Temporary buffer, only accessed from the main thread in -process.
std::vector<id<ASCATransactionQueueObserving>> _batchBuffer;
AS::Mutex _internalQueueLock;
// In order to not pollute the top-level activities, each queue has 1 root activity.
os_activity_t _rootActivity;
#if ASRunLoopQueueLoggingEnabled
NSTimer *_runloopQueueLoggingTimer;
#endif
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
- (instancetype)init
{
if (self = [super init]) {
_internalQueueHashSet = CFSetCreateMutable(NULL, 0, NULL);
// This is going to be a very busy queue – every node in the preload range will enter this queue.
// Save some time on first render by reserving space up front.
static constexpr int kInternalQueueInitialCapacity = 64;
_internalQueue.reserve(kInternalQueueInitialCapacity);
_batchBuffer.reserve(kInternalQueueInitialCapacity);
// We don't want to pollute the top-level app activities with run loop batches, so we create one top-level
// activity per queue, and each batch activity joins that one instead.
_rootActivity = as_activity_create("Process run loop queue items", OS_ACTIVITY_NONE, OS_ACTIVITY_FLAG_DEFAULT);
{
// Log a message identifying this queue into the queue's root activity.
as_activity_scope_verbose(_rootActivity);
as_log_verbose(ASDisplayLog(), "Created run loop queue: %@", self);
}
// Self is guaranteed to outlive the observer. Without the high cost of a weak pointer,
// __unsafe_unretained allows us to avoid flagging the memory cycle detector.
__unsafe_unretained __typeof__(self) weakSelf = self;
_preTransactionObserver = CFRunLoopObserverCreateWithHandler(NULL, kCFRunLoopBeforeWaiting, true, kASASCATransactionQueueOrder, ^(CFRunLoopObserverRef observer, CFRunLoopActivity activity) {
while (!weakSelf->_internalQueue.empty()) {
[weakSelf processQueue];
}
});
CFRunLoopAddObserver(CFRunLoopGetMain(), _preTransactionObserver, kCFRunLoopCommonModes);
// It is not guaranteed that the runloop will turn if it has no scheduled work, and this causes processing of
// the queue to stop. Attaching a custom loop source to the run loop and signal it if new work needs to be done
CFRunLoopSourceContext sourceContext = {};
sourceContext.perform = runLoopSourceCallback;
#if ASRunLoopQueueLoggingEnabled
sourceContext.info = (__bridge void *)self;
#endif
_runLoopSource = CFRunLoopSourceCreate(NULL, 0, &sourceContext);
CFRunLoopAddSource(CFRunLoopGetMain(), _runLoopSource, kCFRunLoopCommonModes);
#if ASRunLoopQueueLoggingEnabled
_runloopQueueLoggingTimer = [NSTimer timerWithTimeInterval:1.0 target:self selector:@selector(checkRunLoop) userInfo:nil repeats:YES];
[[NSRunLoop mainRunLoop] addTimer:_runloopQueueLoggingTimer forMode:NSRunLoopCommonModes];
#endif
}
return self;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- (void)dealloc
{
ASDisplayNodeAssertMainThread();
CFRelease(_internalQueueHashSet);
CFRunLoopRemoveSource(CFRunLoopGetMain(), _runLoopSource, kCFRunLoopCommonModes);
CFRelease(_runLoopSource);
_runLoopSource = nil;
if (CFRunLoopObserverIsValid(_preTransactionObserver)) {
CFRunLoopObserverInvalidate(_preTransactionObserver);
}
CFRelease(_preTransactionObserver);
_preTransactionObserver = nil;
}
1
2
3
4
- (void)checkRunLoop
{
NSLog(@"<%@> - Jobs: %ld", self, _internalQueue.count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- (void)processQueue
{
ASDisplayNodeAssertMainThread();
AS::UniqueLock l(_internalQueueLock);
NSInteger count = _internalQueue.size();
// Early-exit if the queue is empty.
if (count == 0) {
return;
}
as_activity_scope_verbose(as_activity_create("Process run loop queue batch", _rootActivity, OS_ACTIVITY_FLAG_DEFAULT));
ASSignpostStart(ASSignpostRunLoopQueueBatch);
// Swap buffers, clear our hash table.
_internalQueue.swap(_batchBuffer);
CFSetRemoveAllValues(_internalQueueHashSet);
// Unlock early. We are done with internal queue, and batch buffer is main-thread-only so no lock.
l.unlock();
for (const id<ASCATransactionQueueObserving> &value : _batchBuffer) {
[value prepareForCATransactionCommit];
as_log_verbose(ASDisplayLog(), "processed %@", value);
}
_batchBuffer.clear();
as_log_verbose(ASDisplayLog(), "processed %lu items", (unsigned long)count);
ASSignpostEnd(ASSignpostRunLoopQueueBatch);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- (void)enqueue:(id<ASCATransactionQueueObserving>)object
{
if (!object) {
return;
}
if (!self.enabled) {
[object prepareForCATransactionCommit];
return;
}
MutexLocker l(_internalQueueLock);
if (CFSetContainsValue(_internalQueueHashSet, (__bridge void *)object)) {
return;
}
CFSetAddValue(_internalQueueHashSet, (__bridge void *)object);
_internalQueue.emplace_back(object);
if (_internalQueue.size() == 1) {
CFRunLoopSourceSignal(_runLoopSource);
CFRunLoopWakeUp(CFRunLoopGetMain());
}
}