Re: Changing head item on a (GCD) Queue


Dave
 

I came up with the following (untested). Any ideas on how replace the sleepForTimeInterval calls with a proper signal mechanism greatly appreciated.

All the Best
Dave


// The LTWQueue class ensures that the underlying array is protected by a lock/mutex. I’d love to be able to replace the sleepForTimeInterval calls with a proper signal mechanism…….

@implementation LTWConsumerTask

@property(nonatomic,strong) LTWQueue* pProcessingQueue;
@property(nonatomic,strong) id pLastObjectRemoved;
@property(nonatomic,assign) NSInteger pLastQueueCount;
@property(nonatomic,assign) BOOL pConsumerTaskSleepingFlag;


-(instancetype) init
{
self = [super init];
if (self == nil)
return nil;


self.pProcessingQueue = [[LTWQueue alloc] init];
self.pLastQueueCount = 0;
self.pConsumerTaskSleepingFlag = NO;

[NSThread detachNewThreadSelector:@selector(consumerTask) toTarget:self withObject:nil];

return self;
}



-(void) processObject:(id) theObject
{
}

// Background Consumer Task

-(void) consumerTask
{
NSInteger myObjectCount;

do
{
do
{
self.pLastQueueCount = [self.pProcessingQueue queueGetCount];
if (self.pLastQueueCount == 0)
{
self.pConsumerTaskSleepingFlag = YES;
[NSThread sleepForTimeInterval:0.1]; //****** Ugly
self.pConsumerTaskSleepingFlag = NO;
continue;
}
} while (self.pLastQueueCount == 0);

myObjectCount = self.pLastQueueCount;
while (myObjectCount != 0)
{
self.pLastObjectRemoved = [self.pProcessingQueue queueGetHead];
if (self.pLastObjectRemoved == nil)
break;

[self processObject:self.pLastObjectRemoved];

myObjectCount--;
}

self.pConsumerTaskSleepingFlag = YES;
[NSThread sleepForTimeInterval:0.1]; //****** Ugly
self.pConsumerTaskSleepingFlag = NO;
} while (YES);
}

// Add an object to the Queue - Processed by the Consumer Task.

-(void) addObject:(id) theObject
{
[self.pProcessingQueue queuePutTail:theObject];
}

On 9 Mar 2018, at 16:13, Dave <dave@looktowindward.com> wrote:

Hi,

I’m looking at the NSThread level and I can’t see anything that allows a thread to sleep until an event occurs (other than sleepUntilDate
and sleepForTimeInterval) or signal a thread for that matter. Am I missing something obvious?

All the Best
Dave

On 9 Mar 2018, at 13:39, Sandor Szatmari <admin.szatmari.net@gmail.com> wrote:

Can you have a queue manager class observe the count of items in the array and wake itself up when that increments? Then it would stay awake until the queue is empty.

Something like...

NSMutableArray *queue = ...
QueueManager *mgr = [QueueManager newQueueManagerObserving:queue];

Write QueueManager’s init to observe the queue
It’s dealloc to stop observing

Then when you receive notifications it would wake its thread and process.

Sandor

On Mar 9, 2018, at 08:19, Dave <dave@looktowindward.com> wrote:

Thanks Jens and Quincey, I think the best solution is to do what Jens suggested which is to have a custom thread type arrangement and share an Array as the Queue and protect it with a Lock/Mutex. The only thing I’m not clear on, it how to signal to the Consumer thread that an item has been added (e.g. wake the thread up). The way I saw it working is that I’d have one thread that is created at startup and sleeps until an entry has been added to the queue, it would then wake up and process all items on the queue before sleeping again.

A bit of background on what I am trying to do.

I have some C code that works on am embedded system using its own “OS/Kernel” and I’m trying to do something similar in my App so they can use a Mac instead of the embedded system (which is very hard to maintain since its very old and not supported anymore).

This code works with two threads, in a producer/consumer model. The cool thing about this Kernel is that it supports what it calls Queue Semaphores. A Queue Semaphore automatically performs the Thread Switching and Queue Management in a Thread-Safe Manner.

The pseudo code for the task I trying to reproduce on the Mac allows it to be coded really elegantly in my opinion.

(int) taskConsumer()
{
data* myData;

while (TRUE)
{
myData = GetQueueHeadAndWait(queue);

// Process Data
Code to Process the Data

// Free Data
free(myData)
}

// Never Reached

return 0;
}

(int) taskProducer((data*) theData)
{
data* myData;
bool newBlock;

// Check “theData” against the Head Item of the Queue and either allocate a new block and add it to the Tail or overwrite the existing Data in place.

newBlock = CheckData(queue,theData);
if (newBlock == TRUE)
{
// Allocate Buffer
myData = alloc(sizeof(data));

// Code to Copy from theData into myData


// Add Data to Queue, this causes the task to Consumer Task to become “fire” if it is asleep
PutQueueTail(queue,myData);
}
else
{
myData = GetQueueHeadNoWait(queue);
if (myData == NULL)
{
// Fatal Error
}

// Code to Copy from theData into myData, no need to add it to the queue, because its already there
}

return 0;
}

The way this works is that:

The Queue Semaphore is created empty, then the two tasks are created. One of the two tasks will get control first, if its the taskConsumer, then the call GetQueueHeadAndWait will cause the task to suspend (since the queue is empty). At some point taskProducer will run, which takes the data from the source and Adds it to the queue. If there are tasks waiting on the queue then the oldest task is given the data and it is made ready so that when it gets control, the return from GetQueueHeadAndWait return with the Data received. If there are no tasks waiting, then the data is just added to the queue (or overwritten).

There are various other functions in the Kernel that allow it Lock the Queue, set Task Priorities and things like that. In this case, the two tasks have equal priority and the Queue Semaphore used is private to these two Tasks.

I think this solution is so elegant and I’m trying to see if it is reproducible on the Mac/Cocoa and not having much luck so far. The beauty of it is is that it is completely self-regulating and self-synchronizing. Also this mechanism can re-used for different scenarios with very little tweaking, for instance, if you use two Queue Semaphores you can do things like:

FreeBufferQueue; Set to N Buffers when Queue is created.
ProcessQueue; Set Empty.

taskProducer
{
// Get a Free Buffer, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(FreeBufferQueue)

// Fill Buffer

// Add Buffer to Process Queue
PutQueueTail(ProcessQueue)

}
taskConsumer
{
// Get a Buffer to Process, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(ProcessQueue)

// Process Buffer

// Add Buffer to Free Queue (allow Producer to start producing again, if it ever stopped)
PutQueueTail(FreeBufferQueue)

}

This automatically throttles the Producer/Consumer to N buffers!

Any comments or suggestions greatly appreciated.

All the Best
Dave

On 7 Mar 2018, at 21:17, Quincey Morris <quinceymorris@rivergatesoftware.com> wrote:

On Mar 7, 2018, at 03:16 , Dave <dave@looktowindward.com> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property
You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.

Join cocoa@apple-dev.groups.io to automatically receive all group messages.