Changing head item on a (GCD) Queue


Dave
 

Hi All,

I need to do some changes to a Mac Objective-C project to do the following:

1. Data is Sent to a Delegate method in a class.

2. The data is validated by this method and if ok, it needs to be processed in background, e.g. added to a Queue.

3. There is a possibility that the data coming in makes items already queued for processing invalid, this happens when a burst of data is sent in a small time-frame. In this case, I want to examine the head of the queue (e.g. the next data block to be processed) and if this new data makes it invalid, overwrite it.

As an example I have this on the queue:

Head: Data.type = NewLocation, Data.Location = 123,

The new Data is

Data.type = NewLocation, Data.Location = 124,

Instead of adding this to the queue, I want to overwrite the exist Head item with the new DataLocation (124).

I’m trying to figure out the best (modern) way to this which I assume is to use GCD? Any suggestions, pointers or sample code on how to do this would be greatly appreciated.

All the Best
Dave


Quincey Morris
 

On Mar 5, 2018, at 05:04 , Dave <dave@...> wrote:

I’m trying to figure out the best (modern) way to this which I assume is to use GCD?

Not necessarily. Both NSOperationQueue and GCD can cancel un-started operations, but NSOperationQueue has a mechanism for cancelling running operations too. Overall, you probably get more control over the queue with NSOperationQueue.


Dave
 

Hi Quincey,

I thought about using an NSOperationQueue but from what I can tell, when an operation is cancelled it stays on the queue until the NSOperation method gets called which check the “Cancel” property, this means that there will still be a lot of useless Operations in the queue, unless I misunderstand the documentation?

As an example:

Head: Data 122 (Executing)
Data 123 (Cancelled)
Data 124 (Cancelled)
Data 130 (Cancelled)
Data 136

New Data received: Data 137

This will cause Data 136 to be Cancelled and Data 137 to be added to the end of the queue.

Is this how it would have to work?

All the Best
Dave

On 6 Mar 2018, at 00:14, Quincey Morris <quinceymorris@...> wrote:

On Mar 5, 2018, at 05:04 , Dave <dave@...> wrote:

I’m trying to figure out the best (modern) way to this which I assume is to use GCD?

Not necessarily. Both NSOperationQueue and GCD can cancel un-started operations, but NSOperationQueue has a mechanism for cancelling running operations too. Overall, you probably get more control over the queue with NSOperationQueue.



Dave
 

A little more on this.

I was thinking that in the example in my last email could (maybe) be handled like this:

The NSOperation class defines a method called something like “setNewData:(NSInteger) theNewData”. This overwrites the data currently stored in the NSOperation instance. If handled in this manner, the queue would look like this.

Head: Data 122 (Executing)
Data 137 (Ready) - The Data would change to 123,124,130,136 and finally 137, while Data 122 is executing.

However, I’m not sure if this is possible or not using an NSOperationQueue. If it is, then surely it is much faster than holding all the intermediate Operations on the Queue?

Thanks for the input, All the Best
Dave

On 7 Mar 2018, at 11:16, Dave <dave@...> wrote:

Hi Quincey,

I thought about using an NSOperationQueue but from what I can tell, when an operation is cancelled it stays on the queue until the NSOperation method gets called which check the “Cancel” property, this means that there will still be a lot of useless Operations in the queue, unless I misunderstand the documentation?

As an example:

Head: Data 122 (Executing)
Data 123 (Cancelled)
Data 124 (Cancelled)
Data 130 (Cancelled)
Data 136

New Data received: Data 137

This will cause Data 136 to be Cancelled and Data 137 to be added to the end of the queue.

Is this how it would have to work?

All the Best
Dave

On 6 Mar 2018, at 00:14, Quincey Morris <quinceymorris@...> wrote:

On Mar 5, 2018, at 05:04 , Dave <dave@...> wrote:

I’m trying to figure out the best (modern) way to this which I assume is to use GCD?
Not necessarily. Both NSOperationQueue and GCD can cancel un-started operations, but NSOperationQueue has a mechanism for cancelling running operations too. Overall, you probably get more control over the queue with NSOperationQueue.


 

Honestly I would just implement this in a fairly cross-platform way, by creating my own queue in an NSMutableArray. I'd have a thread reading items out of the queue and processing them, and the method that adds/coalesces items into the queue could be called by any other thread. (Of course I'd need a lock/mutex to protect access to the queue.)

—Jens


Quincey Morris
 

On Mar 7, 2018, at 03:16 , Dave <dave@...> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property

You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.


Dave
 

Thanks Jens and Quincey, I think the best solution is to do what Jens suggested which is to have a custom thread type arrangement and share an Array as the Queue and protect it with a Lock/Mutex. The only thing I’m not clear on, it how to signal to the Consumer thread that an item has been added (e.g. wake the thread up). The way I saw it working is that I’d have one thread that is created at startup and sleeps until an entry has been added to the queue, it would then wake up and process all items on the queue before sleeping again.

A bit of background on what I am trying to do.

I have some C code that works on am embedded system using its own “OS/Kernel” and I’m trying to do something similar in my App so they can use a Mac instead of the embedded system (which is very hard to maintain since its very old and not supported anymore).

This code works with two threads, in a producer/consumer model. The cool thing about this Kernel is that it supports what it calls Queue Semaphores. A Queue Semaphore automatically performs the Thread Switching and Queue Management in a Thread-Safe Manner.

The pseudo code for the task I trying to reproduce on the Mac allows it to be coded really elegantly in my opinion. 

(int) taskConsumer()
{
data* myData;

while (TRUE)
{
myData = GetQueueHeadAndWait(queue);

// Process Data
Code to Process the Data

// Free Data
free(myData)
}

// Never Reached

return 0;
}

(int) taskProducer((data*) theData)
{
data* myData;
bool newBlock;

//  Check “theData” against the Head Item of the Queue and either allocate a new block and add it to the Tail or overwrite the existing Data in place.

newBlock = CheckData(queue,theData);
if (newBlock == TRUE)
{
// Allocate Buffer
myData = alloc(sizeof(data));

// Code  to Copy from theData into myData


// Add Data to Queue, this causes the task to Consumer Task to become “fire” if it is asleep 
PutQueueTail(queue,myData);
}
else
{
myData = GetQueueHeadNoWait(queue);
if (myData == NULL)
{
// Fatal Error
}

// Code  to Copy from theData into myData, no need to add it to the queue, because its already there
}

return 0;
}

The way this works is that:

The Queue Semaphore is created empty, then the two tasks are created. One of the two tasks will get control first, if its the taskConsumer, then the call GetQueueHeadAndWait will cause the task to suspend (since the queue is empty). At some point taskProducer will run, which takes the data from the source and Adds it to the queue. If there are tasks waiting on the queue then the oldest task is given the data and it is made ready so that when it gets control, the return from GetQueueHeadAndWait return with the Data received. If there are no tasks waiting, then the data is just added to the queue (or overwritten). 

There are various other functions in the Kernel that allow it Lock the Queue, set Task Priorities and things like that. In this case, the two tasks have equal priority and the Queue Semaphore used is private to these two Tasks.

I think this solution is so elegant and I’m trying to see if it is reproducible on the Mac/Cocoa and not having much luck so far. The beauty of it is is that it is completely self-regulating and self-synchronizing. Also this mechanism can re-used for different scenarios with very little tweaking, for instance, if you use two Queue Semaphores you can do things like:

FreeBufferQueue; Set to N Buffers when Queue is created.
ProcessQueue; Set Empty.

taskProducer
{
//  Get a Free Buffer, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(FreeBufferQueue)

// Fill Buffer

//  Add Buffer to Process Queue 
PutQueueTail(ProcessQueue)

}
taskConsumer
{
//  Get a Buffer to Process, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(ProcessQueue)

// Process Buffer

//  Add Buffer to Free Queue (allow Producer to start producing again, if it ever stopped)
PutQueueTail(FreeBufferQueue)

}

This automatically throttles the Producer/Consumer to N buffers!

Any comments or suggestions greatly appreciated. 

All the Best
Dave

On 7 Mar 2018, at 21:17, Quincey Morris <quinceymorris@...> wrote:

On Mar 7, 2018, at 03:16 , Dave <dave@...> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property

You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.


Sandor Szatmari
 

Can you have a queue manager class observe the count of items in the array and wake itself up when that increments?  Then it would stay awake until the queue is empty.

Something like...

NSMutableArray *queue = ...
QueueManager *mgr = [QueueManager newQueueManagerObserving:queue];

Write QueueManager’s init to observe the queue
It’s dealloc to stop observing

Then when you receive notifications it would wake its thread and process.

Sandor

On Mar 9, 2018, at 08:19, Dave <dave@...> wrote:

Thanks Jens and Quincey, I think the best solution is to do what Jens suggested which is to have a custom thread type arrangement and share an Array as the Queue and protect it with a Lock/Mutex. The only thing I’m not clear on, it how to signal to the Consumer thread that an item has been added (e.g. wake the thread up). The way I saw it working is that I’d have one thread that is created at startup and sleeps until an entry has been added to the queue, it would then wake up and process all items on the queue before sleeping again.

A bit of background on what I am trying to do.

I have some C code that works on am embedded system using its own “OS/Kernel” and I’m trying to do something similar in my App so they can use a Mac instead of the embedded system (which is very hard to maintain since its very old and not supported anymore).

This code works with two threads, in a producer/consumer model. The cool thing about this Kernel is that it supports what it calls Queue Semaphores. A Queue Semaphore automatically performs the Thread Switching and Queue Management in a Thread-Safe Manner.

The pseudo code for the task I trying to reproduce on the Mac allows it to be coded really elegantly in my opinion. 

(int) taskConsumer()
{
data* myData;

while (TRUE)
{
myData = GetQueueHeadAndWait(queue);

// Process Data
Code to Process the Data

// Free Data
free(myData)
}

// Never Reached

return 0;
}

(int) taskProducer((data*) theData)
{
data* myData;
bool newBlock;

//  Check “theData” against the Head Item of the Queue and either allocate a new block and add it to the Tail or overwrite the existing Data in place.

newBlock = CheckData(queue,theData);
if (newBlock == TRUE)
{
// Allocate Buffer
myData = alloc(sizeof(data));

// Code  to Copy from theData into myData


// Add Data to Queue, this causes the task to Consumer Task to become “fire” if it is asleep 
PutQueueTail(queue,myData);
}
else
{
myData = GetQueueHeadNoWait(queue);
if (myData == NULL)
{
// Fatal Error
}

// Code  to Copy from theData into myData, no need to add it to the queue, because its already there
}

return 0;
}

The way this works is that:

The Queue Semaphore is created empty, then the two tasks are created. One of the two tasks will get control first, if its the taskConsumer, then the call GetQueueHeadAndWait will cause the task to suspend (since the queue is empty). At some point taskProducer will run, which takes the data from the source and Adds it to the queue. If there are tasks waiting on the queue then the oldest task is given the data and it is made ready so that when it gets control, the return from GetQueueHeadAndWait return with the Data received. If there are no tasks waiting, then the data is just added to the queue (or overwritten). 

There are various other functions in the Kernel that allow it Lock the Queue, set Task Priorities and things like that. In this case, the two tasks have equal priority and the Queue Semaphore used is private to these two Tasks.

I think this solution is so elegant and I’m trying to see if it is reproducible on the Mac/Cocoa and not having much luck so far. The beauty of it is is that it is completely self-regulating and self-synchronizing. Also this mechanism can re-used for different scenarios with very little tweaking, for instance, if you use two Queue Semaphores you can do things like:

FreeBufferQueue; Set to N Buffers when Queue is created.
ProcessQueue; Set Empty.

taskProducer
{
//  Get a Free Buffer, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(FreeBufferQueue)

// Fill Buffer

//  Add Buffer to Process Queue 
PutQueueTail(ProcessQueue)

}
taskConsumer
{
//  Get a Buffer to Process, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(ProcessQueue)

// Process Buffer

//  Add Buffer to Free Queue (allow Producer to start producing again, if it ever stopped)
PutQueueTail(FreeBufferQueue)

}

This automatically throttles the Producer/Consumer to N buffers!

Any comments or suggestions greatly appreciated. 

All the Best
Dave

On 7 Mar 2018, at 21:17, Quincey Morris <quinceymorris@...> wrote:

On Mar 7, 2018, at 03:16 , Dave <dave@...> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property

You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.


Dave
 

Hi,

I’m looking at the NSThread level and I can’t see anything that allows a thread to sleep until an event occurs (other than sleepUntilDate
and sleepForTimeInterval) or signal a thread for that matter. Am I missing something obvious?

All the Best
Dave

On 9 Mar 2018, at 13:39, Sandor Szatmari <admin.szatmari.net@...> wrote:

Can you have a queue manager class observe the count of items in the array and wake itself up when that increments? Then it would stay awake until the queue is empty.

Something like...

NSMutableArray *queue = ...
QueueManager *mgr = [QueueManager newQueueManagerObserving:queue];

Write QueueManager’s init to observe the queue
It’s dealloc to stop observing

Then when you receive notifications it would wake its thread and process.

Sandor

On Mar 9, 2018, at 08:19, Dave <dave@...> wrote:

Thanks Jens and Quincey, I think the best solution is to do what Jens suggested which is to have a custom thread type arrangement and share an Array as the Queue and protect it with a Lock/Mutex. The only thing I’m not clear on, it how to signal to the Consumer thread that an item has been added (e.g. wake the thread up). The way I saw it working is that I’d have one thread that is created at startup and sleeps until an entry has been added to the queue, it would then wake up and process all items on the queue before sleeping again.

A bit of background on what I am trying to do.

I have some C code that works on am embedded system using its own “OS/Kernel” and I’m trying to do something similar in my App so they can use a Mac instead of the embedded system (which is very hard to maintain since its very old and not supported anymore).

This code works with two threads, in a producer/consumer model. The cool thing about this Kernel is that it supports what it calls Queue Semaphores. A Queue Semaphore automatically performs the Thread Switching and Queue Management in a Thread-Safe Manner.

The pseudo code for the task I trying to reproduce on the Mac allows it to be coded really elegantly in my opinion.

(int) taskConsumer()
{
data* myData;

while (TRUE)
{
myData = GetQueueHeadAndWait(queue);

// Process Data
Code to Process the Data

// Free Data
free(myData)
}

// Never Reached

return 0;
}

(int) taskProducer((data*) theData)
{
data* myData;
bool newBlock;

// Check “theData” against the Head Item of the Queue and either allocate a new block and add it to the Tail or overwrite the existing Data in place.

newBlock = CheckData(queue,theData);
if (newBlock == TRUE)
{
// Allocate Buffer
myData = alloc(sizeof(data));

// Code to Copy from theData into myData


// Add Data to Queue, this causes the task to Consumer Task to become “fire” if it is asleep
PutQueueTail(queue,myData);
}
else
{
myData = GetQueueHeadNoWait(queue);
if (myData == NULL)
{
// Fatal Error
}

// Code to Copy from theData into myData, no need to add it to the queue, because its already there
}

return 0;
}

The way this works is that:

The Queue Semaphore is created empty, then the two tasks are created. One of the two tasks will get control first, if its the taskConsumer, then the call GetQueueHeadAndWait will cause the task to suspend (since the queue is empty). At some point taskProducer will run, which takes the data from the source and Adds it to the queue. If there are tasks waiting on the queue then the oldest task is given the data and it is made ready so that when it gets control, the return from GetQueueHeadAndWait return with the Data received. If there are no tasks waiting, then the data is just added to the queue (or overwritten).

There are various other functions in the Kernel that allow it Lock the Queue, set Task Priorities and things like that. In this case, the two tasks have equal priority and the Queue Semaphore used is private to these two Tasks.

I think this solution is so elegant and I’m trying to see if it is reproducible on the Mac/Cocoa and not having much luck so far. The beauty of it is is that it is completely self-regulating and self-synchronizing. Also this mechanism can re-used for different scenarios with very little tweaking, for instance, if you use two Queue Semaphores you can do things like:

FreeBufferQueue; Set to N Buffers when Queue is created.
ProcessQueue; Set Empty.

taskProducer
{
// Get a Free Buffer, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(FreeBufferQueue)

// Fill Buffer

// Add Buffer to Process Queue
PutQueueTail(ProcessQueue)

}
taskConsumer
{
// Get a Buffer to Process, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(ProcessQueue)

// Process Buffer

// Add Buffer to Free Queue (allow Producer to start producing again, if it ever stopped)
PutQueueTail(FreeBufferQueue)

}

This automatically throttles the Producer/Consumer to N buffers!

Any comments or suggestions greatly appreciated.

All the Best
Dave

On 7 Mar 2018, at 21:17, Quincey Morris <quinceymorris@...> wrote:

On Mar 7, 2018, at 03:16 , Dave <dave@...> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property
You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.


Jonathan Prescott
 

Look at NSCondition, which works with NSThread.  I’m assuming you are re-implementing GetQueueHeadAndWait, and CheckData along with the queue that they operate on.  The queue would consist of an NSMutableArray, and NSCondition variable which requires a predicate variable/function (returns a boolean), and a lock.  The NSCondition variable would be the mechanism that the consumer thread would wait on, and the producer thread would signal.

You could do the same with POSIX thread support routines, or C++ threading support in the std library, if you interested in portability.

Jonathan

On Mar 9, 2018, at 11:13 AM, Dave <dave@...> wrote:

Hi,

I’m looking at the NSThread level and I can’t see anything that allows a thread to sleep until an event occurs (other than sleepUntilDate
and sleepForTimeInterval) or signal a thread for that matter. Am I missing something obvious?

All the Best
Dave

On 9 Mar 2018, at 13:39, Sandor Szatmari <admin.szatmari.net@...> wrote:

Can you have a queue manager class observe the count of items in the array and wake itself up when that increments?  Then it would stay awake until the queue is empty.

Something like...

NSMutableArray *queue = ...
QueueManager *mgr = [QueueManager newQueueManagerObserving:queue];

Write QueueManager’s init to observe the queue
It’s dealloc to stop observing

Then when you receive notifications it would wake its thread and process.

Sandor

On Mar 9, 2018, at 08:19, Dave <dave@...> wrote:

Thanks Jens and Quincey, I think the best solution is to do what Jens suggested which is to have a custom thread type arrangement and share an Array as the Queue and protect it with a Lock/Mutex. The only thing I’m not clear on, it how to signal to the Consumer thread that an item has been added (e.g. wake the thread up). The way I saw it working is that I’d have one thread that is created at startup and sleeps until an entry has been added to the queue, it would then wake up and process all items on the queue before sleeping again.

A bit of background on what I am trying to do.

I have some C code that works on am embedded system using its own “OS/Kernel” and I’m trying to do something similar in my App so they can use a Mac instead of the embedded system (which is very hard to maintain since its very old and not supported anymore).

This code works with two threads, in a producer/consumer model. The cool thing about this Kernel is that it supports what it calls Queue Semaphores. A Queue Semaphore automatically performs the Thread Switching and Queue Management in a Thread-Safe Manner.

The pseudo code for the task I trying to reproduce on the Mac allows it to be coded really elegantly in my opinion. 

(int) taskConsumer()
{
data* myData;

while (TRUE)
{
myData = GetQueueHeadAndWait(queue);

// Process Data
Code to Process the Data

// Free Data
free(myData)
}

// Never Reached

return 0;
}

(int) taskProducer((data*) theData)
{
data* myData;
bool newBlock;

//  Check “theData” against the Head Item of the Queue and either allocate a new block and add it to the Tail or overwrite the existing Data in place.

newBlock = CheckData(queue,theData);
if (newBlock == TRUE)
{
// Allocate Buffer
myData = alloc(sizeof(data));

// Code  to Copy from theData into myData


// Add Data to Queue, this causes the task to Consumer Task to become “fire” if it is asleep 
PutQueueTail(queue,myData);
}
else
{
myData = GetQueueHeadNoWait(queue);
if (myData == NULL)
{
// Fatal Error
}

// Code  to Copy from theData into myData, no need to add it to the queue, because its already there
}

return 0;
}

The way this works is that:

The Queue Semaphore is created empty, then the two tasks are created. One of the two tasks will get control first, if its the taskConsumer, then the call GetQueueHeadAndWait will cause the task to suspend (since the queue is empty). At some point taskProducer will run, which takes the data from the source and Adds it to the queue. If there are tasks waiting on the queue then the oldest task is given the data and it is made ready so that when it gets control, the return from GetQueueHeadAndWait return with the Data received. If there are no tasks waiting, then the data is just added to the queue (or overwritten). 

There are various other functions in the Kernel that allow it Lock the Queue, set Task Priorities and things like that. In this case, the two tasks have equal priority and the Queue Semaphore used is private to these two Tasks.

I think this solution is so elegant and I’m trying to see if it is reproducible on the Mac/Cocoa and not having much luck so far. The beauty of it is is that it is completely self-regulating and self-synchronizing. Also this mechanism can re-used for different scenarios with very little tweaking, for instance, if you use two Queue Semaphores you can do things like:

FreeBufferQueue; Set to N Buffers when Queue is created.
ProcessQueue; Set Empty.

taskProducer
{
//  Get a Free Buffer, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(FreeBufferQueue)

// Fill Buffer

//  Add Buffer to Process Queue 
PutQueueTail(ProcessQueue)

}
taskConsumer
{
//  Get a Buffer to Process, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(ProcessQueue)

// Process Buffer

//  Add Buffer to Free Queue (allow Producer to start producing again, if it ever stopped)
PutQueueTail(FreeBufferQueue)

}

This automatically throttles the Producer/Consumer to N buffers!

Any comments or suggestions greatly appreciated. 

All the Best
Dave

On 7 Mar 2018, at 21:17, Quincey Morris <quinceymorris@...> wrote:

On Mar 7, 2018, at 03:16 , Dave <dave@...> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property

You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.






Dave
 

I came up with the following (untested). Any ideas on how replace the sleepForTimeInterval calls with a proper signal mechanism greatly appreciated.

All the Best
Dave


// The LTWQueue class ensures that the underlying array is protected by a lock/mutex. I’d love to be able to replace the sleepForTimeInterval calls with a proper signal mechanism…….

@implementation LTWConsumerTask

@property(nonatomic,strong) LTWQueue* pProcessingQueue;
@property(nonatomic,strong) id pLastObjectRemoved;
@property(nonatomic,assign) NSInteger pLastQueueCount;
@property(nonatomic,assign) BOOL pConsumerTaskSleepingFlag;


-(instancetype) init
{
self = [super init];
if (self == nil)
return nil;


self.pProcessingQueue = [[LTWQueue alloc] init];
self.pLastQueueCount = 0;
self.pConsumerTaskSleepingFlag = NO;

[NSThread detachNewThreadSelector:@selector(consumerTask) toTarget:self withObject:nil];

return self;
}



-(void) processObject:(id) theObject
{
}

// Background Consumer Task

-(void) consumerTask
{
NSInteger myObjectCount;

do
{
do
{
self.pLastQueueCount = [self.pProcessingQueue queueGetCount];
if (self.pLastQueueCount == 0)
{
self.pConsumerTaskSleepingFlag = YES;
[NSThread sleepForTimeInterval:0.1]; //****** Ugly
self.pConsumerTaskSleepingFlag = NO;
continue;
}
} while (self.pLastQueueCount == 0);

myObjectCount = self.pLastQueueCount;
while (myObjectCount != 0)
{
self.pLastObjectRemoved = [self.pProcessingQueue queueGetHead];
if (self.pLastObjectRemoved == nil)
break;

[self processObject:self.pLastObjectRemoved];

myObjectCount--;
}

self.pConsumerTaskSleepingFlag = YES;
[NSThread sleepForTimeInterval:0.1]; //****** Ugly
self.pConsumerTaskSleepingFlag = NO;
} while (YES);
}

// Add an object to the Queue - Processed by the Consumer Task.

-(void) addObject:(id) theObject
{
[self.pProcessingQueue queuePutTail:theObject];
}

On 9 Mar 2018, at 16:13, Dave <dave@...> wrote:

Hi,

I’m looking at the NSThread level and I can’t see anything that allows a thread to sleep until an event occurs (other than sleepUntilDate
and sleepForTimeInterval) or signal a thread for that matter. Am I missing something obvious?

All the Best
Dave

On 9 Mar 2018, at 13:39, Sandor Szatmari <admin.szatmari.net@...> wrote:

Can you have a queue manager class observe the count of items in the array and wake itself up when that increments? Then it would stay awake until the queue is empty.

Something like...

NSMutableArray *queue = ...
QueueManager *mgr = [QueueManager newQueueManagerObserving:queue];

Write QueueManager’s init to observe the queue
It’s dealloc to stop observing

Then when you receive notifications it would wake its thread and process.

Sandor

On Mar 9, 2018, at 08:19, Dave <dave@...> wrote:

Thanks Jens and Quincey, I think the best solution is to do what Jens suggested which is to have a custom thread type arrangement and share an Array as the Queue and protect it with a Lock/Mutex. The only thing I’m not clear on, it how to signal to the Consumer thread that an item has been added (e.g. wake the thread up). The way I saw it working is that I’d have one thread that is created at startup and sleeps until an entry has been added to the queue, it would then wake up and process all items on the queue before sleeping again.

A bit of background on what I am trying to do.

I have some C code that works on am embedded system using its own “OS/Kernel” and I’m trying to do something similar in my App so they can use a Mac instead of the embedded system (which is very hard to maintain since its very old and not supported anymore).

This code works with two threads, in a producer/consumer model. The cool thing about this Kernel is that it supports what it calls Queue Semaphores. A Queue Semaphore automatically performs the Thread Switching and Queue Management in a Thread-Safe Manner.

The pseudo code for the task I trying to reproduce on the Mac allows it to be coded really elegantly in my opinion.

(int) taskConsumer()
{
data* myData;

while (TRUE)
{
myData = GetQueueHeadAndWait(queue);

// Process Data
Code to Process the Data

// Free Data
free(myData)
}

// Never Reached

return 0;
}

(int) taskProducer((data*) theData)
{
data* myData;
bool newBlock;

// Check “theData” against the Head Item of the Queue and either allocate a new block and add it to the Tail or overwrite the existing Data in place.

newBlock = CheckData(queue,theData);
if (newBlock == TRUE)
{
// Allocate Buffer
myData = alloc(sizeof(data));

// Code to Copy from theData into myData


// Add Data to Queue, this causes the task to Consumer Task to become “fire” if it is asleep
PutQueueTail(queue,myData);
}
else
{
myData = GetQueueHeadNoWait(queue);
if (myData == NULL)
{
// Fatal Error
}

// Code to Copy from theData into myData, no need to add it to the queue, because its already there
}

return 0;
}

The way this works is that:

The Queue Semaphore is created empty, then the two tasks are created. One of the two tasks will get control first, if its the taskConsumer, then the call GetQueueHeadAndWait will cause the task to suspend (since the queue is empty). At some point taskProducer will run, which takes the data from the source and Adds it to the queue. If there are tasks waiting on the queue then the oldest task is given the data and it is made ready so that when it gets control, the return from GetQueueHeadAndWait return with the Data received. If there are no tasks waiting, then the data is just added to the queue (or overwritten).

There are various other functions in the Kernel that allow it Lock the Queue, set Task Priorities and things like that. In this case, the two tasks have equal priority and the Queue Semaphore used is private to these two Tasks.

I think this solution is so elegant and I’m trying to see if it is reproducible on the Mac/Cocoa and not having much luck so far. The beauty of it is is that it is completely self-regulating and self-synchronizing. Also this mechanism can re-used for different scenarios with very little tweaking, for instance, if you use two Queue Semaphores you can do things like:

FreeBufferQueue; Set to N Buffers when Queue is created.
ProcessQueue; Set Empty.

taskProducer
{
// Get a Free Buffer, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(FreeBufferQueue)

// Fill Buffer

// Add Buffer to Process Queue
PutQueueTail(ProcessQueue)

}
taskConsumer
{
// Get a Buffer to Process, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(ProcessQueue)

// Process Buffer

// Add Buffer to Free Queue (allow Producer to start producing again, if it ever stopped)
PutQueueTail(FreeBufferQueue)

}

This automatically throttles the Producer/Consumer to N buffers!

Any comments or suggestions greatly appreciated.

All the Best
Dave

On 7 Mar 2018, at 21:17, Quincey Morris <quinceymorris@...> wrote:

On Mar 7, 2018, at 03:16 , Dave <dave@...> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property
You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.


Dave
 

Hi Jonathan,

I missed NSCondition, thanks a lot. One thing I’m not sure about is the “boolean predicate”, from looking at the documentation, I can’t see that it is needed in this case unless I missing something obvious.

I’ve changed it to use NSCondition:

-(instancetype) init
{
self = [super init];
if (self == nil)
return nil;

self.pProcessingQueue = [[LTWQueue alloc] init];
self.pConsumerTaskWakeUpCondition = [[NSCondition alloc] init];
self.pLastQueueCount = 0;

[NSThread detachNewThreadSelector:@selector(consumerTask) toTarget:self withObject:nil];

return self;
}

-(void) consumerTask
{
NSInteger myObjectCount;

[self.pConsumerTaskWakeUpCondition lock];
[self.pConsumerTaskWakeUpCondition wait];

self.pLastQueueCount = [self.pProcessingQueue queueGetCount];
myObjectCount = self.pLastQueueCount;
while (myObjectCount != 0)
{
self.pLastObjectRemoved = [self.pProcessingQueue queueGetHead];
if (self.pLastObjectRemoved == nil)
break;

[self processObject:self.pLastObjectRemoved];

myObjectCount--;
}


[self.pConsumerTaskWakeUpCondition unlock];
}

-(void) addObject:(id) theObject
{
[self.pConsumerTaskWakeUpCondition lock];
[self.pProcessingQueue queuePutTail:theObject];

[self.pConsumerTaskWakeUpCondition signal];
[self.pConsumerTaskWakeUpCondition unlock];
}

In this case, the underlying array is protected by its own mutex, so I don’t need the extra predicate? Although I’m worried that this might cause a deadlock…….

All the Best
Dave

On 9 Mar 2018, at 16:50, Jonathan Prescott <jprescott12@...> wrote:

Look at NSCondition, which works with NSThread. I’m assuming you are re-implementing GetQueueHeadAndWait, and CheckData along with the queue that they operate on. The queue would consist of an NSMutableArray, and NSCondition variable which requires a predicate variable/function (returns a boolean), and a lock. The NSCondition variable would be the mechanism that the consumer thread would wait on, and the producer thread would signal.

You could do the same with POSIX thread support routines, or C++ threading support in the std library, if you interested in portability.

Jonathan

On Mar 9, 2018, at 11:13 AM, Dave <dave@...> wrote:

Hi,

I’m looking at the NSThread level and I can’t see anything that allows a thread to sleep until an event occurs (other than sleepUntilDate
and sleepForTimeInterval) or signal a thread for that matter. Am I missing something obvious?

All the Best
Dave

On 9 Mar 2018, at 13:39, Sandor Szatmari <admin.szatmari.net@...> wrote:

Can you have a queue manager class observe the count of items in the array and wake itself up when that increments? Then it would stay awake until the queue is empty.

Something like...

NSMutableArray *queue = ...
QueueManager *mgr = [QueueManager newQueueManagerObserving:queue];

Write QueueManager’s init to observe the queue
It’s dealloc to stop observing

Then when you receive notifications it would wake its thread and process.

Sandor

On Mar 9, 2018, at 08:19, Dave <dave@...> wrote:

Thanks Jens and Quincey, I think the best solution is to do what Jens suggested which is to have a custom thread type arrangement and share an Array as the Queue and protect it with a Lock/Mutex. The only thing I’m not clear on, it how to signal to the Consumer thread that an item has been added (e.g. wake the thread up). The way I saw it working is that I’d have one thread that is created at startup and sleeps until an entry has been added to the queue, it would then wake up and process all items on the queue before sleeping again.

A bit of background on what I am trying to do.

I have some C code that works on am embedded system using its own “OS/Kernel” and I’m trying to do something similar in my App so they can use a Mac instead of the embedded system (which is very hard to maintain since its very old and not supported anymore).

This code works with two threads, in a producer/consumer model. The cool thing about this Kernel is that it supports what it calls Queue Semaphores. A Queue Semaphore automatically performs the Thread Switching and Queue Management in a Thread-Safe Manner.

The pseudo code for the task I trying to reproduce on the Mac allows it to be coded really elegantly in my opinion.

(int) taskConsumer()
{
data* myData;

while (TRUE)
{
myData = GetQueueHeadAndWait(queue);

// Process Data
Code to Process the Data

// Free Data
free(myData)
}

// Never Reached

return 0;
}

(int) taskProducer((data*) theData)
{
data* myData;
bool newBlock;

// Check “theData” against the Head Item of the Queue and either allocate a new block and add it to the Tail or overwrite the existing Data in place.

newBlock = CheckData(queue,theData);
if (newBlock == TRUE)
{
// Allocate Buffer
myData = alloc(sizeof(data));

// Code to Copy from theData into myData


// Add Data to Queue, this causes the task to Consumer Task to become “fire” if it is asleep
PutQueueTail(queue,myData);
}
else
{
myData = GetQueueHeadNoWait(queue);
if (myData == NULL)
{
// Fatal Error
}

// Code to Copy from theData into myData, no need to add it to the queue, because its already there
}

return 0;
}

The way this works is that:

The Queue Semaphore is created empty, then the two tasks are created. One of the two tasks will get control first, if its the taskConsumer, then the call GetQueueHeadAndWait will cause the task to suspend (since the queue is empty). At some point taskProducer will run, which takes the data from the source and Adds it to the queue. If there are tasks waiting on the queue then the oldest task is given the data and it is made ready so that when it gets control, the return from GetQueueHeadAndWait return with the Data received. If there are no tasks waiting, then the data is just added to the queue (or overwritten).

There are various other functions in the Kernel that allow it Lock the Queue, set Task Priorities and things like that. In this case, the two tasks have equal priority and the Queue Semaphore used is private to these two Tasks.

I think this solution is so elegant and I’m trying to see if it is reproducible on the Mac/Cocoa and not having much luck so far. The beauty of it is is that it is completely self-regulating and self-synchronizing. Also this mechanism can re-used for different scenarios with very little tweaking, for instance, if you use two Queue Semaphores you can do things like:

FreeBufferQueue; Set to N Buffers when Queue is created.
ProcessQueue; Set Empty.

taskProducer
{
// Get a Free Buffer, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(FreeBufferQueue)

// Fill Buffer

// Add Buffer to Process Queue
PutQueueTail(ProcessQueue)

}
taskConsumer
{
// Get a Buffer to Process, Suspend if there are none (e.g. stop producing).
GetQueueHeadAndWait(ProcessQueue)

// Process Buffer

// Add Buffer to Free Queue (allow Producer to start producing again, if it ever stopped)
PutQueueTail(FreeBufferQueue)

}

This automatically throttles the Producer/Consumer to N buffers!

Any comments or suggestions greatly appreciated.

All the Best
Dave

On 7 Mar 2018, at 21:17, Quincey Morris <quinceymorris@...> wrote:

On Mar 7, 2018, at 03:16 , Dave <dave@...> wrote:

it stays on the queue until the NSOperation method gets called which check the “Cancel” property
You’re right, I misremembered. GCD has a DispatchWorkItem “cancel” method, which doesn’t have any proper documentation but I think dequeues the item (since GCD doesn’t have an actual cancellation protocol).

However, you do have to be careful. There are a of thread safety traps involved in removing things from the queue, and/or “overwriting” state in the top queue entry. The NSOperationQueue mechanism is at least safe in that it helps prevents you from doing unsafe things, at the cost of having to flush the cancelled entries from the queue. Since the cancelled items have to be dequeued anyway, I’m not sure the additional cost of letting them run to find out if they’re cancelled really matters (unless there are going to be thousands of them).

Anyway, if you use a solution that allows to to dequeue items, make sure you cover the case where the item has begun executing after you’ve decided to lock the queue but before you succeed in locking the queue.


Dave
 

Hi,

I get it now, in this case the predicate is actually the (protected) Array, I’ve changed it yet again it just process one item at time. I’ll test it tomorrow and then add higher level code to make use of it and then add support for processing the queue and removing useless items.

Cheers
Dave

-(instancetype) init
{
self = [super init];
if (self == nil)
return nil;

self.pProcessingQueue = [[LTWQueue alloc] init];
self.pConsumerTaskWakeUpCondition = [[NSCondition alloc] init];
self.pLastQueueCount = 0;

[NSThread detachNewThreadSelector:@selector(consumerTask) toTarget:self withObject:nil];

return self;
}



-(void) processObject:(id) theObject
{
}



-(void) consumerTask
{
while (YES)
{
[self.pConsumerTaskWakeUpCondition lock];
[self.pConsumerTaskWakeUpCondition wait];

self.pLastObjectRemoved = [self.pProcessingQueue queueGetHead];
if (self.pLastObjectRemoved != nil)
[self processObject:self.pLastObjectRemoved];

[self.pConsumerTaskWakeUpCondition unlock];
}
}



-(void) addObject:(id) theObject
{
[self.pConsumerTaskWakeUpCondition lock];
[self.pProcessingQueue queuePutTail:theObject];

[self.pConsumerTaskWakeUpCondition signal];
[self.pConsumerTaskWakeUpCondition unlock];
}